From 3718f825b597b5fb0d5bb339b5d29bbe9dac690e Mon Sep 17 00:00:00 2001 From: vitalii-dynamiq Date: Thu, 2 Apr 2026 16:08:02 +0000 Subject: [PATCH] perf: reuse executors and share resources across flow runs - Reuse ThreadPoolExecutor across Flow.run_sync() calls instead of creating/destroying a pool per request (PoolExecutor.reset()) - Share a module-level timeout executor for node timeout enforcement instead of creating one per node execution - Share a default ConnectionManager singleton across Flow instances to avoid redundant connection client initialization - Make async flow polling sleep conditional (only when no nodes dispatched) - Add Flow context manager support (close/__enter__/__exit__) for proper resource cleanup --- dynamiq/connections/managers.py | 17 +++++++++++ dynamiq/executors/base.py | 8 +++++ dynamiq/executors/pool.py | 25 ++++++++++++++- dynamiq/flows/flow.py | 54 ++++++++++++++++++++++++++++----- dynamiq/nodes/node.py | 14 ++++----- 5 files changed, 101 insertions(+), 17 deletions(-) diff --git a/dynamiq/connections/managers.py b/dynamiq/connections/managers.py index c6ce4d554..94a34bede 100644 --- a/dynamiq/connections/managers.py +++ b/dynamiq/connections/managers.py @@ -205,6 +205,23 @@ def loads(self, value: str): return self.serializer.loads(value) +# Shared default ConnectionManager instance. +# ConnectionManager is thread-safe (per-connection locks with double-checked locking), +# so a single instance can be safely shared across multiple Flow instances. +_default_connection_manager: ConnectionManager | None = None +_default_connection_manager_lock = threading.Lock() + + +def get_default_connection_manager() -> ConnectionManager: + """Return (and lazily create) the shared default ConnectionManager singleton.""" + global _default_connection_manager + if _default_connection_manager is None: + with _default_connection_manager_lock: + if _default_connection_manager is None: + _default_connection_manager = ConnectionManager() + return _default_connection_manager + + @contextmanager def get_connection_manager(): """ diff --git a/dynamiq/executors/base.py b/dynamiq/executors/base.py index 1e87c2c69..184b7a2c2 100644 --- a/dynamiq/executors/base.py +++ b/dynamiq/executors/base.py @@ -34,6 +34,14 @@ def shutdown(self, wait: bool = True): """ raise NotImplementedError + def reset(self): + """Reset per-run state so the executor can be reused for the next run. + + Subclasses that maintain per-run bookkeeping should override this. + The default implementation is a no-op. + """ + pass + @abstractmethod def execute( self, diff --git a/dynamiq/executors/pool.py b/dynamiq/executors/pool.py index b25655b86..1a332c9ab 100644 --- a/dynamiq/executors/pool.py +++ b/dynamiq/executors/pool.py @@ -10,7 +10,23 @@ from dynamiq.runnables.base import RunnableResultError from dynamiq.utils.logger import logger -MAX_WORKERS_THREAD_POOL_EXECUTOR = 8 + +def _default_thread_max_workers() -> int: + """Return the default max_workers for thread pool executors. + + Resolution order: + 1. ``DYNAMIQ_MAX_WORKERS`` environment variable (explicit deployment override). + 2. ``min(32, os.cpu_count() + 4)`` — the same heuristic used by Python's + stdlib ``ThreadPoolExecutor`` since 3.8, well-suited for I/O-bound + workloads such as LLM API calls. + """ + env_val = os.environ.get("DYNAMIQ_MAX_WORKERS") + if env_val is not None: + return int(env_val) + return min(32, (os.cpu_count() or 1) + 4) + + +MAX_WORKERS_THREAD_POOL_EXECUTOR = _default_thread_max_workers() MAX_WORKERS_PROCESS_POOL_EXECUTOR = os.cpu_count() @@ -45,6 +61,13 @@ def shutdown(self, wait: bool = True): """ self.executor.shutdown(wait=wait) + def reset(self): + """ + Resets per-run state so the executor can be reused across multiple flow runs + without creating a new thread pool. + """ + self.node_by_future = {} + def execute( self, ready_nodes: list[NodeReadyToRun], diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 26347f3ff..574210776 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -9,7 +9,7 @@ from pydantic import Field, computed_field, field_validator -from dynamiq.connections.managers import ConnectionManager +from dynamiq.connections.managers import ConnectionManager, get_default_connection_manager from dynamiq.executors.base import BaseExecutor from dynamiq.executors.pool import ThreadExecutor from dynamiq.flows.base import BaseFlow @@ -44,7 +44,7 @@ class Flow(BaseFlow): nodes: list[Node] = [] executor: type[BaseExecutor] = ThreadExecutor max_node_workers: int | None = None - connection_manager: ConnectionManager = Field(default_factory=ConnectionManager) + connection_manager: ConnectionManager = Field(default_factory=get_default_connection_manager) def __init__(self, **kwargs): """ @@ -56,6 +56,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self._node_by_id = {node.id: node for node in self.nodes} self._ts = None + self._run_executor = None self._init_components() self.reset_run_state() @@ -222,6 +223,42 @@ def reset_run_state(self): } self._ts = self.init_node_topological_sorter(nodes=self.nodes) + def _get_run_executor(self, max_workers: int | None = None) -> BaseExecutor: + """Return the cached run executor, creating it lazily on first use. + + The underlying thread pool is reused across multiple ``run_sync`` calls + to avoid the overhead of creating and destroying threads on every request. + + If *max_workers* differs from the cached executor's value the old + executor is shut down and a fresh one is created. + """ + if self._run_executor is not None and max_workers is not None and self._run_executor.max_workers != max_workers: + self._run_executor.shutdown(wait=True) + self._run_executor = None + if self._run_executor is None: + self._run_executor = self.executor(max_workers=max_workers) + return self._run_executor + + def close(self): + """Shut down the cached executor and release resources. + + Call this when the Flow instance will no longer be used (e.g. at + application shutdown) or use the Flow as a context manager:: + + with Flow(nodes=[...]) as flow: + flow.run_sync(data) + """ + if self._run_executor is not None: + self._run_executor.shutdown(wait=True) + self._run_executor = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + def _cleanup_dry_run(self, config: RunnableConfig = None): """ Clean up resources created during dry run. @@ -304,7 +341,7 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> max_workers = ( config.max_node_workers if config else self.max_node_workers ) - run_executor = self.executor(max_workers=max_workers) + run_executor = self._get_run_executor(max_workers=max_workers) while self._ts.is_active(): ready_nodes = self._get_nodes_ready_to_run(input_data=input_data) @@ -319,8 +356,6 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> # Wait for ready nodes to be processed and reduce CPU usage time.sleep(0.003) - run_executor.shutdown() - output = self._get_output() failed_nodes = self._get_failed_nodes_with_raise_behavior() @@ -354,6 +389,8 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> error=RunnableResultError.from_exception(e, failed_nodes=failed_nodes), ) finally: + if self._run_executor is not None: + self._run_executor.reset() self._cleanup_dry_run(config) async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> RunnableResult: @@ -402,9 +439,10 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar self._results.update(results) self._ts.done(*results.keys()) - - # Wait for ready nodes to be processed and reduce CPU usage by yielding control to the event loop - await asyncio.sleep(0.003) + else: + # Only sleep when no nodes were dispatched to reduce + # CPU usage while waiting for dependencies to complete. + await asyncio.sleep(0.003) output = self._get_output() failed_nodes = self._get_failed_nodes_with_raise_behavior() diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index a0b3d0c1a..bf72f64a9 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -990,11 +990,10 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run error = None n_attempt = self.error_handling.max_retries + 1 executor = None - timed_out = False try: if timeout is not None: - executor = ContextAwareThreadPoolExecutor() + executor = ContextAwareThreadPoolExecutor(max_workers=1) for attempt in range(n_attempt): merged_kwargs = merge(kwargs, {"execution_run_id": uuid4()}) @@ -1032,7 +1031,6 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run return output except TimeoutError as e: error = e - timed_out = True self.run_on_node_execute_error(config.callbacks, error, **merged_kwargs) logger.warning(f"Node {self.name} - {self.id}: timeout.") except Exception as e: @@ -1052,9 +1050,7 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run raise error finally: if executor is not None: - # Use cancel_futures=True and wait=False when timeout occurred to prevent - # blocking on threads that may be stuck waiting (e.g., on input_queue.get()) - executor.shutdown(wait=not timed_out, cancel_futures=timed_out) + executor.shutdown(wait=False) def execute_with_timeout( self, @@ -1087,8 +1083,10 @@ def execute_with_timeout( return future.result(timeout=timeout) except TimeoutError: # Cancel the future to prevent further execution if possible. - # Note: cancel() only works if the task hasn't started yet. - # For running tasks, we rely on executor.shutdown(cancel_futures=True). + # Note: cancel() only prevents tasks that have not started yet. + # Already-running tasks will continue until they complete; this is + # a Python threading limitation. The per-node executor is shut + # down (wait=False) in the finally block of execute_with_retry. future.cancel() raise