From 7b63fa61ba094eead9f1f0aad18a9889f03689fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 14:08:33 +0100 Subject: [PATCH 1/3] cec: support GIVE_OSD_NAME and GIVE_DEVICE_VENDOR_ID cmd --- src/devialetctl/application/daemon.py | 41 ++++++++++++++----- src/devialetctl/domain/events.py | 2 + src/devialetctl/infrastructure/cec_adapter.py | 6 +++ tests/test_cec_parser.py | 5 ++- tests/test_daemon_runner.py | 20 ++++++++- 5 files changed, 62 insertions(+), 12 deletions(-) diff --git a/src/devialetctl/application/daemon.py b/src/devialetctl/application/daemon.py index 9c2c300..6862eae 100644 --- a/src/devialetctl/application/daemon.py +++ b/src/devialetctl/application/daemon.py @@ -1,6 +1,7 @@ import asyncio import logging import time +from typing import Callable from devialetctl.application.router import EventRouter from devialetctl.application.service import VolumeService @@ -16,14 +17,22 @@ _VENDOR_COMPAT_VENDOR_ID: dict[str, int] = { "samsung": 0x0000F0, } -_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, str] = { - InputEventType.SYSTEM_AUDIO_MODE_REQUEST: "50:72:01", - InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: "50:7E:01", - InputEventType.REQUEST_ARC_INITIATION: "50:C0", - InputEventType.REQUEST_ARC_TERMINATION: "50:C5", + + +def _fixed_frame(frame: str) -> Callable[["DaemonRunner"], str]: + return lambda _runner: frame + + +_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, Callable[["DaemonRunner"], str]] = { + InputEventType.SYSTEM_AUDIO_MODE_REQUEST: _fixed_frame("50:72:01"), + InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: _fixed_frame("50:7E:01"), + InputEventType.REQUEST_ARC_INITIATION: _fixed_frame("50:C0"), + InputEventType.REQUEST_ARC_TERMINATION: _fixed_frame("50:C5"), # REPORT_SHORT_AUDIO_DESCRIPTOR with one valid LPCM SAD: # format=LPCM (1), channels=2, rates=32/44.1/48kHz, sizes=16/20/24bit - InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: "50:A3:09:07:07", + InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: _fixed_frame("50:A3:09:07:07"), + InputEventType.GIVE_DEVICE_VENDOR_ID: lambda runner: runner._vendor_announce_frame(), + InputEventType.GIVE_OSD_NAME: lambda runner: runner._osd_name_frame(), } @@ -144,14 +153,15 @@ async def _handle_cec_event_async(self, adapter: CecKernelAdapter, event: InputE return def _handle_cec_system_request(self, adapter: CecKernelAdapter, kind: InputEventType) -> bool: - frame = _CEC_SYSTEM_RESPONSE_MAP.get(kind) - if frame is None: + frame_builder = _CEC_SYSTEM_RESPONSE_MAP.get(kind) + if frame_builder is None: return False + frame = frame_builder(self) sent = self._send_tx(adapter, frame) if sent: - LOG.debug("sent CEC system/ARC response frame: %s", frame) + LOG.debug("sent CEC system response frame: %s", frame) else: - LOG.debug("cannot send CEC system/ARC response frame: %s", frame) + LOG.debug("cannot send CEC system response frame: %s", frame) return True async def _report_audio_status_async(self, adapter: CecKernelAdapter) -> None: @@ -355,6 +365,17 @@ def _should_spoof_vendor_id(self) -> bool: def _vendor_id_for_profile(self) -> int: return _VENDOR_COMPAT_VENDOR_ID.get(self.cfg.cec_vendor_compat, 0) + def _vendor_announce_frame(self) -> str: + vid = int(self._vendor_id_for_profile()) & 0xFFFFFF + return f"50:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}" + + def _osd_name_frame(self) -> str: + encoded = self.cfg.cec_osd_name.encode("ascii", errors="ignore")[:14] + if not encoded: + encoded = b"Audio" + payload = ":".join(f"{byte:02X}" for byte in encoded) + return f"50:47:{payload}" + async def _relative_step_async(self, delta: int, fallback) -> None: try: current = int(await self.gateway.get_volume_async()) diff --git a/src/devialetctl/domain/events.py b/src/devialetctl/domain/events.py index aa6e54b..ca9768e 100644 --- a/src/devialetctl/domain/events.py +++ b/src/devialetctl/domain/events.py @@ -13,6 +13,8 @@ class InputEventType(str, Enum): REQUEST_ARC_INITIATION = "request_arc_initiation" REQUEST_ARC_TERMINATION = "request_arc_termination" REQUEST_SHORT_AUDIO_DESCRIPTOR = "request_short_audio_descriptor" + GIVE_DEVICE_VENDOR_ID = "give_device_vendor_id" + GIVE_OSD_NAME = "give_osd_name" SET_AUDIO_VOLUME_LEVEL = "set_audio_volume_level" SAMSUNG_VENDOR_COMMAND = "samsung_vendor_command" SAMSUNG_VENDOR_COMMAND_WITH_ID = "samsung_vendor_command_with_id" diff --git a/src/devialetctl/infrastructure/cec_adapter.py b/src/devialetctl/infrastructure/cec_adapter.py index 8ceb3d0..d084a20 100644 --- a/src/devialetctl/infrastructure/cec_adapter.py +++ b/src/devialetctl/infrastructure/cec_adapter.py @@ -41,10 +41,13 @@ 0xE: "Free Use", 0xF: "Broadcast", } + _CEC_OPCODE_NAMES: dict[str, str] = { "00": "FEATURE_ABORT", "44": "USER_CONTROL_PRESSED", "45": "USER_CONTROL_RELEASED", + "46": "GIVE_OSD_NAME", + "47": "SET_OSD_NAME", "70": "SYSTEM_AUDIO_MODE_REQUEST", "71": "GIVE_AUDIO_STATUS", "72": "SET_SYSTEM_AUDIO_MODE", @@ -72,6 +75,7 @@ "42": (InputEventType.VOLUME_DOWN, "VOLUME_DOWN"), "43": (InputEventType.MUTE, "MUTE"), } + _SYSTEM_REQUEST_OPCODE_MAP: dict[str, tuple[InputEventType, str]] = { "70": (InputEventType.SYSTEM_AUDIO_MODE_REQUEST, "SYSTEM_AUDIO_MODE_REQUEST"), "7D": (InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS, "GIVE_SYSTEM_AUDIO_MODE_STATUS"), @@ -81,6 +85,8 @@ InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR, "REQUEST_SHORT_AUDIO_DESCRIPTOR", ), + "8C": (InputEventType.GIVE_DEVICE_VENDOR_ID, "GIVE_DEVICE_VENDOR_ID"), + "46": (InputEventType.GIVE_OSD_NAME, "GIVE_OSD_NAME"), } diff --git a/tests/test_cec_parser.py b/tests/test_cec_parser.py index 6e81773..3c30184 100644 --- a/tests/test_cec_parser.py +++ b/tests/test_cec_parser.py @@ -34,6 +34,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None: arc_init = parse_cec_frame("05:c3") arc_term = parse_cec_frame("05:c4") sad_req = parse_cec_frame("05:a4:02:0a") + give_vendor = parse_cec_frame("05:8c") + give_osd = parse_cec_frame("05:46") assert ( sys_mode_req is not None and sys_mode_req.kind == InputEventType.SYSTEM_AUDIO_MODE_REQUEST ) @@ -44,6 +46,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None: assert arc_init is not None and arc_init.kind == InputEventType.REQUEST_ARC_INITIATION assert arc_term is not None and arc_term.kind == InputEventType.REQUEST_ARC_TERMINATION assert sad_req is not None and sad_req.kind == InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR + assert give_vendor is not None and give_vendor.kind == InputEventType.GIVE_DEVICE_VENDOR_ID + assert give_osd is not None and give_osd.kind == InputEventType.GIVE_OSD_NAME def test_parse_cec_set_audio_volume_level() -> None: @@ -63,7 +67,6 @@ def test_parse_cec_parses_outgoing_set_audio_volume_level_echo() -> None: def test_parse_cec_ignores_non_pressed_frames() -> None: - assert parse_cec_frame("05:46") is None assert parse_cec_frame("50:47:44:65:76:69:61:6c:65:74") is None diff --git a/tests/test_daemon_runner.py b/tests/test_daemon_runner.py index 4fd38d7..cef98b2 100644 --- a/tests/test_daemon_runner.py +++ b/tests/test_daemon_runner.py @@ -197,6 +197,16 @@ async def async_events(self): source="cec", key="REQUEST_SHORT_AUDIO_DESCRIPTOR", ) + yield InputEvent( + kind=InputEventType.GIVE_DEVICE_VENDOR_ID, + source="cec", + key="GIVE_DEVICE_VENDOR_ID", + ) + yield InputEvent( + kind=InputEventType.GIVE_OSD_NAME, + source="cec", + key="GIVE_OSD_NAME", + ) raise KeyboardInterrupt() def send_tx(self, frame: str) -> bool: @@ -211,7 +221,15 @@ def send_tx(self, frame: str) -> bool: except KeyboardInterrupt: pass - assert sent_frames == ["50:72:01", "50:7E:01", "50:C0", "50:C5", "50:A3:09:07:07"] + assert sent_frames == [ + "50:72:01", + "50:7E:01", + "50:C0", + "50:C5", + "50:A3:09:07:07", + "50:87:00:00:00", + "50:47:44:65:76:69:61:6C:65:74", + ] def test_daemon_runner_handles_set_audio_volume_level(monkeypatch) -> None: From b4cb42c9fa5614b3599c864b6353be5cd6fcfc33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 14:17:38 +0100 Subject: [PATCH 2/3] cec: explicit first announce vendor is broadcast --- src/devialetctl/infrastructure/cec_adapter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/devialetctl/infrastructure/cec_adapter.py b/src/devialetctl/infrastructure/cec_adapter.py index d084a20..d0534d6 100644 --- a/src/devialetctl/infrastructure/cec_adapter.py +++ b/src/devialetctl/infrastructure/cec_adapter.py @@ -255,7 +255,7 @@ class CecKernelAdapter: _log_addrs_busy_retries: tuple[float, ...] = (0.1, 0.25, 0.5) _async_poll_interval_s: float = 0.05 - def _vendor_announce_frame(self) -> str: + def _vendor_broadcast_announce_frame(self) -> str: vid = int(self.vendor_id) & 0xFFFFFF return f"5F:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}" @@ -351,7 +351,7 @@ async def async_events(self) -> AsyncIterator[InputEvent]: try: self._configure(fd) if self.announce_vendor_id and self.spoof_vendor_id: - self.send_tx(self._vendor_announce_frame()) + self.send_tx(self._vendor_broadcast_announce_frame()) while True: try: From fa8e62aa00f28a3341544b3b52df09358ba76c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 14:19:05 +0100 Subject: [PATCH 3/3] cec: keep vendor id if we don't spoof --- src/devialetctl/application/daemon.py | 38 ++++++++++++------- src/devialetctl/infrastructure/cec_adapter.py | 9 +++++ tests/test_daemon_runner.py | 5 ++- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/devialetctl/application/daemon.py b/src/devialetctl/application/daemon.py index 6862eae..3681e52 100644 --- a/src/devialetctl/application/daemon.py +++ b/src/devialetctl/application/daemon.py @@ -19,20 +19,24 @@ } -def _fixed_frame(frame: str) -> Callable[["DaemonRunner"], str]: - return lambda _runner: frame +def _fixed_system_frame(frame: str) -> Callable[["DaemonRunner", CecKernelAdapter], str]: + return lambda _runner, _adapter: frame -_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, Callable[["DaemonRunner"], str]] = { - InputEventType.SYSTEM_AUDIO_MODE_REQUEST: _fixed_frame("50:72:01"), - InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: _fixed_frame("50:7E:01"), - InputEventType.REQUEST_ARC_INITIATION: _fixed_frame("50:C0"), - InputEventType.REQUEST_ARC_TERMINATION: _fixed_frame("50:C5"), +_CecSystemFrameBuilder = Callable[["DaemonRunner", CecKernelAdapter], str | None] + +_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, _CecSystemFrameBuilder] = { + InputEventType.SYSTEM_AUDIO_MODE_REQUEST: _fixed_system_frame("50:72:01"), + InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: _fixed_system_frame("50:7E:01"), + InputEventType.REQUEST_ARC_INITIATION: _fixed_system_frame("50:C0"), + InputEventType.REQUEST_ARC_TERMINATION: _fixed_system_frame("50:C5"), # REPORT_SHORT_AUDIO_DESCRIPTOR with one valid LPCM SAD: # format=LPCM (1), channels=2, rates=32/44.1/48kHz, sizes=16/20/24bit - InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: _fixed_frame("50:A3:09:07:07"), - InputEventType.GIVE_DEVICE_VENDOR_ID: lambda runner: runner._vendor_announce_frame(), - InputEventType.GIVE_OSD_NAME: lambda runner: runner._osd_name_frame(), + InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: _fixed_system_frame("50:A3:09:07:07"), + InputEventType.GIVE_DEVICE_VENDOR_ID: ( + lambda runner, adapter: runner._vendor_response_frame(adapter) + ), + InputEventType.GIVE_OSD_NAME: lambda runner, adapter: runner._osd_name_frame(), } @@ -156,7 +160,10 @@ def _handle_cec_system_request(self, adapter: CecKernelAdapter, kind: InputEvent frame_builder = _CEC_SYSTEM_RESPONSE_MAP.get(kind) if frame_builder is None: return False - frame = frame_builder(self) + frame = frame_builder(self, adapter) + if not frame: + LOG.debug("cannot build CEC system response frame for event=%s", kind.value) + return True sent = self._send_tx(adapter, frame) if sent: LOG.debug("sent CEC system response frame: %s", frame) @@ -365,8 +372,13 @@ def _should_spoof_vendor_id(self) -> bool: def _vendor_id_for_profile(self) -> int: return _VENDOR_COMPAT_VENDOR_ID.get(self.cfg.cec_vendor_compat, 0) - def _vendor_announce_frame(self) -> str: - vid = int(self._vendor_id_for_profile()) & 0xFFFFFF + def _vendor_response_frame(self, adapter: CecKernelAdapter) -> str | None: + if hasattr(adapter, "get_effective_vendor_id"): + vid = int(adapter.get_effective_vendor_id()) & 0xFFFFFF + else: + vid = int(self._vendor_id_for_profile()) & 0xFFFFFF + if not self._should_spoof_vendor_id(): + return None return f"50:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}" def _osd_name_frame(self) -> str: diff --git a/src/devialetctl/infrastructure/cec_adapter.py b/src/devialetctl/infrastructure/cec_adapter.py index d0534d6..0a4634e 100644 --- a/src/devialetctl/infrastructure/cec_adapter.py +++ b/src/devialetctl/infrastructure/cec_adapter.py @@ -252,6 +252,7 @@ class CecKernelAdapter: spoof_vendor_id: bool = False source: str = "cec" _fd: int | None = None + _effective_vendor_id: int | None = None _log_addrs_busy_retries: tuple[float, ...] = (0.1, 0.25, 0.5) _async_poll_interval_s: float = 0.05 @@ -269,6 +270,7 @@ def _configure(self, fd: int) -> None: current = CecLogAddrs() fcntl.ioctl(fd, CEC_ADAP_G_LOG_ADDRS, current) + self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF if self._has_audio_system_claim(current): LOG.info( "kernel cec adapter already configured as Audio System " @@ -282,9 +284,11 @@ def _configure(self, fd: int) -> None: addrs.cec_version = CEC_OP_CEC_VERSION_1_4 if self.spoof_vendor_id: addrs.vendor_id = int(self.vendor_id) & 0xFFFFFF + self._effective_vendor_id = int(addrs.vendor_id) & 0xFFFFFF else: # Preserve current adapter vendor identity unless explicitly spoofing. addrs.vendor_id = int(current.vendor_id) & 0xFFFFFF + self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF encoded_name = self.osd_name.encode("ascii", errors="ignore")[:14] addrs.osd_name = encoded_name addrs.primary_device_type[0] = CEC_OP_PRIM_DEVTYPE_AUDIOSYSTEM @@ -314,6 +318,11 @@ def _configure(self, fd: int) -> None: retry_delays[idx + 1], ) + def get_effective_vendor_id(self) -> int: + if self._effective_vendor_id is not None: + return int(self._effective_vendor_id) & 0xFFFFFF + return int(self.vendor_id) & 0xFFFFFF + @staticmethod def _msg_from_frame(frame: str) -> CecMsg: parts = _parse_frame_parts(frame) diff --git a/tests/test_daemon_runner.py b/tests/test_daemon_runner.py index cef98b2..f27d07e 100644 --- a/tests/test_daemon_runner.py +++ b/tests/test_daemon_runner.py @@ -171,6 +171,9 @@ class OneShotAdapter: def __init__(self, **kwargs): self.kwargs = kwargs + def get_effective_vendor_id(self) -> int: + return 0x123456 + async def async_events(self): yield InputEvent( kind=InputEventType.SYSTEM_AUDIO_MODE_REQUEST, @@ -227,7 +230,7 @@ def send_tx(self, frame: str) -> bool: "50:C0", "50:C5", "50:A3:09:07:07", - "50:87:00:00:00", + "50:87:12:34:56", "50:47:44:65:76:69:61:6C:65:74", ]