Reduce zeroconf matcher complexity (#105880)

This commit is contained in:
J. Nick Koston 2023-12-23 00:04:05 -10:00 committed by GitHub
parent 321dc3984c
commit bb30bfa225
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 42 deletions

View file

@ -33,6 +33,7 @@ from homeassistant.helpers.network import NoURLAvailableError, get_url
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import (
HomeKitDiscoveredIntegration,
ZeroconfMatcher,
async_get_homekit,
async_get_zeroconf,
bind_hass,
@ -54,9 +55,6 @@ HOMEKIT_TYPES = [
]
_HOMEKIT_MODEL_SPLITS = (None, " ", "-")
# Top level keys we support matching against in properties that are always matched in
# lower case. ex: ZeroconfServiceInfo.name
LOWER_MATCH_ATTRS = {"name"}
CONF_DEFAULT_INTERFACE = "default_interface"
CONF_IPV6 = "ipv6"
@ -74,6 +72,8 @@ MAX_PROPERTY_VALUE_LEN = 230
# Dns label max length
MAX_NAME_LEN = 63
ATTR_DOMAIN: Final = "domain"
ATTR_NAME: Final = "name"
ATTR_PROPERTIES: Final = "properties"
# Attributes for ZeroconfServiceInfo[ATTR_PROPERTIES]
@ -319,24 +319,6 @@ async def _async_register_hass_zc_service(
await aio_zc.async_register_service(info, allow_name_change=True)
def _match_against_data(
matcher: dict[str, str | dict[str, str]], match_data: dict[str, str]
) -> bool:
"""Check a matcher to ensure all values in match_data match."""
for key in LOWER_MATCH_ATTRS:
if key not in matcher:
continue
if key not in match_data:
return False
match_val = matcher[key]
if TYPE_CHECKING:
assert isinstance(match_val, str)
if not _memorized_fnmatch(match_data[key], match_val):
return False
return True
def _match_against_props(matcher: dict[str, str], props: dict[str, str | None]) -> bool:
"""Check a matcher to ensure all values in props."""
return not any(
@ -365,7 +347,7 @@ class ZeroconfDiscovery:
self,
hass: HomeAssistant,
zeroconf: HaZeroconf,
zeroconf_types: dict[str, list[dict[str, str | dict[str, str]]]],
zeroconf_types: dict[str, list[ZeroconfMatcher]],
homekit_model_lookups: dict[str, HomeKitDiscoveredIntegration],
homekit_model_matchers: dict[re.Pattern, HomeKitDiscoveredIntegration],
) -> None:
@ -496,27 +478,23 @@ class ZeroconfDiscovery:
# discover it, we can stop here.
return
match_data: dict[str, str] = {}
for key in LOWER_MATCH_ATTRS:
attr_value: str = getattr(info, key)
match_data[key] = attr_value.lower()
if not (matchers := self.zeroconf_types.get(service_type)):
return
# Not all homekit types are currently used for discovery
# so not all service type exist in zeroconf_types
for matcher in self.zeroconf_types.get(service_type, []):
for matcher in matchers:
if len(matcher) > 1:
if not _match_against_data(matcher, match_data):
if ATTR_NAME in matcher and not _memorized_fnmatch(
info.name.lower(), matcher[ATTR_NAME]
):
continue
if ATTR_PROPERTIES in matcher and not _match_against_props(
matcher[ATTR_PROPERTIES], props
):
continue
if ATTR_PROPERTIES in matcher:
matcher_props = matcher[ATTR_PROPERTIES]
if TYPE_CHECKING:
assert isinstance(matcher_props, dict)
if not _match_against_props(matcher_props, props):
continue
matcher_domain = matcher["domain"]
if TYPE_CHECKING:
assert isinstance(matcher_domain, str)
matcher_domain = matcher[ATTR_DOMAIN]
context = {
"source": config_entries.SOURCE_ZEROCONF,
}

View file

@ -131,6 +131,14 @@ class HomeKitDiscoveredIntegration:
always_discover: bool
class ZeroconfMatcher(TypedDict, total=False):
"""Matcher for zeroconf."""
domain: str
name: str
properties: dict[str, str]
class Manifest(TypedDict, total=False):
"""Integration manifest.
@ -374,7 +382,7 @@ async def async_get_application_credentials(hass: HomeAssistant) -> list[str]:
]
def async_process_zeroconf_match_dict(entry: dict[str, Any]) -> dict[str, Any]:
def async_process_zeroconf_match_dict(entry: dict[str, Any]) -> ZeroconfMatcher:
"""Handle backwards compat with zeroconf matchers."""
entry_without_type: dict[str, Any] = entry.copy()
del entry_without_type["type"]
@ -396,21 +404,21 @@ def async_process_zeroconf_match_dict(entry: dict[str, Any]) -> dict[str, Any]:
else:
prop_dict = entry_without_type["properties"]
prop_dict[moved_prop] = value.lower()
return entry_without_type
return cast(ZeroconfMatcher, entry_without_type)
async def async_get_zeroconf(
hass: HomeAssistant,
) -> dict[str, list[dict[str, str | dict[str, str]]]]:
) -> dict[str, list[ZeroconfMatcher]]:
"""Return cached list of zeroconf types."""
zeroconf: dict[str, list[dict[str, str | dict[str, str]]]] = ZEROCONF.copy() # type: ignore[assignment]
zeroconf: dict[str, list[ZeroconfMatcher]] = ZEROCONF.copy() # type: ignore[assignment]
integrations = await async_get_custom_components(hass)
for integration in integrations.values():
if not integration.zeroconf:
continue
for entry in integration.zeroconf:
data: dict[str, str | dict[str, str]] = {"domain": integration.domain}
data: ZeroconfMatcher = {"domain": integration.domain}
if isinstance(entry, dict):
typ = entry["type"]
data.update(async_process_zeroconf_match_dict(entry))