Enable type checks for stream component (#50527)

* Enable type checks for stream component

* Fix pylint
This commit is contained in:
Ruslan Sayfutdinov 2021-05-13 22:26:11 +01:00 committed by GitHub
parent e956a726a0
commit 35f304450c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 35 additions and 27 deletions

View file

@ -14,6 +14,8 @@ are no active output formats, the background worker is shut down and access
tokens are expired. Alternatively, a Stream can be configured with keepalive
to always keep workers active.
"""
from __future__ import annotations
import logging
import re
import secrets
@ -34,7 +36,7 @@ from .const import (
STREAM_RESTART_INCREMENT,
STREAM_RESTART_RESET_TIME,
)
from .core import PROVIDERS, IdleTimer
from .core import PROVIDERS, IdleTimer, StreamOutput
from .hls import async_setup_hls
_LOGGER = logging.getLogger(__name__)
@ -118,7 +120,7 @@ class Stream:
self.access_token = None
self._thread = None
self._thread_quit = threading.Event()
self._outputs = {}
self._outputs: dict[str, StreamOutput] = {}
self._fast_restart_once = False
if self.options is None:
@ -274,4 +276,4 @@ class Stream:
num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS)
# Wait for latest segment, then add the lookback
await hls.recv()
recorder.prepend(list(hls.get_segment())[-num_segments:])
recorder.prepend(list(hls.get_segments())[-num_segments:])

View file

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio
from collections import deque
import io
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Callable
from aiohttp import web
import attr
@ -95,12 +95,12 @@ class StreamOutput:
"""Initialize a stream output."""
self._hass = hass
self._idle_timer = idle_timer
self._cursor = None
self._cursor: int | None = None
self._event = asyncio.Event()
self._segments = deque(maxlen=deque_maxlen)
self._segments: deque[Segment] = deque(maxlen=deque_maxlen)
@property
def name(self) -> str:
def name(self) -> str | None:
"""Return provider name."""
return None
@ -123,19 +123,21 @@ class StreamOutput:
durations = [s.duration for s in self._segments]
return round(max(durations)) or 1
def get_segment(self, sequence: int = None) -> Any:
"""Retrieve a specific segment, or the whole list."""
def get_segment(self, sequence: int) -> Segment | None:
"""Retrieve a specific segment."""
self._idle_timer.awake()
if not sequence:
return self._segments
for segment in self._segments:
if segment.sequence == sequence:
return segment
return None
async def recv(self) -> Segment:
def get_segments(self) -> deque[Segment]:
"""Retrieve all segments."""
self._idle_timer.awake()
return self._segments
async def recv(self) -> Segment | None:
"""Wait for and retrieve the latest segment."""
last_segment = max(self.segments, default=0)
if self._cursor is None or self._cursor <= last_segment:
@ -144,7 +146,7 @@ class StreamOutput:
if not self._segments:
return None
segment = self.get_segment()[-1]
segment = self.get_segments()[-1]
self._cursor = segment.sequence
return segment

View file

@ -1,8 +1,13 @@
"""Utilities to help convert mp4s to fmp4s."""
from __future__ import annotations
from collections.abc import Generator
import io
def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int:
def find_box(
segment: io.BytesIO, target_type: bytes, box_start: int = 0
) -> Generator[int, None, None]:
"""Find location of first box (or sub_box if box_start provided) of given type."""
if box_start == 0:
box_end = segment.seek(0, io.SEEK_END)

View file

@ -75,7 +75,7 @@ class HlsPlaylistView(StreamView):
@staticmethod
def render_playlist(track):
"""Render playlist."""
segments = list(track.get_segment())[-NUM_PLAYLIST_SEGMENTS:]
segments = list(track.get_segments())[-NUM_PLAYLIST_SEGMENTS:]
if not segments:
return []
@ -125,7 +125,7 @@ class HlsInitView(StreamView):
async def handle(self, request, stream, sequence):
"""Return init.mp4."""
track = stream.add_provider("hls")
segments = track.get_segment()
segments = track.get_segments()
if not segments:
return web.HTTPNotFound()
headers = {"Content-Type": "video/mp4"}

View file

@ -7,6 +7,7 @@ import os
import threading
import av
from av.container import OutputContainer
from homeassistant.core import HomeAssistant, callback
@ -31,8 +32,8 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]):
if not os.path.exists(os.path.dirname(file_out)):
os.makedirs(os.path.dirname(file_out), exist_ok=True)
pts_adjuster = {"video": None, "audio": None}
output = None
pts_adjuster: dict[str, int | None] = {"video": None, "audio": None}
output: OutputContainer | None = None
output_v = None
output_a = None
@ -100,7 +101,8 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]):
source.close()
output.close()
if output is not None:
output.close()
@PROVIDERS.register("recorder")

View file

@ -1,4 +1,6 @@
"""Provides the worker thread needed for processing streams."""
from __future__ import annotations
from collections import deque
import io
import logging
@ -15,7 +17,7 @@ from .const import (
SEGMENT_CONTAINER_FORMAT,
STREAM_TIMEOUT,
)
from .core import Segment, StreamBuffer
from .core import Segment, StreamBuffer, StreamOutput
_LOGGER = logging.getLogger(__name__)
@ -56,8 +58,7 @@ class SegmentBuffer:
self._video_stream = None
self._audio_stream = None
self._outputs_callback = outputs_callback
# Each element is a StreamOutput
self._outputs = []
self._outputs: list[StreamOutput] = []
self._sequence = 0
self._segment_start_pts = None
self._stream_buffer = None

View file

@ -1154,9 +1154,6 @@ ignore_errors = true
[mypy-homeassistant.components.spotify.*]
ignore_errors = true
[mypy-homeassistant.components.stream.*]
ignore_errors = true
[mypy-homeassistant.components.stt.*]
ignore_errors = true

View file

@ -197,7 +197,6 @@ IGNORED_MODULES: Final[list[str]] = [
"homeassistant.components.songpal.*",
"homeassistant.components.sonos.*",
"homeassistant.components.spotify.*",
"homeassistant.components.stream.*",
"homeassistant.components.stt.*",
"homeassistant.components.surepetcare.*",
"homeassistant.components.switchbot.*",