diff --git a/dynamiq/clients/dynamiq.py b/dynamiq/clients/dynamiq.py index 3541f64e5..3bf97c270 100644 --- a/dynamiq/clients/dynamiq.py +++ b/dynamiq/clients/dynamiq.py @@ -1,4 +1,8 @@ import asyncio +import atexit +import threading +import weakref +from queue import SimpleQueue from typing import TYPE_CHECKING, Any from urllib.parse import urljoin @@ -19,6 +23,8 @@ DYNAMIQ_BASE_URL = "https://collector.getdynamiq.ai" +_FLUSH_TIMEOUT = 0.5 # seconds to wait for remaining traces on shutdown + class HttpBaseError(Exception): pass @@ -45,6 +51,62 @@ def __init__(self, base_url: str | None = None, access_key: str | None = None, t raise ValueError("No API key provided") self.timeout = timeout + # Background queue and thread for non-blocking sync trace dispatch + self._trace_queue: SimpleQueue[list["Run"] | None] = SimpleQueue() + self._bg_thread = threading.Thread(target=self._trace_worker, daemon=True) + self._bg_thread.start() + atexit.register(self.close) + + # Lazily initialised async HTTP client (reused across calls). + # Both the lock and the client are bound to a specific event loop; + # we track the loop via weakref so we can detect when it changes + # (e.g. successive asyncio.run() calls in a test suite). + self._async_client: httpx.AsyncClient | None = None + self._async_client_lock: asyncio.Lock | None = None + self._async_client_loop: weakref.ref | None = None + self._async_init_mutex = threading.Lock() # guards loop-change detection + + # ------------------------------------------------------------------ + # Background worker + # ------------------------------------------------------------------ + + def _trace_worker(self) -> None: + """Drain *_trace_queue* in a background daemon thread.""" + while True: + runs = self._trace_queue.get() + if runs is None: # sentinel -> shut down + break + self._send_traces_sync(runs) + + def close(self) -> None: + """Flush pending traces and stop the background thread.""" + self._trace_queue.put(None) # send sentinel + self._bg_thread.join(timeout=_FLUSH_TIMEOUT) + # Best-effort async client cleanup + client = self._async_client + if client is not None and not client.is_closed: + try: + loop = asyncio.get_running_loop() + loop.create_task(client.aclose()) + except RuntimeError: + # No running loop - run synchronously in a fresh loop + try: + asyncio.run(client.aclose()) + except Exception: # nosec B110 + pass + self._async_client = None + + async def aclose(self) -> None: + """Async counterpart of *close()* - shuts down the async HTTP client.""" + client = self._async_client + if client is not None and not client.is_closed: + await client.aclose() + self._async_client = None + + # ------------------------------------------------------------------ + # Sync transport + # ------------------------------------------------------------------ + def _send_traces_sync(self, runs: list["Run"]) -> None: """Synchronous method to send traces using requests""" try: @@ -66,12 +128,60 @@ def _send_traces_sync(self, runs: list["Run"]) -> None: except Exception as e: logger.error(f"Failed to send traces (sync). Error: {e}") + # ------------------------------------------------------------------ + # Async transport + # ------------------------------------------------------------------ + + def _is_same_loop(self, loop: asyncio.AbstractEventLoop) -> bool: + """Check if *loop* is the same event loop we last bound to.""" + if self._async_client_loop is None: + return False + prev = self._async_client_loop() # dereference weakref + return prev is loop + + async def _get_async_client(self) -> httpx.AsyncClient: + """Return a shared *httpx.AsyncClient*, creating it lazily. + + The lock and client are bound to the current event loop. When the + loop changes (e.g. a new ``asyncio.run()``), both are recreated so + we never use an ``asyncio.Lock`` from a dead loop. + """ + current_loop = asyncio.get_running_loop() + + # Fast path - same loop, client alive + if ( + self._async_client is not None + and not self._async_client.is_closed + and self._is_same_loop(current_loop) + ): + return self._async_client + + # Slow path - need to (re)create lock and/or client + with self._async_init_mutex: + # Loop changed -> discard old lock (bound to old loop) and client + if not self._is_same_loop(current_loop): + old_client = self._async_client + if old_client is not None and not old_client.is_closed: + try: + await old_client.aclose() + except Exception: # nosec B110 + pass + self._async_client = None + self._async_client_lock = asyncio.Lock() + self._async_client_loop = weakref.ref(current_loop) + + # Now acquire the (loop-local) async lock for client creation + async with self._async_client_lock: # type: ignore[union-attr] + if self._async_client is None or self._async_client.is_closed: + self._async_client = httpx.AsyncClient() # nosec B113 + return self._async_client + async def request(self, method: str, url_path: URLTypes, **kwargs: Any) -> Response: logger.debug(f'[{self.__class__.__name__}] REQ "{method} {url_path}". Kwargs: {kwargs}') url = f"{self.base_url}/{str(url_path).lstrip('/')}" if self.base_url else str(url_path).lstrip("/") try: - async with httpx.AsyncClient() as client: # nosec B113 - response = await client.request(method, url=url, timeout=self.timeout, **kwargs) + client = await self._get_async_client() + response = await client.request(method, url=url, timeout=self.timeout, **kwargs) except (httpx.TimeoutException, httpx.NetworkError) as e: raise HttpConnectionError(e) from e @@ -105,6 +215,10 @@ async def _send_traces_async(self, runs: list["Run"]) -> None: except Exception as e: logger.error(f"Failed to send traces (async). Error: {e}") + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + def trace(self, runs: list["Run"]) -> None: """Sync method required by BaseTracingClient interface""" if not runs: @@ -115,6 +229,6 @@ def trace(self, runs: list["Run"]) -> None: loop = asyncio.get_running_loop() loop.create_task(self._send_traces_async(runs)) else: - self._send_traces_sync(runs) + self._trace_queue.put(runs) except Exception as e: logger.error(f"Failed to send traces. Error: {e}") diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 26347f3ff..d340eefee 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -316,8 +316,10 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> self._results.update(results) self._ts.done(*results.keys()) - # Wait for ready nodes to be processed and reduce CPU usage - time.sleep(0.003) + if not results: + # Yield CPU only when no futures completed; futures.wait + # already blocks when work is in progress. + time.sleep(0.003) run_executor.shutdown()