Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 117 additions & 3 deletions dynamiq/clients/dynamiq.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import asyncio
import atexit
import threading
import weakref
from queue import SimpleQueue
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

Expand All @@ -19,6 +23,8 @@

DYNAMIQ_BASE_URL = "https://collector.getdynamiq.ai"

_FLUSH_TIMEOUT = 0.5 # seconds to wait for remaining traces on shutdown


class HttpBaseError(Exception):
pass
Expand All @@ -45,6 +51,62 @@ def __init__(self, base_url: str | None = None, access_key: str | None = None, t
raise ValueError("No API key provided")
self.timeout = timeout

# Background queue and thread for non-blocking sync trace dispatch
self._trace_queue: SimpleQueue[list["Run"] | None] = SimpleQueue()
self._bg_thread = threading.Thread(target=self._trace_worker, daemon=True)
self._bg_thread.start()
atexit.register(self.close)

# Lazily initialised async HTTP client (reused across calls).
# Both the lock and the client are bound to a specific event loop;
# we track the loop via weakref so we can detect when it changes
# (e.g. successive asyncio.run() calls in a test suite).
self._async_client: httpx.AsyncClient | None = None
self._async_client_lock: asyncio.Lock | None = None
self._async_client_loop: weakref.ref | None = None
self._async_init_mutex = threading.Lock() # guards loop-change detection

# ------------------------------------------------------------------
# Background worker
# ------------------------------------------------------------------

def _trace_worker(self) -> None:
"""Drain *_trace_queue* in a background daemon thread."""
while True:
runs = self._trace_queue.get()
if runs is None: # sentinel -> shut down
break
self._send_traces_sync(runs)

def close(self) -> None:
"""Flush pending traces and stop the background thread."""
self._trace_queue.put(None) # send sentinel
self._bg_thread.join(timeout=_FLUSH_TIMEOUT)
# Best-effort async client cleanup
client = self._async_client
if client is not None and not client.is_closed:
try:
loop = asyncio.get_running_loop()
loop.create_task(client.aclose())
except RuntimeError:
# No running loop - run synchronously in a fresh loop
try:
asyncio.run(client.aclose())
except Exception: # nosec B110
pass
self._async_client = None

async def aclose(self) -> None:
"""Async counterpart of *close()* - shuts down the async HTTP client."""
client = self._async_client
if client is not None and not client.is_closed:
await client.aclose()
self._async_client = None

# ------------------------------------------------------------------
# Sync transport
# ------------------------------------------------------------------

def _send_traces_sync(self, runs: list["Run"]) -> None:
"""Synchronous method to send traces using requests"""
try:
Expand All @@ -66,12 +128,60 @@ def _send_traces_sync(self, runs: list["Run"]) -> None:
except Exception as e:
logger.error(f"Failed to send traces (sync). Error: {e}")

# ------------------------------------------------------------------
# Async transport
# ------------------------------------------------------------------

def _is_same_loop(self, loop: asyncio.AbstractEventLoop) -> bool:
"""Check if *loop* is the same event loop we last bound to."""
if self._async_client_loop is None:
return False
prev = self._async_client_loop() # dereference weakref
return prev is loop

async def _get_async_client(self) -> httpx.AsyncClient:
"""Return a shared *httpx.AsyncClient*, creating it lazily.

The lock and client are bound to the current event loop. When the
loop changes (e.g. a new ``asyncio.run()``), both are recreated so
we never use an ``asyncio.Lock`` from a dead loop.
"""
current_loop = asyncio.get_running_loop()

# Fast path - same loop, client alive
if (
self._async_client is not None
and not self._async_client.is_closed
and self._is_same_loop(current_loop)
):
return self._async_client

# Slow path - need to (re)create lock and/or client
with self._async_init_mutex:
# Loop changed -> discard old lock (bound to old loop) and client
if not self._is_same_loop(current_loop):
old_client = self._async_client
if old_client is not None and not old_client.is_closed:
try:
await old_client.aclose()
except Exception: # nosec B110
pass
self._async_client = None
self._async_client_lock = asyncio.Lock()
self._async_client_loop = weakref.ref(current_loop)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Await inside threading.Lock causes event loop deadlock

High Severity

_get_async_client calls await old_client.aclose() while holding self._async_init_mutex (a threading.Lock). When the await suspends the coroutine, any other coroutine scheduled on the same event loop that reaches with self._async_init_mutex: will call the blocking threading.Lock.acquire(), freezing the event loop thread entirely. Since the event loop is frozen, the original coroutine's aclose() can never complete — resulting in a deadlock. The await and associated cleanup need to happen outside the with self._async_init_mutex: block.

Fix in Cursor Fix in Web

Triggered by project rule: Bugbot Rules for Dynamiq


# Now acquire the (loop-local) async lock for client creation
async with self._async_client_lock: # type: ignore[union-attr]
if self._async_client is None or self._async_client.is_closed:
self._async_client = httpx.AsyncClient() # nosec B113
return self._async_client

async def request(self, method: str, url_path: URLTypes, **kwargs: Any) -> Response:
logger.debug(f'[{self.__class__.__name__}] REQ "{method} {url_path}". Kwargs: {kwargs}')
url = f"{self.base_url}/{str(url_path).lstrip('/')}" if self.base_url else str(url_path).lstrip("/")
try:
async with httpx.AsyncClient() as client: # nosec B113
response = await client.request(method, url=url, timeout=self.timeout, **kwargs)
client = await self._get_async_client()
response = await client.request(method, url=url, timeout=self.timeout, **kwargs)
except (httpx.TimeoutException, httpx.NetworkError) as e:
raise HttpConnectionError(e) from e

Expand Down Expand Up @@ -105,6 +215,10 @@ async def _send_traces_async(self, runs: list["Run"]) -> None:
except Exception as e:
logger.error(f"Failed to send traces (async). Error: {e}")

# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------

def trace(self, runs: list["Run"]) -> None:
"""Sync method required by BaseTracingClient interface"""
if not runs:
Expand All @@ -115,6 +229,6 @@ def trace(self, runs: list["Run"]) -> None:
loop = asyncio.get_running_loop()
loop.create_task(self._send_traces_async(runs))
else:
self._send_traces_sync(runs)
self._trace_queue.put(runs)
except Exception as e:
logger.error(f"Failed to send traces. Error: {e}")
6 changes: 4 additions & 2 deletions dynamiq/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) ->
self._results.update(results)
self._ts.done(*results.keys())

# Wait for ready nodes to be processed and reduce CPU usage
time.sleep(0.003)
if not results:
# Yield CPU only when no futures completed; futures.wait
# already blocks when work is in progress.
time.sleep(0.003)

run_executor.shutdown()

Expand Down
Loading