Cast Integration Cleanup (#13275)

* Cast Integration Cleanup

* Fix long line

* Fixes and logging

* Fix tests

* Lint

* Report unknown state with None

* Lint

* Switch to async_add_job

Gets rid of those pesky "Setup of platform cast is taking over 10 seconds." messages.

* Re-introduce PlatformNotReady

* Add tests

* Remove unnecessary checks

* Test PlatformNotReady

* Fix async in sync context

* Blocking update

It's not using async anyway

* Upgrade pychromecast to 2.1.0

* Make reviewing easier

I like "protected" access, but I like reviewing more :)

* Make reviewing even easier :)

* Comment tests
This commit is contained in:
Otto Winter 2018-03-23 22:02:52 +01:00 committed by Paulus Schoutsen
parent 630734ca15
commit 6a625bdb37
3 changed files with 601 additions and 331 deletions

View file

@ -7,8 +7,10 @@ https://home-assistant.io/components/media_player.cast/
# pylint: disable=import-error
import logging
import threading
from typing import Optional, Tuple
import voluptuous as vol
import attr
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.typing import HomeAssistantType, ConfigType
@ -22,11 +24,11 @@ from homeassistant.components.media_player import (
SUPPORT_STOP, SUPPORT_PLAY, MediaPlayerDevice, PLATFORM_SCHEMA)
from homeassistant.const import (
CONF_HOST, STATE_IDLE, STATE_OFF, STATE_PAUSED, STATE_PLAYING,
STATE_UNKNOWN, EVENT_HOMEASSISTANT_STOP)
EVENT_HOMEASSISTANT_STOP)
import homeassistant.helpers.config_validation as cv
import homeassistant.util.dt as dt_util
REQUIREMENTS = ['pychromecast==2.0.0']
REQUIREMENTS = ['pychromecast==2.1.0']
_LOGGER = logging.getLogger(__name__)
@ -39,23 +41,103 @@ SUPPORT_CAST = SUPPORT_PAUSE | SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_TURN_ON | SUPPORT_TURN_OFF | SUPPORT_PREVIOUS_TRACK | \
SUPPORT_NEXT_TRACK | SUPPORT_PLAY_MEDIA | SUPPORT_STOP | SUPPORT_PLAY
# Stores a threading.Lock that is held by the internal pychromecast discovery.
INTERNAL_DISCOVERY_RUNNING_KEY = 'cast_discovery_running'
# UUID -> CastDevice mapping; cast devices without UUID are not stored
# Stores all ChromecastInfo we encountered through discovery or config as a set
# If we find a chromecast with a new host, the old one will be removed again.
KNOWN_CHROMECAST_INFO_KEY = 'cast_known_chromecasts'
# Stores UUIDs of cast devices that were added as entities. Doesn't store
# None UUIDs.
ADDED_CAST_DEVICES_KEY = 'cast_added_cast_devices'
# Stores every discovered (host, port, uuid)
KNOWN_CHROMECASTS_KEY = 'cast_all_chromecasts'
# Dispatcher signal fired with a ChromecastInfo every time we discover a new
# Chromecast or receive it through configuration
SIGNAL_CAST_DISCOVERED = 'cast_discovered'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_HOST): cv.string,
vol.Optional(CONF_IGNORE_CEC): [cv.string],
vol.Optional(CONF_IGNORE_CEC, default=[]): vol.All(cv.ensure_list,
[cv.string])
})
@attr.s(slots=True, frozen=True)
class ChromecastInfo(object):
"""Class to hold all data about a chromecast for creating connections.
This also has the same attributes as the mDNS fields by zeroconf.
"""
host = attr.ib(type=str)
port = attr.ib(type=int)
uuid = attr.ib(type=Optional[str], converter=attr.converters.optional(str),
default=None) # always convert UUID to string if not None
model_name = attr.ib(type=str, default='') # needed for cast type
friendly_name = attr.ib(type=Optional[str], default=None)
@property
def is_audio_group(self) -> bool:
"""Return if this is an audio group."""
return self.port != DEFAULT_PORT
@property
def is_information_complete(self) -> bool:
"""Return if all information is filled out."""
return all(attr.astuple(self))
@property
def host_port(self) -> Tuple[str, int]:
"""Return the host+port tuple."""
return self.host, self.port
def _fill_out_missing_chromecast_info(info: ChromecastInfo) -> ChromecastInfo:
"""Fill out missing attributes of ChromecastInfo using blocking HTTP."""
if info.is_information_complete or info.is_audio_group:
# We have all information, no need to check HTTP API. Or this is an
# audio group, so checking via HTTP won't give us any new information.
return info
# Fill out missing information via HTTP dial.
from pychromecast import dial
http_device_status = dial.get_device_status(info.host)
if http_device_status is None:
# HTTP dial didn't give us any new information.
return info
return ChromecastInfo(
host=info.host, port=info.port,
uuid=(info.uuid or http_device_status.uuid),
friendly_name=(info.friendly_name or http_device_status.friendly_name),
model_name=(info.model_name or http_device_status.model_name)
)
def _discover_chromecast(hass: HomeAssistantType, info: ChromecastInfo):
if info in hass.data[KNOWN_CHROMECAST_INFO_KEY]:
_LOGGER.debug("Discovered previous chromecast %s", info)
return
# Either discovered completely new chromecast or a "moved" one.
info = _fill_out_missing_chromecast_info(info)
_LOGGER.debug("Discovered chromecast %s", info)
if info.uuid is not None:
# Remove previous cast infos with same uuid from known chromecasts.
same_uuid = set(x for x in hass.data[KNOWN_CHROMECAST_INFO_KEY]
if info.uuid == x.uuid)
hass.data[KNOWN_CHROMECAST_INFO_KEY] -= same_uuid
hass.data[KNOWN_CHROMECAST_INFO_KEY].add(info)
dispatcher_send(hass, SIGNAL_CAST_DISCOVERED, info)
def _setup_internal_discovery(hass: HomeAssistantType) -> None:
"""Set up the pychromecast internal discovery."""
hass.data.setdefault(INTERNAL_DISCOVERY_RUNNING_KEY, threading.Lock())
if INTERNAL_DISCOVERY_RUNNING_KEY not in hass.data:
hass.data[INTERNAL_DISCOVERY_RUNNING_KEY] = threading.Lock()
if not hass.data[INTERNAL_DISCOVERY_RUNNING_KEY].acquire(blocking=False):
# Internal discovery is already running
return
@ -65,30 +147,14 @@ def _setup_internal_discovery(hass: HomeAssistantType) -> None:
def internal_callback(name):
"""Called when zeroconf has discovered a new chromecast."""
mdns = listener.services[name]
ip_address, port, uuid, _, _ = mdns
key = (ip_address, port, uuid)
if key in hass.data[KNOWN_CHROMECASTS_KEY]:
_LOGGER.debug("Discovered previous chromecast %s", mdns)
return
_LOGGER.debug("Discovered new chromecast %s", mdns)
try:
# pylint: disable=protected-access
chromecast = pychromecast._get_chromecast_from_host(
mdns, blocking=True)
except pychromecast.ChromecastConnectionError:
_LOGGER.debug("Can't set up cast with mDNS info %s. "
"Assuming it's not a Chromecast", mdns)
return
hass.data[KNOWN_CHROMECASTS_KEY][key] = chromecast
dispatcher_send(hass, SIGNAL_CAST_DISCOVERED, chromecast)
_discover_chromecast(hass, ChromecastInfo(*mdns))
_LOGGER.debug("Starting internal pychromecast discovery.")
listener, browser = pychromecast.start_discovery(internal_callback)
def stop_discovery(event):
"""Stop discovery of new chromecasts."""
_LOGGER.debug("Stopping internal pychromecast discovery.")
pychromecast.stop_discovery(browser)
hass.data[INTERNAL_DISCOVERY_RUNNING_KEY].release()
@ -96,40 +162,26 @@ def _setup_internal_discovery(hass: HomeAssistantType) -> None:
@callback
def _async_create_cast_device(hass, chromecast):
def _async_create_cast_device(hass: HomeAssistantType,
info: ChromecastInfo):
"""Create a CastDevice Entity from the chromecast object.
Returns None if the cast device has already been added. Additionally,
automatically updates existing chromecast entities.
Returns None if the cast device has already been added.
"""
if chromecast.uuid is None:
if info.uuid is None:
# Found a cast without UUID, we don't store it because we won't be able
# to update it anyway.
return CastDevice(chromecast)
return CastDevice(info)
# Found a cast with UUID
added_casts = hass.data[ADDED_CAST_DEVICES_KEY]
old_cast_device = added_casts.get(chromecast.uuid)
if old_cast_device is None:
# -> New cast device
cast_device = CastDevice(chromecast)
added_casts[chromecast.uuid] = cast_device
return cast_device
old_key = (old_cast_device.cast.host,
old_cast_device.cast.port,
old_cast_device.cast.uuid)
new_key = (chromecast.host, chromecast.port, chromecast.uuid)
if old_key == new_key:
# Re-discovered with same data, ignore
if info.uuid in added_casts:
# Already added this one, the entity will take care of moved hosts
# itself
return None
# -> Cast device changed host
# Remove old pychromecast.Chromecast from global list, because it isn't
# valid anymore
old_cast_device.async_set_chromecast(chromecast)
return None
# -> New cast device
added_casts.add(info.uuid)
return CastDevice(info)
async def async_setup_platform(hass: HomeAssistantType, config: ConfigType,
@ -139,98 +191,308 @@ async def async_setup_platform(hass: HomeAssistantType, config: ConfigType,
# Import CEC IGNORE attributes
pychromecast.IGNORE_CEC += config.get(CONF_IGNORE_CEC, [])
hass.data.setdefault(ADDED_CAST_DEVICES_KEY, {})
hass.data.setdefault(KNOWN_CHROMECASTS_KEY, {})
hass.data.setdefault(ADDED_CAST_DEVICES_KEY, set())
hass.data.setdefault(KNOWN_CHROMECAST_INFO_KEY, set())
# None -> use discovery; (host, port) -> manually specify chromecast.
want_host = None
if discovery_info:
want_host = (discovery_info.get('host'), discovery_info.get('port'))
info = None
if discovery_info is not None:
info = ChromecastInfo(host=discovery_info['host'],
port=discovery_info['port'])
elif CONF_HOST in config:
want_host = (config.get(CONF_HOST), DEFAULT_PORT)
info = ChromecastInfo(host=config[CONF_HOST],
port=DEFAULT_PORT)
enable_discovery = False
if want_host is None:
# We were explicitly told to enable pychromecast discovery.
enable_discovery = True
elif want_host[1] != DEFAULT_PORT:
# We're trying to add a group, so we have to use pychromecast's
# discovery to get the correct friendly name.
enable_discovery = True
@callback
def async_cast_discovered(discover: ChromecastInfo) -> None:
"""Callback for when a new chromecast is discovered."""
if info is not None and info.host_port != discover.host_port:
# Not our requested cast device.
return
if enable_discovery:
@callback
def async_cast_discovered(chromecast):
"""Callback for when a new chromecast is discovered."""
if want_host is not None and \
(chromecast.host, chromecast.port) != want_host:
return # for groups, only add requested device
cast_device = _async_create_cast_device(hass, chromecast)
cast_device = _async_create_cast_device(hass, discover)
if cast_device is not None:
async_add_devices([cast_device])
if cast_device is not None:
async_add_devices([cast_device])
async_dispatcher_connect(hass, SIGNAL_CAST_DISCOVERED,
async_cast_discovered)
# Re-play the callback for all past chromecasts, store the objects in
# a list to avoid concurrent modification resulting in exception.
for chromecast in list(hass.data[KNOWN_CHROMECASTS_KEY].values()):
async_cast_discovered(chromecast)
async_dispatcher_connect(hass, SIGNAL_CAST_DISCOVERED,
async_cast_discovered)
# Re-play the callback for all past chromecasts, store the objects in
# a list to avoid concurrent modification resulting in exception.
for chromecast in list(hass.data[KNOWN_CHROMECAST_INFO_KEY]):
async_cast_discovered(chromecast)
if info is None or info.is_audio_group:
# If we were a) explicitly told to enable discovery or
# b) have an audio group cast device, we need internal discovery.
hass.async_add_job(_setup_internal_discovery, hass)
else:
# Manually add a "normal" Chromecast, we can do that without discovery.
try:
chromecast = await hass.async_add_job(
pychromecast.Chromecast, *want_host)
except pychromecast.ChromecastConnectionError as err:
_LOGGER.warning("Can't set up chromecast on %s: %s",
want_host[0], err)
info = await hass.async_add_job(_fill_out_missing_chromecast_info,
info)
if info.friendly_name is None:
# HTTP dial failed, so we won't be able to connect.
raise PlatformNotReady
key = (chromecast.host, chromecast.port, chromecast.uuid)
cast_device = _async_create_cast_device(hass, chromecast)
if cast_device is not None:
hass.data[KNOWN_CHROMECASTS_KEY][key] = chromecast
async_add_devices([cast_device])
hass.async_add_job(_discover_chromecast, hass, info)
class CastStatusListener(object):
"""Helper class to handle pychromecast status callbacks.
Necessary because a CastDevice entity can create a new socket client
and therefore callbacks from multiple chromecast connections can
potentially arrive. This class allows invalidating past chromecast objects.
"""
def __init__(self, cast_device, chromecast):
"""Initialize the status listener."""
self._cast_device = cast_device
self._valid = True
chromecast.register_status_listener(self)
chromecast.socket_client.media_controller.register_status_listener(
self)
chromecast.register_connection_listener(self)
def new_cast_status(self, cast_status):
"""Called when a new CastStatus is received."""
if self._valid:
self._cast_device.new_cast_status(cast_status)
def new_media_status(self, media_status):
"""Called when a new MediaStatus is received."""
if self._valid:
self._cast_device.new_media_status(media_status)
def new_connection_status(self, connection_status):
"""Called when a new ConnectionStatus is received."""
if self._valid:
self._cast_device.new_connection_status(connection_status)
def invalidate(self):
"""Invalidate this status listener.
All following callbacks won't be forwarded.
"""
self._valid = False
class CastDevice(MediaPlayerDevice):
"""Representation of a Cast device on the network."""
"""Representation of a Cast device on the network.
def __init__(self, chromecast):
"""Initialize the Cast device."""
self.cast = None # type: pychromecast.Chromecast
This class is the holder of the pychromecast.Chromecast object and its
socket client. It therefore handles all reconnects and audio group changing
"elected leader" itself.
"""
def __init__(self, cast_info):
"""Initialize the cast device."""
self._cast_info = cast_info # type: ChromecastInfo
self._chromecast = None # type: Optional[pychromecast.Chromecast]
self.cast_status = None
self.media_status = None
self.media_status_received = None
self._available = False # type: bool
self._status_listener = None # type: Optional[CastStatusListener]
self.async_set_chromecast(chromecast)
async def async_added_to_hass(self):
"""Create chromecast object when added to hass."""
@callback
def async_cast_discovered(discover: ChromecastInfo):
"""Callback for changing elected leaders / IP."""
if self._cast_info.uuid is None:
# We can't handle empty UUIDs
return
if self._cast_info.uuid != discover.uuid:
# Discovered is not our device.
return
_LOGGER.debug("Discovered chromecast with same UUID: %s", discover)
self.hass.async_add_job(self.async_set_cast_info(discover))
async_dispatcher_connect(self.hass, SIGNAL_CAST_DISCOVERED,
async_cast_discovered)
self.hass.async_add_job(self.async_set_cast_info(self._cast_info))
async def async_will_remove_from_hass(self) -> None:
"""Disconnect Chromecast object when removed."""
self._async_disconnect()
if self._cast_info.uuid is not None:
# Remove the entity from the added casts so that it can dynamically
# be re-added again.
self.hass.data[ADDED_CAST_DEVICES_KEY].remove(self._cast_info.uuid)
async def async_set_cast_info(self, cast_info):
"""Set the cast information and set up the chromecast object."""
import pychromecast
old_cast_info = self._cast_info
self._cast_info = cast_info
if self._chromecast is not None:
if old_cast_info.host_port == cast_info.host_port:
# Nothing connection-related updated
return
self._async_disconnect()
# Failed connection will unfortunately never raise an exception, it
# will instead just try connecting indefinitely.
# pylint: disable=protected-access
_LOGGER.debug("Connecting to cast device %s", cast_info)
chromecast = await self.hass.async_add_job(
pychromecast._get_chromecast_from_host, attr.astuple(cast_info))
self._chromecast = chromecast
self._status_listener = CastStatusListener(self, chromecast)
# Initialise connection status as connected because we can only
# register the connection listener *after* the initial connection
# attempt. If the initial connection failed, we would never reach
# this code anyway.
self._available = True
self.cast_status = chromecast.status
self.media_status = chromecast.media_controller.status
_LOGGER.debug("Connection successful!")
self.async_schedule_update_ha_state()
@callback
def _async_disconnect(self):
"""Disconnect Chromecast object if it is set."""
if self._chromecast is None:
# Can't disconnect if not connected.
return
_LOGGER.debug("Disconnecting from previous chromecast socket.")
self._available = False
self._chromecast.disconnect(blocking=False)
# Invalidate some attributes
self._chromecast = None
self.cast_status = None
self.media_status = None
self.media_status_received = None
self._status_listener.invalidate()
self._status_listener = None
def update(self):
"""Periodically update the properties.
Even though we receive callbacks for most state changes, some 3rd party
apps don't always send them. Better poll every now and then if the
chromecast is active (i.e. an app is running).
"""
if not self._available:
# Not connected or not available.
return
if self._chromecast.media_controller.is_active:
# We can only update status if the media namespace is active
self._chromecast.media_controller.update_status()
# ========== Callbacks ==========
def new_cast_status(self, cast_status):
"""Handle updates of the cast status."""
self.cast_status = cast_status
self.schedule_update_ha_state()
def new_media_status(self, media_status):
"""Handle updates of the media status."""
self.media_status = media_status
self.media_status_received = dt_util.utcnow()
self.schedule_update_ha_state()
def new_connection_status(self, connection_status):
"""Handle updates of connection status."""
from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED
new_available = connection_status.status == CONNECTION_STATUS_CONNECTED
if new_available != self._available:
# Connection status callbacks happen often when disconnected.
# Only update state when availability changed to put less pressure
# on state machine.
_LOGGER.debug("Cast device availability changed: %s",
connection_status.status)
self._available = new_available
self.schedule_update_ha_state()
# ========== Service Calls ==========
def turn_on(self):
"""Turn on the cast device."""
import pychromecast
if not self._chromecast.is_idle:
# Already turned on
return
if self._chromecast.app_id is not None:
# Quit the previous app before starting splash screen
self._chromecast.quit_app()
# The only way we can turn the Chromecast is on is by launching an app
self._chromecast.play_media(CAST_SPLASH,
pychromecast.STREAM_TYPE_BUFFERED)
def turn_off(self):
"""Turn off the cast device."""
self._chromecast.quit_app()
def mute_volume(self, mute):
"""Mute the volume."""
self._chromecast.set_volume_muted(mute)
def set_volume_level(self, volume):
"""Set volume level, range 0..1."""
self._chromecast.set_volume(volume)
def media_play(self):
"""Send play command."""
self._chromecast.media_controller.play()
def media_pause(self):
"""Send pause command."""
self._chromecast.media_controller.pause()
def media_stop(self):
"""Send stop command."""
self._chromecast.media_controller.stop()
def media_previous_track(self):
"""Send previous track command."""
self._chromecast.media_controller.rewind()
def media_next_track(self):
"""Send next track command."""
self._chromecast.media_controller.skip()
def media_seek(self, position):
"""Seek the media to a specific location."""
self._chromecast.media_controller.seek(position)
def play_media(self, media_type, media_id, **kwargs):
"""Play media from a URL."""
self._chromecast.media_controller.play_media(media_id, media_type)
# ========== Properties ==========
@property
def should_poll(self):
"""No polling needed."""
return False
"""Polling needed for cast integration, see async_update."""
return True
@property
def name(self):
"""Return the name of the device."""
return self.cast.device.friendly_name
return self._cast_info.friendly_name
# MediaPlayerDevice properties and methods
@property
def state(self):
"""Return the state of the player."""
if self.media_status is None:
return STATE_UNKNOWN
return None
elif self.media_status.player_is_playing:
return STATE_PLAYING
elif self.media_status.player_is_paused:
return STATE_PAUSED
elif self.media_status.player_is_idle:
return STATE_IDLE
elif self.cast.is_idle:
elif self._chromecast is not None and self._chromecast.is_idle:
return STATE_OFF
return STATE_UNKNOWN
return None
@property
def available(self):
"""Return True if the cast device is connected."""
return self._available
@property
def volume_level(self):
@ -318,12 +580,12 @@ class CastDevice(MediaPlayerDevice):
@property
def app_id(self):
"""Return the ID of the current running app."""
return self.cast.app_id
return self._chromecast.app_id if self._chromecast else None
@property
def app_name(self):
"""Name of the current running app."""
return self.cast.app_display_name
return self._chromecast.app_display_name if self._chromecast else None
@property
def supported_features(self):
@ -349,101 +611,7 @@ class CastDevice(MediaPlayerDevice):
"""
return self.media_status_received
def turn_on(self):
"""Turn on the ChromeCast."""
# The only way we can turn the Chromecast is on is by launching an app
if not self.cast.status or not self.cast.status.is_active_input:
import pychromecast
if self.cast.app_id:
self.cast.quit_app()
self.cast.play_media(
CAST_SPLASH, pychromecast.STREAM_TYPE_BUFFERED)
def turn_off(self):
"""Turn Chromecast off."""
self.cast.quit_app()
def mute_volume(self, mute):
"""Mute the volume."""
self.cast.set_volume_muted(mute)
def set_volume_level(self, volume):
"""Set volume level, range 0..1."""
self.cast.set_volume(volume)
def media_play(self):
"""Send play command."""
self.cast.media_controller.play()
def media_pause(self):
"""Send pause command."""
self.cast.media_controller.pause()
def media_stop(self):
"""Send stop command."""
self.cast.media_controller.stop()
def media_previous_track(self):
"""Send previous track command."""
self.cast.media_controller.rewind()
def media_next_track(self):
"""Send next track command."""
self.cast.media_controller.skip()
def media_seek(self, position):
"""Seek the media to a specific location."""
self.cast.media_controller.seek(position)
def play_media(self, media_type, media_id, **kwargs):
"""Play media from a URL."""
self.cast.media_controller.play_media(media_id, media_type)
# Implementation of chromecast status_listener methods
def new_cast_status(self, status):
"""Handle updates of the cast status."""
self.cast_status = status
self.schedule_update_ha_state()
def new_media_status(self, status):
"""Handle updates of the media status."""
self.media_status = status
self.media_status_received = dt_util.utcnow()
self.schedule_update_ha_state()
@property
def unique_id(self) -> str:
def unique_id(self) -> Optional[str]:
"""Return a unique ID."""
if self.cast.uuid is not None:
return str(self.cast.uuid)
return None
@callback
def async_set_chromecast(self, chromecast):
"""Set the internal Chromecast object and disconnect the previous."""
self._async_disconnect()
self.cast = chromecast
self.cast.socket_client.receiver_controller.register_status_listener(
self)
self.cast.socket_client.media_controller.register_status_listener(self)
self.cast_status = self.cast.status
self.media_status = self.cast.media_controller.status
async def async_will_remove_from_hass(self) -> None:
"""Disconnect Chromecast object when removed."""
self._async_disconnect()
@callback
def _async_disconnect(self):
"""Disconnect Chromecast object if it is set."""
if self.cast is None:
return
_LOGGER.debug("Disconnecting existing chromecast object")
old_key = (self.cast.host, self.cast.port, self.cast.uuid)
self.hass.data[KNOWN_CHROMECASTS_KEY].pop(old_key)
self.cast.disconnect(blocking=False)
return self._cast_info.uuid

View file

@ -685,7 +685,7 @@ pybbox==0.0.5-alpha
pychannels==1.0.0
# homeassistant.components.media_player.cast
pychromecast==2.0.0
pychromecast==2.1.0
# homeassistant.components.media_player.cmus
pycmus==0.1.0

View file

@ -5,12 +5,17 @@ from typing import Optional
from unittest.mock import patch, MagicMock, Mock
from uuid import UUID
import attr
import pytest
from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.components.media_player.cast import ChromecastInfo
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.dispatcher import async_dispatcher_connect, \
async_dispatcher_send
from homeassistant.components.media_player import cast
from homeassistant.setup import async_setup_component
@pytest.fixture(autouse=True)
@ -26,57 +31,74 @@ def cast_mock():
FakeUUID = UUID('57355bce-9364-4aa6-ac1e-eb849dccf9e2')
def get_fake_chromecast(host='192.168.178.42', port=8009,
uuid: Optional[UUID] = FakeUUID):
def get_fake_chromecast(info: ChromecastInfo):
"""Generate a Fake Chromecast object with the specified arguments."""
return MagicMock(host=host, port=port, uuid=uuid)
mock = MagicMock(host=info.host, port=info.port, uuid=info.uuid)
mock.media_controller.status = None
return mock
@asyncio.coroutine
def async_setup_cast(hass, config=None, discovery_info=None):
def get_fake_chromecast_info(host='192.168.178.42', port=8009,
uuid: Optional[UUID] = FakeUUID):
"""Generate a Fake ChromecastInfo with the specified arguments."""
return ChromecastInfo(host=host, port=port, uuid=uuid,
friendly_name="Speaker")
async def async_setup_cast(hass, config=None, discovery_info=None):
"""Helper to setup the cast platform."""
if config is None:
config = {}
add_devices = Mock()
yield from cast.async_setup_platform(hass, config, add_devices,
discovery_info=discovery_info)
yield from hass.async_block_till_done()
await cast.async_setup_platform(hass, config, add_devices,
discovery_info=discovery_info)
await hass.async_block_till_done()
return add_devices
@asyncio.coroutine
def async_setup_cast_internal_discovery(hass, config=None,
discovery_info=None,
no_from_host_patch=False):
async def async_setup_cast_internal_discovery(hass, config=None,
discovery_info=None):
"""Setup the cast platform and the discovery."""
listener = MagicMock(services={})
with patch('pychromecast.start_discovery',
return_value=(listener, None)) as start_discovery:
add_devices = yield from async_setup_cast(hass, config, discovery_info)
yield from hass.async_block_till_done()
yield from hass.async_block_till_done()
add_devices = await async_setup_cast(hass, config, discovery_info)
await hass.async_block_till_done()
await hass.async_block_till_done()
assert start_discovery.call_count == 1
discovery_callback = start_discovery.call_args[0][0]
def discover_chromecast(service_name, chromecast):
def discover_chromecast(service_name: str, info: ChromecastInfo) -> None:
"""Discover a chromecast device."""
listener.services[service_name] = (
chromecast.host, chromecast.port, chromecast.uuid, None, None)
if no_from_host_patch:
discovery_callback(service_name)
else:
with patch('pychromecast._get_chromecast_from_host',
return_value=chromecast):
discovery_callback(service_name)
listener.services[service_name] = attr.astuple(info)
discovery_callback(service_name)
return discover_chromecast, add_devices
async def async_setup_media_player_cast(hass: HomeAssistantType,
info: ChromecastInfo):
"""Setup the cast platform with async_setup_component."""
chromecast = get_fake_chromecast(info)
cast.CastStatusListener = MagicMock()
with patch('pychromecast._get_chromecast_from_host',
return_value=chromecast) as get_chromecast:
await async_setup_component(hass, 'media_player', {
'media_player': {'platform': 'cast', 'host': info.host}})
await hass.async_block_till_done()
assert get_chromecast.call_count == 1
assert cast.CastStatusListener.call_count == 1
entity = cast.CastStatusListener.call_args[0][0]
return chromecast, entity
@asyncio.coroutine
def test_start_discovery_called_once(hass):
"""Test pychromecast.start_discovery called exactly once."""
@ -95,11 +117,13 @@ def test_stop_discovery_called_on_stop(hass):
"""Test pychromecast.stop_discovery called on shutdown."""
with patch('pychromecast.start_discovery',
return_value=(None, 'the-browser')) as start_discovery:
yield from async_setup_cast(hass)
# start_discovery should be called with empty config
yield from async_setup_cast(hass, {})
assert start_discovery.call_count == 1
with patch('pychromecast.stop_discovery') as stop_discovery:
# stop discovery should be called on shutdown
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
yield from hass.async_block_till_done()
@ -107,145 +131,223 @@ def test_stop_discovery_called_on_stop(hass):
with patch('pychromecast.start_discovery',
return_value=(None, 'the-browser')) as start_discovery:
# start_discovery should be called again on re-startup
yield from async_setup_cast(hass)
assert start_discovery.call_count == 1
@asyncio.coroutine
def test_internal_discovery_callback_only_generates_once(hass):
"""Test _get_chromecast_from_host only called once per device."""
discover_cast, _ = yield from async_setup_cast_internal_discovery(
hass, no_from_host_patch=True)
chromecast = get_fake_chromecast()
async def test_internal_discovery_callback_only_generates_once(hass):
"""Test discovery only called once per device."""
discover_cast, _ = await async_setup_cast_internal_discovery(hass)
info = get_fake_chromecast_info()
with patch('pychromecast._get_chromecast_from_host',
return_value=chromecast) as gen_chromecast:
discover_cast('the-service', chromecast)
mdns = (chromecast.host, chromecast.port, chromecast.uuid, None, None)
gen_chromecast.assert_called_once_with(mdns, blocking=True)
signal = MagicMock()
async_dispatcher_connect(hass, 'cast_discovered', signal)
discover_cast('the-service', chromecast)
gen_chromecast.reset_mock()
assert gen_chromecast.call_count == 0
@asyncio.coroutine
def test_internal_discovery_callback_calls_dispatcher(hass):
"""Test internal discovery calls dispatcher."""
discover_cast, _ = yield from async_setup_cast_internal_discovery(hass)
chromecast = get_fake_chromecast()
with patch('pychromecast._get_chromecast_from_host',
return_value=chromecast):
signal = MagicMock()
async_dispatcher_connect(hass, 'cast_discovered', signal)
discover_cast('the-service', chromecast)
yield from hass.async_block_till_done()
signal.assert_called_once_with(chromecast)
@asyncio.coroutine
def test_internal_discovery_callback_with_connection_error(hass):
"""Test internal discovery not calling dispatcher on ConnectionError."""
import pychromecast # imports mock pychromecast
pychromecast.ChromecastConnectionError = IOError
discover_cast, _ = yield from async_setup_cast_internal_discovery(
hass, no_from_host_patch=True)
chromecast = get_fake_chromecast()
with patch('pychromecast._get_chromecast_from_host',
side_effect=pychromecast.ChromecastConnectionError):
signal = MagicMock()
async_dispatcher_connect(hass, 'cast_discovered', signal)
discover_cast('the-service', chromecast)
yield from hass.async_block_till_done()
with patch('pychromecast.dial.get_device_status', return_value=None):
# discovering a cast device should call the dispatcher
discover_cast('the-service', info)
await hass.async_block_till_done()
discover = signal.mock_calls[0][1][0]
# attr's __eq__ somehow breaks here, use tuples instead
assert attr.astuple(discover) == attr.astuple(info)
signal.reset_mock()
# discovering it a second time shouldn't
discover_cast('the-service', info)
await hass.async_block_till_done()
assert signal.call_count == 0
def test_create_cast_device_without_uuid(hass):
"""Test create a cast device without a UUID."""
chromecast = get_fake_chromecast(uuid=None)
cast_device = cast._async_create_cast_device(hass, chromecast)
assert cast_device is not None
def test_create_cast_device_with_uuid(hass):
"""Test create cast devices with UUID."""
added_casts = hass.data[cast.ADDED_CAST_DEVICES_KEY] = {}
chromecast = get_fake_chromecast()
cast_device = cast._async_create_cast_device(hass, chromecast)
assert cast_device is not None
assert chromecast.uuid in added_casts
with patch.object(cast_device, 'async_set_chromecast') as mock_set:
assert cast._async_create_cast_device(hass, chromecast) is None
assert mock_set.call_count == 0
chromecast = get_fake_chromecast(host='192.168.178.1')
assert cast._async_create_cast_device(hass, chromecast) is None
assert mock_set.call_count == 1
mock_set.assert_called_once_with(chromecast)
@asyncio.coroutine
def test_normal_chromecast_not_starting_discovery(hass):
"""Test cast platform not starting discovery when not required."""
async def test_internal_discovery_callback_fill_out(hass):
"""Test internal discovery automatically filling out information."""
import pychromecast # imports mock pychromecast
pychromecast.ChromecastConnectionError = IOError
chromecast = get_fake_chromecast()
discover_cast, _ = await async_setup_cast_internal_discovery(hass)
info = get_fake_chromecast_info(uuid=None)
full_info = attr.evolve(info, model_name='google home',
friendly_name='Speaker', uuid=FakeUUID)
with patch('pychromecast.Chromecast', return_value=chromecast):
add_devices = yield from async_setup_cast(hass, {'host': 'host1'})
with patch('pychromecast.dial.get_device_status',
return_value=full_info):
signal = MagicMock()
async_dispatcher_connect(hass, 'cast_discovered', signal)
discover_cast('the-service', info)
await hass.async_block_till_done()
# when called with incomplete info, it should use HTTP to get missing
discover = signal.mock_calls[0][1][0]
# attr's __eq__ somehow breaks here, use tuples instead
assert attr.astuple(discover) == attr.astuple(full_info)
async def test_create_cast_device_without_uuid(hass):
"""Test create a cast device with no UUId should still create an entity."""
info = get_fake_chromecast_info(uuid=None)
cast_device = cast._async_create_cast_device(hass, info)
assert cast_device is not None
async def test_create_cast_device_with_uuid(hass):
"""Test create cast devices with UUID creates entities."""
added_casts = hass.data[cast.ADDED_CAST_DEVICES_KEY] = set()
info = get_fake_chromecast_info()
cast_device = cast._async_create_cast_device(hass, info)
assert cast_device is not None
assert info.uuid in added_casts
# Sending second time should not create new entity
cast_device = cast._async_create_cast_device(hass, info)
assert cast_device is None
async def test_normal_chromecast_not_starting_discovery(hass):
"""Test cast platform not starting discovery when not required."""
# pylint: disable=no-member
with patch('homeassistant.components.media_player.cast.'
'_setup_internal_discovery') as setup_discovery:
# normal (non-group) chromecast shouldn't start discovery.
add_devices = await async_setup_cast(hass, {'host': 'host1'})
await hass.async_block_till_done()
assert add_devices.call_count == 1
assert setup_discovery.call_count == 0
# Same entity twice
add_devices = yield from async_setup_cast(hass, {'host': 'host1'})
add_devices = await async_setup_cast(hass, {'host': 'host1'})
await hass.async_block_till_done()
assert add_devices.call_count == 0
assert setup_discovery.call_count == 0
hass.data[cast.ADDED_CAST_DEVICES_KEY] = {}
add_devices = yield from async_setup_cast(
hass.data[cast.ADDED_CAST_DEVICES_KEY] = set()
add_devices = await async_setup_cast(
hass, discovery_info={'host': 'host1', 'port': 8009})
await hass.async_block_till_done()
assert add_devices.call_count == 1
assert setup_discovery.call_count == 0
hass.data[cast.ADDED_CAST_DEVICES_KEY] = {}
add_devices = yield from async_setup_cast(
# group should start discovery.
hass.data[cast.ADDED_CAST_DEVICES_KEY] = set()
add_devices = await async_setup_cast(
hass, discovery_info={'host': 'host1', 'port': 42})
await hass.async_block_till_done()
assert add_devices.call_count == 0
assert setup_discovery.call_count == 1
with patch('pychromecast.Chromecast',
side_effect=pychromecast.ChromecastConnectionError):
async def test_normal_raises_platform_not_ready(hass):
"""Test cast platform raises PlatformNotReady if HTTP dial fails."""
with patch('pychromecast.dial.get_device_status', return_value=None):
with pytest.raises(PlatformNotReady):
yield from async_setup_cast(hass, {'host': 'host3'})
await async_setup_cast(hass, {'host': 'host1'})
@asyncio.coroutine
def test_replay_past_chromecasts(hass):
async def test_replay_past_chromecasts(hass):
"""Test cast platform re-playing past chromecasts when adding new one."""
cast_group1 = get_fake_chromecast(host='host1', port=42)
cast_group2 = get_fake_chromecast(host='host2', port=42, uuid=UUID(
cast_group1 = get_fake_chromecast_info(host='host1', port=42)
cast_group2 = get_fake_chromecast_info(host='host2', port=42, uuid=UUID(
'9462202c-e747-4af5-a66b-7dce0e1ebc09'))
discover_cast, add_dev1 = yield from async_setup_cast_internal_discovery(
discover_cast, add_dev1 = await async_setup_cast_internal_discovery(
hass, discovery_info={'host': 'host1', 'port': 42})
discover_cast('service2', cast_group2)
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert add_dev1.call_count == 0
discover_cast('service1', cast_group1)
yield from hass.async_block_till_done()
yield from hass.async_block_till_done() # having jobs that add jobs
await hass.async_block_till_done()
await hass.async_block_till_done() # having tasks that add jobs
assert add_dev1.call_count == 1
add_dev2 = yield from async_setup_cast(
add_dev2 = await async_setup_cast(
hass, discovery_info={'host': 'host2', 'port': 42})
yield from hass.async_block_till_done()
await hass.async_block_till_done()
assert add_dev2.call_count == 1
async def test_entity_media_states(hass: HomeAssistantType):
"""Test various entity media states."""
info = get_fake_chromecast_info()
full_info = attr.evolve(info, model_name='google home',
friendly_name='Speaker', uuid=FakeUUID)
with patch('pychromecast.dial.get_device_status',
return_value=full_info):
chromecast, entity = await async_setup_media_player_cast(hass, info)
state = hass.states.get('media_player.speaker')
assert state is not None
assert state.name == 'Speaker'
assert state.state == 'unknown'
assert entity.unique_id == full_info.uuid
media_status = MagicMock(images=None)
media_status.player_is_playing = True
entity.new_media_status(media_status)
await hass.async_block_till_done()
state = hass.states.get('media_player.speaker')
assert state.state == 'playing'
entity.new_media_status(media_status)
media_status.player_is_playing = False
media_status.player_is_paused = True
await hass.async_block_till_done()
state = hass.states.get('media_player.speaker')
assert state.state == 'paused'
entity.new_media_status(media_status)
media_status.player_is_paused = False
media_status.player_is_idle = True
await hass.async_block_till_done()
state = hass.states.get('media_player.speaker')
assert state.state == 'idle'
media_status.player_is_idle = False
chromecast.is_idle = True
entity.new_media_status(media_status)
await hass.async_block_till_done()
state = hass.states.get('media_player.speaker')
assert state.state == 'off'
chromecast.is_idle = False
entity.new_media_status(media_status)
await hass.async_block_till_done()
state = hass.states.get('media_player.speaker')
assert state.state == 'unknown'
async def test_switched_host(hass: HomeAssistantType):
"""Test cast device listens for changed hosts and disconnects old cast."""
info = get_fake_chromecast_info()
full_info = attr.evolve(info, model_name='google home',
friendly_name='Speaker', uuid=FakeUUID)
with patch('pychromecast.dial.get_device_status',
return_value=full_info):
chromecast, _ = await async_setup_media_player_cast(hass, full_info)
chromecast2 = get_fake_chromecast(info)
with patch('pychromecast._get_chromecast_from_host',
return_value=chromecast2) as get_chromecast:
async_dispatcher_send(hass, cast.SIGNAL_CAST_DISCOVERED, full_info)
await hass.async_block_till_done()
assert get_chromecast.call_count == 0
changed = attr.evolve(full_info, friendly_name='Speaker 2')
async_dispatcher_send(hass, cast.SIGNAL_CAST_DISCOVERED, changed)
await hass.async_block_till_done()
assert get_chromecast.call_count == 0
changed = attr.evolve(changed, host='host2')
async_dispatcher_send(hass, cast.SIGNAL_CAST_DISCOVERED, changed)
await hass.async_block_till_done()
assert get_chromecast.call_count == 1
chromecast.disconnect.assert_called_once_with(blocking=False)
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
await hass.async_block_till_done()
chromecast.disconnect.assert_called_once_with(blocking=False)