diff --git a/src/devialetctl/application/daemon.py b/src/devialetctl/application/daemon.py index 9c2c300..3681e52 100644 --- a/src/devialetctl/application/daemon.py +++ b/src/devialetctl/application/daemon.py @@ -1,6 +1,7 @@ import asyncio import logging import time +from typing import Callable from devialetctl.application.router import EventRouter from devialetctl.application.service import VolumeService @@ -16,14 +17,26 @@ _VENDOR_COMPAT_VENDOR_ID: dict[str, int] = { "samsung": 0x0000F0, } -_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, str] = { - InputEventType.SYSTEM_AUDIO_MODE_REQUEST: "50:72:01", - InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: "50:7E:01", - InputEventType.REQUEST_ARC_INITIATION: "50:C0", - InputEventType.REQUEST_ARC_TERMINATION: "50:C5", + + +def _fixed_system_frame(frame: str) -> Callable[["DaemonRunner", CecKernelAdapter], str]: + return lambda _runner, _adapter: frame + + +_CecSystemFrameBuilder = Callable[["DaemonRunner", CecKernelAdapter], str | None] + +_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, _CecSystemFrameBuilder] = { + InputEventType.SYSTEM_AUDIO_MODE_REQUEST: _fixed_system_frame("50:72:01"), + InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: _fixed_system_frame("50:7E:01"), + InputEventType.REQUEST_ARC_INITIATION: _fixed_system_frame("50:C0"), + InputEventType.REQUEST_ARC_TERMINATION: _fixed_system_frame("50:C5"), # REPORT_SHORT_AUDIO_DESCRIPTOR with one valid LPCM SAD: # format=LPCM (1), channels=2, rates=32/44.1/48kHz, sizes=16/20/24bit - InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: "50:A3:09:07:07", + InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: _fixed_system_frame("50:A3:09:07:07"), + InputEventType.GIVE_DEVICE_VENDOR_ID: ( + lambda runner, adapter: runner._vendor_response_frame(adapter) + ), + InputEventType.GIVE_OSD_NAME: lambda runner, adapter: runner._osd_name_frame(), } @@ -144,14 +157,18 @@ async def _handle_cec_event_async(self, adapter: CecKernelAdapter, event: InputE return def _handle_cec_system_request(self, adapter: CecKernelAdapter, kind: InputEventType) -> bool: - frame = _CEC_SYSTEM_RESPONSE_MAP.get(kind) - if frame is None: + frame_builder = _CEC_SYSTEM_RESPONSE_MAP.get(kind) + if frame_builder is None: return False + frame = frame_builder(self, adapter) + if not frame: + LOG.debug("cannot build CEC system response frame for event=%s", kind.value) + return True sent = self._send_tx(adapter, frame) if sent: - LOG.debug("sent CEC system/ARC response frame: %s", frame) + LOG.debug("sent CEC system response frame: %s", frame) else: - LOG.debug("cannot send CEC system/ARC response frame: %s", frame) + LOG.debug("cannot send CEC system response frame: %s", frame) return True async def _report_audio_status_async(self, adapter: CecKernelAdapter) -> None: @@ -355,6 +372,22 @@ def _should_spoof_vendor_id(self) -> bool: def _vendor_id_for_profile(self) -> int: return _VENDOR_COMPAT_VENDOR_ID.get(self.cfg.cec_vendor_compat, 0) + def _vendor_response_frame(self, adapter: CecKernelAdapter) -> str | None: + if hasattr(adapter, "get_effective_vendor_id"): + vid = int(adapter.get_effective_vendor_id()) & 0xFFFFFF + else: + vid = int(self._vendor_id_for_profile()) & 0xFFFFFF + if not self._should_spoof_vendor_id(): + return None + return f"50:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}" + + def _osd_name_frame(self) -> str: + encoded = self.cfg.cec_osd_name.encode("ascii", errors="ignore")[:14] + if not encoded: + encoded = b"Audio" + payload = ":".join(f"{byte:02X}" for byte in encoded) + return f"50:47:{payload}" + async def _relative_step_async(self, delta: int, fallback) -> None: try: current = int(await self.gateway.get_volume_async()) diff --git a/src/devialetctl/domain/events.py b/src/devialetctl/domain/events.py index aa6e54b..ca9768e 100644 --- a/src/devialetctl/domain/events.py +++ b/src/devialetctl/domain/events.py @@ -13,6 +13,8 @@ class InputEventType(str, Enum): REQUEST_ARC_INITIATION = "request_arc_initiation" REQUEST_ARC_TERMINATION = "request_arc_termination" REQUEST_SHORT_AUDIO_DESCRIPTOR = "request_short_audio_descriptor" + GIVE_DEVICE_VENDOR_ID = "give_device_vendor_id" + GIVE_OSD_NAME = "give_osd_name" SET_AUDIO_VOLUME_LEVEL = "set_audio_volume_level" SAMSUNG_VENDOR_COMMAND = "samsung_vendor_command" SAMSUNG_VENDOR_COMMAND_WITH_ID = "samsung_vendor_command_with_id" diff --git a/src/devialetctl/infrastructure/cec_adapter.py b/src/devialetctl/infrastructure/cec_adapter.py index 8ceb3d0..0a4634e 100644 --- a/src/devialetctl/infrastructure/cec_adapter.py +++ b/src/devialetctl/infrastructure/cec_adapter.py @@ -41,10 +41,13 @@ 0xE: "Free Use", 0xF: "Broadcast", } + _CEC_OPCODE_NAMES: dict[str, str] = { "00": "FEATURE_ABORT", "44": "USER_CONTROL_PRESSED", "45": "USER_CONTROL_RELEASED", + "46": "GIVE_OSD_NAME", + "47": "SET_OSD_NAME", "70": "SYSTEM_AUDIO_MODE_REQUEST", "71": "GIVE_AUDIO_STATUS", "72": "SET_SYSTEM_AUDIO_MODE", @@ -72,6 +75,7 @@ "42": (InputEventType.VOLUME_DOWN, "VOLUME_DOWN"), "43": (InputEventType.MUTE, "MUTE"), } + _SYSTEM_REQUEST_OPCODE_MAP: dict[str, tuple[InputEventType, str]] = { "70": (InputEventType.SYSTEM_AUDIO_MODE_REQUEST, "SYSTEM_AUDIO_MODE_REQUEST"), "7D": (InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS, "GIVE_SYSTEM_AUDIO_MODE_STATUS"), @@ -81,6 +85,8 @@ InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR, "REQUEST_SHORT_AUDIO_DESCRIPTOR", ), + "8C": (InputEventType.GIVE_DEVICE_VENDOR_ID, "GIVE_DEVICE_VENDOR_ID"), + "46": (InputEventType.GIVE_OSD_NAME, "GIVE_OSD_NAME"), } @@ -246,10 +252,11 @@ class CecKernelAdapter: spoof_vendor_id: bool = False source: str = "cec" _fd: int | None = None + _effective_vendor_id: int | None = None _log_addrs_busy_retries: tuple[float, ...] = (0.1, 0.25, 0.5) _async_poll_interval_s: float = 0.05 - def _vendor_announce_frame(self) -> str: + def _vendor_broadcast_announce_frame(self) -> str: vid = int(self.vendor_id) & 0xFFFFFF return f"5F:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}" @@ -263,6 +270,7 @@ def _configure(self, fd: int) -> None: current = CecLogAddrs() fcntl.ioctl(fd, CEC_ADAP_G_LOG_ADDRS, current) + self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF if self._has_audio_system_claim(current): LOG.info( "kernel cec adapter already configured as Audio System " @@ -276,9 +284,11 @@ def _configure(self, fd: int) -> None: addrs.cec_version = CEC_OP_CEC_VERSION_1_4 if self.spoof_vendor_id: addrs.vendor_id = int(self.vendor_id) & 0xFFFFFF + self._effective_vendor_id = int(addrs.vendor_id) & 0xFFFFFF else: # Preserve current adapter vendor identity unless explicitly spoofing. addrs.vendor_id = int(current.vendor_id) & 0xFFFFFF + self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF encoded_name = self.osd_name.encode("ascii", errors="ignore")[:14] addrs.osd_name = encoded_name addrs.primary_device_type[0] = CEC_OP_PRIM_DEVTYPE_AUDIOSYSTEM @@ -308,6 +318,11 @@ def _configure(self, fd: int) -> None: retry_delays[idx + 1], ) + def get_effective_vendor_id(self) -> int: + if self._effective_vendor_id is not None: + return int(self._effective_vendor_id) & 0xFFFFFF + return int(self.vendor_id) & 0xFFFFFF + @staticmethod def _msg_from_frame(frame: str) -> CecMsg: parts = _parse_frame_parts(frame) @@ -345,7 +360,7 @@ async def async_events(self) -> AsyncIterator[InputEvent]: try: self._configure(fd) if self.announce_vendor_id and self.spoof_vendor_id: - self.send_tx(self._vendor_announce_frame()) + self.send_tx(self._vendor_broadcast_announce_frame()) while True: try: diff --git a/tests/test_cec_parser.py b/tests/test_cec_parser.py index 6e81773..3c30184 100644 --- a/tests/test_cec_parser.py +++ b/tests/test_cec_parser.py @@ -34,6 +34,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None: arc_init = parse_cec_frame("05:c3") arc_term = parse_cec_frame("05:c4") sad_req = parse_cec_frame("05:a4:02:0a") + give_vendor = parse_cec_frame("05:8c") + give_osd = parse_cec_frame("05:46") assert ( sys_mode_req is not None and sys_mode_req.kind == InputEventType.SYSTEM_AUDIO_MODE_REQUEST ) @@ -44,6 +46,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None: assert arc_init is not None and arc_init.kind == InputEventType.REQUEST_ARC_INITIATION assert arc_term is not None and arc_term.kind == InputEventType.REQUEST_ARC_TERMINATION assert sad_req is not None and sad_req.kind == InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR + assert give_vendor is not None and give_vendor.kind == InputEventType.GIVE_DEVICE_VENDOR_ID + assert give_osd is not None and give_osd.kind == InputEventType.GIVE_OSD_NAME def test_parse_cec_set_audio_volume_level() -> None: @@ -63,7 +67,6 @@ def test_parse_cec_parses_outgoing_set_audio_volume_level_echo() -> None: def test_parse_cec_ignores_non_pressed_frames() -> None: - assert parse_cec_frame("05:46") is None assert parse_cec_frame("50:47:44:65:76:69:61:6c:65:74") is None diff --git a/tests/test_daemon_runner.py b/tests/test_daemon_runner.py index 4fd38d7..f27d07e 100644 --- a/tests/test_daemon_runner.py +++ b/tests/test_daemon_runner.py @@ -171,6 +171,9 @@ class OneShotAdapter: def __init__(self, **kwargs): self.kwargs = kwargs + def get_effective_vendor_id(self) -> int: + return 0x123456 + async def async_events(self): yield InputEvent( kind=InputEventType.SYSTEM_AUDIO_MODE_REQUEST, @@ -197,6 +200,16 @@ async def async_events(self): source="cec", key="REQUEST_SHORT_AUDIO_DESCRIPTOR", ) + yield InputEvent( + kind=InputEventType.GIVE_DEVICE_VENDOR_ID, + source="cec", + key="GIVE_DEVICE_VENDOR_ID", + ) + yield InputEvent( + kind=InputEventType.GIVE_OSD_NAME, + source="cec", + key="GIVE_OSD_NAME", + ) raise KeyboardInterrupt() def send_tx(self, frame: str) -> bool: @@ -211,7 +224,15 @@ def send_tx(self, frame: str) -> bool: except KeyboardInterrupt: pass - assert sent_frames == ["50:72:01", "50:7E:01", "50:C0", "50:C5", "50:A3:09:07:07"] + assert sent_frames == [ + "50:72:01", + "50:7E:01", + "50:C0", + "50:C5", + "50:A3:09:07:07", + "50:87:12:34:56", + "50:47:44:65:76:69:61:6C:65:74", + ] def test_daemon_runner_handles_set_audio_volume_level(monkeypatch) -> None: