diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 601506f..b54c772 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -26,12 +26,14 @@ Provide a maintainable local-control platform for Devialet Phantom volume, with: - `ports.py`: contracts (`VolumeGateway`, discovery target models) - `src/devialetctl/infrastructure` - `devialet_gateway.py`: async HTTP calls to Devialet API (`httpx.AsyncClient`) - - `mdns_gateway.py`: zeroconf discovery + filtering + - `mdns_gateway.py`: mDNS/zeroconf discovery + filtering + - `upnp_gateway.py`: SSDP/UPnP discovery (`MediaRenderer:2`) - `cec_adapter.py`: Linux CEC kernel adapter (`/dev/cec0`, ioctl, async event stream) - `keyboard_adapter.py`: single-key or line-based keyboard input - `config.py`: typed runtime config (TOML + env overrides) - `src/devialetctl/interfaces` - `cli.py`: argparse and command wiring + - `topology.py`: topology tree building/rendering and system-name target selection - Compatibility shims - `src/devialetctl/api.py` - `src/devialetctl/discovery.py` @@ -62,6 +64,7 @@ flowchart LR keyboardAdapter[KeyboardAdapter] httpGateway[DevialetHttpGateway] mdnsGateway[MdnsDiscoveryGateway] + upnpGateway[UpnpDiscoveryGateway] configLoader[ConfigLoader] end @@ -79,8 +82,11 @@ flowchart LR volumeService --> httpGateway httpGateway --> devialetSpeaker devialetSpeaker --> mdnsGateway + devialetSpeaker --> upnpGateway daemonInterface --> mdnsGateway + daemonInterface --> upnpGateway cliInterface --> mdnsGateway + cliInterface --> upnpGateway configLoader --> daemonInterface configLoader --> cliInterface ``` @@ -88,16 +94,27 @@ flowchart LR ## Command Surface - Direct control: - - `list`, `systems`, `getvol`, `setvol`, `volup`, `voldown`, `mute` + - `list`, `tree`, `systems`, `getvol`, `setvol`, `volup`, `voldown`, `mute` - Daemon: - `daemon --input cec` - `daemon --input keyboard` - Target selection: - - global args and daemon-specific target args (`--ip`, `--port`, `--base-path`, `--index`, `--discover-timeout`) + - global args (`--ip`, `--port`, `--system`, `--discover-timeout`) + - daemon-specific CEC args (`--cec-device`, `--cec-osd-name`, `--cec-vendor-compat`) + - `--ip` and `--system` are mutually exclusive + - `list` and `tree` reject `--ip` / `--system` because they are discovery-only ## Runtime Behavior -- mDNS discovery filters likely Devialet devices by service identity heuristics. +- Discovery uses merged mDNS + UPnP: + - mDNS path: `_whatsup._tcp.local` browsing. + - UPnP path: SSDP `M-SEARCH` with target `urn:schemas-upnp-org:device:MediaRenderer:2`. + - targets are deduplicated by `(address, port, base_path)` before selection. +- `tree` command builds a topology from "current" endpoints: + - per discovered dispatcher: `/devices/current` + - per inferred system: `/systems/current` + - groups are rebuilt from `groupId`/`systemId` relationships. + - system-targeted selection (`--system`) prefers `isSystemLeader` when available. - Base path is normalized defensively: - `None`, `""`, `/` -> `/ipcontrol/v1` - missing leading slash is corrected. @@ -153,13 +170,13 @@ sequenceDiagram ## Configuration Model Config source priority: -1. CLI daemon target args (when provided) +1. CLI global target args (when provided) 2. Environment variables (`DEVIALETCTL_IP`, `DEVIALETCTL_PORT`, `DEVIALETCTL_BASE_PATH`) 3. TOML config (`~/.config/devialetctl/config.toml`) 4. built-in defaults Relevant settings: -- target: `ip`, `port`, `base_path`, `discover_timeout`, `index` +- target: `ip`, `port`, `base_path`, `discover_timeout` - daemon: `cec_device`, `cec_osd_name`, `cec_vendor_compat`, `reconnect_delay_s`, `log_level` - policy: `dedupe_window_s`, `min_interval_s` @@ -178,8 +195,8 @@ Current test suite covers: - keyboard parser behavior - event policy dedupe/rate-limit - daemon routing in CEC and keyboard modes -- CLI regressions (including daemon argument handling) -- mDNS filtering +- CLI regressions (including daemon argument handling and `tree --json`) +- mDNS + UPnP discovery integration - base-path normalization - compatibility wrappers diff --git a/README.md b/README.md index 001dd7b..e4ae70d 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,10 @@ Use cases: ## Features -- mDNS discovery (`_http._tcp.local`) +- mDNS discovery (`_whatsup._tcp.local`) merged with UPnP discovery - volume commands: `getvol`, `setvol`, `volup`, `voldown`, `mute` -- manual target override (`--ip`, `--port`, `--base-path`) +- target selection with `--system ` (preferred for multi-device setups) +- manual target override (`--ip`, `--port`) - long-running daemon mode (`daemon --input cec`) - keyboard test mode (`daemon --input keyboard`) - typed config via TOML + env overrides @@ -66,7 +67,13 @@ uv run devialetctl setvol 35 Use explicit target: ```bash -uv run devialetctl --ip 192.168.1.42 --port 80 --base-path /ipcontrol/v1 getvol +uv run devialetctl --ip 192.168.1.42 --port 80 getvol +``` + +Use system-name target selection (from `tree` output): + +```bash +uv run devialetctl --system "TV" getvol ``` ## Daemon (CEC Input) @@ -77,6 +84,12 @@ Run daemon with config: uv run devialetctl daemon --input cec ``` +Override daemon CEC settings for one run (global options stay before subcommand): + +```bash +uv run devialetctl --system "TV" daemon --input cec --cec-vendor-compat samsung +``` + The daemon: - consumes CEC key events from Linux CEC (`/dev/cec0`, ioctl backend) - normalizes to volume actions @@ -100,13 +113,13 @@ Run daemon in a container (CEC mode): docker run --rm -it \ --network host \ --device /dev/cec0:/dev/cec0 \ - ghcr.io//:latest \ - daemon --input cec --cec-vendor-compat="samsung" + ghcr.io/clementperon/devialet-phantom-ctl:latest \ + --system "TV" daemon --input cec --cec-vendor-compat="samsung" ``` Notes: - `--device /dev/cec0:/dev/cec0` passes the Linux CEC device into the container. -- `--network host` is required for mDNS discovery (`_http._tcp.local`) from the container. +- `--network host` is required for mDNS discovery (`_whatsup._tcp.local`) from the container. - `--cec-vendor-compat samsung` enables Samsung vendor compatibility mode for this run. - alternatively, set `DEVIALETCTL_CEC_VENDOR_COMPAT=samsung` to make it the environment default. - with a fixed target IP (`DEVIALETCTL_IP`), you can usually run without host networking. @@ -145,9 +158,6 @@ min_interval_s = 0.12 [target] ip = "192.168.1.42" port = 80 -base_path = "/ipcontrol/v1" -discover_timeout = 3.0 -index = 0 ``` Use `log_level = "DEBUG"` (or `DEVIALETCTL_LOG_LEVEL=DEBUG`) to log raw HDMI-CEC frames: @@ -162,6 +172,10 @@ Environment overrides: - `DEVIALETCTL_LOG_LEVEL` - `DEVIALETCTL_CEC_DEVICE` +CLI target selection notes: +- `--ip` and `--system` are mutually exclusive. +- `list` and `tree` are discovery-only commands and reject `--ip` / `--system`. + Kernel CEC permissions note: - the daemon user must have read/write access to `/dev/cec0` (typically via `video` group or udev rule) - if startup fails with ioctl/device access errors, verify `ls -l /dev/cec*` and group membership diff --git a/src/devialetctl/discovery.py b/src/devialetctl/discovery.py index 3260696..206a6fb 100644 --- a/src/devialetctl/discovery.py +++ b/src/devialetctl/discovery.py @@ -2,6 +2,7 @@ from typing import List from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway +from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway @dataclass(frozen=True) @@ -15,8 +16,15 @@ class DevialetService: def discover( timeout_s: float = 3.0, service_type: str = "_whatsup._tcp.local." ) -> List[DevialetService]: - gateway = MdnsDiscoveryGateway(service_type=service_type) - results = gateway.discover(timeout_s=timeout_s) + results = [] + seen: set[tuple[str, int, str]] = set() + for gateway in (MdnsDiscoveryGateway(service_type=service_type), UpnpDiscoveryGateway()): + for svc in gateway.discover(timeout_s=timeout_s): + key = (svc.address, svc.port, svc.base_path) + if key in seen: + continue + seen.add(key) + results.append(svc) return [ DevialetService(name=r.name, address=r.address, port=r.port, base_path=r.base_path) for r in results diff --git a/src/devialetctl/domain/events.py b/src/devialetctl/domain/events.py index aa6e54b..ac816cd 100644 --- a/src/devialetctl/domain/events.py +++ b/src/devialetctl/domain/events.py @@ -27,4 +27,4 @@ class InputEvent: muted: bool | None = None vendor_subcommand: int | None = None vendor_mode: int | None = None - vendor_payload: tuple[int, ...] | None = None + vendor_payload: tuple[int, ...] | None = None \ No newline at end of file diff --git a/src/devialetctl/infrastructure/__init__.py b/src/devialetctl/infrastructure/__init__.py index 7645887..28bc681 100644 --- a/src/devialetctl/infrastructure/__init__.py +++ b/src/devialetctl/infrastructure/__init__.py @@ -2,6 +2,7 @@ from .devialet_gateway import DevialetHttpGateway from .keyboard_adapter import KeyboardAdapter, parse_keyboard_command from .mdns_gateway import MdnsDiscoveryGateway, MdnsService +from .upnp_gateway import UpnpDiscoveryGateway, UpnpService __all__ = [ "DaemonConfig", @@ -12,4 +13,6 @@ "parse_keyboard_command", "MdnsDiscoveryGateway", "MdnsService", + "UpnpDiscoveryGateway", + "UpnpService", ] diff --git a/src/devialetctl/infrastructure/devialet_gateway.py b/src/devialetctl/infrastructure/devialet_gateway.py index 0135d17..4e0015d 100644 --- a/src/devialetctl/infrastructure/devialet_gateway.py +++ b/src/devialetctl/infrastructure/devialet_gateway.py @@ -40,6 +40,9 @@ async def _apost(self, path: str, payload: dict[str, Any] | None = None) -> None ) r.raise_for_status() + async def fetch_json_async(self, path: str) -> dict[str, Any]: + return await self._aget(path) + async def systems_async(self) -> dict[str, Any]: try: return await self._aget("/systems") diff --git a/src/devialetctl/infrastructure/upnp_gateway.py b/src/devialetctl/infrastructure/upnp_gateway.py new file mode 100644 index 0000000..878fa2f --- /dev/null +++ b/src/devialetctl/infrastructure/upnp_gateway.py @@ -0,0 +1,139 @@ +import logging +import re +import socket +import time +from dataclasses import dataclass +from typing import Iterable +from urllib.parse import urlparse + +import httpx # type: ignore[reportMissingImports] + +from devialetctl.application.ports import DiscoveryPort, Target + +_SSDP_ADDR = ("239.255.255.250", 1900) +_SSDP_SEARCH_TARGET = "urn:schemas-upnp-org:device:MediaRenderer:2" +_DEFAULT_BASE_PATH = "/ipcontrol/v1" +_DEFAULT_PORT = 80 +LOG = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class UpnpService: + name: str + address: str + port: int + base_path: str + + +def _parse_ssdp_headers(payload: bytes) -> dict[str, str]: + text = payload.decode("utf-8", errors="ignore") + lines = [line.strip() for line in text.splitlines() if line.strip()] + headers: dict[str, str] = {} + for line in lines[1:]: + if ":" not in line: + continue + key, value = line.split(":", 1) + headers[key.strip().lower()] = value.strip() + return headers + + +def _iter_ssdp_responses(timeout_s: float) -> Iterable[dict[str, str]]: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) as sock: + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) + sock.settimeout(max(0.2, min(timeout_s, 1.0))) + msg = "\r\n".join( + [ + "M-SEARCH * HTTP/1.1", + "HOST: 239.255.255.250:1900", + 'MAN: "ssdp:discover"', + "MX: 1", + f"ST: {_SSDP_SEARCH_TARGET}", + "", + "", + ] + ).encode("ascii") + LOG.debug("UPnP SSDP M-SEARCH start st=%s timeout_s=%.2f", _SSDP_SEARCH_TARGET, timeout_s) + sock.sendto(msg, _SSDP_ADDR) + + deadline = time.monotonic() + max(0.1, timeout_s) + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + LOG.debug("UPnP SSDP M-SEARCH finished (timeout reached)") + return + sock.settimeout(max(0.05, min(remaining, 0.5))) + try: + payload, _ = sock.recvfrom(8192) + except TimeoutError: + continue + except OSError: + LOG.debug("UPnP SSDP receive aborted due to socket error") + return + headers = _parse_ssdp_headers(payload) + if headers: + LOG.debug( + "UPnP SSDP response location=%s st=%s usn=%s", + headers.get("location", ""), + headers.get("st", ""), + headers.get("usn", ""), + ) + yield headers + + +def _is_devialet_manufacturer(location: str, timeout_s: float) -> bool: + timeout = max(0.3, min(timeout_s, 1.5)) + try: + with httpx.Client(timeout=timeout) as client: + response = client.get(location) + response.raise_for_status() + xml_text = response.text + except Exception as exc: + LOG.debug("UPnP XML fetch failed location=%s err=%s", location, exc) + return False + + if not re.search( + r"\s*Devialet\s*", + xml_text, + flags=re.IGNORECASE, + ): + LOG.debug("UPnP XML rejected location=%s manufacturer_tag_not_found", location) + return False + + LOG.debug("UPnP XML accepted location=%s manufacturer=Devialet", location) + return True + + +class UpnpDiscoveryGateway(DiscoveryPort): + def discover(self, timeout_s: float = 3.0) -> list[Target]: + uniq: dict[str, UpnpService] = {} + LOG.debug( + "UPnP discovery begin timeout_s=%.2f target=%s", + timeout_s, + _SSDP_SEARCH_TARGET, + ) + for headers in _iter_ssdp_responses(timeout_s): + location = headers.get("location", "") + if not _is_devialet_manufacturer(location=location, timeout_s=timeout_s): + continue + parsed = urlparse(location) + host = parsed.hostname + if not host: + LOG.debug("UPnP response ignored (missing host in location): %s", location) + continue + if host in uniq: + continue + + uniq[host] = UpnpService( + name=f"UPnP:{host}", + address=host, + port=_DEFAULT_PORT, + base_path=_DEFAULT_BASE_PATH, + ) + LOG.debug("UPnP device accepted host=%s base_path=%s", host, _DEFAULT_BASE_PATH) + + targets = [ + Target(address=s.address, port=s.port, base_path=s.base_path, name=s.name) + for s in uniq.values() + ] + LOG.debug("UPnP discovery done found=%d", len(targets)) + return targets diff --git a/src/devialetctl/interfaces/cli.py b/src/devialetctl/interfaces/cli.py index 70124dd..896ef50 100644 --- a/src/devialetctl/interfaces/cli.py +++ b/src/devialetctl/interfaces/cli.py @@ -1,5 +1,6 @@ import argparse import dataclasses +import json import logging import os import sys @@ -8,79 +9,82 @@ from devialetctl.application.ports import Target from devialetctl.application.service import VolumeService from devialetctl.infrastructure.config import load_config -from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway, normalize_base_path +from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway from devialetctl.infrastructure.mdns_gateway import MdnsDiscoveryGateway +from devialetctl.infrastructure.upnp_gateway import UpnpDiscoveryGateway +from devialetctl.interfaces.topology import ( + build_topology_tree, + pick_target_by_system_name, + render_topology_tree_lines, +) + +LOG = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class _EffectiveOptions: + ip: str | None + port: int + discover_timeout: float + system: str | None + + +def _effective_options(args, cfg) -> _EffectiveOptions: + return _EffectiveOptions( + ip=args.ip if args.ip is not None else cfg.target.ip, + port=args.port if args.port is not None else cfg.target.port, + discover_timeout=( + args.discover_timeout + if args.discover_timeout is not None + else cfg.target.discover_timeout + ), + system=args.system, + ) -def _pick(services: list[Target], index: int | None): +def _pick(services: list[Target]) -> Target: if not services: raise RuntimeError( - "No service detected via mDNS (Bonjour). Check network / Wi-Fi isolation." - ) - if index is None: - if len(services) == 1: - return services[0] - for i, s in enumerate(services): - print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") - raise RuntimeError("Multiple services detected. Run again with --index N.") - if index < 0 or index >= len(services): - raise RuntimeError(f"Invalid index: {index}") - return services[index] - - -def _target_from_args(args) -> Target: - ip = args.ip - port = args.port - base_path = args.base_path - discover_timeout = args.discover_timeout - index = args.index - - if getattr(args, "cmd", None) == "daemon": - ip = args.daemon_ip if args.daemon_ip is not None else ip - port = args.daemon_port if args.daemon_port is not None else port - base_path = args.daemon_base_path if args.daemon_base_path is not None else base_path - discover_timeout = ( - args.daemon_discover_timeout - if args.daemon_discover_timeout is not None - else discover_timeout + "No service detected via mDNS/UPnP. Check network / Wi-Fi isolation." ) - index = args.daemon_index if args.daemon_index is not None else index + if len(services) == 1: + return services[0] + for i, s in enumerate(services): + print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") + raise RuntimeError( + "Multiple services detected. Use --system to pick one, or --ip to force a target." + ) - if ip: - return Target( - address=ip, port=port, base_path=normalize_base_path(base_path), name="manual" - ) - services = MdnsDiscoveryGateway().discover(timeout_s=discover_timeout) - return _pick(services, index) +def _discover_targets(timeout_s: float) -> list[Target]: + seen: set[tuple[str, int, str]] = set() + merged: list[Target] = [] + for gateway in (MdnsDiscoveryGateway(), UpnpDiscoveryGateway()): + for svc in gateway.discover(timeout_s=timeout_s): + key = (svc.address, svc.port, svc.base_path) + if key in seen: + continue + seen.add(key) + merged.append(svc) + return merged -def _target_from_config(args) -> Target: - cfg = load_config(args.config) - if args.daemon_ip is not None: + +def _target_from_resolved(resolved: _EffectiveOptions) -> Target: + if resolved.ip: return Target( - address=args.daemon_ip, - port=args.daemon_port if args.daemon_port is not None else 80, - base_path=normalize_base_path( - args.daemon_base_path if args.daemon_base_path is not None else "/ipcontrol/v1" - ), + address=resolved.ip, + port=resolved.port, + base_path="/ipcontrol/v1", name="manual", ) - - if cfg.target.ip: - return Target( - address=cfg.target.ip, - port=cfg.target.port, - base_path=cfg.target.base_path, - name="config", + services = _discover_targets(timeout_s=resolved.discover_timeout) + if resolved.system is not None: + return pick_target_by_system_name( + services, + resolved.system, + gateway_factory=DevialetHttpGateway, ) - timeout = ( - args.daemon_discover_timeout - if args.daemon_discover_timeout is not None - else cfg.target.discover_timeout - ) - index = args.daemon_index if args.daemon_index is not None else cfg.target.index - services = MdnsDiscoveryGateway().discover(timeout_s=timeout) - return _pick(services, index) + return _pick(services) def _configure_logging(level: str) -> None: @@ -91,58 +95,24 @@ def _configure_logging(level: str) -> None: ) -def main() -> None: - p = argparse.ArgumentParser( - prog="devialetctl", description="Devialet Phantom IP Control (discover + commands)" - ) - p.add_argument( - "--log-level", - type=str, - default=None, - help="Override log level (e.g. DEBUG, INFO, WARNING).", - ) - p.add_argument("--discover-timeout", type=float, default=3.0) - p.add_argument("--index", type=int, default=None) - p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") - p.add_argument("--port", type=int, default=80) - p.add_argument("--base-path", type=str, default="/ipcontrol/v1") - - sub = p.add_subparsers(dest="cmd", required=True) - sub.add_parser("list") - sub.add_parser("systems") - sub.add_parser("getvol") - sub.add_parser("volup") - sub.add_parser("voldown") - sub.add_parser("mute") - set_parser = sub.add_parser("setvol") - set_parser.add_argument("value", type=int) +def _validate_target_selection_args(parser: argparse.ArgumentParser, args) -> None: + if args.ip and args.system: + parser.error( + "--ip and --system are not compatible: " + "--ip disables discovery while --system requires discovery." + ) + if args.cmd in {"list", "tree"} and args.ip: + parser.error(f"--ip is not supported with '{args.cmd}': this command is discovery-based.") + if args.cmd in {"list", "tree"} and args.system: + parser.error( + f"--system is not supported with '{args.cmd}': " + "this command already lists discovered systems." + ) - daemon = sub.add_parser("daemon") - daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") - daemon.add_argument("--config", type=str, default=None) - daemon.add_argument( - "--discover-timeout", dest="daemon_discover_timeout", type=float, default=None - ) - daemon.add_argument("--index", dest="daemon_index", type=int, default=None) - daemon.add_argument("--ip", dest="daemon_ip", type=str, default=None) - daemon.add_argument("--port", dest="daemon_port", type=int, default=None) - daemon.add_argument("--base-path", dest="daemon_base_path", type=str, default=None) - daemon.add_argument("--cec-device", dest="daemon_cec_device", type=str, default=None) - daemon.add_argument("--cec-osd-name", dest="daemon_cec_osd_name", type=str, default=None) - daemon.add_argument( - "--cec-vendor-compat", - dest="daemon_cec_vendor_compat", - choices=["none", "samsung"], - default=None, - ) - - args = p.parse_args() - requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") - if requested_log_level is not None: - _configure_logging(requested_log_level) +def _dispatch_command(args, cfg, resolved: _EffectiveOptions) -> None: if args.cmd == "list": - services = MdnsDiscoveryGateway().discover(timeout_s=args.discover_timeout) + services = _discover_targets(timeout_s=resolved.discover_timeout) if not services: print("No service detected.") return @@ -150,30 +120,37 @@ def main() -> None: print(f"[{i}] {s.name} -> {s.address}:{s.port}{s.base_path}") return + if args.cmd == "tree": + services = _discover_targets(timeout_s=resolved.discover_timeout) + if not services: + print("No service detected.") + return + tree = build_topology_tree(services, gateway_factory=DevialetHttpGateway) + if args.tree_json: + print(json.dumps(tree, indent=2, ensure_ascii=False)) + return + for line in render_topology_tree_lines(tree): + print(line) + return + + target = _target_from_resolved(resolved) + gateway = DevialetHttpGateway(target.address, target.port, target.base_path) + if args.cmd == "daemon": try: - cfg = load_config(args.config) - cfg = dataclasses.replace( + daemon_cfg = dataclasses.replace( cfg, - cec_device=( - args.daemon_cec_device if args.daemon_cec_device is not None else cfg.cec_device - ), + cec_device=args.cec_device if args.cec_device is not None else cfg.cec_device, cec_osd_name=( - args.daemon_cec_osd_name - if args.daemon_cec_osd_name is not None - else cfg.cec_osd_name + args.cec_osd_name if args.cec_osd_name is not None else cfg.cec_osd_name ), cec_vendor_compat=( - args.daemon_cec_vendor_compat - if args.daemon_cec_vendor_compat is not None + args.cec_vendor_compat + if args.cec_vendor_compat is not None else cfg.cec_vendor_compat ), ) - if requested_log_level is None: - _configure_logging(cfg.log_level) - target = _target_from_config(args) - gateway = DevialetHttpGateway(target.address, target.port, target.base_path) - runner = DaemonRunner(cfg=cfg, gateway=gateway) + runner = DaemonRunner(cfg=daemon_cfg, gateway=gateway) runner.run_forever(input_name=args.input) return except KeyboardInterrupt: @@ -182,9 +159,7 @@ def main() -> None: print(f"Daemon error: {exc}", file=sys.stderr) raise SystemExit(2) - target = _target_from_args(args) - client = VolumeService(DevialetHttpGateway(target.address, target.port, target.base_path)) - + client = VolumeService(gateway) try: if args.cmd == "systems": print(client.systems()) @@ -205,3 +180,50 @@ def main() -> None: except Exception as exc: print(f"Error: {exc}", file=sys.stderr) raise SystemExit(2) + + +def main() -> None: + p = argparse.ArgumentParser( + prog="devialetctl", description="Devialet Phantom IP Control (discover + commands)" + ) + p.add_argument("--config", type=str, default=None) + p.add_argument( + "--log-level", + type=str, + default=None, + help="Override log level (e.g. DEBUG, INFO, WARNING).", + ) + p.add_argument("--discover-timeout", type=float, default=None) + p.add_argument("--system", type=str, default=None, help="System name from 'tree' output.") + p.add_argument("--ip", type=str, default=None, help="Manual IP (bypass discovery)") + p.add_argument("--port", type=int, default=None) + + sub = p.add_subparsers(dest="cmd", required=True) + sub.add_parser("list") + tree = sub.add_parser("tree") + tree.add_argument("--json", action="store_true", dest="tree_json") + sub.add_parser("systems") + sub.add_parser("getvol") + sub.add_parser("volup") + sub.add_parser("voldown") + sub.add_parser("mute") + set_parser = sub.add_parser("setvol") + set_parser.add_argument("value", type=int) + + daemon = sub.add_parser("daemon") + daemon.add_argument("--input", choices=["cec", "keyboard"], default="cec") + daemon.add_argument("--cec-device", type=str, default=None) + daemon.add_argument("--cec-osd-name", type=str, default=None) + daemon.add_argument( + "--cec-vendor-compat", + choices=["none", "samsung"], + default=None, + ) + + args = p.parse_args() + _validate_target_selection_args(p, args) + cfg = load_config(args.config) + resolved = _effective_options(args, cfg) + requested_log_level = args.log_level or os.getenv("DEVIALETCTL_LOG_LEVEL") + _configure_logging(requested_log_level if requested_log_level is not None else cfg.log_level) + _dispatch_command(args, cfg, resolved) diff --git a/src/devialetctl/interfaces/topology.py b/src/devialetctl/interfaces/topology.py new file mode 100644 index 0000000..6ecd6a4 --- /dev/null +++ b/src/devialetctl/interfaces/topology.py @@ -0,0 +1,245 @@ +import asyncio +import dataclasses +import logging + +from devialetctl.application.ports import Target +from devialetctl.infrastructure.devialet_gateway import DevialetHttpGateway + +LOG = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class DeviceRow: + device_id: str + device_name: str + model: str + role: str + serial: str + address: str + port: int + is_system_leader: bool + + +@dataclasses.dataclass(frozen=True) +class SystemRow: + system_id: str + system_name: str + devices: list[DeviceRow] + + +@dataclasses.dataclass(frozen=True) +class GroupRow: + group_id: str + systems: list[SystemRow] + + +def _safe_fetch_json(gateway: DevialetHttpGateway, path: str) -> dict | None: + try: + data = asyncio.run(gateway.fetch_json_async(path)) + if isinstance(data, dict): + return data + except Exception as exc: + LOG.debug("tree fetch failed path=%s host=%s err=%s", path, gateway.address, exc) + return None + + +def _device_row_to_dict(dev: DeviceRow) -> dict: + return { + "device_id": dev.device_id, + "device_name": dev.device_name, + "model": dev.model, + "role": dev.role, + "serial": dev.serial, + "address": dev.address, + "port": dev.port, + "is_system_leader": dev.is_system_leader, + } + + +def build_topology_tree(targets: list[Target], gateway_factory=DevialetHttpGateway) -> dict: + devices_by_id: dict[str, dict] = {} + systems: dict[str, dict] = {} + groups: dict[str, dict] = {} + + for target in targets: + gateway = gateway_factory(target.address, target.port, target.base_path) + device = _safe_fetch_json(gateway, "/devices/current") + if not device: + continue + + device_id = str(device.get("deviceId") or f"dispatcher:{target.address}") + system_id = str(device.get("systemId") or "") or None + group_id = str(device.get("groupId") or "") or None + devices_by_id[device_id] = { + "device_id": device_id, + "device_name": device.get("deviceName") or device.get("model") or device_id, + "model": str(device.get("model") or ""), + "role": str(device.get("role") or ""), + "serial": str(device.get("serial") or ""), + "system_id": system_id, + "group_id": group_id, + "address": target.address, + "port": target.port, + "is_system_leader": bool(device.get("isSystemLeader")), + } + + if not devices_by_id: + return {"groups": [], "ungrouped_devices": [], "errors": ["No Devialet devices detected."]} + + for dev in devices_by_id.values(): + if not dev["system_id"]: + continue + systems.setdefault( + dev["system_id"], + {"name": None, "group_id": dev["group_id"], "devices": []}, + ) + systems[dev["system_id"]]["devices"].append(dev) + + for system_id, system_data in systems.items(): + probe_device = system_data["devices"][0] + gateway = gateway_factory( + probe_device["address"], + probe_device["port"], + "/ipcontrol/v1", + ) + sys_info = _safe_fetch_json(gateway, "/systems/current") + if sys_info: + system_data["name"] = str(sys_info.get("systemName") or system_id) + sys_group_id = str(sys_info.get("groupId") or "") or None + if sys_group_id: + system_data["group_id"] = sys_group_id + else: + system_data["name"] = system_id + + group_id = system_data["group_id"] or "ungrouped" + groups.setdefault(group_id, {"systems": {}}) + groups[group_id]["systems"][system_id] = system_data + + group_rows: list[GroupRow] = [] + for group_id in sorted(groups.keys()): + group_data = groups[group_id] + systems_rows: list[SystemRow] = [] + for system_id in sorted(group_data["systems"].keys()): + system_data = group_data["systems"][system_id] + devices_rows = [ + DeviceRow( + device_id=str(dev["device_id"]), + device_name=str(dev["device_name"]), + model=str(dev["model"]), + role=str(dev["role"]), + serial=str(dev["serial"]), + address=str(dev["address"]), + port=int(dev["port"]), + is_system_leader=bool(dev["is_system_leader"]), + ) + for dev in sorted(system_data["devices"], key=lambda d: d["device_name"]) + ] + systems_rows.append( + SystemRow( + system_id=str(system_id), + system_name=str(system_data["name"]), + devices=devices_rows, + ) + ) + group_rows.append(GroupRow(group_id=str(group_id), systems=systems_rows)) + + ungrouped_devices_rows = [ + DeviceRow( + device_id=str(dev["device_id"]), + device_name=str(dev["device_name"]), + model=str(dev["model"]), + role=str(dev["role"]), + serial=str(dev["serial"]), + address=str(dev["address"]), + port=int(dev["port"]), + is_system_leader=bool(dev["is_system_leader"]), + ) + for dev in sorted( + [d for d in devices_by_id.values() if d["system_id"] is None], + key=lambda d: d["device_name"], + ) + ] + + return { + "groups": [ + { + "group_id": group.group_id, + "systems": [ + { + "system_id": system.system_id, + "system_name": system.system_name, + "devices": [_device_row_to_dict(dev) for dev in system.devices], + } + for system in group.systems + ], + } + for group in group_rows + ], + "ungrouped_devices": [_device_row_to_dict(dev) for dev in ungrouped_devices_rows], + "errors": [], + } + + +def render_topology_tree_lines(tree: dict) -> list[str]: + if tree.get("errors"): + return tree["errors"] + + lines: list[str] = [] + for group in tree.get("groups", []): + lines.append(f"Group {group['group_id']}") + for system in group.get("systems", []): + lines.append(f" System {system['system_name']} ({system['system_id']})") + for dev in system.get("devices", []): + role = f" role={dev['role']}" if dev.get("role") else "" + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}{role}") + + if tree.get("ungrouped_devices"): + lines.append("Ungrouped devices") + for dev in tree["ungrouped_devices"]: + model = f" model={dev['model']}" if dev.get("model") else "" + lines.append(f" Device {dev['device_name']} @ {dev['address']}{model}") + return lines + + +def pick_target_by_system_name( + services: list[Target], + system_name: str, + gateway_factory=DevialetHttpGateway, +) -> Target: + if not services: + raise RuntimeError("No service detected via mDNS/UPnP. Check network / Wi-Fi isolation.") + requested = system_name.strip() + if not requested: + raise RuntimeError("System name cannot be empty.") + + tree = build_topology_tree(services, gateway_factory=gateway_factory) + matches: list[tuple[str, dict]] = [] + for group in tree.get("groups", []): + group_id = str(group.get("group_id", "ungrouped")) + for system in group.get("systems", []): + if str(system.get("system_name", "")).casefold() == requested.casefold(): + matches.append((group_id, system)) + + if not matches: + raise RuntimeError( + f"System '{requested}' not found. Run 'devialetctl tree' to list available systems." + ) + if len(matches) > 1: + groups = ", ".join(sorted({m[0] for m in matches})) + raise RuntimeError( + f"System name '{requested}' is ambiguous across groups: {groups}. " + "Use --ip or rename systems." + ) + + group_id, system = matches[0] + devices = system.get("devices", []) + if not devices: + raise RuntimeError(f"System '{requested}' has no reachable devices in group {group_id}.") + selected = next((d for d in devices if d.get("is_system_leader")), devices[0]) + return Target( + address=str(selected["address"]), + port=int(selected["port"]), + base_path="/ipcontrol/v1", + name=f"{requested}@{group_id}", + ) diff --git a/tests/test_cli_helpers.py b/tests/test_cli_helpers.py index 731d450..d69d869 100644 --- a/tests/test_cli_helpers.py +++ b/tests/test_cli_helpers.py @@ -2,6 +2,7 @@ import pytest +from devialetctl.infrastructure.config import DaemonConfig, RuntimeTarget from devialetctl.interfaces import cli @@ -11,29 +12,20 @@ def _service(name="dev", address="10.0.0.2", port=80, base_path="/ipcontrol/v1") def test_pick_raises_on_empty_list() -> None: with pytest.raises(RuntimeError, match="No service"): - cli._pick([], None) - - -def test_pick_raises_on_invalid_index() -> None: - with pytest.raises(RuntimeError, match="Invalid index"): - cli._pick([_service()], 5) + cli._pick([]) def test_target_from_args_uses_daemon_overrides() -> None: + cfg = DaemonConfig(target=RuntimeTarget()) args = SimpleNamespace( cmd="daemon", - ip=None, - port=80, - base_path="/ipcontrol/v1", - discover_timeout=3.0, - index=None, - daemon_ip="10.0.0.9", - daemon_port=8080, - daemon_base_path="/", - daemon_discover_timeout=1.0, - daemon_index=1, + ip="10.0.0.9", + port=8080, + discover_timeout=None, + system=None, ) - target = cli._target_from_args(args) + resolved = cli._effective_options(args, cfg) + target = cli._target_from_resolved(resolved) assert target.address == "10.0.0.9" assert target.port == 8080 assert target.base_path == "/ipcontrol/v1" diff --git a/tests/test_cli_regression.py b/tests/test_cli_regression.py index 3fab9c3..234ed52 100644 --- a/tests/test_cli_regression.py +++ b/tests/test_cli_regression.py @@ -1,3 +1,4 @@ +import json import sys import pytest @@ -5,6 +6,15 @@ from devialetctl.interfaces import cli +@pytest.fixture(autouse=True) +def _disable_real_upnp_discovery(monkeypatch) -> None: + class FakeUpnpDiscovery: + def discover(self, timeout_s): + return [] + + monkeypatch.setattr(cli, "UpnpDiscoveryGateway", lambda: FakeUpnpDiscovery()) + + def test_cli_list_prints_discovered_services(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): @@ -97,7 +107,7 @@ def run_forever(self, input_name): assert FakeRunner.called_with == "keyboard" -def test_cli_daemon_accepts_subcommand_index(monkeypatch) -> None: +def test_cli_daemon_accepts_subcommand_system(monkeypatch) -> None: class FakeDiscovery: def discover(self, timeout_s): class S0: @@ -119,6 +129,32 @@ class FakeGateway: def __init__(self, address, port, base_path): FakeGateway.picked_address = address + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + if self.address == "10.0.0.10": + return { + "deviceId": "dev0", + "systemId": "sys-other", + "groupId": "grp-other", + "deviceName": "Other", + "isSystemLeader": True, + } + return { + "deviceId": "dev1", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV", + "isSystemLeader": True, + } + if path == "/systems/current": + if self.address == "10.0.0.10": + return {"systemName": "Other", "groupId": "grp-other"} + return {"systemName": "TV", "groupId": "grp-tv"} + return {} class FakeRunner: def __init__(self, cfg, gateway): @@ -134,12 +170,183 @@ def run_forever(self, input_name): monkeypatch.setattr( sys, "argv", - ["devialetctl", "daemon", "--input", "keyboard", "--index", "1"], + ["devialetctl", "--system", "TV", "daemon", "--input", "keyboard"], ) cli.main() assert FakeGateway.picked_address == "10.0.0.11" +def test_cli_getvol_accepts_system_name_selection(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Salon: + name = "salon" + address = "10.0.0.50" + port = 80 + base_path = "/ipcontrol/v1" + + class TvLeft: + name = "tv-left" + address = "10.0.0.71" + port = 80 + base_path = "/ipcontrol/v1" + + class TvRight: + name = "tv-right" + address = "10.0.0.184" + port = 80 + base_path = "/ipcontrol/v1" + + return [Salon(), TvLeft(), TvRight()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + if self.address == "10.0.0.50": + return { + "deviceId": "salon-1", + "systemId": "sys-salon", + "groupId": "grp-salon", + "deviceName": "Salon", + "model": "Phantom", + "role": "Mono", + "isSystemLeader": True, + } + if self.address == "10.0.0.71": + return { + "deviceId": "tv-left", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV Gauche", + "model": "Phantom", + "role": "FrontLeft", + "isSystemLeader": True, + } + return { + "deviceId": "tv-right", + "systemId": "sys-tv", + "groupId": "grp-tv", + "deviceName": "TV Droite", + "model": "Phantom", + "role": "FrontRight", + "isSystemLeader": False, + } + if path == "/systems/current": + if self.address in {"10.0.0.71", "10.0.0.184"}: + return {"systemName": "TV", "groupId": "grp-tv"} + return {"systemName": "Salon", "groupId": "grp-salon"} + return {} + + async def systems_async(self): + return {} + + async def get_volume_async(self): + return int(self.address.split(".")[-1]) + + async def set_volume_async(self, value): + return None + + async def volume_up_async(self): + return None + + async def volume_down_async(self): + return None + + async def mute_toggle_async(self): + return None + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "getvol"]) + cli.main() + out = capsys.readouterr().out + assert out.strip() == "71" + + +def test_cli_getvol_system_name_not_found(monkeypatch) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + } + if path == "/systems/current": + return {"systemName": "Salon", "groupId": "grp-1"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "--system", "TV", "getvol"]) + with pytest.raises(RuntimeError) as exc: + cli.main() + assert "System 'TV' not found" in str(exc.value) + + +def test_cli_rejects_ip_and_system_together(monkeypatch, capsys) -> None: + monkeypatch.setattr( + sys, + "argv", + ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "getvol"], + ) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip and --system are not compatible" in err + + +def test_cli_daemon_rejects_ip_and_system_together(monkeypatch, capsys) -> None: + monkeypatch.setattr( + sys, + "argv", + ["devialetctl", "--ip", "10.0.0.2", "--system", "TV", "daemon", "--input", "keyboard"], + ) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip and --system are not compatible" in err + + +def test_cli_list_rejects_ip_override(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "list"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip is not supported with 'list'" in err + + +def test_cli_tree_rejects_ip_override(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["devialetctl", "--ip", "10.0.0.2", "tree"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 2 + err = capsys.readouterr().err + assert "--ip is not supported with 'tree'" in err + + def test_cli_list_when_empty(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): @@ -186,6 +393,91 @@ def run_forever(self, input_name): assert "Daemon error: boom" in err +def test_cli_tree_prints_groups_systems_devices(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + "model": "Phantom I", + "role": "Mono", + } + if path == "/systems/current": + return {"systemId": "sys-1", "groupId": "grp-1", "systemName": "Salon"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree"]) + cli.main() + + out = capsys.readouterr().out + assert "Group grp-1" in out + assert "System Salon (sys-1)" in out + assert "Device Living Room @ 10.0.0.2 model=Phantom I role=Mono" in out + + +def test_cli_tree_json_outputs_structured_topology(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + self.port = port + self.base_path = base_path + + async def fetch_json_async(self, path): + if path == "/devices/current": + return { + "deviceId": "dev-1", + "systemId": "sys-1", + "groupId": "grp-1", + "deviceName": "Living Room", + "model": "Phantom I", + "role": "Mono", + } + if path == "/systems/current": + return {"systemId": "sys-1", "groupId": "grp-1", "systemName": "Salon"} + return {} + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree", "--json"]) + cli.main() + + data = json.loads(capsys.readouterr().out) + assert data["groups"][0]["group_id"] == "grp-1" + assert data["groups"][0]["systems"][0]["system_name"] == "Salon" + assert data["groups"][0]["systems"][0]["devices"][0]["device_name"] == "Living Room" + assert "sources" not in data["groups"][0] + + def test_cli_list_applies_log_level_from_env(monkeypatch, capsys) -> None: class FakeDiscovery: def discover(self, timeout_s): @@ -203,3 +495,110 @@ def fake_configure(level): cli.main() _ = capsys.readouterr().out assert captured["level"] == "DEBUG" + + +@pytest.mark.parametrize( + ("argv", "expected_output"), + [ + (["devialetctl", "systems"], "{}"), + (["devialetctl", "setvol", "35"], "OK"), + (["devialetctl", "volup"], "OK"), + (["devialetctl", "voldown"], "OK"), + (["devialetctl", "mute"], "OK"), + ], +) +def test_cli_control_commands_paths(monkeypatch, capsys, argv, expected_output) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + set_value = None + calls = [] + + def __init__(self, address, port, base_path): + self.address = address + + async def systems_async(self): + return {} + + async def get_volume_async(self): + return 31 + + async def set_volume_async(self, value): + FakeGateway.set_value = value + return None + + async def volume_up_async(self): + FakeGateway.calls.append("up") + return None + + async def volume_down_async(self): + FakeGateway.calls.append("down") + return None + + async def mute_toggle_async(self): + FakeGateway.calls.append("mute") + return None + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(sys, "argv", argv) + + cli.main() + + out = capsys.readouterr().out + assert out.strip() == expected_output + if argv[1] == "setvol": + assert FakeGateway.set_value == 35 + + +def test_cli_tree_when_empty(monkeypatch, capsys) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + return [] + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(sys, "argv", ["devialetctl", "tree"]) + + cli.main() + + out = capsys.readouterr().out + assert "No service detected." in out + + +def test_cli_daemon_handles_keyboard_interrupt(monkeypatch) -> None: + class FakeDiscovery: + def discover(self, timeout_s): + class Row: + name = "phantom" + address = "10.0.0.2" + port = 80 + base_path = "/ipcontrol/v1" + + return [Row()] + + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + class FakeRunner: + def __init__(self, cfg, gateway): + self.cfg = cfg + self.gateway = gateway + + def run_forever(self, input_name): + raise KeyboardInterrupt() + + monkeypatch.setattr(cli, "MdnsDiscoveryGateway", lambda: FakeDiscovery()) + monkeypatch.setattr(cli, "DevialetHttpGateway", FakeGateway) + monkeypatch.setattr(cli, "DaemonRunner", FakeRunner) + monkeypatch.setattr(sys, "argv", ["devialetctl", "daemon", "--input", "keyboard"]) + + cli.main() diff --git a/tests/test_topology.py b/tests/test_topology.py new file mode 100644 index 0000000..1977568 --- /dev/null +++ b/tests/test_topology.py @@ -0,0 +1,98 @@ +import pytest + +from devialetctl.application.ports import Target +from devialetctl.interfaces.topology import ( + build_topology_tree, + pick_target_by_system_name, + render_topology_tree_lines, +) + + +def _svc(name: str, address: str) -> Target: + return Target(name=name, address=address, port=80, base_path="/ipcontrol/v1") + + +def test_build_topology_tree_returns_error_when_no_device_payload() -> None: + class FakeGateway: + def __init__(self, address, port, base_path): + self.address = address + + async def fetch_json_async(self, path): + return None + + tree = build_topology_tree([_svc("d1", "10.0.0.2")], gateway_factory=FakeGateway) + assert tree["groups"] == [] + assert tree["ungrouped_devices"] == [] + assert tree["errors"] == ["No Devialet devices detected."] + + +def test_render_topology_tree_lines_handles_errors_and_ungrouped() -> None: + lines = render_topology_tree_lines({"errors": ["boom"], "groups": [], "ungrouped_devices": []}) + assert lines == ["boom"] + + tree = { + "errors": [], + "groups": [], + "ungrouped_devices": [ + { + "device_name": "Kitchen", + "address": "10.0.0.42", + "model": "Phantom II", + } + ], + } + lines = render_topology_tree_lines(tree) + assert "Ungrouped devices" in lines + assert " Device Kitchen @ 10.0.0.42 model=Phantom II" in lines + + +def test_pick_target_by_system_name_validation_and_ambiguity(monkeypatch) -> None: + with pytest.raises(RuntimeError, match="No service"): + pick_target_by_system_name([], "TV") + + with pytest.raises(RuntimeError, match="cannot be empty"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], " ") + + monkeypatch.setattr( + "devialetctl.interfaces.topology.build_topology_tree", + lambda services, gateway_factory=None: { + "errors": [], + "groups": [ + { + "group_id": "g1", + "systems": [ + { + "system_name": "TV", + "devices": [{"address": "10.0.0.2", "port": 80}], + } + ], + }, + { + "group_id": "g2", + "systems": [ + { + "system_name": "TV", + "devices": [{"address": "10.0.0.3", "port": 80}], + } + ], + }, + ], + "ungrouped_devices": [], + }, + ) + with pytest.raises(RuntimeError, match="ambiguous across groups"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], "TV") + + +def test_pick_target_by_system_name_raises_when_system_has_no_devices(monkeypatch) -> None: + monkeypatch.setattr( + "devialetctl.interfaces.topology.build_topology_tree", + lambda services, gateway_factory=None: { + "errors": [], + "groups": [{"group_id": "grp-tv", "systems": [{"system_name": "TV", "devices": []}]}], + "ungrouped_devices": [], + }, + ) + + with pytest.raises(RuntimeError, match="has no reachable devices"): + pick_target_by_system_name([_svc("d1", "10.0.0.2")], "TV") diff --git a/tests/test_upnp_gateway.py b/tests/test_upnp_gateway.py new file mode 100644 index 0000000..50a359c --- /dev/null +++ b/tests/test_upnp_gateway.py @@ -0,0 +1,116 @@ +from devialetctl.infrastructure import upnp_gateway + + +def test_parse_ssdp_headers_lowercases_and_ignores_invalid_lines() -> None: + payload = ( + b"HTTP/1.1 200 OK\r\n" + b"LOCATION: http://10.0.0.2:1400/desc.xml\r\n" + b"ST: urn:schemas-upnp-org:device:MediaRenderer:2\r\n" + b"USN: uuid:abc::urn:schemas-upnp-org:device:MediaRenderer:2\r\n" + b"bad line without colon\r\n" + b"\r\n" + ) + headers = upnp_gateway._parse_ssdp_headers(payload) + assert headers["location"] == "http://10.0.0.2:1400/desc.xml" + assert headers["st"] == "urn:schemas-upnp-org:device:MediaRenderer:2" + assert headers["usn"] == "uuid:abc::urn:schemas-upnp-org:device:MediaRenderer:2" + assert "bad line without colon" not in headers + + +def test_is_devialet_manufacturer_accepts_devialet_tag_and_caps_timeout(monkeypatch) -> None: + captured = {"timeout": None} + + class FakeResponse: + text = " deViaLet " + + @staticmethod + def raise_for_status(): + return None + + class FakeClient: + def __init__(self, timeout): + captured["timeout"] = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + return FakeResponse() + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.2:1400/desc.xml", timeout_s=9.0) + assert ok is True + assert captured["timeout"] == 1.5 + + +def test_is_devialet_manufacturer_rejects_non_devialet_tag(monkeypatch) -> None: + class FakeResponse: + text = "OtherBrand" + + @staticmethod + def raise_for_status(): + return None + + class FakeClient: + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + return FakeResponse() + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.3:1400/desc.xml", timeout_s=0.05) + assert ok is False + + +def test_is_devialet_manufacturer_returns_false_on_http_error(monkeypatch) -> None: + class FakeClient: + def __init__(self, timeout): + self.timeout = timeout + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, location): + raise RuntimeError("boom") + + monkeypatch.setattr(upnp_gateway.httpx, "Client", FakeClient) + ok = upnp_gateway._is_devialet_manufacturer("http://10.0.0.4:1400/desc.xml", timeout_s=1.0) + assert ok is False + + +def test_discover_filters_non_devialet_missing_host_and_duplicates(monkeypatch) -> None: + def fake_iter(timeout_s): + return iter( + [ + {"location": "http://10.0.0.2:1400/desc.xml"}, + {"location": "http://10.0.0.2:1400/desc.xml"}, + {"location": "not-a-url"}, + {"location": "http://10.0.0.3:1400/desc.xml"}, + ] + ) + + def fake_is_devialet_manufacturer(location, timeout_s): + return "10.0.0.3" not in location + + monkeypatch.setattr(upnp_gateway, "_iter_ssdp_responses", fake_iter) + monkeypatch.setattr(upnp_gateway, "_is_devialet_manufacturer", fake_is_devialet_manufacturer) + + targets = upnp_gateway.UpnpDiscoveryGateway().discover(timeout_s=0.1) + assert len(targets) == 1 + assert targets[0].address == "10.0.0.2" + assert targets[0].port == 80 + assert targets[0].base_path == "/ipcontrol/v1" + assert targets[0].name == "UPnP:10.0.0.2"