From 77e2b3be464f515e25e67eaf98c70f4e824d68ca Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 5 Dec 2025 09:29:17 -0500 Subject: [PATCH 1/4] fixed log statement for time reporting --- src/ezmsg/util/perf/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ezmsg/util/perf/run.py b/src/ezmsg/util/perf/run.py index 3f794f2..5c6f6b4 100644 --- a/src/ezmsg/util/perf/run.py +++ b/src/ezmsg/util/perf/run.py @@ -231,7 +231,7 @@ def perf_run( server.stop() d = datetime(1, 1, 1) + timedelta(seconds=time.time() - start_time) dur_str = ":".join( - [str(n) for n in [d.day - 1, d.hour, d.minute, d.second] if n != 0] + [str(n) for n in [d.hour, d.minute, d.second]] ) ez.logger.info(f"Tests concluded. Wallclock Runtime: {dur_str}s") From bc4e2179bbd6c481794f81c3b7efd6139cc24078 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 5 Dec 2025 15:38:27 -0500 Subject: [PATCH 2/4] initial implementation --- src/ezmsg/core/backpressure.py | 13 +++- src/ezmsg/core/channelmanager.py | 15 ++++- src/ezmsg/core/command.py | 17 ++++- src/ezmsg/core/graphserver.py | 104 +++++++++++++++++++++++++++++++ src/ezmsg/core/messagechannel.py | 45 ++++++++++++- src/ezmsg/core/netprotocol.py | 2 + src/ezmsg/core/pubclient.py | 30 ++++++++- src/ezmsg/core/subclient.py | 24 ++++++- 8 files changed, 241 insertions(+), 9 deletions(-) diff --git a/src/ezmsg/core/backpressure.py b/src/ezmsg/core/backpressure.py index 56c0e7c..a0970fe 100644 --- a/src/ezmsg/core/backpressure.py +++ b/src/ezmsg/core/backpressure.py @@ -1,9 +1,11 @@ import asyncio +import time from uuid import UUID from typing import Literal +from .profiling import LeaseDurationTelemetry class BufferLease: """ @@ -83,7 +85,9 @@ class Backpressure: empty: asyncio.Event pressure: int - def __init__(self, num_buffers: int) -> None: + def __init__( + self, num_buffers: int, telemetry: LeaseDurationTelemetry | None = None + ) -> None: """ Initialize backpressure management for the specified number of buffers. @@ -94,6 +98,7 @@ def __init__(self, num_buffers: int) -> None: self.empty = asyncio.Event() self.empty.set() self.pressure = 0 + self._telemetry = telemetry @property def is_empty(self) -> bool: @@ -138,6 +143,8 @@ def lease(self, uuid: UUID, buf_idx: int) -> None: self.pressure += 1 self.buffers[buf_idx].add(uuid) self.empty.clear() + if self._telemetry is not None: + self._telemetry.on_lease(uuid, buf_idx, time.perf_counter()) def _free(self, uuid: UUID, buf_idx: int) -> None: """ @@ -152,6 +159,8 @@ def _free(self, uuid: UUID, buf_idx: int) -> None: self.buffers[buf_idx].remove(uuid) if self.buffers[buf_idx].is_empty: self.pressure -= 1 + if self._telemetry is not None: + self._telemetry.on_free(uuid, buf_idx, time.perf_counter()) except KeyError: pass @@ -170,6 +179,8 @@ def free(self, uuid: UUID, buf_idx: int | None = None) -> None: if buf_idx is None: for idx in range(len(self.buffers)): self._free(uuid, idx) + if self._telemetry is not None: + self._telemetry.on_free(uuid, None, time.perf_counter()) else: self._free(uuid, buf_idx) diff --git a/src/ezmsg/core/channelmanager.py b/src/ezmsg/core/channelmanager.py index 42231d0..1fe69e0 100644 --- a/src/ezmsg/core/channelmanager.py +++ b/src/ezmsg/core/channelmanager.py @@ -37,6 +37,7 @@ async def register( client_id: UUID, queue: NotificationQueue, graph_address: AddressType | None = None, + handle: str | None = None, ) -> Channel: """ Acquire the channel associated with a particular publisher, creating it if necessary @@ -49,10 +50,14 @@ async def register( :type queue: asyncio.Queue[tuple[UUID, int]] :param graph_address: The address to the GraphServer that the requested publisher is managed by :type graph_address: AddressType | None + :param handle: Optional label to associate with the subscribing client for profiling output + :type handle: str | None :return: A Channel for retreiving messages from the requested Publisher :rtype: Channel """ - return await self._register(pub_id, client_id, queue, graph_address, None) + return await self._register( + pub_id, client_id, queue, graph_address, None, handle=handle + ) async def register_local_pub( self, @@ -85,6 +90,7 @@ async def _register( queue: NotificationQueue | None = None, graph_address: AddressType | None = None, local_backpressure: Backpressure | None = None, + handle: str | None = None, ) -> Channel: graph_address = _ensure_address(graph_address) try: @@ -94,7 +100,12 @@ async def _register( channels = self._registry.get(graph_address, dict()) channels[pub_id] = channel self._registry[graph_address] = channels - channel.register_client(client_id, queue, local_backpressure) + channel.register_client( + client_id, + queue, + local_backpressure, + handle=handle, + ) return channel async def unregister( diff --git a/src/ezmsg/core/command.py b/src/ezmsg/core/command.py index 09ce2dc..ac283ce 100644 --- a/src/ezmsg/core/command.py +++ b/src/ezmsg/core/command.py @@ -42,7 +42,7 @@ def cmdline() -> None: parser.add_argument( "command", help="command for ezmsg", - choices=["serve", "start", "shutdown", "graphviz", "mermaid"], + choices=["serve", "start", "shutdown", "graphviz", "mermaid", "profile"], ) parser.add_argument("--address", help="Address for GraphServer", default=None) @@ -70,12 +70,21 @@ def cmdline() -> None: action="store_true", ) + parser.add_argument( + "-w", + "--window", + help="Profiling window (seconds) for the 'profile' command", + type=float, + default=None, + ) + class Args: command: str address: str | None target: str compact: int | None nobrowser: bool + window: float | None args = parser.parse_args(namespace=Args) @@ -93,6 +102,7 @@ class Args: args.target, args.compact, args.nobrowser, + args.window, ) ) @@ -103,6 +113,7 @@ async def run_command( target: str = "live", compact: int | None = None, nobrowser: bool = False, + window: float | None = None, ) -> None: """ Run an ezmsg command with the specified parameters. @@ -166,6 +177,10 @@ async def run_command( f"Could not issue shutdown command to GraphServer @ {graph_service.address}; server not running?" ) + elif cmd == "profile": + profile = await graph_service.profile(window) + print(json.dumps(profile, indent=2)) + elif cmd in ["graphviz", "mermaid"]: graph_out = await graph_service.get_formatted_graph( fmt=cmd, compact_level=compact diff --git a/src/ezmsg/core/graphserver.py b/src/ezmsg/core/graphserver.py index cb97b9e..581161a 100644 --- a/src/ezmsg/core/graphserver.py +++ b/src/ezmsg/core/graphserver.py @@ -1,9 +1,11 @@ import asyncio +import json import logging import pickle import os import socket import threading +from typing import Any from contextlib import suppress from uuid import UUID, uuid1 @@ -11,6 +13,7 @@ from . import __version__ from .dag import DAG, CyclicException from .graph_util import get_compactified_graph, graph_string, prune_graph_connections +from .profiling import PROFILE_WINDOW_S from .netprotocol import ( Address, Command, @@ -78,6 +81,7 @@ def __init__(self, **kwargs) -> None: self.clients = {} self._client_tasks = {} self.shms = {} + self._profile_requests: dict[UUID, asyncio.Future] = {} @property def address(self) -> Address: @@ -119,6 +123,10 @@ async def _shutdown_async(self) -> None: with suppress(asyncio.CancelledError): await asyncio.gather(*self._client_tasks.values(), return_exceptions=True) self._client_tasks.clear() + for fut in list(self._profile_requests.values()): + if not fut.done(): + fut.cancel() + self._profile_requests.clear() # Cancel SHM leases for info in self.shms.values(): @@ -338,6 +346,14 @@ async def api( writer.write(uint64_to_bytes(len(dag_bytes)) + dag_bytes) writer.write(Command.COMPLETE.value) + elif req == Command.PROFILE.value: + window_ms = await read_int(reader) + profile = await self._collect_profiles(window_ms / 1000.0) + writer.write(Command.PROFILE_DATA.value) + writer.write(encode_str(json.dumps(profile))) + writer.write(Command.COMPLETE.value) + await writer.drain() + else: logger.warning(f"GraphServer received unknown command {req}") @@ -380,6 +396,15 @@ async def _handle_client( if req == Command.COMPLETE.value: self.clients[client_id].set_sync() + elif req == Command.PROFILE_DATA.value: + payload = await read_str(reader) + future = self._profile_requests.pop(client_id, None) + if future is not None and not future.done(): + try: + future.set_result(json.loads(payload)) + except json.JSONDecodeError as e: + future.set_exception(e) + self.clients[client_id].set_sync() except (ConnectionResetError, BrokenPipeError) as e: logger.debug(f"Client {client_id} disconnected from GraphServer: {e}") @@ -388,6 +413,9 @@ async def _handle_client( # Ensure any waiter on this client unblocks # with suppress(Exception): self.clients[client_id].set_sync() + future = self._profile_requests.pop(client_id, None) + if future is not None and not future.done(): + future.cancel() self.clients.pop(client_id, None) await close_stream_writer(writer) @@ -428,6 +456,64 @@ def _downstream_subs(self, topic: str) -> list[SubscriberInfo]: downstream_topics = self.graph.downstream(topic) return [sub for sub in self._subscribers() if sub.topic in downstream_topics] + async def _profile_client( + self, info: ClientInfo, window_s: float + ) -> dict[str, Any] | None: + loop = asyncio.get_running_loop() + fut: asyncio.Future[dict[str, Any]] = loop.create_future() + self._profile_requests[info.id] = fut + + try: + async with info.sync_writer() as writer: + writer.write(Command.PROFILE.value) + writer.write(uint64_to_bytes(int(window_s * 1000))) + await writer.drain() + + return await asyncio.wait_for(fut, timeout=1.0) + + except asyncio.TimeoutError: + logger.debug(f"Profile request to {info.id} timed out") + return None + + finally: + self._profile_requests.pop(info.id, None) + + async def _collect_profiles(self, window_s: float) -> dict[str, Any]: + client_infos = [ + info + for info in self.clients.values() + if isinstance(info, (PublisherInfo, ChannelInfo)) + ] + results = await asyncio.gather( + *[self._profile_client(info, window_s) for info in client_infos], + return_exceptions=True, + ) + + publishers: list[dict[str, Any]] = [] + channels: list[dict[str, Any]] = [] + + for info, result in zip(client_infos, results): + if isinstance(result, Exception) or result is None: + continue + + if isinstance(info, PublisherInfo) and result.get("type") == "publisher": + publishers.append(result) + + elif isinstance(info, ChannelInfo) and result.get("type") == "channel": + try: + pub_info = self.clients.get(UUID(result["pub_id"]), None) + if isinstance(pub_info, PublisherInfo): + result.setdefault("topic", pub_info.topic) + except Exception: + ... + channels.append(result) + + return { + "window_s": window_s, + "publishers": publishers, + "channels": channels, + } + class GraphService: ADDR_ENV = GRAPHSERVER_ADDR_ENV @@ -544,6 +630,24 @@ async def dag(self, timeout: float | None = None) -> DAG: await close_stream_writer(writer) return dag + async def profile(self, window_s: float | None = None) -> dict[str, Any]: + reader, writer = await self.open_connection() + if window_s is None: + window_s = PROFILE_WINDOW_S + writer.write(Command.PROFILE.value) + writer.write(uint64_to_bytes(int(window_s * 1000))) + await writer.drain() + + response = await reader.read(1) + if response != Command.PROFILE_DATA.value: + await close_stream_writer(writer) + raise ValueError("Unexpected response to profile request") + + payload = await read_str(reader) + await reader.read(1) # COMPLETE + await close_stream_writer(writer) + return json.loads(payload) + async def get_formatted_graph( self, fmt: str, diff --git a/src/ezmsg/core/messagechannel.py b/src/ezmsg/core/messagechannel.py index e016f10..a95fb70 100644 --- a/src/ezmsg/core/messagechannel.py +++ b/src/ezmsg/core/messagechannel.py @@ -1,5 +1,6 @@ import os import asyncio +import json import typing import logging @@ -11,6 +12,7 @@ from .backpressure import Backpressure from .messagecache import MessageCache from .graphserver import GraphService +from .profiling import ChannelTelemetry, PROFILE_WINDOW_S from .netprotocol import ( Command, Address, @@ -52,6 +54,9 @@ class Channel: shm: SHMContext | None clients: dict[UUID, NotificationQueue | None] backpressure: Backpressure + _telemetry: ChannelTelemetry + _client_handles: dict[UUID, str] + _mode: str _graph_task: asyncio.Task[None] _pub_task: asyncio.Task[None] @@ -78,12 +83,18 @@ def __init__( self.pub_id = pub_id self.num_buffers = num_buffers self.shm = shm + self.pid = os.getpid() self.cache = MessageCache(self.num_buffers) - self.backpressure = Backpressure(self.num_buffers) + self._telemetry = ChannelTelemetry(PROFILE_WINDOW_S) + self.backpressure = Backpressure( + self.num_buffers, telemetry=self._telemetry.leases + ) self.clients = dict() self._graph_address = graph_address self._local_backpressure = None + self._client_handles = dict() + self._mode = "unknown" @classmethod async def create( @@ -190,6 +201,13 @@ async def _graph_connection( if not cmd: break + elif cmd == Command.PROFILE.value: + window_ms = await read_int(reader) + payload = json.dumps(self.profile(window_ms / 1000.0)) + writer.write(Command.PROFILE_DATA.value) + writer.write(encode_str(payload)) + await writer.drain() + else: logger.warning( f"Channel {self.id} rx unknown command from GraphServer: {cmd}" @@ -215,6 +233,7 @@ async def _publisher_connection(self, reader: asyncio.StreamReader) -> None: buf_idx = msg_id % self.num_buffers if msg == Command.TX_SHM.value: + self._mode = "shm" shm_name = await read_str(reader) if self.shm is not None and self.shm.name != shm_name: @@ -241,6 +260,7 @@ async def _publisher_connection(self, reader: asyncio.StreamReader) -> None: self.cache.put_from_mem(self.shm[buf_idx]) elif msg == Command.TX_TCP.value: + self._mode = "tcp" buf_size = await read_int(reader) obj_bytes = await reader.readexactly(buf_size) assert MessageMarshal.msg_id(obj_bytes) == msg_id @@ -290,6 +310,7 @@ def put_local(self, msg_id: int, msg: typing.Any) -> None: if self._notify_clients(msg_id): self.cache.put_local(msg, msg_id) self._local_backpressure.lease(self.id, buf_idx) + self._mode = "local" @contextmanager def get( @@ -333,6 +354,7 @@ def register_client( client_id: UUID, queue: NotificationQueue | None = None, local_backpressure: Backpressure | None = None, + handle: str | None = None, ) -> None: """ Register an interested client and provide a queue for incoming message notifications. @@ -343,8 +365,12 @@ def register_client( :type queue: asyncio.Queue[tuple[UUID, int]] | None :param local_backpressure: The backpressure object for the Publisher if it is in the same process :type local_backpressure: Backpressure + :param handle: Optional user-friendly handle for profiling + :type handle: str | None """ self.clients[client_id] = queue + if handle is not None: + self._client_handles[client_id] = handle if client_id == self.pub_id: self._local_backpressure = local_backpressure @@ -371,3 +397,20 @@ def unregister_client(self, client_id: UUID) -> None: self._local_backpressure = None del self.clients[client_id] + self._client_handles.pop(client_id, None) + + def profile(self, window_s: float | None = None) -> dict[str, typing.Any]: + snapshot = self._telemetry.snapshot( + window_s=window_s, handles=self._client_handles + ) + snapshot.update( + { + "type": "channel", + "channel_id": str(self.id), + "pub_id": str(self.pub_id), + "pid": self.pid, + "num_buffers": self.num_buffers, + "mode": self._mode, + } + ) + return snapshot diff --git a/src/ezmsg/core/netprotocol.py b/src/ezmsg/core/netprotocol.py index ee1d903..35899bc 100644 --- a/src/ezmsg/core/netprotocol.py +++ b/src/ezmsg/core/netprotocol.py @@ -296,6 +296,8 @@ def _generate_next_value_(name, start, count, last_values) -> bytes: CHANNEL = enum.auto() SHM_OK = enum.auto() SHM_ATTACH_FAILED = enum.auto() + PROFILE = enum.auto() + PROFILE_DATA = enum.auto() def create_socket( diff --git a/src/ezmsg/core/pubclient.py b/src/ezmsg/core/pubclient.py index e12b71e..701cb64 100644 --- a/src/ezmsg/core/pubclient.py +++ b/src/ezmsg/core/pubclient.py @@ -1,4 +1,5 @@ import os +import json import asyncio import logging import time @@ -13,6 +14,7 @@ from .channelmanager import CHANNELS from .messagechannel import Channel from .messagemarshal import MessageMarshal, UninitializedMemory +from .profiling import PublisherTelemetry, PROFILE_WINDOW_S from .netprotocol import ( Address, @@ -78,6 +80,7 @@ class Publisher: _last_backpressure_event: float _graph_address: AddressType | None + _telemetry: PublisherTelemetry @staticmethod def client_type() -> bytes: @@ -226,7 +229,10 @@ def __init__( if not start_paused: self._running.set() self._num_buffers = num_buffers - self._backpressure = Backpressure(num_buffers) + self._telemetry = PublisherTelemetry(PROFILE_WINDOW_S) + self._backpressure = Backpressure( + num_buffers, telemetry=self._telemetry.backpressure + ) self._force_tcp = force_tcp self._last_backpressure_event = -1 self._graph_address = graph_address @@ -264,6 +270,19 @@ async def wait_closed(self) -> None: with suppress(asyncio.CancelledError): await task + def profile(self, window_s: float | None = None) -> dict[str, Any]: + snapshot = self._telemetry.snapshot(window_s=window_s) + snapshot.update( + { + "type": "publisher", + "id": str(self.id), + "topic": self.topic, + "pid": self.pid, + "num_buffers": self._num_buffers, + } + ) + return snapshot + async def _graph_connection( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: @@ -294,6 +313,12 @@ async def _graph_connection( await self.sync() writer.write(Command.COMPLETE.value) + elif cmd == Command.PROFILE.value: + window_ms = await read_int(reader) + payload = json.dumps(self.profile(window_ms / 1000.0)) + writer.write(Command.PROFILE_DATA.value) + writer.write(encode_str(payload)) + else: logger.warning( f"Publisher {self.id} rx unknown command from GraphServer {cmd}" @@ -427,6 +452,7 @@ async def broadcast(self, obj: Any) -> None: buf_idx = self._msg_id % self._num_buffers msg_id_bytes = uint64_to_bytes(self._msg_id) + msg_size: int | None = None if not self._backpressure.available(buf_idx): delta = time.time() - self._last_backpressure_event @@ -446,6 +472,7 @@ async def broadcast(self, obj: Any) -> None: header, buffers, ): + msg_size = total_size total_size_bytes = uint64_to_bytes(total_size) if not self._force_tcp and any( @@ -507,4 +534,5 @@ async def broadcast(self, obj: Any) -> None: f"Publisher {self.id}: Channel {channel.id} connection fail" ) + self._telemetry.record_message(msg_size) self._msg_id += 1 diff --git a/src/ezmsg/core/subclient.py b/src/ezmsg/core/subclient.py index fee285c..67d640d 100644 --- a/src/ezmsg/core/subclient.py +++ b/src/ezmsg/core/subclient.py @@ -37,6 +37,7 @@ class Subscriber: topic: str _graph_address: AddressType | None + handle: str _graph_task: asyncio.Task[None] _cur_pubs: set[UUID] _incoming: NotificationQueue @@ -53,7 +54,11 @@ class Subscriber: @classmethod async def create( - cls, topic: str, graph_address: AddressType | None, **kwargs + cls, + topic: str, + graph_address: AddressType | None, + handle: str | None = None, + **kwargs, ) -> "Subscriber": """ Create a new Subscriber instance and register it with the graph server. @@ -74,7 +79,14 @@ async def create( sub_id_str = await read_str(reader) sub_id = UUID(sub_id_str) - sub = cls(sub_id, topic, graph_address, _guard=cls._SENTINEL, **kwargs) + sub = cls( + sub_id, + topic, + graph_address, + handle=handle, + _guard=cls._SENTINEL, + **kwargs, + ) sub._graph_task = asyncio.create_task( sub._graph_connection(reader, writer), @@ -95,6 +107,7 @@ def __init__( id: UUID, topic: str, graph_address: AddressType | None, + handle: str | None = None, _guard = None, **kwargs ) -> None: @@ -118,6 +131,7 @@ def __init__( ) self.id = id self.topic = topic + self.handle = handle if handle is not None else topic self._graph_address = graph_address self._cur_pubs = set() @@ -197,7 +211,11 @@ async def _graph_connection( for pub_id in set(pub_ids - self._cur_pubs): channel = await CHANNELS.register( - pub_id, self.id, self._incoming, self._graph_address + pub_id, + self.id, + self._incoming, + self._graph_address, + handle=self.handle, ) self._channels[pub_id] = channel From a8223ab5377cc94c1df93198e1af805a652ef720 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 5 Dec 2025 15:51:21 -0500 Subject: [PATCH 3/4] missing file --- src/ezmsg/core/profiling.py | 220 ++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 src/ezmsg/core/profiling.py diff --git a/src/ezmsg/core/profiling.py b/src/ezmsg/core/profiling.py new file mode 100644 index 0000000..2cd53f7 --- /dev/null +++ b/src/ezmsg/core/profiling.py @@ -0,0 +1,220 @@ +import time +from collections import defaultdict, deque +from typing import Any +from uuid import UUID + +# Sliding window used for profiling summaries. Keep a small window to bound memory. +PROFILE_WINDOW_S = 10.0 + + +class SlidingWindow: + """ + Maintain a rolling window of timestamped values and provide simple summaries. + """ + + def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None: + self.max_window_s = max_window_s + self._events: deque[tuple[float, float]] = deque() + self._total = 0.0 + + def add(self, value: float, now: float | None = None) -> None: + now = time.perf_counter() if now is None else now + self._events.append((now, value)) + self._total += value + self._prune(now) + + def _prune(self, now: float, window_s: float | None = None) -> None: + window = self.max_window_s if window_s is None else min( + window_s, self.max_window_s + ) + cutoff = now - window + while self._events and self._events[0][0] < cutoff: + _, val = self._events.popleft() + self._total -= val + + def summary( + self, window_s: float | None = None, now: float | None = None + ) -> dict[str, float]: + now = time.perf_counter() if now is None else now + self._prune(now) + + if window_s is None or window_s > self.max_window_s: + window_s = self.max_window_s + + cutoff = now - window_s + total = 0.0 + count = 0 + max_val = 0.0 + for ts, val in self._events: + if ts >= cutoff: + total += val + count += 1 + if val > max_val: + max_val = val + + avg = total / count if count else 0.0 + + return { + "count": count, + "total": total, + "avg": avg, + "max": max_val, + "window_s": window_s, + } + + +class LeaseDurationTelemetry: + """ + Track lease/free durations from Backpressure to estimate processing latency. + """ + + def __init__(self, max_window_s: float = PROFILE_WINDOW_S, per_client: bool = False): + self.max_window_s = max_window_s + self.per_client = per_client + self._starts: dict[tuple[UUID, int], float] = {} + + if per_client: + self._windows: dict[UUID, SlidingWindow] = defaultdict( + lambda: SlidingWindow(max_window_s) + ) + else: + self._window = SlidingWindow(max_window_s) + + def on_lease(self, client_id: UUID, buf_idx: int, now: float | None = None) -> None: + self._starts[(client_id, buf_idx)] = time.perf_counter() if now is None else now + + def on_free( + self, client_id: UUID, buf_idx: int | None, now: float | None = None + ) -> None: + now = time.perf_counter() if now is None else now + + if buf_idx is None: + # Client disconnected; drop any inflight timers. + for key in [k for k in self._starts if k[0] == client_id]: + self._starts.pop(key, None) + return + + start = self._starts.pop((client_id, buf_idx), None) + if start is None: + return + + duration = now - start + if self.per_client: + self._windows[client_id].add(duration, now) + else: + self._window.add(duration, now) + + def summary( + self, window_s: float | None = None, now: float | None = None + ) -> dict[Any, dict[str, float]]: + now = time.perf_counter() if now is None else now + if self.per_client: + return { + client_id: window.summary(window_s=window_s, now=now) + for client_id, window in self._windows.items() + } + else: + return {None: self._window.summary(window_s=window_s, now=now)} + + def in_flight(self, client_id: UUID | None = None) -> int: + if client_id is None: + return len(self._starts) + return sum(1 for cid, _ in self._starts if cid == client_id) + + +class PublisherTelemetry: + """ + Record publish throughput and backpressure latency for a Publisher. + """ + + def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None: + self.max_window_s = max_window_s + self.msg_window = SlidingWindow(max_window_s) + self.byte_window = SlidingWindow(max_window_s) + self.backpressure = LeaseDurationTelemetry(max_window_s) + self.total_messages = 0 + self.total_bytes = 0 + + def record_message(self, num_bytes: int | None, now: float | None = None) -> None: + now = time.perf_counter() if now is None else now + self.msg_window.add(1.0, now) + self.total_messages += 1 + + if num_bytes is not None: + self.byte_window.add(float(num_bytes), now) + self.total_bytes += num_bytes + + def snapshot(self, window_s: float | None = None) -> dict[str, Any]: + now = time.perf_counter() + msg_stats = self.msg_window.summary(window_s=window_s, now=now) + byte_stats = self.byte_window.summary(window_s=window_s, now=now) + bp_stats = list(self.backpressure.summary(window_s=window_s, now=now).values())[ + 0 + ] + + window = msg_stats["window_s"] + message_rate = ( + msg_stats["count"] / window if window > 0 else 0.0 + ) + byte_rate = ( + byte_stats["total"] / window if window > 0 else 0.0 + ) + + return { + "window_s": window, + "message_rate_hz": message_rate, + "byte_rate_per_s": byte_rate, + "messages": { + "total": self.total_messages, + "window": msg_stats["count"], + }, + "bytes": { + "total": self.total_bytes, + "window": int(byte_stats["total"]), + }, + "backpressure": { + "avg_ms": bp_stats["avg"] * 1000.0 if bp_stats["count"] else 0.0, + "max_ms": bp_stats["max"] * 1000.0 if bp_stats["count"] else 0.0, + "samples": bp_stats["count"], + "in_flight": self.backpressure.in_flight(), + }, + } + + +class ChannelTelemetry: + """ + Track per-subscriber processing time on a Channel. + """ + + def __init__(self, max_window_s: float = PROFILE_WINDOW_S) -> None: + self.max_window_s = max_window_s + self.leases = LeaseDurationTelemetry(max_window_s, per_client=True) + + def snapshot( + self, + window_s: float | None = None, + handles: dict[UUID, str] | None = None, + ) -> dict[str, Any]: + now = time.perf_counter() + lease_stats = self.leases.summary(window_s=window_s, now=now) + + subscribers: list[dict[str, Any]] = [] + for client_id, stats in lease_stats.items(): + window = stats["window_s"] + subscribers.append( + { + "subscriber_id": str(client_id), + "handle": handles.get(client_id, None) if handles else None, + "avg_ms": stats["avg"] * 1000.0 if stats["count"] else 0.0, + "max_ms": stats["max"] * 1000.0 if stats["count"] else 0.0, + "samples": stats["count"], + "message_rate_hz": stats["count"] / window if window > 0 else 0.0, + "in_flight": self.leases.in_flight(client_id), + } + ) + + return { + "window_s": window_s if window_s is not None else self.max_window_s, + "subscribers": subscribers, + "in_flight": self.leases.in_flight(), + } From 7a931a220b93e4975593f07efc64af2f9f0c9496 Mon Sep 17 00:00:00 2001 From: Griffin Milsap Date: Fri, 5 Dec 2025 16:09:08 -0500 Subject: [PATCH 4/4] fixed tests --- tests/test_channelmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_channelmanager.py b/tests/test_channelmanager.py index 0f78a6f..7bf101d 100644 --- a/tests/test_channelmanager.py +++ b/tests/test_channelmanager.py @@ -20,7 +20,7 @@ def __init__(self): self.waited = False self.local_bp: dict = {} - def register_client(self, client_id, queue, local_backpressure): + def register_client(self, client_id, queue, local_backpressure, handle=None): self.clients[client_id] = queue if local_backpressure is not None: self.local_bp[client_id] = local_backpressure