perf: reuse executors and share resources across flow runs#674
perf: reuse executors and share resources across flow runs#674vitalii-dynamiq wants to merge 1 commit intomainfrom
Conversation
db3c76d to
4372e3a
Compare
- Reuse ThreadPoolExecutor across Flow.run_sync() calls instead of creating/destroying a pool per request (PoolExecutor.reset()) - Share a module-level timeout executor for node timeout enforcement instead of creating one per node execution - Share a default ConnectionManager singleton across Flow instances to avoid redundant connection client initialization - Make async flow polling sleep conditional (only when no nodes dispatched) - Add Flow context manager support (close/__enter__/__exit__) for proper resource cleanup
4372e3a to
3718f82
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| try: | ||
| if timeout is not None: | ||
| executor = ContextAwareThreadPoolExecutor() | ||
| executor = ContextAwareThreadPoolExecutor(max_workers=1) |
There was a problem hiding this comment.
Single-worker timeout executor blocks retries after timeout
High Severity
The timeout executor changed from default max_workers (typically ≥5) to max_workers=1. The executor is created once before the retry loop and reused across all attempts. After a timeout, the timed-out task continues running on the sole worker thread (Python can't interrupt running threads, and future.cancel() only works on queued tasks). When a retry submits a new task, it's queued behind the still-running timed-out task and can never start until that task finishes — causing every subsequent retry to also time out immediately without executing.
Additional Locations (1)
Coverage Report •
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||


Summary
Performance optimization to reduce per-request overhead in flow execution by reusing executors, sharing resources across runs, and scaling thread pools to match system capacity. Targets scenarios with many sequential or parallel requests (e.g. 50+ concurrent) where thread pool churn, undersized pools, and redundant resource creation caused significant overhead.
Changes
1. PoolExecutor.reset() (
dynamiq/executors/pool.py,dynamiq/executors/base.py)reset()toBaseExecutorinterface (no-op default for backward compatibility)PoolExecutor.reset()clearsnode_by_futuredict without destroying the thread pool2. Flow executor caching (
dynamiq/flows/flow.py)_get_run_executor()lazily creates and caches the executormax_workerschanges and recreates the executor when neededmax_workers=None(default) correctly by skipping comparison against the resolved integer valuerun_sync()callsreset()infinallyblock (exception-safe) instead ofshutdown()close()method and__enter__/__exit__context manager support3. Per-node timeout executor (
dynamiq/nodes/node.py)ContextAwareThreadPoolExecutor(max_workers=1)shutdown(wait=False)in thefinallyblock ofexecute_with_retryfuture.result(timeout=T)measured wall-clock from submission rather than task start, causing queued tasks to timeout before executing4. Shared ConnectionManager (
dynamiq/connections/managers.py,dynamiq/flows/flow.py)get_default_connection_manager()singleton5. Async conditional sleep (
dynamiq/flows/flow.py)asyncio.sleep(0.003)only triggers when no nodes were dispatched (idle polling)6. Smart thread pool sizing (
dynamiq/executors/pool.py)MAX_WORKERS_THREAD_POOL_EXECUTOR = 8withmin(32, os.cpu_count() + 4)— Python stdlib's proven heuristic for I/O-bound workloadsDYNAMIQ_MAX_WORKERSenvironment variable for deployment-level tuningConfiguration
DYNAMIQ_MAX_WORKERSmin(32, cpu_count + 4)Per-flow override: set
max_node_workerson theFlowinstance or inRunnableConfig.Cursor Bugbot Issues - All Resolved
reset()only called in success pathfinallyblockmax_workersNone mismatchmax_workersisNone(default); only recreate when explicitly changedContextAwareThreadPoolExecutor(max_workers=1)withshutdown(wait=False)shutdown(wait=False)in finallyreset()not defined onBaseExecutorTest Results
test_background_thread_runs_until_streaming_timeout_after_execute_timeoutpasses with per-node timeout executorImpact on 50 Parallel Requests
Before: Each
run_sync()call created and destroyed a thread pool (8 threads), a timeout executor, and a connection manager. 50 parallel requests = 150+ redundant resource allocations + constant thread churn.After:
max_workers=1) and are properly cleaned upDYNAMIQ_MAX_WORKERS=64for high-concurrency scenarios