Fix stream recorder with orientation transforms (#80370)

Find moov instead of using fixed location
This commit is contained in:
uvjustin 2022-10-18 09:12:45 +08:00 committed by GitHub
parent 8d3de53592
commit d38d21ab3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 6 deletions

View file

@ -4,6 +4,8 @@ from __future__ import annotations
from collections.abc import Generator
from typing import TYPE_CHECKING
from homeassistant.exceptions import HomeAssistantError
if TYPE_CHECKING:
from io import BufferedIOBase
@ -11,7 +13,7 @@ if TYPE_CHECKING:
def find_box(
mp4_bytes: bytes, target_type: bytes, box_start: int = 0
) -> Generator[int, None, None]:
"""Find location of first box (or sub_box if box_start provided) of given type."""
"""Find location of first box (or sub box if box_start provided) of given type."""
if box_start == 0:
index = 0
box_end = len(mp4_bytes)
@ -141,12 +143,26 @@ def get_codec_string(mp4_bytes: bytes) -> str:
return ",".join(codecs)
def find_moov(mp4_io: BufferedIOBase) -> int:
"""Find location of moov atom in a BufferedIOBase mp4."""
index = 0
while 1:
mp4_io.seek(index)
box_header = mp4_io.read(8)
if len(box_header) != 8:
raise HomeAssistantError("moov atom not found")
if box_header[4:8] == b"moov":
return index
index += int.from_bytes(box_header[0:4], byteorder="big")
def read_init(bytes_io: BufferedIOBase) -> bytes:
"""Read the init from a mp4 file."""
bytes_io.seek(24)
moov_loc = find_moov(bytes_io)
bytes_io.seek(moov_loc)
moov_len = int.from_bytes(bytes_io.read(4), byteorder="big")
bytes_io.seek(0)
return bytes_io.read(24 + moov_len)
return bytes_io.read(moov_loc + moov_len)
ZERO32 = b"\x00\x00\x00\x00"

View file

@ -29,7 +29,7 @@ from .common import (
from tests.common import async_fire_time_changed
STREAM_SOURCE = "some-stream-source"
INIT_BYTES = b"init"
INIT_BYTES = b"\x00\x00\x00\x08moov"
FAKE_PAYLOAD = b"fake-payload"
SEGMENT_DURATION = 10
TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout

View file

@ -30,7 +30,7 @@ TEST_PART_DURATION = 0.75
NUM_PART_SEGMENTS = int(-(-SEGMENT_DURATION // TEST_PART_DURATION))
PART_INDEPENDENT_PERIOD = int(1 / TEST_PART_DURATION) or 1
BYTERANGE_LENGTH = 1
INIT_BYTES = b"init"
INIT_BYTES = b"\x00\x00\x00\x08moov"
SEQUENCE_BYTES = bytearray(range(NUM_PART_SEGMENTS * BYTERANGE_LENGTH))
ALT_SEQUENCE_BYTES = bytearray(range(20, 20 + NUM_PART_SEGMENTS * BYTERANGE_LENGTH))
VERY_LARGE_LAST_BYTE_POS = 9007199254740991

View file

@ -242,7 +242,7 @@ class FakePyAvBuffer:
# Forward to appropriate FakeStream
packet.stream.mux(packet)
# Make new init/part data available to the worker
self.memory_file.write(b"0")
self.memory_file.write(b"\x00\x00\x00\x00moov")
def close(self):
"""Close the buffer."""