mirror of
https://github.com/home-assistant/core
synced 2024-07-21 02:25:24 +00:00
Check if requirements are installed in the executor (#71611)
This commit is contained in:
parent
c660fae8d8
commit
0ffeb6c304
|
@ -15,10 +15,7 @@ from .util import package as pkg_util
|
|||
|
||||
PIP_TIMEOUT = 60 # The default is too low when the internet connection is satellite or high latency
|
||||
MAX_INSTALL_FAILURES = 3
|
||||
DATA_PIP_LOCK = "pip_lock"
|
||||
DATA_PKG_CACHE = "pkg_cache"
|
||||
DATA_INTEGRATIONS_WITH_REQS = "integrations_with_reqs"
|
||||
DATA_INSTALL_FAILURE_HISTORY = "install_failure_history"
|
||||
DATA_REQUIREMENTS_MANAGER = "requirements_manager"
|
||||
CONSTRAINT_FILE = "package_constraints.txt"
|
||||
DISCOVERY_INTEGRATIONS: dict[str, Iterable[str]] = {
|
||||
"dhcp": ("dhcp",),
|
||||
|
@ -40,7 +37,7 @@ class RequirementsNotFound(HomeAssistantError):
|
|||
|
||||
|
||||
async def async_get_integration_with_requirements(
|
||||
hass: HomeAssistant, domain: str, done: set[str] | None = None
|
||||
hass: HomeAssistant, domain: str
|
||||
) -> Integration:
|
||||
"""Get an integration with all requirements installed, including the dependencies.
|
||||
|
||||
|
@ -48,97 +45,8 @@ async def async_get_integration_with_requirements(
|
|||
is invalid, RequirementNotFound if there was some type of
|
||||
failure to install requirements.
|
||||
"""
|
||||
if done is None:
|
||||
done = {domain}
|
||||
else:
|
||||
done.add(domain)
|
||||
|
||||
integration = await async_get_integration(hass, domain)
|
||||
|
||||
if hass.config.skip_pip:
|
||||
return integration
|
||||
|
||||
if (cache := hass.data.get(DATA_INTEGRATIONS_WITH_REQS)) is None:
|
||||
cache = hass.data[DATA_INTEGRATIONS_WITH_REQS] = {}
|
||||
|
||||
int_or_evt: Integration | asyncio.Event | None | UndefinedType = cache.get(
|
||||
domain, UNDEFINED
|
||||
)
|
||||
|
||||
if isinstance(int_or_evt, asyncio.Event):
|
||||
await int_or_evt.wait()
|
||||
|
||||
# When we have waited and it's UNDEFINED, it doesn't exist
|
||||
# We don't cache that it doesn't exist, or else people can't fix it
|
||||
# and then restart, because their config will never be valid.
|
||||
if (int_or_evt := cache.get(domain, UNDEFINED)) is UNDEFINED:
|
||||
raise IntegrationNotFound(domain)
|
||||
|
||||
if int_or_evt is not UNDEFINED:
|
||||
return cast(Integration, int_or_evt)
|
||||
|
||||
event = cache[domain] = asyncio.Event()
|
||||
|
||||
try:
|
||||
await _async_process_integration(hass, integration, done)
|
||||
except Exception:
|
||||
del cache[domain]
|
||||
event.set()
|
||||
raise
|
||||
|
||||
cache[domain] = integration
|
||||
event.set()
|
||||
return integration
|
||||
|
||||
|
||||
async def _async_process_integration(
|
||||
hass: HomeAssistant, integration: Integration, done: set[str]
|
||||
) -> None:
|
||||
"""Process an integration and requirements."""
|
||||
if integration.requirements:
|
||||
await async_process_requirements(
|
||||
hass, integration.domain, integration.requirements
|
||||
)
|
||||
|
||||
deps_to_check = [
|
||||
dep
|
||||
for dep in integration.dependencies + integration.after_dependencies
|
||||
if dep not in done
|
||||
]
|
||||
|
||||
for check_domain, to_check in DISCOVERY_INTEGRATIONS.items():
|
||||
if (
|
||||
check_domain not in done
|
||||
and check_domain not in deps_to_check
|
||||
and any(check in integration.manifest for check in to_check)
|
||||
):
|
||||
deps_to_check.append(check_domain)
|
||||
|
||||
if not deps_to_check:
|
||||
return
|
||||
|
||||
results = await asyncio.gather(
|
||||
*(
|
||||
async_get_integration_with_requirements(hass, dep, done)
|
||||
for dep in deps_to_check
|
||||
),
|
||||
return_exceptions=True,
|
||||
)
|
||||
for result in results:
|
||||
if not isinstance(result, BaseException):
|
||||
continue
|
||||
if not isinstance(result, IntegrationNotFound) or not (
|
||||
not integration.is_built_in
|
||||
and result.domain in integration.after_dependencies
|
||||
):
|
||||
raise result
|
||||
|
||||
|
||||
@callback
|
||||
def async_clear_install_history(hass: HomeAssistant) -> None:
|
||||
"""Forget the install history."""
|
||||
if install_failure_history := hass.data.get(DATA_INSTALL_FAILURE_HISTORY):
|
||||
install_failure_history.clear()
|
||||
manager = _async_get_manager(hass)
|
||||
return await manager.async_get_integration_with_requirements(domain)
|
||||
|
||||
|
||||
async def async_process_requirements(
|
||||
|
@ -149,49 +57,24 @@ async def async_process_requirements(
|
|||
This method is a coroutine. It will raise RequirementsNotFound
|
||||
if an requirement can't be satisfied.
|
||||
"""
|
||||
if (pip_lock := hass.data.get(DATA_PIP_LOCK)) is None:
|
||||
pip_lock = hass.data[DATA_PIP_LOCK] = asyncio.Lock()
|
||||
install_failure_history = hass.data.get(DATA_INSTALL_FAILURE_HISTORY)
|
||||
if install_failure_history is None:
|
||||
install_failure_history = hass.data[DATA_INSTALL_FAILURE_HISTORY] = set()
|
||||
|
||||
kwargs = pip_kwargs(hass.config.config_dir)
|
||||
|
||||
async with pip_lock:
|
||||
for req in requirements:
|
||||
await _async_process_requirements(
|
||||
hass, name, req, install_failure_history, kwargs
|
||||
)
|
||||
await _async_get_manager(hass).async_process_requirements(name, requirements)
|
||||
|
||||
|
||||
async def _async_process_requirements(
|
||||
hass: HomeAssistant,
|
||||
name: str,
|
||||
req: str,
|
||||
install_failure_history: set[str],
|
||||
kwargs: Any,
|
||||
) -> None:
|
||||
"""Install a requirement and save failures."""
|
||||
if req in install_failure_history:
|
||||
_LOGGER.info(
|
||||
"Multiple attempts to install %s failed, install will be retried after next configuration check or restart",
|
||||
req,
|
||||
)
|
||||
raise RequirementsNotFound(name, [req])
|
||||
@callback
|
||||
def _async_get_manager(hass: HomeAssistant) -> RequirementsManager:
|
||||
"""Get the requirements manager."""
|
||||
if DATA_REQUIREMENTS_MANAGER in hass.data:
|
||||
manager: RequirementsManager = hass.data[DATA_REQUIREMENTS_MANAGER]
|
||||
return manager
|
||||
|
||||
if pkg_util.is_installed(req):
|
||||
return
|
||||
manager = hass.data[DATA_REQUIREMENTS_MANAGER] = RequirementsManager(hass)
|
||||
return manager
|
||||
|
||||
def _install(req: str, kwargs: dict[str, Any]) -> bool:
|
||||
"""Install requirement."""
|
||||
return pkg_util.install_package(req, **kwargs)
|
||||
|
||||
for _ in range(MAX_INSTALL_FAILURES):
|
||||
if await hass.async_add_executor_job(_install, req, kwargs):
|
||||
return
|
||||
|
||||
install_failure_history.add(req)
|
||||
raise RequirementsNotFound(name, [req])
|
||||
@callback
|
||||
def async_clear_install_history(hass: HomeAssistant) -> None:
|
||||
"""Forget the install history."""
|
||||
_async_get_manager(hass).install_failure_history.clear()
|
||||
|
||||
|
||||
def pip_kwargs(config_dir: str | None) -> dict[str, Any]:
|
||||
|
@ -207,3 +90,178 @@ def pip_kwargs(config_dir: str | None) -> dict[str, Any]:
|
|||
if not (config_dir is None or pkg_util.is_virtual_env()) and not is_docker:
|
||||
kwargs["target"] = os.path.join(config_dir, "deps")
|
||||
return kwargs
|
||||
|
||||
|
||||
def _install_with_retry(requirement: str, kwargs: dict[str, Any]) -> bool:
|
||||
"""Try to install a package up to MAX_INSTALL_FAILURES times."""
|
||||
for _ in range(MAX_INSTALL_FAILURES):
|
||||
if pkg_util.install_package(requirement, **kwargs):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _install_requirements_if_missing(
|
||||
requirements: list[str], kwargs: dict[str, Any]
|
||||
) -> tuple[set[str], set[str]]:
|
||||
"""Install requirements if missing."""
|
||||
installed: set[str] = set()
|
||||
failures: set[str] = set()
|
||||
for req in requirements:
|
||||
if pkg_util.is_installed(req) or _install_with_retry(req, kwargs):
|
||||
installed.add(req)
|
||||
continue
|
||||
failures.add(req)
|
||||
return installed, failures
|
||||
|
||||
|
||||
class RequirementsManager:
|
||||
"""Manage requirements."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant) -> None:
|
||||
"""Init the requirements manager."""
|
||||
self.hass = hass
|
||||
self.pip_lock = asyncio.Lock()
|
||||
self.integrations_with_reqs: dict[
|
||||
str, Integration | asyncio.Event | None | UndefinedType
|
||||
] = {}
|
||||
self.install_failure_history: set[str] = set()
|
||||
self.is_installed_cache: set[str] = set()
|
||||
|
||||
async def async_get_integration_with_requirements(
|
||||
self, domain: str, done: set[str] | None = None
|
||||
) -> Integration:
|
||||
"""Get an integration with all requirements installed, including the dependencies.
|
||||
|
||||
This can raise IntegrationNotFound if manifest or integration
|
||||
is invalid, RequirementNotFound if there was some type of
|
||||
failure to install requirements.
|
||||
"""
|
||||
|
||||
if done is None:
|
||||
done = {domain}
|
||||
else:
|
||||
done.add(domain)
|
||||
|
||||
integration = await async_get_integration(self.hass, domain)
|
||||
|
||||
if self.hass.config.skip_pip:
|
||||
return integration
|
||||
|
||||
cache = self.integrations_with_reqs
|
||||
int_or_evt = cache.get(domain, UNDEFINED)
|
||||
|
||||
if isinstance(int_or_evt, asyncio.Event):
|
||||
await int_or_evt.wait()
|
||||
|
||||
# When we have waited and it's UNDEFINED, it doesn't exist
|
||||
# We don't cache that it doesn't exist, or else people can't fix it
|
||||
# and then restart, because their config will never be valid.
|
||||
if (int_or_evt := cache.get(domain, UNDEFINED)) is UNDEFINED:
|
||||
raise IntegrationNotFound(domain)
|
||||
|
||||
if int_or_evt is not UNDEFINED:
|
||||
return cast(Integration, int_or_evt)
|
||||
|
||||
event = cache[domain] = asyncio.Event()
|
||||
|
||||
try:
|
||||
await self._async_process_integration(integration, done)
|
||||
except Exception:
|
||||
del cache[domain]
|
||||
event.set()
|
||||
raise
|
||||
|
||||
cache[domain] = integration
|
||||
event.set()
|
||||
return integration
|
||||
|
||||
async def _async_process_integration(
|
||||
self, integration: Integration, done: set[str]
|
||||
) -> None:
|
||||
"""Process an integration and requirements."""
|
||||
if integration.requirements:
|
||||
await self.async_process_requirements(
|
||||
integration.domain, integration.requirements
|
||||
)
|
||||
|
||||
deps_to_check = [
|
||||
dep
|
||||
for dep in integration.dependencies + integration.after_dependencies
|
||||
if dep not in done
|
||||
]
|
||||
|
||||
for check_domain, to_check in DISCOVERY_INTEGRATIONS.items():
|
||||
if (
|
||||
check_domain not in done
|
||||
and check_domain not in deps_to_check
|
||||
and any(check in integration.manifest for check in to_check)
|
||||
):
|
||||
deps_to_check.append(check_domain)
|
||||
|
||||
if not deps_to_check:
|
||||
return
|
||||
|
||||
results = await asyncio.gather(
|
||||
*(
|
||||
self.async_get_integration_with_requirements(dep, done)
|
||||
for dep in deps_to_check
|
||||
),
|
||||
return_exceptions=True,
|
||||
)
|
||||
for result in results:
|
||||
if not isinstance(result, BaseException):
|
||||
continue
|
||||
if not isinstance(result, IntegrationNotFound) or not (
|
||||
not integration.is_built_in
|
||||
and result.domain in integration.after_dependencies
|
||||
):
|
||||
raise result
|
||||
|
||||
async def async_process_requirements(
|
||||
self, name: str, requirements: list[str]
|
||||
) -> None:
|
||||
"""Install the requirements for a component or platform.
|
||||
|
||||
This method is a coroutine. It will raise RequirementsNotFound
|
||||
if an requirement can't be satisfied.
|
||||
"""
|
||||
if not (missing := self._find_missing_requirements(requirements)):
|
||||
return
|
||||
self._raise_for_failed_requirements(name, missing)
|
||||
|
||||
async with self.pip_lock:
|
||||
# Recaculate missing again now that we have the lock
|
||||
await self._async_process_requirements(
|
||||
name, self._find_missing_requirements(requirements)
|
||||
)
|
||||
|
||||
def _find_missing_requirements(self, requirements: list[str]) -> list[str]:
|
||||
"""Find requirements that are missing in the cache."""
|
||||
return [req for req in requirements if req not in self.is_installed_cache]
|
||||
|
||||
def _raise_for_failed_requirements(
|
||||
self, integration: str, missing: list[str]
|
||||
) -> None:
|
||||
"""Raise RequirementsNotFound so we do not keep trying requirements that have already failed."""
|
||||
for req in missing:
|
||||
if req in self.install_failure_history:
|
||||
_LOGGER.info(
|
||||
"Multiple attempts to install %s failed, install will be retried after next configuration check or restart",
|
||||
req,
|
||||
)
|
||||
raise RequirementsNotFound(integration, [req])
|
||||
|
||||
async def _async_process_requirements(
|
||||
self,
|
||||
name: str,
|
||||
requirements: list[str],
|
||||
) -> None:
|
||||
"""Install a requirement and save failures."""
|
||||
kwargs = pip_kwargs(self.hass.config.config_dir)
|
||||
installed, failures = await self.hass.async_add_executor_job(
|
||||
_install_requirements_if_missing, requirements, kwargs
|
||||
)
|
||||
self.is_installed_cache |= installed
|
||||
self.install_failure_history |= failures
|
||||
if failures:
|
||||
raise RequirementsNotFound(name, list(failures))
|
||||
|
|
|
@ -213,16 +213,9 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha
|
|||
assert integration
|
||||
assert integration.domain == "test_component"
|
||||
|
||||
assert len(mock_is_installed.mock_calls) == 1
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [
|
||||
"test-comp==1.0.0",
|
||||
]
|
||||
|
||||
assert len(mock_is_installed.mock_calls) == 0
|
||||
# On another attempt we remember failures and don't try again
|
||||
assert len(mock_inst.mock_calls) == 1
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [
|
||||
"test-comp==1.0.0"
|
||||
]
|
||||
assert len(mock_inst.mock_calls) == 0
|
||||
|
||||
# Now clear the history and so we try again
|
||||
async_clear_install_history(hass)
|
||||
|
@ -239,14 +232,13 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha
|
|||
assert integration
|
||||
assert integration.domain == "test_component"
|
||||
|
||||
assert len(mock_is_installed.mock_calls) == 3
|
||||
assert len(mock_is_installed.mock_calls) == 2
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [
|
||||
"test-comp-after-dep==1.0.0",
|
||||
"test-comp-dep==1.0.0",
|
||||
"test-comp==1.0.0",
|
||||
]
|
||||
|
||||
assert len(mock_inst.mock_calls) == 7
|
||||
assert len(mock_inst.mock_calls) == 6
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [
|
||||
"test-comp-after-dep==1.0.0",
|
||||
"test-comp-after-dep==1.0.0",
|
||||
|
@ -254,7 +246,6 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha
|
|||
"test-comp-dep==1.0.0",
|
||||
"test-comp-dep==1.0.0",
|
||||
"test-comp-dep==1.0.0",
|
||||
"test-comp==1.0.0",
|
||||
]
|
||||
|
||||
# Now clear the history and mock success
|
||||
|
@ -272,18 +263,16 @@ async def test_get_integration_with_requirements_pip_install_fails_two_passes(ha
|
|||
assert integration
|
||||
assert integration.domain == "test_component"
|
||||
|
||||
assert len(mock_is_installed.mock_calls) == 3
|
||||
assert len(mock_is_installed.mock_calls) == 2
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_is_installed.mock_calls) == [
|
||||
"test-comp-after-dep==1.0.0",
|
||||
"test-comp-dep==1.0.0",
|
||||
"test-comp==1.0.0",
|
||||
]
|
||||
|
||||
assert len(mock_inst.mock_calls) == 3
|
||||
assert len(mock_inst.mock_calls) == 2
|
||||
assert sorted(mock_call[1][0] for mock_call in mock_inst.mock_calls) == [
|
||||
"test-comp-after-dep==1.0.0",
|
||||
"test-comp-dep==1.0.0",
|
||||
"test-comp==1.0.0",
|
||||
]
|
||||
|
||||
|
||||
|
@ -408,12 +397,12 @@ async def test_discovery_requirements_mqtt(hass):
|
|||
hass, MockModule("mqtt_comp", partial_manifest={"mqtt": ["foo/discovery"]})
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.requirements.async_process_requirements",
|
||||
"homeassistant.requirements.RequirementsManager.async_process_requirements",
|
||||
) as mock_process:
|
||||
await async_get_integration_with_requirements(hass, "mqtt_comp")
|
||||
|
||||
assert len(mock_process.mock_calls) == 2 # mqtt also depends on http
|
||||
assert mock_process.mock_calls[0][1][2] == mqtt.requirements
|
||||
assert mock_process.mock_calls[0][1][1] == mqtt.requirements
|
||||
|
||||
|
||||
async def test_discovery_requirements_ssdp(hass):
|
||||
|
@ -425,17 +414,17 @@ async def test_discovery_requirements_ssdp(hass):
|
|||
hass, MockModule("ssdp_comp", partial_manifest={"ssdp": [{"st": "roku:ecp"}]})
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.requirements.async_process_requirements",
|
||||
"homeassistant.requirements.RequirementsManager.async_process_requirements",
|
||||
) as mock_process:
|
||||
await async_get_integration_with_requirements(hass, "ssdp_comp")
|
||||
|
||||
assert len(mock_process.mock_calls) == 4
|
||||
assert mock_process.mock_calls[0][1][2] == ssdp.requirements
|
||||
assert mock_process.mock_calls[0][1][1] == ssdp.requirements
|
||||
# Ensure zeroconf is a dep for ssdp
|
||||
assert {
|
||||
mock_process.mock_calls[1][1][1],
|
||||
mock_process.mock_calls[2][1][1],
|
||||
mock_process.mock_calls[3][1][1],
|
||||
mock_process.mock_calls[1][1][0],
|
||||
mock_process.mock_calls[2][1][0],
|
||||
mock_process.mock_calls[3][1][0],
|
||||
} == {"network", "zeroconf", "http"}
|
||||
|
||||
|
||||
|
@ -454,12 +443,12 @@ async def test_discovery_requirements_zeroconf(hass, partial_manifest):
|
|||
)
|
||||
|
||||
with patch(
|
||||
"homeassistant.requirements.async_process_requirements",
|
||||
"homeassistant.requirements.RequirementsManager.async_process_requirements",
|
||||
) as mock_process:
|
||||
await async_get_integration_with_requirements(hass, "comp")
|
||||
|
||||
assert len(mock_process.mock_calls) == 3 # zeroconf also depends on http
|
||||
assert mock_process.mock_calls[0][1][2] == zeroconf.requirements
|
||||
assert mock_process.mock_calls[0][1][1] == zeroconf.requirements
|
||||
|
||||
|
||||
async def test_discovery_requirements_dhcp(hass):
|
||||
|
@ -477,9 +466,9 @@ async def test_discovery_requirements_dhcp(hass):
|
|||
),
|
||||
)
|
||||
with patch(
|
||||
"homeassistant.requirements.async_process_requirements",
|
||||
"homeassistant.requirements.RequirementsManager.async_process_requirements",
|
||||
) as mock_process:
|
||||
await async_get_integration_with_requirements(hass, "comp")
|
||||
|
||||
assert len(mock_process.mock_calls) == 1 # dhcp does not depend on http
|
||||
assert mock_process.mock_calls[0][1][2] == dhcp.requirements
|
||||
assert mock_process.mock_calls[0][1][1] == dhcp.requirements
|
||||
|
|
Loading…
Reference in a new issue