Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions src/devialetctl/application/daemon.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import time
from typing import Callable

from devialetctl.application.router import EventRouter
from devialetctl.application.service import VolumeService
Expand All @@ -16,14 +17,26 @@
_VENDOR_COMPAT_VENDOR_ID: dict[str, int] = {
"samsung": 0x0000F0,
}
_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, str] = {
InputEventType.SYSTEM_AUDIO_MODE_REQUEST: "50:72:01",
InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: "50:7E:01",
InputEventType.REQUEST_ARC_INITIATION: "50:C0",
InputEventType.REQUEST_ARC_TERMINATION: "50:C5",


def _fixed_system_frame(frame: str) -> Callable[["DaemonRunner", CecKernelAdapter], str]:
return lambda _runner, _adapter: frame


_CecSystemFrameBuilder = Callable[["DaemonRunner", CecKernelAdapter], str | None]

_CEC_SYSTEM_RESPONSE_MAP: dict[InputEventType, _CecSystemFrameBuilder] = {
InputEventType.SYSTEM_AUDIO_MODE_REQUEST: _fixed_system_frame("50:72:01"),
InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS: _fixed_system_frame("50:7E:01"),
InputEventType.REQUEST_ARC_INITIATION: _fixed_system_frame("50:C0"),
InputEventType.REQUEST_ARC_TERMINATION: _fixed_system_frame("50:C5"),
# REPORT_SHORT_AUDIO_DESCRIPTOR with one valid LPCM SAD:
# format=LPCM (1), channels=2, rates=32/44.1/48kHz, sizes=16/20/24bit
InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: "50:A3:09:07:07",
InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR: _fixed_system_frame("50:A3:09:07:07"),
InputEventType.GIVE_DEVICE_VENDOR_ID: (
lambda runner, adapter: runner._vendor_response_frame(adapter)
),
InputEventType.GIVE_OSD_NAME: lambda runner, adapter: runner._osd_name_frame(),
}


Expand Down Expand Up @@ -144,14 +157,18 @@ async def _handle_cec_event_async(self, adapter: CecKernelAdapter, event: InputE
return

def _handle_cec_system_request(self, adapter: CecKernelAdapter, kind: InputEventType) -> bool:
frame = _CEC_SYSTEM_RESPONSE_MAP.get(kind)
if frame is None:
frame_builder = _CEC_SYSTEM_RESPONSE_MAP.get(kind)
if frame_builder is None:
return False
frame = frame_builder(self, adapter)
if not frame:
LOG.debug("cannot build CEC system response frame for event=%s", kind.value)
return True
sent = self._send_tx(adapter, frame)
if sent:
LOG.debug("sent CEC system/ARC response frame: %s", frame)
LOG.debug("sent CEC system response frame: %s", frame)
else:
LOG.debug("cannot send CEC system/ARC response frame: %s", frame)
LOG.debug("cannot send CEC system response frame: %s", frame)
return True

async def _report_audio_status_async(self, adapter: CecKernelAdapter) -> None:
Expand Down Expand Up @@ -355,6 +372,22 @@ def _should_spoof_vendor_id(self) -> bool:
def _vendor_id_for_profile(self) -> int:
return _VENDOR_COMPAT_VENDOR_ID.get(self.cfg.cec_vendor_compat, 0)

def _vendor_response_frame(self, adapter: CecKernelAdapter) -> str | None:
if hasattr(adapter, "get_effective_vendor_id"):
vid = int(adapter.get_effective_vendor_id()) & 0xFFFFFF
else:
vid = int(self._vendor_id_for_profile()) & 0xFFFFFF
if not self._should_spoof_vendor_id():
return None
return f"50:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}"

def _osd_name_frame(self) -> str:
encoded = self.cfg.cec_osd_name.encode("ascii", errors="ignore")[:14]
if not encoded:
encoded = b"Audio"
payload = ":".join(f"{byte:02X}" for byte in encoded)
return f"50:47:{payload}"

async def _relative_step_async(self, delta: int, fallback) -> None:
try:
current = int(await self.gateway.get_volume_async())
Expand Down
2 changes: 2 additions & 0 deletions src/devialetctl/domain/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class InputEventType(str, Enum):
REQUEST_ARC_INITIATION = "request_arc_initiation"
REQUEST_ARC_TERMINATION = "request_arc_termination"
REQUEST_SHORT_AUDIO_DESCRIPTOR = "request_short_audio_descriptor"
GIVE_DEVICE_VENDOR_ID = "give_device_vendor_id"
GIVE_OSD_NAME = "give_osd_name"
SET_AUDIO_VOLUME_LEVEL = "set_audio_volume_level"
SAMSUNG_VENDOR_COMMAND = "samsung_vendor_command"
SAMSUNG_VENDOR_COMMAND_WITH_ID = "samsung_vendor_command_with_id"
Expand Down
19 changes: 17 additions & 2 deletions src/devialetctl/infrastructure/cec_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
0xE: "Free Use",
0xF: "Broadcast",
}

_CEC_OPCODE_NAMES: dict[str, str] = {
"00": "FEATURE_ABORT",
"44": "USER_CONTROL_PRESSED",
"45": "USER_CONTROL_RELEASED",
"46": "GIVE_OSD_NAME",
"47": "SET_OSD_NAME",
"70": "SYSTEM_AUDIO_MODE_REQUEST",
"71": "GIVE_AUDIO_STATUS",
"72": "SET_SYSTEM_AUDIO_MODE",
Expand Down Expand Up @@ -72,6 +75,7 @@
"42": (InputEventType.VOLUME_DOWN, "VOLUME_DOWN"),
"43": (InputEventType.MUTE, "MUTE"),
}

_SYSTEM_REQUEST_OPCODE_MAP: dict[str, tuple[InputEventType, str]] = {
"70": (InputEventType.SYSTEM_AUDIO_MODE_REQUEST, "SYSTEM_AUDIO_MODE_REQUEST"),
"7D": (InputEventType.GIVE_SYSTEM_AUDIO_MODE_STATUS, "GIVE_SYSTEM_AUDIO_MODE_STATUS"),
Expand All @@ -81,6 +85,8 @@
InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR,
"REQUEST_SHORT_AUDIO_DESCRIPTOR",
),
"8C": (InputEventType.GIVE_DEVICE_VENDOR_ID, "GIVE_DEVICE_VENDOR_ID"),
"46": (InputEventType.GIVE_OSD_NAME, "GIVE_OSD_NAME"),
}


Expand Down Expand Up @@ -246,10 +252,11 @@ class CecKernelAdapter:
spoof_vendor_id: bool = False
source: str = "cec"
_fd: int | None = None
_effective_vendor_id: int | None = None
_log_addrs_busy_retries: tuple[float, ...] = (0.1, 0.25, 0.5)
_async_poll_interval_s: float = 0.05

def _vendor_announce_frame(self) -> str:
def _vendor_broadcast_announce_frame(self) -> str:
vid = int(self.vendor_id) & 0xFFFFFF
return f"5F:87:{(vid >> 16) & 0xFF:02X}:{(vid >> 8) & 0xFF:02X}:{vid & 0xFF:02X}"

Expand All @@ -263,6 +270,7 @@ def _configure(self, fd: int) -> None:

current = CecLogAddrs()
fcntl.ioctl(fd, CEC_ADAP_G_LOG_ADDRS, current)
self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF
if self._has_audio_system_claim(current):
LOG.info(
"kernel cec adapter already configured as Audio System "
Expand All @@ -276,9 +284,11 @@ def _configure(self, fd: int) -> None:
addrs.cec_version = CEC_OP_CEC_VERSION_1_4
if self.spoof_vendor_id:
addrs.vendor_id = int(self.vendor_id) & 0xFFFFFF
self._effective_vendor_id = int(addrs.vendor_id) & 0xFFFFFF
else:
# Preserve current adapter vendor identity unless explicitly spoofing.
addrs.vendor_id = int(current.vendor_id) & 0xFFFFFF
self._effective_vendor_id = int(current.vendor_id) & 0xFFFFFF
encoded_name = self.osd_name.encode("ascii", errors="ignore")[:14]
addrs.osd_name = encoded_name
addrs.primary_device_type[0] = CEC_OP_PRIM_DEVTYPE_AUDIOSYSTEM
Expand Down Expand Up @@ -308,6 +318,11 @@ def _configure(self, fd: int) -> None:
retry_delays[idx + 1],
)

def get_effective_vendor_id(self) -> int:
if self._effective_vendor_id is not None:
return int(self._effective_vendor_id) & 0xFFFFFF
return int(self.vendor_id) & 0xFFFFFF

@staticmethod
def _msg_from_frame(frame: str) -> CecMsg:
parts = _parse_frame_parts(frame)
Expand Down Expand Up @@ -345,7 +360,7 @@ async def async_events(self) -> AsyncIterator[InputEvent]:
try:
self._configure(fd)
if self.announce_vendor_id and self.spoof_vendor_id:
self.send_tx(self._vendor_announce_frame())
self.send_tx(self._vendor_broadcast_announce_frame())

while True:
try:
Expand Down
5 changes: 4 additions & 1 deletion tests/test_cec_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None:
arc_init = parse_cec_frame("05:c3")
arc_term = parse_cec_frame("05:c4")
sad_req = parse_cec_frame("05:a4:02:0a")
give_vendor = parse_cec_frame("05:8c")
give_osd = parse_cec_frame("05:46")
assert (
sys_mode_req is not None and sys_mode_req.kind == InputEventType.SYSTEM_AUDIO_MODE_REQUEST
)
Expand All @@ -44,6 +46,8 @@ def test_parse_cec_system_audio_and_arc_requests() -> None:
assert arc_init is not None and arc_init.kind == InputEventType.REQUEST_ARC_INITIATION
assert arc_term is not None and arc_term.kind == InputEventType.REQUEST_ARC_TERMINATION
assert sad_req is not None and sad_req.kind == InputEventType.REQUEST_SHORT_AUDIO_DESCRIPTOR
assert give_vendor is not None and give_vendor.kind == InputEventType.GIVE_DEVICE_VENDOR_ID
assert give_osd is not None and give_osd.kind == InputEventType.GIVE_OSD_NAME


def test_parse_cec_set_audio_volume_level() -> None:
Expand All @@ -63,7 +67,6 @@ def test_parse_cec_parses_outgoing_set_audio_volume_level_echo() -> None:


def test_parse_cec_ignores_non_pressed_frames() -> None:
assert parse_cec_frame("05:46") is None
assert parse_cec_frame("50:47:44:65:76:69:61:6c:65:74") is None


Expand Down
23 changes: 22 additions & 1 deletion tests/test_daemon_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ class OneShotAdapter:
def __init__(self, **kwargs):
self.kwargs = kwargs

def get_effective_vendor_id(self) -> int:
return 0x123456

async def async_events(self):
yield InputEvent(
kind=InputEventType.SYSTEM_AUDIO_MODE_REQUEST,
Expand All @@ -197,6 +200,16 @@ async def async_events(self):
source="cec",
key="REQUEST_SHORT_AUDIO_DESCRIPTOR",
)
yield InputEvent(
kind=InputEventType.GIVE_DEVICE_VENDOR_ID,
source="cec",
key="GIVE_DEVICE_VENDOR_ID",
)
yield InputEvent(
kind=InputEventType.GIVE_OSD_NAME,
source="cec",
key="GIVE_OSD_NAME",
)
raise KeyboardInterrupt()

def send_tx(self, frame: str) -> bool:
Expand All @@ -211,7 +224,15 @@ def send_tx(self, frame: str) -> bool:
except KeyboardInterrupt:
pass

assert sent_frames == ["50:72:01", "50:7E:01", "50:C0", "50:C5", "50:A3:09:07:07"]
assert sent_frames == [
"50:72:01",
"50:7E:01",
"50:C0",
"50:C5",
"50:A3:09:07:07",
"50:87:12:34:56",
"50:47:44:65:76:69:61:6C:65:74",
]


def test_daemon_runner_handles_set_audio_volume_level(monkeypatch) -> None:
Expand Down
Loading