From aacc559ed9674ab8b2040a0ec3e27935b60a950f Mon Sep 17 00:00:00 2001 From: vitalii-dynamiq Date: Thu, 2 Apr 2026 15:52:41 +0000 Subject: [PATCH 1/3] perf: non-blocking tracing dispatch and flow polling optimization ## Changes ### dynamiq/clients/dynamiq.py - Add background daemon thread with SimpleQueue for trace dispatch. trace() now enqueues runs instead of issuing a blocking requests.post on the caller thread, removing up to 60s of latency per flush under high concurrency. - Reuse a single httpx.AsyncClient (lazy, double-checked lock) instead of creating one per _send_traces_async call, saving TCP/TLS handshake overhead. - Add close() + atexit hook for graceful shutdown. ### dynamiq/flows/flow.py - Guard time.sleep(0.003) so it only fires when no futures completed (empty results dict). futures.wait(FIRST_COMPLETED) already blocks when work is in progress, so the unconditional 3ms sleep added unnecessary per-iteration latency. --- dynamiq/clients/dynamiq.py | 58 ++++++++++++++++++++++++++++++++++++-- dynamiq/flows/flow.py | 6 ++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/dynamiq/clients/dynamiq.py b/dynamiq/clients/dynamiq.py index 3541f64e5..b0fcb1510 100644 --- a/dynamiq/clients/dynamiq.py +++ b/dynamiq/clients/dynamiq.py @@ -1,4 +1,7 @@ import asyncio +import atexit +import threading +from queue import SimpleQueue from typing import TYPE_CHECKING, Any from urllib.parse import urljoin @@ -19,6 +22,8 @@ DYNAMIQ_BASE_URL = "https://collector.getdynamiq.ai" +_FLUSH_TIMEOUT = 0.5 # seconds to wait for remaining traces on shutdown + class HttpBaseError(Exception): pass @@ -45,6 +50,37 @@ def __init__(self, base_url: str | None = None, access_key: str | None = None, t raise ValueError("No API key provided") self.timeout = timeout + # Background queue and thread for non-blocking sync trace dispatch + self._trace_queue: SimpleQueue[list["Run"] | None] = SimpleQueue() + self._bg_thread = threading.Thread(target=self._trace_worker, daemon=True) + self._bg_thread.start() + atexit.register(self.close) + + # Lazily initialised async HTTP client (reused across calls) + self._async_client: httpx.AsyncClient | None = None + self._async_client_lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Background worker + # ------------------------------------------------------------------ + + def _trace_worker(self) -> None: + """Drain *_trace_queue* in a background daemon thread.""" + while True: + runs = self._trace_queue.get() + if runs is None: # sentinel → shut down + break + self._send_traces_sync(runs) + + def close(self) -> None: + """Flush pending traces and stop the background thread.""" + self._trace_queue.put(None) # send sentinel + self._bg_thread.join(timeout=_FLUSH_TIMEOUT) + + # ------------------------------------------------------------------ + # Sync transport + # ------------------------------------------------------------------ + def _send_traces_sync(self, runs: list["Run"]) -> None: """Synchronous method to send traces using requests""" try: @@ -66,12 +102,24 @@ def _send_traces_sync(self, runs: list["Run"]) -> None: except Exception as e: logger.error(f"Failed to send traces (sync). Error: {e}") + # ------------------------------------------------------------------ + # Async transport + # ------------------------------------------------------------------ + + async def _get_async_client(self) -> httpx.AsyncClient: + """Return a shared *httpx.AsyncClient*, creating it lazily.""" + if self._async_client is None or self._async_client.is_closed: + async with self._async_client_lock: + if self._async_client is None or self._async_client.is_closed: + self._async_client = httpx.AsyncClient() # nosec B113 + return self._async_client + async def request(self, method: str, url_path: URLTypes, **kwargs: Any) -> Response: logger.debug(f'[{self.__class__.__name__}] REQ "{method} {url_path}". Kwargs: {kwargs}') url = f"{self.base_url}/{str(url_path).lstrip('/')}" if self.base_url else str(url_path).lstrip("/") try: - async with httpx.AsyncClient() as client: # nosec B113 - response = await client.request(method, url=url, timeout=self.timeout, **kwargs) + client = await self._get_async_client() + response = await client.request(method, url=url, timeout=self.timeout, **kwargs) except (httpx.TimeoutException, httpx.NetworkError) as e: raise HttpConnectionError(e) from e @@ -105,6 +153,10 @@ async def _send_traces_async(self, runs: list["Run"]) -> None: except Exception as e: logger.error(f"Failed to send traces (async). Error: {e}") + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + def trace(self, runs: list["Run"]) -> None: """Sync method required by BaseTracingClient interface""" if not runs: @@ -115,6 +167,6 @@ def trace(self, runs: list["Run"]) -> None: loop = asyncio.get_running_loop() loop.create_task(self._send_traces_async(runs)) else: - self._send_traces_sync(runs) + self._trace_queue.put(runs) except Exception as e: logger.error(f"Failed to send traces. Error: {e}") diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 26347f3ff..d340eefee 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -316,8 +316,10 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> self._results.update(results) self._ts.done(*results.keys()) - # Wait for ready nodes to be processed and reduce CPU usage - time.sleep(0.003) + if not results: + # Yield CPU only when no futures completed; futures.wait + # already blocks when work is in progress. + time.sleep(0.003) run_executor.shutdown() From 72ec4af45153cf289f85cb6b4d15033ba109d092 Mon Sep 17 00:00:00 2001 From: dynamiq-bot Date: Thu, 2 Apr 2026 16:20:13 +0000 Subject: [PATCH 2/3] fix: loop-aware async state and proper httpx.AsyncClient cleanup Resolves two Cursor Bugbot issues: 1. (Medium) asyncio.Lock created in __init__ binds to the first event loop and raises RuntimeError when reused across different loops (e.g. successive asyncio.run() calls in test suites). Fixed by lazily initialising both the lock and client per event loop, tracked via weakref to reliably detect loop changes even when CPython reuses memory addresses. 2. (Low) Shared httpx.AsyncClient was never closed - close() only shut down the sync background thread. Added best-effort cleanup in close() and a new async aclose() method. --- dynamiq/clients/dynamiq.py | 78 ++++++++++++++++++++++++++++++++++---- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/dynamiq/clients/dynamiq.py b/dynamiq/clients/dynamiq.py index b0fcb1510..c919a2ddf 100644 --- a/dynamiq/clients/dynamiq.py +++ b/dynamiq/clients/dynamiq.py @@ -1,6 +1,7 @@ import asyncio import atexit import threading +import weakref from queue import SimpleQueue from typing import TYPE_CHECKING, Any from urllib.parse import urljoin @@ -56,9 +57,14 @@ def __init__(self, base_url: str | None = None, access_key: str | None = None, t self._bg_thread.start() atexit.register(self.close) - # Lazily initialised async HTTP client (reused across calls) + # Lazily initialised async HTTP client (reused across calls). + # Both the lock and the client are bound to a specific event loop; + # we track the loop via weakref so we can detect when it changes + # (e.g. successive asyncio.run() calls in a test suite). self._async_client: httpx.AsyncClient | None = None - self._async_client_lock = asyncio.Lock() + self._async_client_lock: asyncio.Lock | None = None + self._async_client_loop: weakref.ref | None = None + self._async_init_mutex = threading.Lock() # guards loop-change detection # ------------------------------------------------------------------ # Background worker @@ -68,7 +74,7 @@ def _trace_worker(self) -> None: """Drain *_trace_queue* in a background daemon thread.""" while True: runs = self._trace_queue.get() - if runs is None: # sentinel → shut down + if runs is None: # sentinel -> shut down break self._send_traces_sync(runs) @@ -76,6 +82,26 @@ def close(self) -> None: """Flush pending traces and stop the background thread.""" self._trace_queue.put(None) # send sentinel self._bg_thread.join(timeout=_FLUSH_TIMEOUT) + # Best-effort async client cleanup + client = self._async_client + if client is not None and not client.is_closed: + try: + loop = asyncio.get_running_loop() + loop.create_task(client.aclose()) + except RuntimeError: + # No running loop - run synchronously in a fresh loop + try: + asyncio.run(client.aclose()) + except Exception: + pass + self._async_client = None + + async def aclose(self) -> None: + """Async counterpart of *close()* - shuts down the async HTTP client.""" + client = self._async_client + if client is not None and not client.is_closed: + await client.aclose() + self._async_client = None # ------------------------------------------------------------------ # Sync transport @@ -106,12 +132,48 @@ def _send_traces_sync(self, runs: list["Run"]) -> None: # Async transport # ------------------------------------------------------------------ + def _is_same_loop(self, loop: asyncio.AbstractEventLoop) -> bool: + """Check if *loop* is the same event loop we last bound to.""" + if self._async_client_loop is None: + return False + prev = self._async_client_loop() # dereference weakref + return prev is loop + async def _get_async_client(self) -> httpx.AsyncClient: - """Return a shared *httpx.AsyncClient*, creating it lazily.""" - if self._async_client is None or self._async_client.is_closed: - async with self._async_client_lock: - if self._async_client is None or self._async_client.is_closed: - self._async_client = httpx.AsyncClient() # nosec B113 + """Return a shared *httpx.AsyncClient*, creating it lazily. + + The lock and client are bound to the current event loop. When the + loop changes (e.g. a new ``asyncio.run()``), both are recreated so + we never use an ``asyncio.Lock`` from a dead loop. + """ + current_loop = asyncio.get_running_loop() + + # Fast path - same loop, client alive + if ( + self._async_client is not None + and not self._async_client.is_closed + and self._is_same_loop(current_loop) + ): + return self._async_client + + # Slow path - need to (re)create lock and/or client + with self._async_init_mutex: + # Loop changed -> discard old lock (bound to old loop) and client + if not self._is_same_loop(current_loop): + old_client = self._async_client + if old_client is not None and not old_client.is_closed: + try: + await old_client.aclose() + except Exception: + pass + self._async_client = None + self._async_client_lock = asyncio.Lock() + self._async_client_loop = weakref.ref(current_loop) + + # Now acquire the (loop-local) async lock for client creation + async with self._async_client_lock: # type: ignore[union-attr] + if self._async_client is None or self._async_client.is_closed: + self._async_client = httpx.AsyncClient() # nosec B113 return self._async_client async def request(self, method: str, url_path: URLTypes, **kwargs: Any) -> Response: From 47ea5640eb65dad0434f36e5a840ed31cc674e94 Mon Sep 17 00:00:00 2001 From: dynamiq-bot Date: Thu, 2 Apr 2026 16:26:59 +0000 Subject: [PATCH 3/3] fix: add nosec B110 to intentional best-effort cleanup blocks --- dynamiq/clients/dynamiq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dynamiq/clients/dynamiq.py b/dynamiq/clients/dynamiq.py index c919a2ddf..3bf97c270 100644 --- a/dynamiq/clients/dynamiq.py +++ b/dynamiq/clients/dynamiq.py @@ -92,7 +92,7 @@ def close(self) -> None: # No running loop - run synchronously in a fresh loop try: asyncio.run(client.aclose()) - except Exception: + except Exception: # nosec B110 pass self._async_client = None @@ -164,7 +164,7 @@ async def _get_async_client(self) -> httpx.AsyncClient: if old_client is not None and not old_client.is_closed: try: await old_client.aclose() - except Exception: + except Exception: # nosec B110 pass self._async_client = None self._async_client_lock = asyncio.Lock()