perf: non-blocking tracing dispatch and flow polling optimization#673
perf: non-blocking tracing dispatch and flow polling optimization#673vitalii-dynamiq wants to merge 3 commits intomainfrom
Conversation
## Changes ### dynamiq/clients/dynamiq.py - Add background daemon thread with SimpleQueue for trace dispatch. trace() now enqueues runs instead of issuing a blocking requests.post on the caller thread, removing up to 60s of latency per flush under high concurrency. - Reuse a single httpx.AsyncClient (lazy, double-checked lock) instead of creating one per _send_traces_async call, saving TCP/TLS handshake overhead. - Add close() + atexit hook for graceful shutdown. ### dynamiq/flows/flow.py - Guard time.sleep(0.003) so it only fires when no futures completed (empty results dict). futures.wait(FIRST_COMPLETED) already blocks when work is in progress, so the unconditional 3ms sleep added unnecessary per-iteration latency.
Coverage Report •
|
||||||||||||||||||||||||||||||||||||||||
Resolves two Cursor Bugbot issues: 1. (Medium) asyncio.Lock created in __init__ binds to the first event loop and raises RuntimeError when reused across different loops (e.g. successive asyncio.run() calls in test suites). Fixed by lazily initialising both the lock and client per event loop, tracked via weakref to reliably detect loop changes even when CPython reuses memory addresses. 2. (Low) Shared httpx.AsyncClient was never closed - close() only shut down the sync background thread. Added best-effort cleanup in close() and a new async aclose() method.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| pass | ||
| self._async_client = None | ||
| self._async_client_lock = asyncio.Lock() | ||
| self._async_client_loop = weakref.ref(current_loop) |
There was a problem hiding this comment.
Await inside threading.Lock causes event loop deadlock
High Severity
_get_async_client calls await old_client.aclose() while holding self._async_init_mutex (a threading.Lock). When the await suspends the coroutine, any other coroutine scheduled on the same event loop that reaches with self._async_init_mutex: will call the blocking threading.Lock.acquire(), freezing the event loop thread entirely. Since the event loop is frozen, the original coroutine's aclose() can never complete — resulting in a deadlock. The await and associated cleanup need to happen outside the with self._async_init_mutex: block.
Triggered by project rule: Bugbot Rules for Dynamiq


Summary
Surgical performance optimizations targeting two high-impact bottlenecks identified under high parallel load (20/50/100 concurrent requests).
Changes
1. Non-blocking trace dispatch (
dynamiq/clients/dynamiq.py)Problem:
DynamiqTracingClient.trace()called_send_traces_sync()on the caller thread, issuing a blockingrequests.post()with a 60s timeout. Under high concurrency this blocked execution threads and added significant latency to every node/flow completion callback.Fix:
SimpleQueuefor trace dispatchtrace()now enqueues runs instead of blocking on HTTP POSTclose()method +atexithook for graceful shutdownloop.create_task)2. Reuse
httpx.AsyncClient(dynamiq/clients/dynamiq.py)Problem:
request()created a newhttpx.AsyncClientcontext manager per call, paying TCP/TLS handshake overhead every time.Fix:
httpx.AsyncClientwith double-checked locking3. Conditional flow polling sleep (
dynamiq/flows/flow.py)Problem:
time.sleep(0.003)fired unconditionally on every iteration of the sync flow polling loop, even afterfutures.wait(FIRST_COMPLETED)had already blocked waiting for real work. This added unnecessary 3ms per-iteration latency.Fix:
time.sleep(0.003)so it only fires when no futures completed (empty results dict)asyncio.sleep(0.003)is left as-is since it properly yields to the event loopImpact
Testing
test_example_yaml_files_loaddue to missingOPENAI_API_KEY- unrelated)Note
Medium Risk
Introduces background-thread/queue-based trace dispatch and shared
httpx.AsyncClientlifecycle management, which can affect shutdown behavior, resource cleanup, and trace delivery under concurrency. The flow-loop sleep gating is low risk but changes timing in synchronous execution.Overview
Improves performance in tracing and synchronous flow execution by making sync
DynamiqTracingClient.trace()fire-and-forget via a background daemon thread/SimpleQueue, with newclose()/aclose()cleanup and anatexithook to flush on shutdown.Reworks async HTTP usage to reuse a lazily created shared
httpx.AsyncClient, including loop-change detection to avoid reusing loop-bound locks across successive event loops.Reduces unnecessary latency in
Flow.run_sync()by only callingtime.sleep(0.003)when no node results completed in the polling loop.Written by Cursor Bugbot for commit 47ea564. This will update automatically on new commits. Configure here.