From fbf00cb7018c398a0bb0c4703ffbf1ac0467e3e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Tue, 17 Feb 2026 13:18:01 +0100 Subject: [PATCH 01/14] Add UPnP discovery --- README.md | 2 +- src/devialetctl/discovery.py | 12 +- src/devialetctl/domain/events.py | 2 +- src/devialetctl/infrastructure/__init__.py | 3 + .../infrastructure/upnp_gateway.py | 139 ++++++++++++++++++ src/devialetctl/interfaces/cli.py | 22 ++- 6 files changed, 172 insertions(+), 8 deletions(-) create mode 100644 src/devialetctl/infrastructure/upnp_gateway.py diff --git a/README.md b/README.md index 001dd7b..4863468 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Use cases: ## Features -- mDNS discovery (`_http._tcp.local`) +- mDNS discovery (`_http._tcp.local`) with UPnP fallback - volume commands: `getvol`, `setvol`, `volup`, `voldown`, `mute` - manual target override (`--ip`, `--port`, `--base-path`) - long-running daemon mode (`daemon --input cec`) diff --git a/src/devialetctl/discovery.py b/src/devialetctl/discovery.py index 3260696..206a6fb 100644 --- a/src/devialetctl/discovery.py +++ b/src/devialetctl/discovery.py @@ -2,6 +2,7 @@ from typing import List from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway +from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway @dataclass(frozen=True) @@ -15,8 +16,15 @@ class DevialetService: def discover( timeout_s: float = 3.0, service_type: str = "_whatsup._tcp.local." ) -> List[DevialetService]: - gateway = MdnsDiscoveryGateway(service_type=service_type) - results = gateway.discover(timeout_s=timeout_s) + results = [] + seen: set[tuple[str, int, str]] = set() + for gateway in (MdnsDiscoveryGateway(service_type=service_type), UpnpDiscoveryGateway()): + for svc in gateway.discover(timeout_s=timeout_s): + key = (svc.address, svc.port, svc.base_path) + if key in seen: + continue + seen.add(key) + results.append(svc) return [ DevialetService(name=r.name, address=r.address, port=r.port, base_path=r.base_path) for r in results diff --git a/src/devialetctl/domain/events.py b/src/devialetctl/domain/events.py index aa6e54b..ac816cd 100644 --- a/src/devialetctl/domain/events.py +++ b/src/devialetctl/domain/events.py @@ -27,4 +27,4 @@ class InputEvent: muted: bool | None = None vendor_subcommand: int | None = None vendor_mode: int | None = None - vendor_payload: tuple[int, ...] | None = None + vendor_payload: tuple[int, ...] | None = None \ No newline at end of file diff --git a/src/devialetctl/infrastructure/__init__.py b/src/devialetctl/infrastructure/__init__.py index 7645887..28bc681 100644 --- a/src/devialetctl/infrastructure/__init__.py +++ b/src/devialetctl/infrastructure/__init__.py @@ -2,6 +2,7 @@ from .devialet_gateway import DevialetHttpGateway from .keyboard_adapter import KeyboardAdapter, parse_keyboard_command from .mdns_gateway import MdnsDiscoveryGateway, MdnsService +from .upnp_gateway import UpnpDiscoveryGateway, UpnpService __all__ = [ "DaemonConfig", @@ -12,4 +13,6 @@ "parse_keyboard_command", "MdnsDiscoveryGateway", "MdnsService", + "UpnpDiscoveryGateway", + "UpnpService", ] diff --git a/src/devialetctl/infrastructure/upnp_gateway.py b/src/devialetctl/infrastructure/upnp_gateway.py new file mode 100644 index 0000000..878fa2f --- /dev/null +++ b/src/devialetctl/infrastructure/upnp_gateway.py @@ -0,0 +1,139 @@ +import logging +import re +import socket +import time +from dataclasses import dataclass +from typing import Iterable +from urllib.parse import urlparse + +import httpx # type: ignore[reportMissingImports] + +from devialetctl.application.ports import DiscoveryPort, Target + +_SSDP_ADDR = ("239.255.255.250", 1900) +_SSDP_SEARCH_TARGET = "urn:schemas-upnp-org:device:MediaRenderer:2" +_DEFAULT_BASE_PATH = "/ipcontrol/v1" +_DEFAULT_PORT = 80 +LOG = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class UpnpService: + name: str + address: str + port: int + base_path: str + + +def _parse_ssdp_headers(payload: bytes) -> dict[str, str]: + text = payload.decode("utf-8", errors="ignore") + lines = [line.strip() for line in text.splitlines() if line.strip()] + headers: dict[str, str] = {} + for line in lines[1:]: + if ":" not in line: + continue + key, value = line.split(":", 1) + headers[key.strip().lower()] = value.strip() + return headers + + +def _iter_ssdp_responses(timeout_s: float) -> Iterable[dict[str, str]]: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) as sock: + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + sock.settimeout(max(0.2, min(timeout_s, 1.0))) + msg = "\r\n".join( + [ + "M-SEARCH * HTTP/1.1", + "HOST: 239.255.255.250:1900", + 'MAN: "ssdp:discover"', + "MX: 1", + f"ST: {_SSDP_SEARCH_TARGET}", + "", + "", + ] + ).encode("ascii") + LOG.debug("UPnP SSDP M-SEARCH start st=%s timeout_s=%.2f", _SSDP_SEARCH_TARGET, timeout_s) + sock.sendto(msg, _SSDP_ADDR) + + deadline = time.monotonic() + max(0.1, timeout_s) + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + LOG.debug("UPnP SSDP M-SEARCH finished (timeout reached)") + return + sock.settimeout(max(0.05, min(remaining, 0.5))) + try: + payload, _ = sock.recvfrom(8192) + except TimeoutError: + continue + except OSError: + LOG.debug("UPnP SSDP receive aborted due to socket error") + return + headers = _parse_ssdp_headers(payload) + if headers: + LOG.debug( + "UPnP SSDP response location=%s st=%s usn=%s", + headers.get("location", ""), + headers.get("st", ""), + headers.get("usn", ""), + ) + yield headers + + +def _is_devialet_manufacturer(location: str, timeout_s: float) -> bool: + timeout = max(0.3, min(timeout_s, 1.5)) + try: + with httpx.Client(timeout=timeout) as client: + response = client.get(location) + response.raise_for_status() + xml_text = response.text + except Exception as exc: + LOG.debug("UPnP XML fetch failed location=%s err=%s", location, exc) + return False + + if not re.search( + r"\s*Devialet\s*", + xml_text, + flags=re.IGNORECASE, + ): + LOG.debug("UPnP XML rejected location=%s manufacturer_tag_not_found", location) + return False + + LOG.debug("UPnP XML accepted location=%s manufacturer=Devialet", location) + return True + + +class UpnpDiscoveryGateway(DiscoveryPort): + def discover(self, timeout_s: float = 3.0) -> list[Target]: + uniq: dict[str, UpnpService] = {} + LOG.debug( + "UPnP discovery begin timeout_s=%.2f target=%s", + timeout_s, + _SSDP_SEARCH_TARGET, + ) + for headers in _iter_ssdp_responses(timeout_s): + location = headers.get("location", "") + if not _is_devialet_manufacturer(location=location, timeout_s=timeout_s): + continue + parsed = urlparse(location) + host = parsed.hostname + if not host: + LOG.debug("UPnP response ignored (missing host in location): %s", location) + continue + if host in uniq: + continue + + uniq[host] = UpnpService( + name=f"UPnP:{host}", + address=host, + port=_DEFAULT_PORT, + base_path=_DEFAULT_BASE_PATH, + ) + LOG.debug("UPnP device accepted host=%s base_path=%s", host, _DEFAULT_BASE_PATH) + + targets = [ + Target(address=s.address, port=s.port, base_path=s.base_path, name=s.name) + for s in uniq.values() + ] + LOG.debug("UPnP discovery done found=%d", len(targets)) + return targets diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 70124dd..dbd3477 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -10,12 +10,13 @@ from devialetctl.infrastructure.config import load_config from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway, normalize_base_path from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway +from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway def _pick(services: list[Target], index: int | None): if not services: raise RuntimeError( - "No service detected via mDNS (Bonjour). Check network / Wi-Fi isolation." + "No service detected via mDNS/UPnP. Check network / Wi-Fi isolation." ) if index is None: if len(services) == 1: @@ -28,6 +29,19 @@ def _pick(services: list[Target], index: int | None): return services[index] +def _discover_targets(timeout_s: float) -> list[Target]: + seen: set[tuple[str, int, str]] = set() + merged: list[Target] = [] + for gateway in (MdnsDiscoveryGateway(), UpnpDiscoveryGateway()): + for svc in gateway.discover(timeout_s=timeout_s): + key = (svc.address, svc.port, svc.base_path) + if key in seen: + continue + seen.add(key) + merged.append(svc) + return merged + + def _target_from_args(args) -> Target: ip = args.ip port = args.port @@ -50,7 +64,7 @@ def _target_from_args(args) -> Target: return Target( address=ip, port=port, base_path=normalize_base_path(base_path), name="manual" ) - services = MdnsDiscoveryGateway().discover(timeout_s=discover_timeout) + services = _discover_targets(timeout_s=discover_timeout) return _pick(services, index) @@ -79,7 +93,7 @@ def _target_from_config(args) -> Target: else cfg.target.discover_timeout ) index = args.daemon_index if args.daemon_index is not None else cfg.target.index - services = MdnsDiscoveryGateway().discover(timeout_s=timeout) + services = _discover_targets(timeout_s=timeout) return _pick(services, index) @@ -142,7 +156,7 @@ def main() -> None: _configure_logging(requested_log_level) if args.cmd == "list": - services = MdnsDiscoveryGateway().discover(timeout_s=args.discover_timeout) + services = _discover_targets(timeout_s=args.discover_timeout) if not services: print("No service detected.") return From ad737517bce8d5a633cfc5cc9570de533bb5fd7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Tue, 17 Feb 2026 13:45:47 +0100 Subject: [PATCH 02/14] Add tree cmd --- .../infrastructure/devialet_gateway.py | 3 + src/devialetctl/interfaces/cli.py | 159 ++++++++++++++++++ tests/test_cli_regression.py | 95 +++++++++++ 3 files changed, 257 insertions(+) diff --git a/src/devialetctl/infrastructure/devialet_gateway.py b/src/devialetctl/infrastructure/devialet_gateway.py index 0135d17..4e0015d 100644 --- a/src/devialetctl/infrastructure/devialet_gateway.py +++ b/src/devialetctl/infrastructure/devialet_gateway.py @@ -40,6 +40,9 @@ async def _apost(self, path: str, payload: dict[str, Any] | None = None) -> None ) r.raise_for_status() + async def fetch_json_async(self, path: str) -> dict[str, Any]: + return await self._aget(path) + async def systems_async(self) -> dict[str, Any]: try: return await self._aget("/systems") diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index dbd3477..012a920 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -1,5 +1,7 @@ import argparse +import asyncio import dataclasses +import json import logging import os import sys @@ -12,6 +14,8 @@ from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway +LOG = logging.getLogger(__name__) + def _pick(services: list[Target], index: int | None): if not services: @@ -42,6 +46,147 @@ def _discover_targets(timeout_s: float) -> list[Target]: return merged +def _safe_fetch_json(gateway: DevialetHttpGateway, path: str) -> dict | None: + try: + data = asyncio.run(gateway.fetch_json_async(path)) + if isinstance(data, dict): + return data + except Exception as exc: + LOG.debug("tree fetch failed path=%s host=%s err=%s", path, gateway.address, exc) + return None + + +def _build_topology_tree(targets: list[Target]) -> dict: + devices_by_id: dict[str, dict] = {} + systems: dict[str, dict] = {} + groups: dict[str, dict] = {} + + for target in targets: + gateway = DevialetHttpGateway(target.address, target.port, target.base_path) + device = _safe_fetch_json(gateway, "/devices/current") + if not device: + continue + + device_id = str(device.get("deviceId") or f"dispatcher:{target.address}") + system_id = str(device.get("systemId") or "") or None + group_id = str(device.get("groupId") or "") or None + devices_by_id[device_id] = { + "device_id": device_id, + "device_name": device.get("deviceName") or device.get("model") or device_id, + "model": str(device.get("model") or ""), + "role": str(device.get("role") or ""), + "serial": str(device.get("serial") or ""), + "system_id": system_id, + "group_id": group_id, + "address": target.address, + "port": target.port, + } + + if not devices_by_id: + return {"groups": [], "ungrouped_devices": [], "errors": ["No Devialet devices detected."]} + + for dev in devices_by_id.values(): + if not dev["system_id"]: + continue + systems.setdefault( + dev["system_id"], + {"name": None, "group_id": dev["group_id"], "devices": []}, + ) + systems[dev["system_id"]]["devices"].append(dev) + + for system_id, system_data in systems.items(): + probe_device = system_data["devices"][0] + gateway = DevialetHttpGateway( + probe_device["address"], + probe_device["port"], + "/ipcontrol/v1", + ) + sys_info = _safe_fetch_json(gateway, "/systems/current") + if sys_info: + system_data["name"] = str(sys_info.get("systemName") or system_id) + sys_group_id = str(sys_info.get("groupId") or "") or None + if sys_group_id: + system_data["group_id"] = sys_group_id + else: + system_data["name"] = system_id + + group_id = system_data["group_id"] or "ungrouped" + groups.setdefault(group_id, {"systems": {}}) + groups[group_id]["systems"][system_id] = system_data + + group_items: list[dict] = [] + for group_id in sorted(groups.keys()): + group_data = groups[group_id] + systems_items: list[dict] = [] + for system_id in sorted(group_data["systems"].keys()): + system_data = group_data["systems"][system_id] + devices_items = [ + { + "device_id": dev["device_id"], + "device_name": dev["device_name"], + "model": dev["model"], + "role": dev["role"], + "serial": dev["serial"], + "address": dev["address"], + "port": dev["port"], + } + for dev in sorted(system_data["devices"], key=lambda d: d["device_name"]) + ] + systems_items.append( + { + "system_id": system_id, + "system_name": system_data["name"], + "devices": devices_items, + } + ) + group_items.append( + { + "group_id": group_id, + "systems": systems_items, + } + ) + + ungrouped_devices = [ + { + "device_id": dev["device_id"], + "device_name": dev["device_name"], + "model": dev["model"], + "role": dev["role"], + "serial": dev["serial"], + "address": dev["address"], + "port": dev["port"], + } + for dev in sorted( + [d for d in devices_by_id.values() if d["system_id"] is None], + key=lambda d: d["device_name"], + ) + ] + + return {"groups": group_items, "ungrouped_devices": ungrouped_devices, "errors": []} + + +def _render_topology_tree_lines(tree: dict) -> list[str]: + if tree.get("errors"): + return tree["errors"] + + lines: list[str] = [] + for group in tree.get("groups", []): + lines.append(f"Group {group['group_id']}") + for system in group.get("systems", []): + lines.append(f" System {system['system_name']} ({system['system_id']})") + for dev in system.get("devices", []): + role = f" role={dev['role']}" if dev.get("role") else "" + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}{role}") + + if tree.get("ungrouped_devices"): + lines.append("Ungrouped devices") + for dev in tree["ungrouped_devices"]: + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}") + return lines + + def _target_from_args(args) -> Target: ip = args.ip port = args.port @@ -123,6 +268,8 @@ def main() -> None: sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("list") + tree = sub.add_parser("tree") + tree.add_argument("--json", action="store_true", dest="tree_json") sub.add_parser("systems") sub.add_parser("getvol") sub.add_parser("volup") @@ -163,6 +310,18 @@ def main() -> None: for i, s in enumerate(services): print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") return + if args.cmd == "tree": + services = _discover_targets(timeout_s=args.discover_timeout) + if not services: + print("No service detected.") + return + tree = _build_topology_tree(services) + if args.tree_json: + print(json.dumps(tree, indent=2, ensure_ascii=False)) + return + for line in _render_topology_tree_lines(tree): + print(line) + return if args.cmd == "daemon": try: diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 3fab9c3..7d7c1bd 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -1,3 +1,4 @@ +import json import sys import pytest @@ -5,6 +6,15 @@ from devialetctl.interfaces import cli +@pytest.fixture(autouse=True) +def _disable_real_upnp_discovery(monkeypatch) -> None: + class FakeUpnpDiscovery: + def discover(self, timeout_s): + return [] + + monkeypatch.setattr(cli, "UpnpDiscoveryGateway", lambda: FakeUpnpDiscovery()) + + def test_cli_list_prints_discovered_services(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): @@ -186,6 +196,91 @@ def run_forever(self, input_name): assert "Daemon error: boom" in err +def test_cli_tree_prints_groups_systems_devices(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + "model": "Phantom I", + "role": "Mono", + } + if path == "/systems/current": + return {"systemId": "sys-1", "groupId": "grp-1", "systemName": "Salon"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree"]) + cli.main() + + out = capsys.readouterr().out + assert "Group grp-1" in out + assert "System Salon (sys-1)" in out + assert "Device Living Room @ 10.0.0.2 model=Phantom I role=Mono" in out + + +def test_cli_tree_json_outputs_structured_topology(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + "model": "Phantom I", + "role": "Mono", + } + if path == "/systems/current": + return {"systemId": "sys-1", "groupId": "grp-1", "systemName": "Salon"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree", "--json"]) + cli.main() + + data = json.loads(capsys.readouterr().out) + assert data["groups"][0]["group_id"] == "grp-1" + assert data["groups"][0]["systems"][0]["system_name"] == "Salon" + assert data["groups"][0]["systems"][0]["devices"][0]["device_name"] == "Living Room" + assert "sources" not in data["groups"][0] + + def test_cli_list_applies_log_level_from_env(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): From f6d5633d377737cf744ee803adcad4a6cef512e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Tue, 17 Feb 2026 14:04:53 +0100 Subject: [PATCH 03/14] Update ARCHITECTURE.md --- ARCHITECTURE.md | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 601506f..64b3c30 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -26,7 +26,8 @@ Provide a maintainable local-control platform for Devialet Phantom volume, with: - `ports.py`: contracts (`VolumeGateway`, discovery target models) - `src/devialetctl/infrastructure` - `devialet_gateway.py`: async HTTP calls to Devialet API (`httpx.AsyncClient`) - - `mdns_gateway.py`: zeroconf discovery + filtering + - `mdns_gateway.py`: mDNS/zeroconf discovery + filtering + - `upnp_gateway.py`: SSDP/UPnP discovery (`MediaRenderer:2`) - `cec_adapter.py`: Linux CEC kernel adapter (`/dev/cec0`, ioctl, async event stream) - `keyboard_adapter.py`: single-key or line-based keyboard input - `config.py`: typed runtime config (TOML + env overrides) @@ -62,6 +63,7 @@ flowchart LR keyboardAdapter[KeyboardAdapter] httpGateway[DevialetHttpGateway] mdnsGateway[MdnsDiscoveryGateway] + upnpGateway[UpnpDiscoveryGateway] configLoader[ConfigLoader] end @@ -79,8 +81,11 @@ flowchart LR volumeService --> httpGateway httpGateway --> devialetSpeaker devialetSpeaker --> mdnsGateway + devialetSpeaker --> upnpGateway daemonInterface --> mdnsGateway + daemonInterface --> upnpGateway cliInterface --> mdnsGateway + cliInterface --> upnpGateway configLoader --> daemonInterface configLoader --> cliInterface ``` @@ -88,7 +93,7 @@ flowchart LR ## Command Surface - Direct control: - - `list`, `systems`, `getvol`, `setvol`, `volup`, `voldown`, `mute` + - `list`, `tree`, `systems`, `getvol`, `setvol`, `volup`, `voldown`, `mute` - Daemon: - `daemon --input cec` - `daemon --input keyboard` @@ -97,7 +102,14 @@ flowchart LR ## Runtime Behavior -- mDNS discovery filters likely Devialet devices by service identity heuristics. +- Discovery uses merged mDNS + UPnP: + - mDNS path: `_http._tcp.local` browsing with TXT/path-based filtering. + - UPnP path: SSDP `M-SEARCH` with target `urn:schemas-upnp-org:device:MediaRenderer:2`. + - targets are deduplicated by `(address, port, base_path)` before selection. +- `tree` command builds a topology from "current" endpoints: + - per discovered dispatcher: `/devices/current` + - per inferred system: `/systems/current` + - groups are rebuilt from `groupId`/`systemId` relationships. - Base path is normalized defensively: - `None`, `""`, `/` -> `/ipcontrol/v1` - missing leading slash is corrected. @@ -178,8 +190,8 @@ Current test suite covers: - keyboard parser behavior - event policy dedupe/rate-limit - daemon routing in CEC and keyboard modes -- CLI regressions (including daemon argument handling) -- mDNS filtering +- CLI regressions (including daemon argument handling and `tree --json`) +- mDNS + UPnP discovery integration - base-path normalization - compatibility wrappers From 4e3507bbca7841380965ec677d406583acd763f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 08:42:55 +0100 Subject: [PATCH 04/14] Pick by system name --- src/devialetctl/interfaces/cli.py | 52 +++++++++++++ tests/test_cli_regression.py | 124 ++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 012a920..20fc664 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -33,6 +33,49 @@ def _pick(services: list[Target], index: int | None): return services[index] +def _pick_by_system_name(services: list[Target], system_name: str) -> Target: + if not services: + raise RuntimeError( + "No service detected via mDNS/UPnP. Check network / Wi-Fi isolation." + ) + requested = system_name.strip() + if not requested: + raise RuntimeError("System name cannot be empty.") + + tree = _build_topology_tree(services) + matches: list[tuple[str, dict]] = [] + for group in tree.get("groups", []): + group_id = str(group.get("group_id", "ungrouped")) + for system in group.get("systems", []): + if str(system.get("system_name", "")).casefold() == requested.casefold(): + matches.append((group_id, system)) + + if not matches: + raise RuntimeError( + f"System '{requested}' not found. Run 'devialetctl tree' to list available systems." + ) + if len(matches) > 1: + groups = ", ".join(sorted({m[0] for m in matches})) + raise RuntimeError( + f"System name '{requested}' is ambiguous across groups: {groups}. " + "Use --index or rename systems." + ) + + group_id, system = matches[0] + devices = system.get("devices", []) + if not devices: + raise RuntimeError( + f"System '{requested}' has no reachable devices in group {group_id}." + ) + selected = devices[0] + return Target( + address=str(selected["address"]), + port=int(selected["port"]), + base_path="/ipcontrol/v1", + name=f"{requested}@{group_id}", + ) + + def _discover_targets(timeout_s: float) -> list[Target]: seen: set[tuple[str, int, str]] = set() merged: list[Target] = [] @@ -193,6 +236,7 @@ def _target_from_args(args) -> Target: base_path = args.base_path discover_timeout = args.discover_timeout index = args.index + system = args.system if getattr(args, "cmd", None) == "daemon": ip = args.daemon_ip if args.daemon_ip is not None else ip @@ -204,12 +248,15 @@ def _target_from_args(args) -> Target: else discover_timeout ) index = args.daemon_index if args.daemon_index is not None else index + system = args.daemon_system if args.daemon_system is not None else system if ip: return Target( address=ip, port=port, base_path=normalize_base_path(base_path), name="manual" ) services = _discover_targets(timeout_s=discover_timeout) + if system is not None: + return _pick_by_system_name(services, system) return _pick(services, index) @@ -237,6 +284,9 @@ def _target_from_config(args) -> Target: if args.daemon_discover_timeout is not None else cfg.target.discover_timeout ) + if args.daemon_system is not None: + services = _discover_targets(timeout_s=timeout) + return _pick_by_system_name(services, args.daemon_system) index = args.daemon_index if args.daemon_index is not None else cfg.target.index services = _discover_targets(timeout_s=timeout) return _pick(services, index) @@ -262,6 +312,7 @@ def main() -> None: ) p.add_argument("--discover-timeout", type=float, default=3.0) p.add_argument("--index", type=int, default=None) + p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") p.add_argument("--port", type=int, default=80) p.add_argument("--base-path", type=str, default="/ipcontrol/v1") @@ -285,6 +336,7 @@ def main() -> None: "--discover-timeout", dest="daemon_discover_timeout", type=float, default=None ) daemon.add_argument("--index", dest="daemon_index", type=int, default=None) + daemon.add_argument("--system", dest="daemon_system", type=str, default=None) daemon.add_argument("--ip", dest="daemon_ip", type=str, default=None) daemon.add_argument("--port", dest="daemon_port", type=int, default=None) daemon.add_argument("--base-path", dest="daemon_base_path", type=str, default=None) diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 7d7c1bd..65899cb 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -150,6 +150,130 @@ def run_forever(self, input_name): assert FakeGateway.picked_address == "10.0.0.11" +def test_cli_getvol_accepts_system_name_selection(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Salon: + name = "salon" + address = "10.0.0.50" + port = 80 + base_path = "/ipcontrol/v1" + + class TvLeft: + name = "tv-left" + address = "10.0.0.71" + port = 80 + base_path = "/ipcontrol/v1" + + class TvRight: + name = "tv-right" + address = "10.0.0.184" + port = 80 + base_path = "/ipcontrol/v1" + + return [Salon(), TvLeft(), TvRight()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + if self.address == "10.0.0.50": + return { + "deviceId": "salon-1", + "systemId": "sys-salon", + "groupId": "grp-salon", + "deviceName": "Salon", + "model": "Phantom", + "role": "Mono", + } + if self.address == "10.0.0.71": + return { + "deviceId": "tv-left", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV Gauche", + "model": "Phantom", + "role": "FrontLeft", + } + return { + "deviceId": "tv-right", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV Droite", + "model": "Phantom", + "role": "FrontRight", + } + if path == "/systems/current": + if self.address in {"10.0.0.71", "10.0.0.184"}: + return {"systemName": "TV", "groupId": "grp-tv"} + return {"systemName": "Salon", "groupId": "grp-salon"} + return {} + + async def systems_async(self): + return {} + + async def get_volume_async(self): + return int(self.address.split(".")[-1]) + + async def set_volume_async(self, value): + return None + + async def volume_up_async(self): + return None + + async def volume_down_async(self): + return None + + async def mute_toggle_async(self): + return None + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "getvol"]) + cli.main() + out = capsys.readouterr().out + assert out.strip() in {"71", "184"} + + +def test_cli_getvol_system_name_not_found(monkeypatch) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + } + if path == "/systems/current": + return {"systemName": "Salon", "groupId": "grp-1"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "getvol"]) + with pytest.raises(RuntimeError) as exc: + cli.main() + assert "System 'TV' not found" in str(exc.value) + + def test_cli_list_when_empty(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): From f743699d982aaf6129795ddab899043638f9e42d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 08:49:22 +0100 Subject: [PATCH 05/14] Remove --index and --base-path --- ARCHITECTURE.md | 6 ++--- README.md | 11 +++----- src/devialetctl/interfaces/cli.py | 43 ++++++++++--------------------- tests/test_cli_helpers.py | 13 +++------- tests/test_cli_regression.py | 32 ++++++++++++++++++----- 5 files changed, 50 insertions(+), 55 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 64b3c30..0602dc2 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -98,12 +98,12 @@ flowchart LR - `daemon --input cec` - `daemon --input keyboard` - Target selection: - - global args and daemon-specific target args (`--ip`, `--port`, `--base-path`, `--index`, `--discover-timeout`) + - global args and daemon-specific target args (`--ip`, `--port`, `--system`, `--discover-timeout`) ## Runtime Behavior - Discovery uses merged mDNS + UPnP: - - mDNS path: `_http._tcp.local` browsing with TXT/path-based filtering. + - mDNS path: `_whatsup._tcp.local` browsing. - UPnP path: SSDP `M-SEARCH` with target `urn:schemas-upnp-org:device:MediaRenderer:2`. - targets are deduplicated by `(address, port, base_path)` before selection. - `tree` command builds a topology from "current" endpoints: @@ -171,7 +171,7 @@ Config source priority: 4. built-in defaults Relevant settings: -- target: `ip`, `port`, `base_path`, `discover_timeout`, `index` +- target: `ip`, `port`, `base_path`, `discover_timeout` - daemon: `cec_device`, `cec_osd_name`, `cec_vendor_compat`, `reconnect_delay_s`, `log_level` - policy: `dedupe_window_s`, `min_interval_s` diff --git a/README.md b/README.md index 4863468..03a2c52 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ Use cases: ## Features -- mDNS discovery (`_http._tcp.local`) with UPnP fallback +- mDNS discovery (`_whatsup._tcp.local`) merged with UPnP discovery - volume commands: `getvol`, `setvol`, `volup`, `voldown`, `mute` -- manual target override (`--ip`, `--port`, `--base-path`) +- manual target override (`--ip`, `--port`) - long-running daemon mode (`daemon --input cec`) - keyboard test mode (`daemon --input keyboard`) - typed config via TOML + env overrides @@ -66,7 +66,7 @@ uv run devialetctl setvol 35 Use explicit target: ```bash -uv run devialetctl --ip 192.168.1.42 --port 80 --base-path /ipcontrol/v1 getvol +uv run devialetctl --ip 192.168.1.42 --port 80 getvol ``` ## Daemon (CEC Input) @@ -106,7 +106,7 @@ docker run --rm -it \ Notes: - `--device /dev/cec0:/dev/cec0` passes the Linux CEC device into the container. -- `--network host` is required for mDNS discovery (`_http._tcp.local`) from the container. +- `--network host` is required for mDNS discovery (`_whatsup._tcp.local`) from the container. - `--cec-vendor-compat samsung` enables Samsung vendor compatibility mode for this run. - alternatively, set `DEVIALETCTL_CEC_VENDOR_COMPAT=samsung` to make it the environment default. - with a fixed target IP (`DEVIALETCTL_IP`), you can usually run without host networking. @@ -145,9 +145,6 @@ min_interval_s = 0.12 [target] ip = "192.168.1.42" port = 80 -base_path = "/ipcontrol/v1" -discover_timeout = 3.0 -index = 0 ``` Use `log_level = "DEBUG"` (or `DEVIALETCTL_LOG_LEVEL=DEBUG`) to log raw HDMI-CEC frames: diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 20fc664..42f0429 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -10,27 +10,25 @@ from devialetctl.application.ports import Target from devialetctl.application.service import VolumeService from devialetctl.infrastructure.config import load_config -from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway, normalize_base_path +from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway LOG = logging.getLogger(__name__) -def _pick(services: list[Target], index: int | None): +def _pick(services: list[Target]) -> Target: if not services: raise RuntimeError( "No service detected via mDNS/UPnP. Check network / Wi-Fi isolation." ) - if index is None: - if len(services) == 1: - return services[0] - for i, s in enumerate(services): - print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") - raise RuntimeError("Multiple services detected. Run again with --index N.") - if index < 0 or index >= len(services): - raise RuntimeError(f"Invalid index: {index}") - return services[index] + if len(services) == 1: + return services[0] + for i, s in enumerate(services): + print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") + raise RuntimeError( + "Multiple services detected. Use --system to pick one, or --ip to force a target." + ) def _pick_by_system_name(services: list[Target], system_name: str) -> Target: @@ -58,7 +56,7 @@ def _pick_by_system_name(services: list[Target], system_name: str) -> Target: groups = ", ".join(sorted({m[0] for m in matches})) raise RuntimeError( f"System name '{requested}' is ambiguous across groups: {groups}. " - "Use --index or rename systems." + "Use --ip or rename systems." ) group_id, system = matches[0] @@ -233,31 +231,25 @@ def _render_topology_tree_lines(tree: dict) -> list[str]: def _target_from_args(args) -> Target: ip = args.ip port = args.port - base_path = args.base_path discover_timeout = args.discover_timeout - index = args.index system = args.system if getattr(args, "cmd", None) == "daemon": ip = args.daemon_ip if args.daemon_ip is not None else ip port = args.daemon_port if args.daemon_port is not None else port - base_path = args.daemon_base_path if args.daemon_base_path is not None else base_path discover_timeout = ( args.daemon_discover_timeout if args.daemon_discover_timeout is not None else discover_timeout ) - index = args.daemon_index if args.daemon_index is not None else index system = args.daemon_system if args.daemon_system is not None else system if ip: - return Target( - address=ip, port=port, base_path=normalize_base_path(base_path), name="manual" - ) + return Target(address=ip, port=port, base_path="/ipcontrol/v1", name="manual") services = _discover_targets(timeout_s=discover_timeout) if system is not None: return _pick_by_system_name(services, system) - return _pick(services, index) + return _pick(services) def _target_from_config(args) -> Target: @@ -266,9 +258,7 @@ def _target_from_config(args) -> Target: return Target( address=args.daemon_ip, port=args.daemon_port if args.daemon_port is not None else 80, - base_path=normalize_base_path( - args.daemon_base_path if args.daemon_base_path is not None else "/ipcontrol/v1" - ), + base_path="/ipcontrol/v1", name="manual", ) @@ -287,9 +277,8 @@ def _target_from_config(args) -> Target: if args.daemon_system is not None: services = _discover_targets(timeout_s=timeout) return _pick_by_system_name(services, args.daemon_system) - index = args.daemon_index if args.daemon_index is not None else cfg.target.index services = _discover_targets(timeout_s=timeout) - return _pick(services, index) + return _pick(services) def _configure_logging(level: str) -> None: @@ -311,11 +300,9 @@ def main() -> None: help="Override log level (e.g. DEBUG, INFO, WARNING).", ) p.add_argument("--discover-timeout", type=float, default=3.0) - p.add_argument("--index", type=int, default=None) p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") p.add_argument("--port", type=int, default=80) - p.add_argument("--base-path", type=str, default="/ipcontrol/v1") sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("list") @@ -335,11 +322,9 @@ def main() -> None: daemon.add_argument( "--discover-timeout", dest="daemon_discover_timeout", type=float, default=None ) - daemon.add_argument("--index", dest="daemon_index", type=int, default=None) daemon.add_argument("--system", dest="daemon_system", type=str, default=None) daemon.add_argument("--ip", dest="daemon_ip", type=str, default=None) daemon.add_argument("--port", dest="daemon_port", type=int, default=None) - daemon.add_argument("--base-path", dest="daemon_base_path", type=str, default=None) daemon.add_argument("--cec-device", dest="daemon_cec_device", type=str, default=None) daemon.add_argument("--cec-osd-name", dest="daemon_cec_osd_name", type=str, default=None) daemon.add_argument( diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index 731d450..dfdb6b0 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -11,12 +11,7 @@ def _service(name="dev", address="10.0.0.2", port=80, base_path="/ipcontrol/v1") def test_pick_raises_on_empty_list() -> None: with pytest.raises(RuntimeError, match="No service"): - cli._pick([], None) - - -def test_pick_raises_on_invalid_index() -> None: - with pytest.raises(RuntimeError, match="Invalid index"): - cli._pick([_service()], 5) + cli._pick([]) def test_target_from_args_uses_daemon_overrides() -> None: @@ -24,14 +19,12 @@ def test_target_from_args_uses_daemon_overrides() -> None: cmd="daemon", ip=None, port=80, - base_path="/ipcontrol/v1", discover_timeout=3.0, - index=None, + system=None, daemon_ip="10.0.0.9", daemon_port=8080, - daemon_base_path="/", daemon_discover_timeout=1.0, - daemon_index=1, + daemon_system=None, ) target = cli._target_from_args(args) assert target.address == "10.0.0.9" diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 65899cb..dc347cb 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -107,7 +107,7 @@ def run_forever(self, input_name): assert FakeRunner.called_with == "keyboard" -def test_cli_daemon_accepts_subcommand_index(monkeypatch) -> None: +def test_cli_daemon_accepts_subcommand_system(monkeypatch) -> None: class FakeDiscovery: def discover(self, timeout_s): class S0: @@ -129,6 +129,30 @@ class FakeGateway: def __init__(self, address, port, base_path): FakeGateway.picked_address = address + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + if self.address == "10.0.0.10": + return { + "deviceId": "dev0", + "systemId": "sys-other", + "groupId": "grp-other", + "deviceName": "Other", + } + return { + "deviceId": "dev1", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV", + } + if path == "/systems/current": + if self.address == "10.0.0.10": + return {"systemName": "Other", "groupId": "grp-other"} + return {"systemName": "TV", "groupId": "grp-tv"} + return {} class FakeRunner: def __init__(self, cfg, gateway): @@ -141,11 +165,7 @@ def run_forever(self, input_name): monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) monkeypatch.setattr(cli, "DaemonRunner", FakeRunner) - monkeypatch.setattr( - sys, - "argv", - ["devialetctl", "daemon", "--input", "keyboard", "--index", "1"], - ) + monkeypatch.setattr(sys, "argv", ["devialetctl", "daemon", "--input", "keyboard", "--system", "TV"]) cli.main() assert FakeGateway.picked_address == "10.0.0.11" From 8e977488ff11f75c7df9328046b7481912573292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:07:52 +0100 Subject: [PATCH 06/14] cli: refactor --- src/devialetctl/interfaces/cli.py | 161 +++++++++++++----------------- tests/test_cli_helpers.py | 15 ++- tests/test_cli_regression.py | 24 ++++- 3 files changed, 100 insertions(+), 100 deletions(-) diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 42f0429..acb95b5 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -17,6 +17,40 @@ LOG = logging.getLogger(__name__) +@dataclasses.dataclass(frozen=True) +class _EffectiveOptions: + ip: str | None + port: int + discover_timeout: float + system: str | None + cec_device: str + cec_osd_name: str + cec_vendor_compat: str + + +def _effective_options(args, cfg) -> _EffectiveOptions: + cec_device_arg = getattr(args, "cec_device", None) + cec_osd_name_arg = getattr(args, "cec_osd_name", None) + cec_vendor_compat_arg = getattr(args, "cec_vendor_compat", None) + return _EffectiveOptions( + ip=args.ip if args.ip is not None else cfg.target.ip, + port=args.port if args.port is not None else cfg.target.port, + discover_timeout=( + args.discover_timeout + if args.discover_timeout is not None + else cfg.target.discover_timeout + ), + system=args.system, + cec_device=cec_device_arg if cec_device_arg is not None else cfg.cec_device, + cec_osd_name=cec_osd_name_arg if cec_osd_name_arg is not None else cfg.cec_osd_name, + cec_vendor_compat=( + cec_vendor_compat_arg + if cec_vendor_compat_arg is not None + else cfg.cec_vendor_compat + ), + ) + + def _pick(services: list[Target]) -> Target: if not services: raise RuntimeError( @@ -228,56 +262,12 @@ def _render_topology_tree_lines(tree: dict) -> list[str]: return lines -def _target_from_args(args) -> Target: - ip = args.ip - port = args.port - discover_timeout = args.discover_timeout - system = args.system - - if getattr(args, "cmd", None) == "daemon": - ip = args.daemon_ip if args.daemon_ip is not None else ip - port = args.daemon_port if args.daemon_port is not None else port - discover_timeout = ( - args.daemon_discover_timeout - if args.daemon_discover_timeout is not None - else discover_timeout - ) - system = args.daemon_system if args.daemon_system is not None else system - - if ip: - return Target(address=ip, port=port, base_path="/ipcontrol/v1", name="manual") - services = _discover_targets(timeout_s=discover_timeout) - if system is not None: - return _pick_by_system_name(services, system) - return _pick(services) - - -def _target_from_config(args) -> Target: - cfg = load_config(args.config) - if args.daemon_ip is not None: - return Target( - address=args.daemon_ip, - port=args.daemon_port if args.daemon_port is not None else 80, - base_path="/ipcontrol/v1", - name="manual", - ) - - if cfg.target.ip: - return Target( - address=cfg.target.ip, - port=cfg.target.port, - base_path=cfg.target.base_path, - name="config", - ) - timeout = ( - args.daemon_discover_timeout - if args.daemon_discover_timeout is not None - else cfg.target.discover_timeout - ) - if args.daemon_system is not None: - services = _discover_targets(timeout_s=timeout) - return _pick_by_system_name(services, args.daemon_system) - services = _discover_targets(timeout_s=timeout) +def _target_from_resolved(resolved: _EffectiveOptions) -> Target: + if resolved.ip: + return Target(address=resolved.ip, port=resolved.port, base_path="/ipcontrol/v1", name="manual") + services = _discover_targets(timeout_s=resolved.discover_timeout) + if resolved.system is not None: + return _pick_by_system_name(services, resolved.system) return _pick(services) @@ -289,20 +279,35 @@ def _configure_logging(level: str) -> None: ) +def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> None: + if args.ip and args.system: + parser.error( + "--ip and --system are not compatible: --ip disables discovery while --system requires discovery." + ) + + def main() -> None: p = argparse.ArgumentParser( prog="devialetctl", description="Devialet Phantom IP Control (discover + commands)" ) + p.add_argument("--config", type=str, default=None) p.add_argument( "--log-level", type=str, default=None, help="Override log level (e.g. DEBUG, INFO, WARNING).", ) - p.add_argument("--discover-timeout", type=float, default=3.0) + p.add_argument("--discover-timeout", type=float, default=None) p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") - p.add_argument("--port", type=int, default=80) + p.add_argument("--port", type=int, default=None) + p.add_argument("--cec-device", type=str, default=None) + p.add_argument("--cec-osd-name", type=str, default=None) + p.add_argument( + "--cec-vendor-compat", + choices=["none", "samsung"], + default=None, + ) sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("list") @@ -318,29 +323,16 @@ def main() -> None: daemon = sub.add_parser("daemon") daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") - daemon.add_argument("--config", type=str, default=None) - daemon.add_argument( - "--discover-timeout", dest="daemon_discover_timeout", type=float, default=None - ) - daemon.add_argument("--system", dest="daemon_system", type=str, default=None) - daemon.add_argument("--ip", dest="daemon_ip", type=str, default=None) - daemon.add_argument("--port", dest="daemon_port", type=int, default=None) - daemon.add_argument("--cec-device", dest="daemon_cec_device", type=str, default=None) - daemon.add_argument("--cec-osd-name", dest="daemon_cec_osd_name", type=str, default=None) - daemon.add_argument( - "--cec-vendor-compat", - dest="daemon_cec_vendor_compat", - choices=["none", "samsung"], - default=None, - ) args = p.parse_args() + _validate_target_selection_args(p, args) + cfg = load_config(args.config) + resolved = _effective_options(args, cfg) requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") - if requested_log_level is not None: - _configure_logging(requested_log_level) + _configure_logging(requested_log_level if requested_log_level is not None else cfg.log_level) if args.cmd == "list": - services = _discover_targets(timeout_s=args.discover_timeout) + services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: print("No service detected.") return @@ -348,7 +340,7 @@ def main() -> None: print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") return if args.cmd == "tree": - services = _discover_targets(timeout_s=args.discover_timeout) + services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: print("No service detected.") return @@ -360,29 +352,17 @@ def main() -> None: print(line) return + target = _target_from_resolved(resolved) + gateway = DevialetHttpGateway(target.address, target.port, target.base_path) + if args.cmd == "daemon": try: - cfg = load_config(args.config) cfg = dataclasses.replace( cfg, - cec_device=( - args.daemon_cec_device if args.daemon_cec_device is not None else cfg.cec_device - ), - cec_osd_name=( - args.daemon_cec_osd_name - if args.daemon_cec_osd_name is not None - else cfg.cec_osd_name - ), - cec_vendor_compat=( - args.daemon_cec_vendor_compat - if args.daemon_cec_vendor_compat is not None - else cfg.cec_vendor_compat - ), + cec_device=resolved.cec_device, + cec_osd_name=resolved.cec_osd_name, + cec_vendor_compat=resolved.cec_vendor_compat, ) - if requested_log_level is None: - _configure_logging(cfg.log_level) - target = _target_from_config(args) - gateway = DevialetHttpGateway(target.address, target.port, target.base_path) runner = DaemonRunner(cfg=cfg, gateway=gateway) runner.run_forever(input_name=args.input) return @@ -392,8 +372,7 @@ def main() -> None: print(f"Daemon error: {exc}", file=sys.stderr) raise SystemExit(2) - target = _target_from_args(args) - client = VolumeService(DevialetHttpGateway(target.address, target.port, target.base_path)) + client = VolumeService(gateway) try: if args.cmd == "systems": diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index dfdb6b0..cccaa14 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -3,6 +3,7 @@ import pytest from devialetctl.interfaces import cli +from devialetctl.infrastructure.config import DaemonConfig, RuntimeTarget def _service(name="dev", address="10.0.0.2", port=80, base_path="/ipcontrol/v1"): @@ -15,18 +16,16 @@ def test_pick_raises_on_empty_list() -> None: def test_target_from_args_uses_daemon_overrides() -> None: + cfg = DaemonConfig(target=RuntimeTarget()) args = SimpleNamespace( cmd="daemon", - ip=None, - port=80, - discover_timeout=3.0, + ip="10.0.0.9", + port=8080, + discover_timeout=None, system=None, - daemon_ip="10.0.0.9", - daemon_port=8080, - daemon_discover_timeout=1.0, - daemon_system=None, ) - target = cli._target_from_args(args) + resolved = cli._effective_options(args, cfg) + target = cli._target_from_resolved(resolved) assert target.address == "10.0.0.9" assert target.port == 8080 assert target.base_path == "/ipcontrol/v1" diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index dc347cb..2bb2901 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -165,7 +165,7 @@ def run_forever(self, input_name): monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) monkeypatch.setattr(cli, "DaemonRunner", FakeRunner) - monkeypatch.setattr(sys, "argv", ["devialetctl", "daemon", "--input", "keyboard", "--system", "TV"]) + monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "daemon", "--input", "keyboard"]) cli.main() assert FakeGateway.picked_address == "10.0.0.11" @@ -294,6 +294,28 @@ async def fetch_json_async(self, path): assert "System 'TV' not found" in str(exc.value) +def test_cli_rejects_ip_and_system_together(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "getvol"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip and --system are not compatible" in err + + +def test_cli_daemon_rejects_ip_and_system_together(monkeypatch, capsys) -> None: + monkeypatch.setattr( + sys, + "argv", + ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "daemon", "--input", "keyboard"], + ) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip and --system are not compatible" in err + + def test_cli_list_when_empty(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): From 8146550536b4a673cb09c4db6debbde4380fb59b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:19:35 +0100 Subject: [PATCH 07/14] Introduce topology struct --- src/devialetctl/interfaces/cli.py | 305 ++++++------------------- src/devialetctl/interfaces/topology.py | 236 +++++++++++++++++++ 2 files changed, 300 insertions(+), 241 deletions(-) create mode 100644 src/devialetctl/interfaces/topology.py diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index acb95b5..1cb210c 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -1,5 +1,4 @@ import argparse -import asyncio import dataclasses import json import logging @@ -13,6 +12,11 @@ from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway +from devialetctl.interfaces.topology import ( + build_topology_tree, + pick_target_by_system_name, + render_topology_tree_lines, +) LOG = logging.getLogger(__name__) @@ -65,49 +69,6 @@ def _pick(services: list[Target]) -> Target: ) -def _pick_by_system_name(services: list[Target], system_name: str) -> Target: - if not services: - raise RuntimeError( - "No service detected via mDNS/UPnP. Check network / Wi-Fi isolation." - ) - requested = system_name.strip() - if not requested: - raise RuntimeError("System name cannot be empty.") - - tree = _build_topology_tree(services) - matches: list[tuple[str, dict]] = [] - for group in tree.get("groups", []): - group_id = str(group.get("group_id", "ungrouped")) - for system in group.get("systems", []): - if str(system.get("system_name", "")).casefold() == requested.casefold(): - matches.append((group_id, system)) - - if not matches: - raise RuntimeError( - f"System '{requested}' not found. Run 'devialetctl tree' to list available systems." - ) - if len(matches) > 1: - groups = ", ".join(sorted({m[0] for m in matches})) - raise RuntimeError( - f"System name '{requested}' is ambiguous across groups: {groups}. " - "Use --ip or rename systems." - ) - - group_id, system = matches[0] - devices = system.get("devices", []) - if not devices: - raise RuntimeError( - f"System '{requested}' has no reachable devices in group {group_id}." - ) - selected = devices[0] - return Target( - address=str(selected["address"]), - port=int(selected["port"]), - base_path="/ipcontrol/v1", - name=f"{requested}@{group_id}", - ) - - def _discover_targets(timeout_s: float) -> list[Target]: seen: set[tuple[str, int, str]] = set() merged: list[Target] = [] @@ -121,153 +82,12 @@ def _discover_targets(timeout_s: float) -> list[Target]: return merged -def _safe_fetch_json(gateway: DevialetHttpGateway, path: str) -> dict | None: - try: - data = asyncio.run(gateway.fetch_json_async(path)) - if isinstance(data, dict): - return data - except Exception as exc: - LOG.debug("tree fetch failed path=%s host=%s err=%s", path, gateway.address, exc) - return None - - -def _build_topology_tree(targets: list[Target]) -> dict: - devices_by_id: dict[str, dict] = {} - systems: dict[str, dict] = {} - groups: dict[str, dict] = {} - - for target in targets: - gateway = DevialetHttpGateway(target.address, target.port, target.base_path) - device = _safe_fetch_json(gateway, "/devices/current") - if not device: - continue - - device_id = str(device.get("deviceId") or f"dispatcher:{target.address}") - system_id = str(device.get("systemId") or "") or None - group_id = str(device.get("groupId") or "") or None - devices_by_id[device_id] = { - "device_id": device_id, - "device_name": device.get("deviceName") or device.get("model") or device_id, - "model": str(device.get("model") or ""), - "role": str(device.get("role") or ""), - "serial": str(device.get("serial") or ""), - "system_id": system_id, - "group_id": group_id, - "address": target.address, - "port": target.port, - } - - if not devices_by_id: - return {"groups": [], "ungrouped_devices": [], "errors": ["No Devialet devices detected."]} - - for dev in devices_by_id.values(): - if not dev["system_id"]: - continue - systems.setdefault( - dev["system_id"], - {"name": None, "group_id": dev["group_id"], "devices": []}, - ) - systems[dev["system_id"]]["devices"].append(dev) - - for system_id, system_data in systems.items(): - probe_device = system_data["devices"][0] - gateway = DevialetHttpGateway( - probe_device["address"], - probe_device["port"], - "/ipcontrol/v1", - ) - sys_info = _safe_fetch_json(gateway, "/systems/current") - if sys_info: - system_data["name"] = str(sys_info.get("systemName") or system_id) - sys_group_id = str(sys_info.get("groupId") or "") or None - if sys_group_id: - system_data["group_id"] = sys_group_id - else: - system_data["name"] = system_id - - group_id = system_data["group_id"] or "ungrouped" - groups.setdefault(group_id, {"systems": {}}) - groups[group_id]["systems"][system_id] = system_data - - group_items: list[dict] = [] - for group_id in sorted(groups.keys()): - group_data = groups[group_id] - systems_items: list[dict] = [] - for system_id in sorted(group_data["systems"].keys()): - system_data = group_data["systems"][system_id] - devices_items = [ - { - "device_id": dev["device_id"], - "device_name": dev["device_name"], - "model": dev["model"], - "role": dev["role"], - "serial": dev["serial"], - "address": dev["address"], - "port": dev["port"], - } - for dev in sorted(system_data["devices"], key=lambda d: d["device_name"]) - ] - systems_items.append( - { - "system_id": system_id, - "system_name": system_data["name"], - "devices": devices_items, - } - ) - group_items.append( - { - "group_id": group_id, - "systems": systems_items, - } - ) - - ungrouped_devices = [ - { - "device_id": dev["device_id"], - "device_name": dev["device_name"], - "model": dev["model"], - "role": dev["role"], - "serial": dev["serial"], - "address": dev["address"], - "port": dev["port"], - } - for dev in sorted( - [d for d in devices_by_id.values() if d["system_id"] is None], - key=lambda d: d["device_name"], - ) - ] - - return {"groups": group_items, "ungrouped_devices": ungrouped_devices, "errors": []} - - -def _render_topology_tree_lines(tree: dict) -> list[str]: - if tree.get("errors"): - return tree["errors"] - - lines: list[str] = [] - for group in tree.get("groups", []): - lines.append(f"Group {group['group_id']}") - for system in group.get("systems", []): - lines.append(f" System {system['system_name']} ({system['system_id']})") - for dev in system.get("devices", []): - role = f" role={dev['role']}" if dev.get("role") else "" - model = f" model={dev['model']}" if dev.get("model") else "" - lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}{role}") - - if tree.get("ungrouped_devices"): - lines.append("Ungrouped devices") - for dev in tree["ungrouped_devices"]: - model = f" model={dev['model']}" if dev.get("model") else "" - lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}") - return lines - - def _target_from_resolved(resolved: _EffectiveOptions) -> Target: if resolved.ip: return Target(address=resolved.ip, port=resolved.port, base_path="/ipcontrol/v1", name="manual") services = _discover_targets(timeout_s=resolved.discover_timeout) if resolved.system is not None: - return _pick_by_system_name(services, resolved.system) + return pick_target_by_system_name(services, resolved.system) return _pick(services) @@ -286,51 +106,7 @@ def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> No ) -def main() -> None: - p = argparse.ArgumentParser( - prog="devialetctl", description="Devialet Phantom IP Control (discover + commands)" - ) - p.add_argument("--config", type=str, default=None) - p.add_argument( - "--log-level", - type=str, - default=None, - help="Override log level (e.g. DEBUG, INFO, WARNING).", - ) - p.add_argument("--discover-timeout", type=float, default=None) - p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") - p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") - p.add_argument("--port", type=int, default=None) - p.add_argument("--cec-device", type=str, default=None) - p.add_argument("--cec-osd-name", type=str, default=None) - p.add_argument( - "--cec-vendor-compat", - choices=["none", "samsung"], - default=None, - ) - - sub = p.add_subparsers(dest="cmd", required=True) - sub.add_parser("list") - tree = sub.add_parser("tree") - tree.add_argument("--json", action="store_true", dest="tree_json") - sub.add_parser("systems") - sub.add_parser("getvol") - sub.add_parser("volup") - sub.add_parser("voldown") - sub.add_parser("mute") - set_parser = sub.add_parser("setvol") - set_parser.add_argument("value", type=int) - - daemon = sub.add_parser("daemon") - daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") - - args = p.parse_args() - _validate_target_selection_args(p, args) - cfg = load_config(args.config) - resolved = _effective_options(args, cfg) - requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") - _configure_logging(requested_log_level if requested_log_level is not None else cfg.log_level) - +def _dispatch_command(args, daemon_cfg, resolved: _EffectiveOptions) -> None: if args.cmd == "list": services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: @@ -339,16 +115,17 @@ def main() -> None: for i, s in enumerate(services): print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") return + if args.cmd == "tree": services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: print("No service detected.") return - tree = _build_topology_tree(services) + tree = build_topology_tree(services) if args.tree_json: print(json.dumps(tree, indent=2, ensure_ascii=False)) return - for line in _render_topology_tree_lines(tree): + for line in render_topology_tree_lines(tree): print(line) return @@ -357,13 +134,7 @@ def main() -> None: if args.cmd == "daemon": try: - cfg = dataclasses.replace( - cfg, - cec_device=resolved.cec_device, - cec_osd_name=resolved.cec_osd_name, - cec_vendor_compat=resolved.cec_vendor_compat, - ) - runner = DaemonRunner(cfg=cfg, gateway=gateway) + runner = DaemonRunner(cfg=daemon_cfg, gateway=gateway) runner.run_forever(input_name=args.input) return except KeyboardInterrupt: @@ -373,7 +144,6 @@ def main() -> None: raise SystemExit(2) client = VolumeService(gateway) - try: if args.cmd == "systems": print(client.systems()) @@ -394,3 +164,56 @@ def main() -> None: except Exception as exc: print(f"Error: {exc}", file=sys.stderr) raise SystemExit(2) + + +def main() -> None: + p = argparse.ArgumentParser( + prog="devialetctl", description="Devialet Phantom IP Control (discover + commands)" + ) + p.add_argument("--config", type=str, default=None) + p.add_argument( + "--log-level", + type=str, + default=None, + help="Override log level (e.g. DEBUG, INFO, WARNING).", + ) + p.add_argument("--discover-timeout", type=float, default=None) + p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") + p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") + p.add_argument("--port", type=int, default=None) + p.add_argument("--cec-device", type=str, default=None) + p.add_argument("--cec-osd-name", type=str, default=None) + p.add_argument( + "--cec-vendor-compat", + choices=["none", "samsung"], + default=None, + ) + + sub = p.add_subparsers(dest="cmd", required=True) + sub.add_parser("list") + tree = sub.add_parser("tree") + tree.add_argument("--json", action="store_true", dest="tree_json") + sub.add_parser("systems") + sub.add_parser("getvol") + sub.add_parser("volup") + sub.add_parser("voldown") + sub.add_parser("mute") + set_parser = sub.add_parser("setvol") + set_parser.add_argument("value", type=int) + + daemon = sub.add_parser("daemon") + daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") + + args = p.parse_args() + _validate_target_selection_args(p, args) + cfg = load_config(args.config) + resolved = _effective_options(args, cfg) + daemon_cfg = dataclasses.replace( + cfg, + cec_device=resolved.cec_device, + cec_osd_name=resolved.cec_osd_name, + cec_vendor_compat=resolved.cec_vendor_compat, + ) + requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") + _configure_logging(requested_log_level if requested_log_level is not None else cfg.log_level) + _dispatch_command(args, daemon_cfg, resolved) diff --git a/src/devialetctl/interfaces/topology.py b/src/devialetctl/interfaces/topology.py new file mode 100644 index 0000000..e171ac2 --- /dev/null +++ b/src/devialetctl/interfaces/topology.py @@ -0,0 +1,236 @@ +import asyncio +import dataclasses +import logging + +from devialetctl.application.ports import Target +from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway + +LOG = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class DeviceRow: + device_id: str + device_name: str + model: str + role: str + serial: str + address: str + port: int + + +@dataclasses.dataclass(frozen=True) +class SystemRow: + system_id: str + system_name: str + devices: list[DeviceRow] + + +@dataclasses.dataclass(frozen=True) +class GroupRow: + group_id: str + systems: list[SystemRow] + + +def _safe_fetch_json(gateway: DevialetHttpGateway, path: str) -> dict | None: + try: + data = asyncio.run(gateway.fetch_json_async(path)) + if isinstance(data, dict): + return data + except Exception as exc: + LOG.debug("tree fetch failed path=%s host=%s err=%s", path, gateway.address, exc) + return None + + +def _device_row_to_dict(dev: DeviceRow) -> dict: + return { + "device_id": dev.device_id, + "device_name": dev.device_name, + "model": dev.model, + "role": dev.role, + "serial": dev.serial, + "address": dev.address, + "port": dev.port, + } + + +def build_topology_tree(targets: list[Target]) -> dict: + devices_by_id: dict[str, dict] = {} + systems: dict[str, dict] = {} + groups: dict[str, dict] = {} + + for target in targets: + gateway = DevialetHttpGateway(target.address, target.port, target.base_path) + device = _safe_fetch_json(gateway, "/devices/current") + if not device: + continue + + device_id = str(device.get("deviceId") or f"dispatcher:{target.address}") + system_id = str(device.get("systemId") or "") or None + group_id = str(device.get("groupId") or "") or None + devices_by_id[device_id] = { + "device_id": device_id, + "device_name": device.get("deviceName") or device.get("model") or device_id, + "model": str(device.get("model") or ""), + "role": str(device.get("role") or ""), + "serial": str(device.get("serial") or ""), + "system_id": system_id, + "group_id": group_id, + "address": target.address, + "port": target.port, + } + + if not devices_by_id: + return {"groups": [], "ungrouped_devices": [], "errors": ["No Devialet devices detected."]} + + for dev in devices_by_id.values(): + if not dev["system_id"]: + continue + systems.setdefault( + dev["system_id"], + {"name": None, "group_id": dev["group_id"], "devices": []}, + ) + systems[dev["system_id"]]["devices"].append(dev) + + for system_id, system_data in systems.items(): + probe_device = system_data["devices"][0] + gateway = DevialetHttpGateway( + probe_device["address"], + probe_device["port"], + "/ipcontrol/v1", + ) + sys_info = _safe_fetch_json(gateway, "/systems/current") + if sys_info: + system_data["name"] = str(sys_info.get("systemName") or system_id) + sys_group_id = str(sys_info.get("groupId") or "") or None + if sys_group_id: + system_data["group_id"] = sys_group_id + else: + system_data["name"] = system_id + + group_id = system_data["group_id"] or "ungrouped" + groups.setdefault(group_id, {"systems": {}}) + groups[group_id]["systems"][system_id] = system_data + + group_rows: list[GroupRow] = [] + for group_id in sorted(groups.keys()): + group_data = groups[group_id] + systems_rows: list[SystemRow] = [] + for system_id in sorted(group_data["systems"].keys()): + system_data = group_data["systems"][system_id] + devices_rows = [ + DeviceRow( + device_id=str(dev["device_id"]), + device_name=str(dev["device_name"]), + model=str(dev["model"]), + role=str(dev["role"]), + serial=str(dev["serial"]), + address=str(dev["address"]), + port=int(dev["port"]), + ) + for dev in sorted(system_data["devices"], key=lambda d: d["device_name"]) + ] + systems_rows.append( + SystemRow( + system_id=str(system_id), + system_name=str(system_data["name"]), + devices=devices_rows, + ) + ) + group_rows.append(GroupRow(group_id=str(group_id), systems=systems_rows)) + + ungrouped_devices_rows = [ + DeviceRow( + device_id=str(dev["device_id"]), + device_name=str(dev["device_name"]), + model=str(dev["model"]), + role=str(dev["role"]), + serial=str(dev["serial"]), + address=str(dev["address"]), + port=int(dev["port"]), + ) + for dev in sorted( + [d for d in devices_by_id.values() if d["system_id"] is None], + key=lambda d: d["device_name"], + ) + ] + + return { + "groups": [ + { + "group_id": group.group_id, + "systems": [ + { + "system_id": system.system_id, + "system_name": system.system_name, + "devices": [_device_row_to_dict(dev) for dev in system.devices], + } + for system in group.systems + ], + } + for group in group_rows + ], + "ungrouped_devices": [_device_row_to_dict(dev) for dev in ungrouped_devices_rows], + "errors": [], + } + + +def render_topology_tree_lines(tree: dict) -> list[str]: + if tree.get("errors"): + return tree["errors"] + + lines: list[str] = [] + for group in tree.get("groups", []): + lines.append(f"Group {group['group_id']}") + for system in group.get("systems", []): + lines.append(f" System {system['system_name']} ({system['system_id']})") + for dev in system.get("devices", []): + role = f" role={dev['role']}" if dev.get("role") else "" + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}{role}") + + if tree.get("ungrouped_devices"): + lines.append("Ungrouped devices") + for dev in tree["ungrouped_devices"]: + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}") + return lines + + +def pick_target_by_system_name(services: list[Target], system_name: str) -> Target: + if not services: + raise RuntimeError("No service detected via mDNS/UPnP. Check network / Wi-Fi isolation.") + requested = system_name.strip() + if not requested: + raise RuntimeError("System name cannot be empty.") + + tree = build_topology_tree(services) + matches: list[tuple[str, dict]] = [] + for group in tree.get("groups", []): + group_id = str(group.get("group_id", "ungrouped")) + for system in group.get("systems", []): + if str(system.get("system_name", "")).casefold() == requested.casefold(): + matches.append((group_id, system)) + + if not matches: + raise RuntimeError( + f"System '{requested}' not found. Run 'devialetctl tree' to list available systems." + ) + if len(matches) > 1: + groups = ", ".join(sorted({m[0] for m in matches})) + raise RuntimeError( + f"System name '{requested}' is ambiguous across groups: {groups}. " + "Use --ip or rename systems." + ) + + group_id, system = matches[0] + devices = system.get("devices", []) + if not devices: + raise RuntimeError(f"System '{requested}' has no reachable devices in group {group_id}.") + selected = devices[0] + return Target( + address=str(selected["address"]), + port=int(selected["port"]), + base_path="/ipcontrol/v1", + name=f"{requested}@{group_id}", + ) From 9a76430a356bae902302dc8bddfc887186de373e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:22:55 +0100 Subject: [PATCH 08/14] Improve cli validation --- src/devialetctl/interfaces/cli.py | 14 ++++++++++++-- src/devialetctl/interfaces/topology.py | 14 +++++++++----- tests/test_cli_regression.py | 18 ++++++++++++++++++ 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 1cb210c..072ec03 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -87,7 +87,11 @@ def _target_from_resolved(resolved: _EffectiveOptions) -> Target: return Target(address=resolved.ip, port=resolved.port, base_path="/ipcontrol/v1", name="manual") services = _discover_targets(timeout_s=resolved.discover_timeout) if resolved.system is not None: - return pick_target_by_system_name(services, resolved.system) + return pick_target_by_system_name( + services, + resolved.system, + gateway_factory=DevialetHttpGateway, + ) return _pick(services) @@ -104,6 +108,12 @@ def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> No parser.error( "--ip and --system are not compatible: --ip disables discovery while --system requires discovery." ) + if args.cmd in {"list", "tree"} and args.ip: + parser.error(f"--ip is not supported with '{args.cmd}': this command is discovery-based.") + if args.cmd in {"list", "tree"} and args.system: + parser.error( + f"--system is not supported with '{args.cmd}': this command already lists discovered systems." + ) def _dispatch_command(args, daemon_cfg, resolved: _EffectiveOptions) -> None: @@ -121,7 +131,7 @@ def _dispatch_command(args, daemon_cfg, resolved: _EffectiveOptions) -> None: if not services: print("No service detected.") return - tree = build_topology_tree(services) + tree = build_topology_tree(services, gateway_factory=DevialetHttpGateway) if args.tree_json: print(json.dumps(tree, indent=2, ensure_ascii=False)) return diff --git a/src/devialetctl/interfaces/topology.py b/src/devialetctl/interfaces/topology.py index e171ac2..8534859 100644 --- a/src/devialetctl/interfaces/topology.py +++ b/src/devialetctl/interfaces/topology.py @@ -54,13 +54,13 @@ def _device_row_to_dict(dev: DeviceRow) -> dict: } -def build_topology_tree(targets: list[Target]) -> dict: +def build_topology_tree(targets: list[Target], gateway_factory=DevialetHttpGateway) -> dict: devices_by_id: dict[str, dict] = {} systems: dict[str, dict] = {} groups: dict[str, dict] = {} for target in targets: - gateway = DevialetHttpGateway(target.address, target.port, target.base_path) + gateway = gateway_factory(target.address, target.port, target.base_path) device = _safe_fetch_json(gateway, "/devices/current") if not device: continue @@ -94,7 +94,7 @@ def build_topology_tree(targets: list[Target]) -> dict: for system_id, system_data in systems.items(): probe_device = system_data["devices"][0] - gateway = DevialetHttpGateway( + gateway = gateway_factory( probe_device["address"], probe_device["port"], "/ipcontrol/v1", @@ -197,14 +197,18 @@ def render_topology_tree_lines(tree: dict) -> list[str]: return lines -def pick_target_by_system_name(services: list[Target], system_name: str) -> Target: +def pick_target_by_system_name( + services: list[Target], + system_name: str, + gateway_factory=DevialetHttpGateway, +) -> Target: if not services: raise RuntimeError("No service detected via mDNS/UPnP. Check network / Wi-Fi isolation.") requested = system_name.strip() if not requested: raise RuntimeError("System name cannot be empty.") - tree = build_topology_tree(services) + tree = build_topology_tree(services, gateway_factory=gateway_factory) matches: list[tuple[str, dict]] = [] for group in tree.get("groups", []): group_id = str(group.get("group_id", "ungrouped")) diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 2bb2901..3e5b1c3 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -316,6 +316,24 @@ def test_cli_daemon_rejects_ip_and_system_together(monkeypatch, capsys) -> None: assert "--ip and --system are not compatible" in err +def test_cli_list_rejects_ip_override(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "list"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip is not supported with 'list'" in err + + +def test_cli_tree_rejects_ip_override(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "tree"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip is not supported with 'tree'" in err + + def test_cli_list_when_empty(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): From 5bf31d2510481a6d1ae6fcf49b6f4a417529aca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:23:31 +0100 Subject: [PATCH 09/14] topology: select systemLeader --- src/devialetctl/interfaces/topology.py | 7 ++++++- tests/test_cli_regression.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/devialetctl/interfaces/topology.py b/src/devialetctl/interfaces/topology.py index 8534859..6ecd6a4 100644 --- a/src/devialetctl/interfaces/topology.py +++ b/src/devialetctl/interfaces/topology.py @@ -17,6 +17,7 @@ class DeviceRow: serial: str address: str port: int + is_system_leader: bool @dataclasses.dataclass(frozen=True) @@ -51,6 +52,7 @@ def _device_row_to_dict(dev: DeviceRow) -> dict: "serial": dev.serial, "address": dev.address, "port": dev.port, + "is_system_leader": dev.is_system_leader, } @@ -78,6 +80,7 @@ def build_topology_tree(targets: list[Target], gateway_factory=DevialetHttpGatew "group_id": group_id, "address": target.address, "port": target.port, + "is_system_leader": bool(device.get("isSystemLeader")), } if not devices_by_id: @@ -127,6 +130,7 @@ def build_topology_tree(targets: list[Target], gateway_factory=DevialetHttpGatew serial=str(dev["serial"]), address=str(dev["address"]), port=int(dev["port"]), + is_system_leader=bool(dev["is_system_leader"]), ) for dev in sorted(system_data["devices"], key=lambda d: d["device_name"]) ] @@ -148,6 +152,7 @@ def build_topology_tree(targets: list[Target], gateway_factory=DevialetHttpGatew serial=str(dev["serial"]), address=str(dev["address"]), port=int(dev["port"]), + is_system_leader=bool(dev["is_system_leader"]), ) for dev in sorted( [d for d in devices_by_id.values() if d["system_id"] is None], @@ -231,7 +236,7 @@ def pick_target_by_system_name( devices = system.get("devices", []) if not devices: raise RuntimeError(f"System '{requested}' has no reachable devices in group {group_id}.") - selected = devices[0] + selected = next((d for d in devices if d.get("is_system_leader")), devices[0]) return Target( address=str(selected["address"]), port=int(selected["port"]), diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 3e5b1c3..c463c75 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -141,12 +141,14 @@ async def fetch_json_async(self, path): "systemId": "sys-other", "groupId": "grp-other", "deviceName": "Other", + "isSystemLeader": True, } return { "deviceId": "dev1", "systemId": "sys-tv", "groupId": "grp-tv", "deviceName": "TV", + "isSystemLeader": True, } if path == "/systems/current": if self.address == "10.0.0.10": @@ -209,6 +211,7 @@ async def fetch_json_async(self, path): "deviceName": "Salon", "model": "Phantom", "role": "Mono", + "isSystemLeader": True, } if self.address == "10.0.0.71": return { @@ -218,6 +221,7 @@ async def fetch_json_async(self, path): "deviceName": "TV Gauche", "model": "Phantom", "role": "FrontLeft", + "isSystemLeader": True, } return { "deviceId": "tv-right", @@ -226,6 +230,7 @@ async def fetch_json_async(self, path): "deviceName": "TV Droite", "model": "Phantom", "role": "FrontRight", + "isSystemLeader": False, } if path == "/systems/current": if self.address in {"10.0.0.71", "10.0.0.184"}: @@ -256,7 +261,7 @@ async def mute_toggle_async(self): monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "getvol"]) cli.main() out = capsys.readouterr().out - assert out.strip() in {"71", "184"} + assert out.strip() == "71" def test_cli_getvol_system_name_not_found(monkeypatch) -> None: From 7299b956419c94ef748a40ea0cd50c41d44f05e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:32:24 +0100 Subject: [PATCH 10/14] cmd: clean cec --- src/devialetctl/interfaces/cli.py | 47 +++++++++++++------------------ 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 072ec03..c9bf1b7 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -27,15 +27,9 @@ class _EffectiveOptions: port: int discover_timeout: float system: str | None - cec_device: str - cec_osd_name: str - cec_vendor_compat: str def _effective_options(args, cfg) -> _EffectiveOptions: - cec_device_arg = getattr(args, "cec_device", None) - cec_osd_name_arg = getattr(args, "cec_osd_name", None) - cec_vendor_compat_arg = getattr(args, "cec_vendor_compat", None) return _EffectiveOptions( ip=args.ip if args.ip is not None else cfg.target.ip, port=args.port if args.port is not None else cfg.target.port, @@ -45,13 +39,6 @@ def _effective_options(args, cfg) -> _EffectiveOptions: else cfg.target.discover_timeout ), system=args.system, - cec_device=cec_device_arg if cec_device_arg is not None else cfg.cec_device, - cec_osd_name=cec_osd_name_arg if cec_osd_name_arg is not None else cfg.cec_osd_name, - cec_vendor_compat=( - cec_vendor_compat_arg - if cec_vendor_compat_arg is not None - else cfg.cec_vendor_compat - ), ) @@ -116,7 +103,7 @@ def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> No ) -def _dispatch_command(args, daemon_cfg, resolved: _EffectiveOptions) -> None: +def _dispatch_command(args, cfg, resolved: _EffectiveOptions) -> None: if args.cmd == "list": services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: @@ -144,6 +131,16 @@ def _dispatch_command(args, daemon_cfg, resolved: _EffectiveOptions) -> None: if args.cmd == "daemon": try: + daemon_cfg = dataclasses.replace( + cfg, + cec_device=args.cec_device if args.cec_device is not None else cfg.cec_device, + cec_osd_name=args.cec_osd_name if args.cec_osd_name is not None else cfg.cec_osd_name, + cec_vendor_compat=( + args.cec_vendor_compat + if args.cec_vendor_compat is not None + else cfg.cec_vendor_compat + ), + ) runner = DaemonRunner(cfg=daemon_cfg, gateway=gateway) runner.run_forever(input_name=args.input) return @@ -191,13 +188,6 @@ def main() -> None: p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") p.add_argument("--port", type=int, default=None) - p.add_argument("--cec-device", type=str, default=None) - p.add_argument("--cec-osd-name", type=str, default=None) - p.add_argument( - "--cec-vendor-compat", - choices=["none", "samsung"], - default=None, - ) sub = p.add_subparsers(dest="cmd", required=True) sub.add_parser("list") @@ -213,17 +203,18 @@ def main() -> None: daemon = sub.add_parser("daemon") daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") + daemon.add_argument("--cec-device", type=str, default=None) + daemon.add_argument("--cec-osd-name", type=str, default=None) + daemon.add_argument( + "--cec-vendor-compat", + choices=["none", "samsung"], + default=None, + ) args = p.parse_args() _validate_target_selection_args(p, args) cfg = load_config(args.config) resolved = _effective_options(args, cfg) - daemon_cfg = dataclasses.replace( - cfg, - cec_device=resolved.cec_device, - cec_osd_name=resolved.cec_osd_name, - cec_vendor_compat=resolved.cec_vendor_compat, - ) requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") _configure_logging(requested_log_level if requested_log_level is not None else cfg.log_level) - _dispatch_command(args, daemon_cfg, resolved) + _dispatch_command(args, cfg, resolved) From 329bd9bbef9c9fca5bd4fd20a080f21375893e21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:35:11 +0100 Subject: [PATCH 11/14] Fix lint --- src/devialetctl/interfaces/cli.py | 17 +++++++++++++---- tests/test_cli_helpers.py | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index c9bf1b7..896ef50 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -71,7 +71,12 @@ def _discover_targets(timeout_s: float) -> list[Target]: def _target_from_resolved(resolved: _EffectiveOptions) -> Target: if resolved.ip: - return Target(address=resolved.ip, port=resolved.port, base_path="/ipcontrol/v1", name="manual") + return Target( + address=resolved.ip, + port=resolved.port, + base_path="/ipcontrol/v1", + name="manual", + ) services = _discover_targets(timeout_s=resolved.discover_timeout) if resolved.system is not None: return pick_target_by_system_name( @@ -93,13 +98,15 @@ def _configure_logging(level: str) -> None: def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> None: if args.ip and args.system: parser.error( - "--ip and --system are not compatible: --ip disables discovery while --system requires discovery." + "--ip and --system are not compatible: " + "--ip disables discovery while --system requires discovery." ) if args.cmd in {"list", "tree"} and args.ip: parser.error(f"--ip is not supported with '{args.cmd}': this command is discovery-based.") if args.cmd in {"list", "tree"} and args.system: parser.error( - f"--system is not supported with '{args.cmd}': this command already lists discovered systems." + f"--system is not supported with '{args.cmd}': " + "this command already lists discovered systems." ) @@ -134,7 +141,9 @@ def _dispatch_command(args, cfg, resolved: _EffectiveOptions) -> None: daemon_cfg = dataclasses.replace( cfg, cec_device=args.cec_device if args.cec_device is not None else cfg.cec_device, - cec_osd_name=args.cec_osd_name if args.cec_osd_name is not None else cfg.cec_osd_name, + cec_osd_name=( + args.cec_osd_name if args.cec_osd_name is not None else cfg.cec_osd_name + ), cec_vendor_compat=( args.cec_vendor_compat if args.cec_vendor_compat is not None diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index cccaa14..d69d869 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -2,8 +2,8 @@ import pytest -from devialetctl.interfaces import cli from devialetctl.infrastructure.config import DaemonConfig, RuntimeTarget +from devialetctl.interfaces import cli def _service(name="dev", address="10.0.0.2", port=80, base_path="/ipcontrol/v1"): From 0dce222a4cb265604407761cbe75383e4e44d407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:38:24 +0100 Subject: [PATCH 12/14] Update command --- ARCHITECTURE.md | 9 +++++++-- README.md | 19 ++++++++++++++++++- tests/test_cli_regression.py | 12 ++++++++++-- 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 0602dc2..b54c772 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -33,6 +33,7 @@ Provide a maintainable local-control platform for Devialet Phantom volume, with: - `config.py`: typed runtime config (TOML + env overrides) - `src/devialetctl/interfaces` - `cli.py`: argparse and command wiring + - `topology.py`: topology tree building/rendering and system-name target selection - Compatibility shims - `src/devialetctl/api.py` - `src/devialetctl/discovery.py` @@ -98,7 +99,10 @@ flowchart LR - `daemon --input cec` - `daemon --input keyboard` - Target selection: - - global args and daemon-specific target args (`--ip`, `--port`, `--system`, `--discover-timeout`) + - global args (`--ip`, `--port`, `--system`, `--discover-timeout`) + - daemon-specific CEC args (`--cec-device`, `--cec-osd-name`, `--cec-vendor-compat`) + - `--ip` and `--system` are mutually exclusive + - `list` and `tree` reject `--ip` / `--system` because they are discovery-only ## Runtime Behavior @@ -110,6 +114,7 @@ flowchart LR - per discovered dispatcher: `/devices/current` - per inferred system: `/systems/current` - groups are rebuilt from `groupId`/`systemId` relationships. + - system-targeted selection (`--system`) prefers `isSystemLeader` when available. - Base path is normalized defensively: - `None`, `""`, `/` -> `/ipcontrol/v1` - missing leading slash is corrected. @@ -165,7 +170,7 @@ sequenceDiagram ## Configuration Model Config source priority: -1. CLI daemon target args (when provided) +1. CLI global target args (when provided) 2. Environment variables (`DEVIALETCTL_IP`, `DEVIALETCTL_PORT`, `DEVIALETCTL_BASE_PATH`) 3. TOML config (`~/.config/devialetctl/config.toml`) 4. built-in defaults diff --git a/README.md b/README.md index 03a2c52..fb32ace 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Use cases: - mDNS discovery (`_whatsup._tcp.local`) merged with UPnP discovery - volume commands: `getvol`, `setvol`, `volup`, `voldown`, `mute` +- target selection with `--system ` (preferred for multi-device setups) - manual target override (`--ip`, `--port`) - long-running daemon mode (`daemon --input cec`) - keyboard test mode (`daemon --input keyboard`) @@ -69,6 +70,12 @@ Use explicit target: uv run devialetctl --ip 192.168.1.42 --port 80 getvol ``` +Use system-name target selection (from `tree` output): + +```bash +uv run devialetctl --system "TV" getvol +``` + ## Daemon (CEC Input) Run daemon with config: @@ -77,6 +84,12 @@ Run daemon with config: uv run devialetctl daemon --input cec ``` +Override daemon CEC settings for one run (global options stay before subcommand): + +```bash +uv run devialetctl --system "TV" daemon --input cec --cec-vendor-compat samsung +``` + The daemon: - consumes CEC key events from Linux CEC (`/dev/cec0`, ioctl backend) - normalizes to volume actions @@ -101,7 +114,7 @@ docker run --rm -it \ --network host \ --device /dev/cec0:/dev/cec0 \ ghcr.io//:latest \ - daemon --input cec --cec-vendor-compat="samsung" + --system "TV" daemon --input cec --cec-vendor-compat="samsung" ``` Notes: @@ -159,6 +172,10 @@ Environment overrides: - `DEVIALETCTL_LOG_LEVEL` - `DEVIALETCTL_CEC_DEVICE` +CLI target selection notes: +- `--ip` and `--system` are mutually exclusive. +- `list` and `tree` are discovery-only commands and reject `--ip` / `--system`. + Kernel CEC permissions note: - the daemon user must have read/write access to `/dev/cec0` (typically via `video` group or udev rule) - if startup fails with ioctl/device access errors, verify `ls -l /dev/cec*` and group membership diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index c463c75..a99b4a1 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -167,7 +167,11 @@ def run_forever(self, input_name): monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) monkeypatch.setattr(cli, "DaemonRunner", FakeRunner) - monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "daemon", "--input", "keyboard"]) + monkeypatch.setattr( + sys, + "argv", + ["devialetctl", "--system", "TV", "daemon", "--input", "keyboard"], + ) cli.main() assert FakeGateway.picked_address == "10.0.0.11" @@ -300,7 +304,11 @@ async def fetch_json_async(self, path): def test_cli_rejects_ip_and_system_together(monkeypatch, capsys) -> None: - monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "getvol"]) + monkeypatch.setattr( + sys, + "argv", + ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "getvol"], + ) with pytest.raises(SystemExit) as exc: cli.main() assert exc.value.code == 2 From 20831d5221d6df8fafb2efde9bd0baebccfe6586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:42:55 +0100 Subject: [PATCH 13/14] tests: increase coverage --- tests/test_cli_regression.py | 107 ++++++++++++++++++++++++++++++++ tests/test_topology.py | 98 +++++++++++++++++++++++++++++ tests/test_upnp_gateway.py | 116 +++++++++++++++++++++++++++++++++++ 3 files changed, 321 insertions(+) create mode 100644 tests/test_topology.py create mode 100644 tests/test_upnp_gateway.py diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index a99b4a1..234ed52 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -495,3 +495,110 @@ def fake_configure(level): cli.main() _ = capsys.readouterr().out assert captured["level"] == "DEBUG" + + +@pytest.mark.parametrize( + ("argv", "expected_output"), + [ + (["devialetctl", "systems"], "{}"), + (["devialetctl", "setvol", "35"], "OK"), + (["devialetctl", "volup"], "OK"), + (["devialetctl", "voldown"], "OK"), + (["devialetctl", "mute"], "OK"), + ], +) +def test_cli_control_commands_paths(monkeypatch, capsys, argv, expected_output) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + set_value = None + calls = [] + + def __init__(self, address, port, base_path): + self.address = address + + async def systems_async(self): + return {} + + async def get_volume_async(self): + return 31 + + async def set_volume_async(self, value): + FakeGateway.set_value = value + return None + + async def volume_up_async(self): + FakeGateway.calls.append("up") + return None + + async def volume_down_async(self): + FakeGateway.calls.append("down") + return None + + async def mute_toggle_async(self): + FakeGateway.calls.append("mute") + return None + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", argv) + + cli.main() + + out = capsys.readouterr().out + assert out.strip() == expected_output + if argv[1] == "setvol": + assert FakeGateway.set_value == 35 + + +def test_cli_tree_when_empty(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + return [] + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree"]) + + cli.main() + + out = capsys.readouterr().out + assert "No service detected." in out + + +def test_cli_daemon_handles_keyboard_interrupt(monkeypatch) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + class FakeRunner: + def __init__(self, cfg, gateway): + self.cfg = cfg + self.gateway = gateway + + def run_forever(self, input_name): + raise KeyboardInterrupt() + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(cli, "DaemonRunner", FakeRunner) + monkeypatch.setattr(sys, "argv", ["devialetctl", "daemon", "--input", "keyboard"]) + + cli.main() diff --git a/tests/test_topology.py b/tests/test_topology.py new file mode 100644 index 0000000..1977568 --- /dev/null +++ b/tests/test_topology.py @@ -0,0 +1,98 @@ +import pytest + +from devialetctl.application.ports import Target +from devialetctl.interfaces.topology import ( + build_topology_tree, + pick_target_by_system_name, + render_topology_tree_lines, +) + + +def _svc(name: str, address: str) -> Target: + return Target(name=name, address=address, port=80, base_path="/ipcontrol/v1") + + +def test_build_topology_tree_returns_error_when_no_device_payload() -> None: + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + async def fetch_json_async(self, path): + return None + + tree = build_topology_tree([_svc("d1", "10.0.0.2")], gateway_factory=FakeGateway) + assert tree["groups"] == [] + assert tree["ungrouped_devices"] == [] + assert tree["errors"] == ["No Devialet devices detected."] + + +def test_render_topology_tree_lines_handles_errors_and_ungrouped() -> None: + lines = render_topology_tree_lines({"errors": ["boom"], "groups": [], "ungrouped_devices": []}) + assert lines == ["boom"] + + tree = { + "errors": [], + "groups": [], + "ungrouped_devices": [ + { + "device_name": "Kitchen", + "address": "10.0.0.42", + "model": "Phantom II", + } + ], + } + lines = render_topology_tree_lines(tree) + assert "Ungrouped devices" in lines + assert " Device Kitchen @ 10.0.0.42 model=Phantom II" in lines + + +def test_pick_target_by_system_name_validation_and_ambiguity(monkeypatch) -> None: + with pytest.raises(RuntimeError, match="No service"): + pick_target_by_system_name([], "TV") + + with pytest.raises(RuntimeError, match="cannot be empty"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], " ") + + monkeypatch.setattr( + "devialetctl.interfaces.topology.build_topology_tree", + lambda services, gateway_factory=None: { + "errors": [], + "groups": [ + { + "group_id": "g1", + "systems": [ + { + "system_name": "TV", + "devices": [{"address": "10.0.0.2", "port": 80}], + } + ], + }, + { + "group_id": "g2", + "systems": [ + { + "system_name": "TV", + "devices": [{"address": "10.0.0.3", "port": 80}], + } + ], + }, + ], + "ungrouped_devices": [], + }, + ) + with pytest.raises(RuntimeError, match="ambiguous across groups"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], "TV") + + +def test_pick_target_by_system_name_raises_when_system_has_no_devices(monkeypatch) -> None: + monkeypatch.setattr( + "devialetctl.interfaces.topology.build_topology_tree", + lambda services, gateway_factory=None: { + "errors": [], + "groups": [{"group_id": "grp-tv", "systems": [{"system_name": "TV", "devices": []}]}], + "ungrouped_devices": [], + }, + ) + + with pytest.raises(RuntimeError, match="has no reachable devices"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], "TV") diff --git a/tests/test_upnp_gateway.py b/tests/test_upnp_gateway.py new file mode 100644 index 0000000..50a359c --- /dev/null +++ b/tests/test_upnp_gateway.py @@ -0,0 +1,116 @@ +from devialetctl.infrastructure import upnp_gateway + + +def test_parse_ssdp_headers_lowercases_and_ignores_invalid_lines() -> None: + payload = ( + b"HTTP/1.1 200 OK\r\n" + b"LOCATION: http://10.0.0.2:1400/desc.xml\r\n" + b"ST: urn:schemas-upnp-org:device:MediaRenderer:2\r\n" + b"USN: uuid:abc::urn:schemas-upnp-org:device:MediaRenderer:2\r\n" + b"bad line without colon\r\n" + b"\r\n" + ) + headers = upnp_gateway._parse_ssdp_headers(payload) + assert headers["location"] == "http://10.0.0.2:1400/desc.xml" + assert headers["st"] == "urn:schemas-upnp-org:device:MediaRenderer:2" + assert headers["usn"] == "uuid:abc::urn:schemas-upnp-org:device:MediaRenderer:2" + assert "bad line without colon" not in headers + + +def test_is_devialet_manufacturer_accepts_devialet_tag_and_caps_timeout(monkeypatch) -> None: + captured = {"timeout": None} + + class FakeResponse: + text = " deViaLet " + + @staticmethod + def raise_for_status(): + return None + + class FakeClient: + def __init__(self, timeout): + captured["timeout"] = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + return FakeResponse() + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.2:1400/desc.xml", timeout_s=9.0) + assert ok is True + assert captured["timeout"] == 1.5 + + +def test_is_devialet_manufacturer_rejects_non_devialet_tag(monkeypatch) -> None: + class FakeResponse: + text = "OtherBrand" + + @staticmethod + def raise_for_status(): + return None + + class FakeClient: + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + return FakeResponse() + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.3:1400/desc.xml", timeout_s=0.05) + assert ok is False + + +def test_is_devialet_manufacturer_returns_false_on_http_error(monkeypatch) -> None: + class FakeClient: + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + raise RuntimeError("boom") + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.4:1400/desc.xml", timeout_s=1.0) + assert ok is False + + +def test_discover_filters_non_devialet_missing_host_and_duplicates(monkeypatch) -> None: + def fake_iter(timeout_s): + return iter( + [ + {"location": "http://10.0.0.2:1400/desc.xml"}, + {"location": "http://10.0.0.2:1400/desc.xml"}, + {"location": "not-a-url"}, + {"location": "http://10.0.0.3:1400/desc.xml"}, + ] + ) + + def fake_is_devialet_manufacturer(location, timeout_s): + return "10.0.0.3" not in location + + monkeypatch.setattr(upnp_gateway, "_iter_ssdp_responses", fake_iter) + monkeypatch.setattr(upnp_gateway, "_is_devialet_manufacturer", fake_is_devialet_manufacturer) + + targets = upnp_gateway.UpnpDiscoveryGateway().discover(timeout_s=0.1) + assert len(targets) == 1 + assert targets[0].address == "10.0.0.2" + assert targets[0].port == 80 + assert targets[0].base_path == "/ipcontrol/v1" + assert targets[0].name == "UPnP:10.0.0.2" From 3135c91519f87932160228749da5b30f2c94934c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20P=C3=A9ron?= Date: Wed, 18 Feb 2026 09:43:26 +0100 Subject: [PATCH 14/14] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fb32ace..e4ae70d 100644 --- a/README.md +++ b/README.md @@ -113,7 +113,7 @@ Run daemon in a container (CEC mode): docker run --rm -it \ --network host \ --device /dev/cec0:/dev/cec0 \ - ghcr.io//:latest \ + ghcr.io/clementperon/devialet-phantom-ctl:latest \ --system "TV" daemon --input cec --cec-vendor-compat="samsung" ```