Add multi-factor authentication modules (#15489)

* Get user after login flow finished

* Add multi factor authentication support

* Typings
This commit is contained in:
Jason Hu 2018-08-22 00:52:34 -07:00 committed by Paulus Schoutsen
parent ae63980152
commit 7e7f9bc6ac
18 changed files with 925 additions and 46 deletions

View file

@ -6,21 +6,26 @@ from typing import Any, Dict, List, Optional, Tuple, cast
import jwt
import voluptuous as vol
from homeassistant import data_entry_flow
from homeassistant.core import callback, HomeAssistant
from homeassistant.util import dt as dt_util
from . import auth_store, models
from .providers import auth_provider_from_config, AuthProvider
from .mfa_modules import auth_mfa_module_from_config, MultiFactorAuthModule
from .providers import auth_provider_from_config, AuthProvider, LoginFlow
_LOGGER = logging.getLogger(__name__)
_MfaModuleDict = Dict[str, MultiFactorAuthModule]
_ProviderKey = Tuple[str, Optional[str]]
_ProviderDict = Dict[_ProviderKey, AuthProvider]
async def auth_manager_from_config(
hass: HomeAssistant,
provider_configs: List[Dict[str, Any]]) -> 'AuthManager':
provider_configs: List[Dict[str, Any]],
module_configs: List[Dict[str, Any]]) -> 'AuthManager':
"""Initialize an auth manager from config."""
store = auth_store.AuthStore(hass)
if provider_configs:
@ -44,7 +49,28 @@ async def auth_manager_from_config(
continue
provider_hash[key] = provider
manager = AuthManager(hass, store, provider_hash)
if module_configs:
modules = await asyncio.gather(
*[auth_mfa_module_from_config(hass, config)
for config in module_configs])
else:
modules = ()
# So returned auth modules are in same order as config
module_hash = OrderedDict() # type: _MfaModuleDict
for module in modules:
if module is None:
continue
if module.id in module_hash:
_LOGGER.error(
'Found duplicate multi-factor module: %s. Please add unique '
'IDs if you want to have the same module twice.', module.id)
continue
module_hash[module.id] = module
manager = AuthManager(hass, store, provider_hash, module_hash)
return manager
@ -52,10 +78,13 @@ class AuthManager:
"""Manage the authentication for Home Assistant."""
def __init__(self, hass: HomeAssistant, store: auth_store.AuthStore,
providers: _ProviderDict) -> None:
providers: _ProviderDict, mfa_modules: _MfaModuleDict) \
-> None:
"""Initialize the auth manager."""
self.hass = hass
self._store = store
self._providers = providers
self._mfa_modules = mfa_modules
self.login_flow = data_entry_flow.FlowManager(
hass, self._async_create_login_flow,
self._async_finish_login_flow)
@ -82,6 +111,16 @@ class AuthManager:
"""Return a list of available auth providers."""
return list(self._providers.values())
@property
def auth_mfa_modules(self) -> List[MultiFactorAuthModule]:
"""Return a list of available auth modules."""
return list(self._mfa_modules.values())
def get_auth_mfa_module(self, module_id: str) \
-> Optional[MultiFactorAuthModule]:
"""Return an multi-factor auth module, None if not found."""
return self._mfa_modules.get(module_id)
async def async_get_users(self) -> List[models.User]:
"""Retrieve all users."""
return await self._store.async_get_users()
@ -90,6 +129,16 @@ class AuthManager:
"""Retrieve a user."""
return await self._store.async_get_user(user_id)
async def async_get_user_by_credentials(
self, credentials: models.Credentials) -> Optional[models.User]:
"""Get a user by credential, return None if not found."""
for user in await self.async_get_users():
for creds in user.credentials:
if creds.id == credentials.id:
return user
return None
async def async_create_system_user(self, name: str) -> models.User:
"""Create a system user."""
return await self._store.async_create_user(
@ -114,12 +163,11 @@ class AuthManager:
-> models.User:
"""Get or create a user."""
if not credentials.is_new:
for user in await self._store.async_get_users():
for creds in user.credentials:
if creds.id == credentials.id:
return user
raise ValueError('Unable to find the user.')
user = await self.async_get_user_by_credentials(credentials)
if user is None:
raise ValueError('Unable to find the user.')
else:
return user
auth_provider = self._async_get_auth_provider(credentials)
@ -175,6 +223,49 @@ class AuthManager:
await self._store.async_remove_credentials(credentials)
async def async_enable_user_mfa(self, user: models.User,
mfa_module_id: str, data: Any) -> None:
"""Enable a multi-factor auth module for user."""
if user.system_generated:
raise ValueError('System generated users cannot enable '
'multi-factor auth module.')
module = self.get_auth_mfa_module(mfa_module_id)
if module is None:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))
if module.setup_schema is not None:
try:
# pylint: disable=not-callable
data = module.setup_schema(data)
except vol.Invalid as err:
raise ValueError('Data does not match schema: {}'.format(err))
await module.async_setup_user(user.id, data)
async def async_disable_user_mfa(self, user: models.User,
mfa_module_id: str) -> None:
"""Disable a multi-factor auth module for user."""
if user.system_generated:
raise ValueError('System generated users cannot disable '
'multi-factor auth module.')
module = self.get_auth_mfa_module(mfa_module_id)
if module is None:
raise ValueError('Unable find multi-factor auth module: {}'
.format(mfa_module_id))
await module.async_depose_user(user.id)
async def async_get_enabled_mfa(self, user: models.User) -> List[str]:
"""List enabled mfa modules for user."""
module_ids = []
for module_id, module in self._mfa_modules.items():
if await module.async_is_user_setup(user.id):
module_ids.append(module_id)
return module_ids
async def async_create_refresh_token(self, user: models.User,
client_id: Optional[str] = None) \
-> models.RefreshToken:
@ -262,12 +353,17 @@ class AuthManager:
return await auth_provider.async_login_flow(context)
async def _async_finish_login_flow(
self, flow: data_entry_flow.FlowHandler, result: Dict[str, Any]) \
self, flow: LoginFlow, result: Dict[str, Any]) \
-> Dict[str, Any]:
"""Return a user as result of login flow."""
if result['type'] != data_entry_flow.RESULT_TYPE_CREATE_ENTRY:
return result
# we got final result
if isinstance(result['data'], models.User):
result['result'] = result['data']
return result
auth_provider = self._providers[result['handler']]
credentials = await auth_provider.async_get_or_create_credentials(
result['data'])
@ -276,8 +372,19 @@ class AuthManager:
result['result'] = credentials
return result
user = await self.async_get_or_create_user(credentials)
result['result'] = user
# multi-factor module cannot enabled for new credential
# which has not linked to a user yet
if auth_provider.support_mfa and not credentials.is_new:
user = await self.async_get_user_by_credentials(credentials)
if user is not None:
modules = await self.async_get_enabled_mfa(user)
if modules:
flow.user = user
flow.available_mfa_modules = modules
return await flow.async_step_select_mfa_module()
result['result'] = await self.async_get_or_create_user(credentials)
return result
@callback

View file

@ -0,0 +1,141 @@
"""Plugable auth modules for Home Assistant."""
from datetime import timedelta
import importlib
import logging
import types
from typing import Any, Dict, Optional
import voluptuous as vol
from voluptuous.humanize import humanize_error
from homeassistant import requirements
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
from homeassistant.core import HomeAssistant
from homeassistant.util.decorator import Registry
MULTI_FACTOR_AUTH_MODULES = Registry()
MULTI_FACTOR_AUTH_MODULE_SCHEMA = vol.Schema({
vol.Required(CONF_TYPE): str,
vol.Optional(CONF_NAME): str,
# Specify ID if you have two mfa auth module for same type.
vol.Optional(CONF_ID): str,
}, extra=vol.ALLOW_EXTRA)
SESSION_EXPIRATION = timedelta(minutes=5)
DATA_REQS = 'mfa_auth_module_reqs_processed'
_LOGGER = logging.getLogger(__name__)
class MultiFactorAuthModule:
"""Multi-factor Auth Module of validation function."""
DEFAULT_TITLE = 'Unnamed auth module'
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize an auth module."""
self.hass = hass
self.config = config
@property
def id(self) -> str: # pylint: disable=invalid-name
"""Return id of the auth module.
Default is same as type
"""
return self.config.get(CONF_ID, self.type)
@property
def type(self) -> str:
"""Return type of the module."""
return self.config[CONF_TYPE] # type: ignore
@property
def name(self) -> str:
"""Return the name of the auth module."""
return self.config.get(CONF_NAME, self.DEFAULT_TITLE)
# Implement by extending class
@property
def input_schema(self) -> vol.Schema:
"""Return a voluptuous schema to define mfa auth module's input."""
raise NotImplementedError
@property
def setup_schema(self) -> Optional[vol.Schema]:
"""Return a vol schema to validate mfa auth module's setup input.
Optional
"""
return None
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user for mfa auth module."""
raise NotImplementedError
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
raise NotImplementedError
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
raise NotImplementedError
async def async_validation(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
raise NotImplementedError
async def auth_mfa_module_from_config(
hass: HomeAssistant, config: Dict[str, Any]) \
-> Optional[MultiFactorAuthModule]:
"""Initialize an auth module from a config."""
module_name = config[CONF_TYPE]
module = await _load_mfa_module(hass, module_name)
if module is None:
return None
try:
config = module.CONFIG_SCHEMA(config) # type: ignore
except vol.Invalid as err:
_LOGGER.error('Invalid configuration for multi-factor module %s: %s',
module_name, humanize_error(config, err))
return None
return MULTI_FACTOR_AUTH_MODULES[module_name](hass, config) # type: ignore
async def _load_mfa_module(hass: HomeAssistant, module_name: str) \
-> Optional[types.ModuleType]:
"""Load an mfa auth module."""
module_path = 'homeassistant.auth.mfa_modules.{}'.format(module_name)
try:
module = importlib.import_module(module_path)
except ImportError:
_LOGGER.warning('Unable to find %s', module_path)
return None
if hass.config.skip_pip or not hasattr(module, 'REQUIREMENTS'):
return module
processed = hass.data.get(DATA_REQS)
if processed and module_name in processed:
return module
processed = hass.data[DATA_REQS] = set()
# https://github.com/python/mypy/issues/1424
req_success = await requirements.async_process_requirements(
hass, module_path, module.REQUIREMENTS) # type: ignore
if not req_success:
return None
processed.add(module_name)
return module

View file

@ -0,0 +1,82 @@
"""Example auth module."""
import logging
from typing import Any, Dict, Optional
import voluptuous as vol
from homeassistant.core import HomeAssistant
from . import MultiFactorAuthModule, MULTI_FACTOR_AUTH_MODULES, \
MULTI_FACTOR_AUTH_MODULE_SCHEMA
CONFIG_SCHEMA = MULTI_FACTOR_AUTH_MODULE_SCHEMA.extend({
vol.Required('data'): [vol.Schema({
vol.Required('user_id'): str,
vol.Required('pin'): str,
})]
}, extra=vol.PREVENT_EXTRA)
_LOGGER = logging.getLogger(__name__)
@MULTI_FACTOR_AUTH_MODULES.register('insecure_example')
class InsecureExampleModule(MultiFactorAuthModule):
"""Example auth module validate pin."""
DEFAULT_TITLE = 'Insecure Personal Identify Number'
def __init__(self, hass: HomeAssistant, config: Dict[str, Any]) -> None:
"""Initialize the user data store."""
super().__init__(hass, config)
self._data = config['data']
@property
def input_schema(self) -> vol.Schema:
"""Validate login flow input data."""
return vol.Schema({'pin': str})
@property
def setup_schema(self) -> Optional[vol.Schema]:
"""Validate async_setup_user input data."""
return vol.Schema({'pin': str})
async def async_setup_user(self, user_id: str, setup_data: Any) -> None:
"""Set up user to use mfa module."""
# data shall has been validate in caller
pin = setup_data['pin']
for data in self._data:
if data['user_id'] == user_id:
# already setup, override
data['pin'] = pin
return
self._data.append({'user_id': user_id, 'pin': pin})
async def async_depose_user(self, user_id: str) -> None:
"""Remove user from mfa module."""
found = None
for data in self._data:
if data['user_id'] == user_id:
found = data
break
if found:
self._data.remove(found)
async def async_is_user_setup(self, user_id: str) -> bool:
"""Return whether user is setup."""
for data in self._data:
if data['user_id'] == user_id:
return True
return False
async def async_validation(
self, user_id: str, user_input: Dict[str, Any]) -> bool:
"""Return True if validation passed."""
for data in self._data:
if data['user_id'] == user_id:
# user_input has been validate in caller
if data['pin'] == user_input['pin']:
return True
return False

View file

@ -9,12 +9,13 @@ from voluptuous.humanize import humanize_error
from homeassistant import data_entry_flow, requirements
from homeassistant.core import callback, HomeAssistant
from homeassistant.const import CONF_TYPE, CONF_NAME, CONF_ID
from homeassistant.const import CONF_ID, CONF_NAME, CONF_TYPE
from homeassistant.util import dt as dt_util
from homeassistant.util.decorator import Registry
from ..auth_store import AuthStore
from ..models import Credentials, UserMeta
from ..models import Credentials, User, UserMeta # noqa: F401
from ..mfa_modules import SESSION_EXPIRATION
_LOGGER = logging.getLogger(__name__)
DATA_REQS = 'auth_prov_reqs_processed'
@ -59,6 +60,11 @@ class AuthProvider:
"""Return the name of the auth provider."""
return self.config.get(CONF_NAME, self.DEFAULT_TITLE)
@property
def support_mfa(self) -> bool:
"""Return whether multi-factor auth supported by the auth provider."""
return True
async def async_credentials(self) -> List[Credentials]:
"""Return all credentials of this provider."""
users = await self.store.async_get_users()
@ -160,8 +166,11 @@ class LoginFlow(data_entry_flow.FlowHandler):
def __init__(self, auth_provider: AuthProvider) -> None:
"""Initialize the login flow."""
self._auth_provider = auth_provider
self._auth_module_id = None # type: Optional[str]
self._auth_manager = auth_provider.hass.auth # type: ignore
self.available_mfa_modules = [] # type: List
self.created_at = dt_util.utcnow()
self.user = None
self.user = None # type: Optional[User]
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None) \
@ -173,6 +182,63 @@ class LoginFlow(data_entry_flow.FlowHandler):
"""
raise NotImplementedError
async def async_step_select_mfa_module(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the step of select mfa module."""
errors = {}
if user_input is not None:
auth_module = user_input.get('multi_factor_auth_module')
if auth_module in self.available_mfa_modules:
self._auth_module_id = auth_module
return await self.async_step_mfa()
errors['base'] = 'invalid_auth_module'
if len(self.available_mfa_modules) == 1:
self._auth_module_id = self.available_mfa_modules[0]
return await self.async_step_mfa()
return self.async_show_form(
step_id='select_mfa_module',
data_schema=vol.Schema({
'multi_factor_auth_module': vol.In(self.available_mfa_modules)
}),
errors=errors,
)
async def async_step_mfa(
self, user_input: Optional[Dict[str, str]] = None) \
-> Dict[str, Any]:
"""Handle the step of mfa validation."""
errors = {}
auth_module = self._auth_manager.get_auth_mfa_module(
self._auth_module_id)
if auth_module is None:
# Given an invalid input to async_step_select_mfa_module
# will show invalid_auth_module error
return await self.async_step_select_mfa_module(user_input={})
if user_input is not None:
expires = self.created_at + SESSION_EXPIRATION
if dt_util.utcnow() > expires:
errors['base'] = 'login_expired'
else:
result = await auth_module.async_validation(
self.user.id, user_input) # type: ignore
if not result:
errors['base'] = 'invalid_auth'
if not errors:
return await self.async_finish(self.user)
return self.async_show_form(
step_id='mfa',
data_schema=auth_module.input_schema,
errors=errors,
)
async def async_finish(self, flow_result: Any) -> Dict:
"""Handle the pass of login flow."""
return self.async_create_entry(

View file

@ -35,6 +35,11 @@ class TrustedNetworksAuthProvider(AuthProvider):
DEFAULT_TITLE = 'Trusted Networks'
@property
def support_mfa(self) -> bool:
"""Trusted Networks auth provider does not support MFA."""
return False
async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow:
"""Return a flow to login."""
assert context is not None

View file

@ -14,14 +14,16 @@ import voluptuous as vol
from voluptuous.humanize import humanize_error
from homeassistant import auth
from homeassistant.auth import providers as auth_providers
from homeassistant.auth import providers as auth_providers,\
mfa_modules as auth_mfa_modules
from homeassistant.const import (
ATTR_FRIENDLY_NAME, ATTR_HIDDEN, ATTR_ASSUMED_STATE,
CONF_LATITUDE, CONF_LONGITUDE, CONF_NAME, CONF_PACKAGES, CONF_UNIT_SYSTEM,
CONF_TIME_ZONE, CONF_ELEVATION, CONF_UNIT_SYSTEM_METRIC,
CONF_UNIT_SYSTEM_IMPERIAL, CONF_TEMPERATURE_UNIT, TEMP_CELSIUS,
__version__, CONF_CUSTOMIZE, CONF_CUSTOMIZE_DOMAIN, CONF_CUSTOMIZE_GLOB,
CONF_WHITELIST_EXTERNAL_DIRS, CONF_AUTH_PROVIDERS, CONF_TYPE)
CONF_WHITELIST_EXTERNAL_DIRS, CONF_AUTH_PROVIDERS, CONF_AUTH_MFA_MODULES,
CONF_TYPE)
from homeassistant.core import callback, DOMAIN as CONF_CORE, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import get_component, get_platform
@ -166,7 +168,10 @@ CORE_CONFIG_SCHEMA = CUSTOMIZE_CONFIG_SCHEMA.extend({
CONF_TYPE: vol.NotIn(['insecure_example'],
'The insecure_example auth provider'
' is for testing only.')
})])
})]),
vol.Optional(CONF_AUTH_MFA_MODULES):
vol.All(cv.ensure_list,
[auth_mfa_modules.MULTI_FACTOR_AUTH_MODULE_SCHEMA]),
})
@ -412,7 +417,9 @@ async def async_process_ha_core_config(
# Only load auth during startup.
if not hasattr(hass, 'auth'):
setattr(hass, 'auth', await auth.auth_manager_from_config(
hass, config.get(CONF_AUTH_PROVIDERS, [])))
hass,
config.get(CONF_AUTH_PROVIDERS, []),
config.get(CONF_AUTH_MFA_MODULES, [])))
hac = hass.config

View file

@ -30,6 +30,7 @@ CONF_API_KEY = 'api_key'
CONF_API_VERSION = 'api_version'
CONF_AT = 'at'
CONF_AUTHENTICATION = 'authentication'
CONF_AUTH_MFA_MODULES = 'auth_mfa_modules'
CONF_AUTH_PROVIDERS = 'auth_providers'
CONF_BASE = 'base'
CONF_BEFORE = 'before'

View file

@ -5,15 +5,15 @@ import logging
import os
from homeassistant.auth import auth_manager_from_config
from homeassistant.auth.providers import homeassistant as hass_auth
from homeassistant.core import HomeAssistant
from homeassistant.config import get_default_config_dir
from homeassistant.auth.providers import homeassistant as hass_auth
def run(args):
"""Handle Home Assistant auth provider script."""
parser = argparse.ArgumentParser(
description=("Manage Home Assistant users"))
description="Manage Home Assistant users")
parser.add_argument(
'--script', choices=['auth'])
parser.add_argument(
@ -56,7 +56,7 @@ async def run_command(hass, args):
hass.config.config_dir = os.path.join(os.getcwd(), args.config)
hass.auth = await auth_manager_from_config(hass, [{
'type': 'homeassistant',
}])
}], [])
provider = hass.auth.auth_providers[0]
await provider.async_initialize()
await args.func(hass, provider, args)

View file

@ -165,8 +165,10 @@ def gather_modules():
errors = []
for package in sorted(explore_module('homeassistant.components', True) +
explore_module('homeassistant.scripts', True)):
for package in sorted(
explore_module('homeassistant.components', True) +
explore_module('homeassistant.scripts', True) +
explore_module('homeassistant.auth', True)):
try:
module = importlib.import_module(package)
except ImportError:

View file

@ -0,0 +1 @@
"""Tests for the multi-factor auth modules."""

View file

@ -0,0 +1,127 @@
"""Test the example module auth module."""
from homeassistant import auth, data_entry_flow
from homeassistant.auth.mfa_modules import auth_mfa_module_from_config
from homeassistant.auth.models import Credentials
from tests.common import MockUser
async def test_validate(hass):
"""Test validating pin."""
auth_module = await auth_mfa_module_from_config(hass, {
'type': 'insecure_example',
'data': [{'user_id': 'test-user', 'pin': '123456'}]
})
result = await auth_module.async_validation(
'test-user', {'pin': '123456'})
assert result is True
result = await auth_module.async_validation(
'test-user', {'pin': 'invalid'})
assert result is False
result = await auth_module.async_validation(
'invalid-user', {'pin': '123456'})
assert result is False
async def test_setup_user(hass):
"""Test setup user."""
auth_module = await auth_mfa_module_from_config(hass, {
'type': 'insecure_example',
'data': []
})
await auth_module.async_setup_user(
'test-user', {'pin': '123456'})
assert len(auth_module._data) == 1
result = await auth_module.async_validation(
'test-user', {'pin': '123456'})
assert result is True
async def test_depose_user(hass):
"""Test despose user."""
auth_module = await auth_mfa_module_from_config(hass, {
'type': 'insecure_example',
'data': [{'user_id': 'test-user', 'pin': '123456'}]
})
assert len(auth_module._data) == 1
await auth_module.async_depose_user('test-user')
assert len(auth_module._data) == 0
async def test_is_user_setup(hass):
"""Test is user setup."""
auth_module = await auth_mfa_module_from_config(hass, {
'type': 'insecure_example',
'data': [{'user_id': 'test-user', 'pin': '123456'}]
})
assert await auth_module.async_is_user_setup('test-user') is True
assert await auth_module.async_is_user_setup('invalid-user') is False
async def test_login(hass):
"""Test login flow with auth module."""
hass.auth = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{'username': 'test-user', 'password': 'test-pass'}],
}], [{
'type': 'insecure_example',
'data': [{'user_id': 'mock-user', 'pin': '123456'}]
}])
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(hass.auth)
await hass.auth.async_link_user(user, Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
provider = hass.auth.auth_providers[0]
result = await hass.auth.login_flow.async_init(
(provider.type, provider.id))
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {
'username': 'incorrect-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {
'username': 'test-user',
'password': 'incorrect-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['step_id'] == 'mfa'
assert result['data_schema'].schema.get('pin') == str
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'pin': 'invalid-code'})
assert result['type'] == data_entry_flow.RESULT_TYPE_FORM
assert result['errors']['base'] == 'invalid_auth'
result = await hass.auth.login_flow.async_configure(
result['flow_id'], {'pin': '123456'})
assert result['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result['data'].id == 'mock-user'

View file

@ -124,7 +124,7 @@ async def test_new_users_populate_values(hass, data):
manager = await auth_manager_from_config(hass, [{
'type': 'homeassistant'
}])
}], [])
provider = manager.auth_providers[0]
credentials = await provider.async_get_or_create_credentials({
'username': 'hello'

View file

@ -40,7 +40,7 @@ def manager(hass, store, provider):
"""Mock manager."""
return AuthManager(hass, store, {
(provider.type, provider.id): provider
})
}, {})
async def test_create_new_credential(manager, provider):

View file

@ -27,7 +27,7 @@ def manager(hass, store, provider):
"""Mock manager."""
return auth.AuthManager(hass, store, {
(provider.type, provider.id): provider
})
}, {})
async def test_create_new_credential(manager, provider):

View file

@ -28,7 +28,7 @@ def manager(hass, store, provider):
"""Mock manager."""
return auth.AuthManager(hass, store, {
(provider.type, provider.id): provider
})
}, {})
async def test_trusted_networks_credentials(manager, provider):

View file

@ -7,6 +7,7 @@ import pytest
from homeassistant import auth, data_entry_flow
from homeassistant.auth import (
models as auth_models, auth_store, const as auth_const)
from homeassistant.auth.mfa_modules import SESSION_EXPIRATION
from homeassistant.util import dt as dt_util
from tests.common import (
MockUser, ensure_auth_manager_loaded, flush_store, CLIENT_ID)
@ -40,7 +41,7 @@ async def test_auth_manager_from_config_validates_config_and_id(mock_hass):
'type': 'insecure_example',
'id': 'another',
'users': [],
}])
}], [])
providers = [{
'name': provider.name,
@ -58,7 +59,65 @@ async def test_auth_manager_from_config_validates_config_and_id(mock_hass):
}]
async def test_create_new_user(hass, hass_storage):
async def test_auth_manager_from_config_auth_modules(mock_hass):
"""Test get auth modules."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'name': 'Test Name',
'type': 'insecure_example',
'users': [],
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
'users': [],
}], [{
'name': 'Module 1',
'type': 'insecure_example',
'data': [],
}, {
'name': 'Module 2',
'type': 'insecure_example',
'id': 'another',
'data': [],
}, {
'name': 'Duplicate ID',
'type': 'insecure_example',
'id': 'another',
'data': [],
}])
providers = [{
'name': provider.name,
'type': provider.type,
'id': provider.id,
} for provider in manager.auth_providers]
assert providers == [{
'name': 'Test Name',
'type': 'insecure_example',
'id': None,
}, {
'name': 'Test Name 2',
'type': 'insecure_example',
'id': 'another',
}]
modules = [{
'name': module.name,
'type': module.type,
'id': module.id,
} for module in manager.auth_mfa_modules]
assert modules == [{
'name': 'Module 1',
'type': 'insecure_example',
'id': 'insecure_example',
}, {
'name': 'Module 2',
'type': 'insecure_example',
'id': 'another',
}]
async def test_create_new_user(hass):
"""Test creating new user."""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
@ -67,7 +126,7 @@ async def test_create_new_user(hass, hass_storage):
'password': 'test-pass',
'name': 'Test Name'
}]
}])
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
@ -92,7 +151,8 @@ async def test_login_as_existing_user(mock_hass):
'password': 'test-pass',
'name': 'Test Name'
}]
}])
}], [])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add a fake user that we're not going to log in with
@ -157,7 +217,7 @@ async def test_linking_user_to_two_auth_providers(hass, hass_storage):
'username': 'another-user',
'password': 'another-password',
}]
}])
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
@ -190,7 +250,7 @@ async def test_saving_loading(hass, hass_storage):
'username': 'test-user',
'password': 'test-pass',
}]
}])
}], [])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
@ -211,7 +271,7 @@ async def test_saving_loading(hass, hass_storage):
async def test_cannot_retrieve_expired_access_token(hass):
"""Test that we cannot retrieve expired access tokens."""
manager = await auth.auth_manager_from_config(hass, [])
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
assert refresh_token.user.id is user.id
@ -236,7 +296,7 @@ async def test_cannot_retrieve_expired_access_token(hass):
async def test_generating_system_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [])
manager = await auth.auth_manager_from_config(hass, [], [])
user = await manager.async_create_system_user('Hass.io')
token = await manager.async_create_refresh_token(user)
assert user.system_generated
@ -246,7 +306,7 @@ async def test_generating_system_user(hass):
async def test_refresh_token_requires_client_for_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [])
manager = await auth.auth_manager_from_config(hass, [], [])
user = MockUser().add_to_auth_manager(manager)
assert user.system_generated is False
@ -260,7 +320,7 @@ async def test_refresh_token_requires_client_for_user(hass):
async def test_refresh_token_not_requires_client_for_system_user(hass):
"""Test that we can add a system user."""
manager = await auth.auth_manager_from_config(hass, [])
manager = await auth.auth_manager_from_config(hass, [], [])
user = await manager.async_create_system_user('Hass.io')
assert user.system_generated is True
@ -274,7 +334,7 @@ async def test_refresh_token_not_requires_client_for_system_user(hass):
async def test_cannot_deactive_owner(mock_hass):
"""Test that we cannot deactive the owner."""
manager = await auth.auth_manager_from_config(mock_hass, [])
manager = await auth.auth_manager_from_config(mock_hass, [], [])
owner = MockUser(
is_owner=True,
).add_to_auth_manager(manager)
@ -285,7 +345,7 @@ async def test_cannot_deactive_owner(mock_hass):
async def test_remove_refresh_token(mock_hass):
"""Test that we can remove a refresh token."""
manager = await auth.auth_manager_from_config(mock_hass, [])
manager = await auth.auth_manager_from_config(mock_hass, [], [])
user = MockUser().add_to_auth_manager(manager)
refresh_token = await manager.async_create_refresh_token(user, CLIENT_ID)
access_token = manager.async_create_access_token(refresh_token)
@ -298,3 +358,280 @@ async def test_remove_refresh_token(mock_hass):
assert (
await manager.async_validate_access_token(access_token) is None
)
async def test_login_with_auth_module(mock_hass):
"""Test login as existing user with auth module."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
# After auth_provider validated, request auth module input form
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'invalid-pin',
})
# Invalid auth error
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors'] == {'base': 'invalid_auth'}
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
# Finally passed, get user
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.id == 'mock-user'
assert user.is_owner is False
assert user.is_active is False
assert user.name == 'Paulus'
async def test_login_with_multi_auth_module(mock_hass):
"""Test login as existing user with multiple auth modules."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}, {
'type': 'insecure_example',
'id': 'module2',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin2'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
# After auth_provider validated, request select auth module
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'select_mfa_module'
step = await manager.login_flow.async_configure(step['flow_id'], {
'multi_factor_auth_module': 'module2',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin2',
})
# Finally passed, get user
assert step['type'] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
user = step['result']
assert user is not None
assert user.id == 'mock-user'
assert user.is_owner is False
assert user.is_active is False
assert user.name == 'Paulus'
async def test_auth_module_expired_session(mock_hass):
"""Test login as existing user."""
manager = await auth.auth_manager_from_config(mock_hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
'name': 'Test Name'
}],
}], [{
'type': 'insecure_example',
'data': [{
'user_id': 'mock-user',
'pin': 'test-pin'
}]
}])
mock_hass.auth = manager
ensure_auth_manager_loaded(manager)
# Add fake user with credentials for example auth provider.
user = MockUser(
id='mock-user',
is_owner=False,
is_active=False,
name='Paulus',
).add_to_auth_manager(manager)
user.credentials.append(auth_models.Credentials(
id='mock-id',
auth_provider_type='insecure_example',
auth_provider_id=None,
data={'username': 'test-user'},
is_new=False,
))
step = await manager.login_flow.async_init(('insecure_example', None))
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
with patch('homeassistant.util.dt.utcnow',
return_value=dt_util.utcnow() + SESSION_EXPIRATION):
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
# Invalid auth due session timeout
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors']['base'] == 'login_expired'
# The second try will fail as well
step = await manager.login_flow.async_configure(step['flow_id'], {
'pin': 'test-pin',
})
assert step['type'] == data_entry_flow.RESULT_TYPE_FORM
assert step['step_id'] == 'mfa'
assert step['errors']['base'] == 'login_expired'
async def test_enable_mfa_for_user(hass, hass_storage):
"""Test enable mfa module for user."""
manager = await auth.auth_manager_from_config(hass, [{
'type': 'insecure_example',
'users': [{
'username': 'test-user',
'password': 'test-pass',
}]
}], [{
'type': 'insecure_example',
'data': [],
}])
step = await manager.login_flow.async_init(('insecure_example', None))
step = await manager.login_flow.async_configure(step['flow_id'], {
'username': 'test-user',
'password': 'test-pass',
})
user = step['result']
assert user is not None
# new user don't have mfa enabled
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 0
module = manager.get_auth_mfa_module('insecure_example')
# mfa module don't have data
assert bool(module._data) is False
# test enable mfa for user
await manager.async_enable_user_mfa(user, 'insecure_example',
{'pin': 'test-pin'})
assert len(module._data) == 1
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin'}
# test get enabled mfa
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 1
assert 'insecure_example' in modules
# re-enable mfa for user will override
await manager.async_enable_user_mfa(user, 'insecure_example',
{'pin': 'test-pin-new'})
assert len(module._data) == 1
assert module._data[0] == {'user_id': user.id, 'pin': 'test-pin-new'}
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 1
assert 'insecure_example' in modules
# system user cannot enable mfa
system_user = await manager.async_create_system_user('system-user')
with pytest.raises(ValueError):
await manager.async_enable_user_mfa(system_user, 'insecure_example',
{'pin': 'test-pin'})
assert len(module._data) == 1
modules = await manager.async_get_enabled_mfa(system_user)
assert len(modules) == 0
# disable mfa for user
await manager.async_disable_user_mfa(user, 'insecure_example')
assert bool(module._data) is False
# test get enabled mfa
modules = await manager.async_get_enabled_mfa(user)
assert len(modules) == 0
# disable mfa for user don't enabled just silent fail
await manager.async_disable_user_mfa(user, 'insecure_example')

View file

@ -118,7 +118,7 @@ def async_test_home_assistant(loop):
hass = ha.HomeAssistant(loop)
hass.config.async_load = Mock()
store = auth_store.AuthStore(hass)
hass.auth = auth.AuthManager(hass, store, {})
hass.auth = auth.AuthManager(hass, store, {}, {})
ensure_auth_manager_loaded(hass.auth)
INSTANCES.append(hass)
@ -342,7 +342,7 @@ class MockUser(auth_models.User):
'is_owner': is_owner,
'is_active': is_active,
'name': name,
'system_generated': system_generated
'system_generated': system_generated,
}
if id is not None:
kwargs['id'] = id

View file

@ -15,11 +15,14 @@ BASE_CONFIG = [{
}]
}]
EMPTY_CONFIG = []
async def async_setup_auth(hass, aiohttp_client, provider_configs=BASE_CONFIG,
setup_api=False):
"""Helper to set up authentication and create an HTTP client."""
hass.auth = await auth.auth_manager_from_config(hass, provider_configs)
module_configs=EMPTY_CONFIG, setup_api=False):
"""Helper to set up authentication and create a HTTP client."""
hass.auth = await auth.auth_manager_from_config(
hass, provider_configs, module_configs)
ensure_auth_manager_loaded(hass.auth)
await async_setup_component(hass, 'auth', {
'http': {