Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions dynamiq/connections/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ def loads(self, value: str):
return self.serializer.loads(value)


# Shared default ConnectionManager instance.
# ConnectionManager is thread-safe (per-connection locks with double-checked locking),
# so a single instance can be safely shared across multiple Flow instances.
_default_connection_manager: ConnectionManager | None = None
_default_connection_manager_lock = threading.Lock()


def get_default_connection_manager() -> ConnectionManager:
"""Return (and lazily create) the shared default ConnectionManager singleton."""
global _default_connection_manager
if _default_connection_manager is None:
with _default_connection_manager_lock:
if _default_connection_manager is None:
_default_connection_manager = ConnectionManager()
return _default_connection_manager


@contextmanager
def get_connection_manager():
"""
Expand Down
8 changes: 8 additions & 0 deletions dynamiq/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ def shutdown(self, wait: bool = True):
"""
raise NotImplementedError

def reset(self):
"""Reset per-run state so the executor can be reused for the next run.

Subclasses that maintain per-run bookkeeping should override this.
The default implementation is a no-op.
"""
pass

@abstractmethod
def execute(
self,
Expand Down
25 changes: 24 additions & 1 deletion dynamiq/executors/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,23 @@
from dynamiq.runnables.base import RunnableResultError
from dynamiq.utils.logger import logger

MAX_WORKERS_THREAD_POOL_EXECUTOR = 8

def _default_thread_max_workers() -> int:
"""Return the default max_workers for thread pool executors.

Resolution order:
1. ``DYNAMIQ_MAX_WORKERS`` environment variable (explicit deployment override).
2. ``min(32, os.cpu_count() + 4)`` — the same heuristic used by Python's
stdlib ``ThreadPoolExecutor`` since 3.8, well-suited for I/O-bound
workloads such as LLM API calls.
"""
env_val = os.environ.get("DYNAMIQ_MAX_WORKERS")
if env_val is not None:
return int(env_val)
return min(32, (os.cpu_count() or 1) + 4)


MAX_WORKERS_THREAD_POOL_EXECUTOR = _default_thread_max_workers()
MAX_WORKERS_PROCESS_POOL_EXECUTOR = os.cpu_count()


Expand Down Expand Up @@ -45,6 +61,13 @@ def shutdown(self, wait: bool = True):
"""
self.executor.shutdown(wait=wait)

def reset(self):
"""
Resets per-run state so the executor can be reused across multiple flow runs
without creating a new thread pool.
"""
self.node_by_future = {}

def execute(
self,
ready_nodes: list[NodeReadyToRun],
Expand Down
54 changes: 46 additions & 8 deletions dynamiq/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from pydantic import Field, computed_field, field_validator

from dynamiq.connections.managers import ConnectionManager
from dynamiq.connections.managers import ConnectionManager, get_default_connection_manager
from dynamiq.executors.base import BaseExecutor
from dynamiq.executors.pool import ThreadExecutor
from dynamiq.flows.base import BaseFlow
Expand Down Expand Up @@ -44,7 +44,7 @@ class Flow(BaseFlow):
nodes: list[Node] = []
executor: type[BaseExecutor] = ThreadExecutor
max_node_workers: int | None = None
connection_manager: ConnectionManager = Field(default_factory=ConnectionManager)
connection_manager: ConnectionManager = Field(default_factory=get_default_connection_manager)

def __init__(self, **kwargs):
"""
Expand All @@ -56,6 +56,7 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self._node_by_id = {node.id: node for node in self.nodes}
self._ts = None
self._run_executor = None

self._init_components()
self.reset_run_state()
Expand Down Expand Up @@ -222,6 +223,42 @@ def reset_run_state(self):
}
self._ts = self.init_node_topological_sorter(nodes=self.nodes)

def _get_run_executor(self, max_workers: int | None = None) -> BaseExecutor:
"""Return the cached run executor, creating it lazily on first use.

The underlying thread pool is reused across multiple ``run_sync`` calls
to avoid the overhead of creating and destroying threads on every request.

If *max_workers* differs from the cached executor's value the old
executor is shut down and a fresh one is created.
"""
if self._run_executor is not None and max_workers is not None and self._run_executor.max_workers != max_workers:
self._run_executor.shutdown(wait=True)
self._run_executor = None
if self._run_executor is None:
self._run_executor = self.executor(max_workers=max_workers)
return self._run_executor

def close(self):
"""Shut down the cached executor and release resources.

Call this when the Flow instance will no longer be used (e.g. at
application shutdown) or use the Flow as a context manager::

with Flow(nodes=[...]) as flow:
flow.run_sync(data)
"""
if self._run_executor is not None:
self._run_executor.shutdown(wait=True)
self._run_executor = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

def _cleanup_dry_run(self, config: RunnableConfig = None):
"""
Clean up resources created during dry run.
Expand Down Expand Up @@ -304,7 +341,7 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) ->
max_workers = (
config.max_node_workers if config else self.max_node_workers
)
run_executor = self.executor(max_workers=max_workers)
run_executor = self._get_run_executor(max_workers=max_workers)

while self._ts.is_active():
ready_nodes = self._get_nodes_ready_to_run(input_data=input_data)
Expand All @@ -319,8 +356,6 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) ->
# Wait for ready nodes to be processed and reduce CPU usage
time.sleep(0.003)

run_executor.shutdown()

output = self._get_output()
failed_nodes = self._get_failed_nodes_with_raise_behavior()

Expand Down Expand Up @@ -354,6 +389,8 @@ def run_sync(self, input_data: Any, config: RunnableConfig = None, **kwargs) ->
error=RunnableResultError.from_exception(e, failed_nodes=failed_nodes),
)
finally:
if self._run_executor is not None:
self._run_executor.reset()
self._cleanup_dry_run(config)

async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwargs) -> RunnableResult:
Expand Down Expand Up @@ -402,9 +439,10 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar

self._results.update(results)
self._ts.done(*results.keys())

# Wait for ready nodes to be processed and reduce CPU usage by yielding control to the event loop
await asyncio.sleep(0.003)
else:
# Only sleep when no nodes were dispatched to reduce
# CPU usage while waiting for dependencies to complete.
await asyncio.sleep(0.003)

output = self._get_output()
failed_nodes = self._get_failed_nodes_with_raise_behavior()
Expand Down
14 changes: 6 additions & 8 deletions dynamiq/nodes/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,11 +990,10 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run
error = None
n_attempt = self.error_handling.max_retries + 1
executor = None
timed_out = False

try:
if timeout is not None:
executor = ContextAwareThreadPoolExecutor()
executor = ContextAwareThreadPoolExecutor(max_workers=1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single-worker timeout executor blocks retries after timeout

High Severity

The timeout executor changed from default max_workers (typically ≥5) to max_workers=1. The executor is created once before the retry loop and reused across all attempts. After a timeout, the timed-out task continues running on the sole worker thread (Python can't interrupt running threads, and future.cancel() only works on queued tasks). When a retry submits a new task, it's queued behind the still-running timed-out task and can never start until that task finishes — causing every subsequent retry to also time out immediately without executing.

Additional Locations (1)
Fix in Cursor Fix in Web


for attempt in range(n_attempt):
merged_kwargs = merge(kwargs, {"execution_run_id": uuid4()})
Expand Down Expand Up @@ -1032,7 +1031,6 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run
return output
except TimeoutError as e:
error = e
timed_out = True
self.run_on_node_execute_error(config.callbacks, error, **merged_kwargs)
logger.warning(f"Node {self.name} - {self.id}: timeout.")
except Exception as e:
Expand All @@ -1052,9 +1050,7 @@ def execute_with_retry(self, input_data: dict[str, Any] | BaseModel, config: Run
raise error
finally:
if executor is not None:
# Use cancel_futures=True and wait=False when timeout occurred to prevent
# blocking on threads that may be stuck waiting (e.g., on input_queue.get())
executor.shutdown(wait=not timed_out, cancel_futures=timed_out)
executor.shutdown(wait=False)

def execute_with_timeout(
self,
Expand Down Expand Up @@ -1087,8 +1083,10 @@ def execute_with_timeout(
return future.result(timeout=timeout)
except TimeoutError:
# Cancel the future to prevent further execution if possible.
# Note: cancel() only works if the task hasn't started yet.
# For running tasks, we rely on executor.shutdown(cancel_futures=True).
# Note: cancel() only prevents tasks that have not started yet.
# Already-running tasks will continue until they complete; this is
# a Python threading limitation. The per-node executor is shut
# down (wait=False) in the finally block of execute_with_retry.
future.cancel()
raise

Expand Down
Loading