From 9c0485a9cacde1ae2343740dcc5f7ed8f98544dc Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 16:33:23 +0300 Subject: [PATCH 01/26] feat: add execute_async() and has_native_async to Node base class --- dynamiq/nodes/node.py | 14 +++++++++ tests/unit/nodes/test_node_async.py | 46 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 tests/unit/nodes/test_node_async.py diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index a0b3d0c1a..d43501e26 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -267,6 +267,11 @@ class Node(BaseModel, Runnable, DryRunMixin, ABC): _output_references: NodeOutputReferences = PrivateAttr() + @property + def has_native_async(self) -> bool: + """Check if the subclass provides a native async execute implementation.""" + return type(self).execute_async is not Node.execute_async + model_config = ConfigDict(arbitrary_types_allowed=True) input_schema: type[BaseModel] | None = None callbacks: list[NodeCallbackHandler] = [] @@ -1396,6 +1401,15 @@ def execute(self, input_data: dict[str, Any] | BaseModel, config: RunnableConfig """ pass + async def execute_async( + self, input_data: dict[str, Any] | BaseModel, config: RunnableConfig = None, **kwargs + ) -> Any: + """ + Async execution of the node. Override in subclasses for native async support. + Returns NotImplemented to signal fallback to sync execute() in a thread. + """ + return NotImplemented + def depends_on(self, nodes: Union["Node", list["Node"]], condition: ChoiceCondition | None = None) -> "Node": """ Add dependencies for this node. Accepts either a single node or a list of nodes. diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py new file mode 100644 index 000000000..c071e04f5 --- /dev/null +++ b/tests/unit/nodes/test_node_async.py @@ -0,0 +1,46 @@ +import asyncio + +import pytest + +from dynamiq.nodes.node import Node, ErrorHandling +from dynamiq.nodes.types import NodeGroup +from dynamiq.runnables import RunnableConfig, RunnableResult, RunnableStatus + + +class SyncOnlyNode(Node): + """Test node with only sync execute.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "SyncOnly" + + def execute(self, input_data, config=None, **kwargs): + return {"result": "sync"} + + +class NativeAsyncNode(Node): + """Test node with both sync and async execute.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "NativeAsync" + + def execute(self, input_data, config=None, **kwargs): + return {"result": "sync"} + + async def execute_async(self, input_data, config=None, **kwargs): + await asyncio.sleep(0.01) + return {"result": "async"} + + +class TestNodeAsyncProtocol: + def test_sync_only_node_has_no_native_async(self): + node = SyncOnlyNode() + assert node.has_native_async is False + + def test_native_async_node_has_native_async(self): + node = NativeAsyncNode() + assert node.has_native_async is True + + def test_base_execute_async_returns_not_implemented(self): + node = SyncOnlyNode() + result = asyncio.get_event_loop().run_until_complete( + node.execute_async(input_data={}) + ) + assert result is NotImplemented From e46da7941a5727a6f0bc7d6f7e37b3c24de6e4c1 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 16:38:40 +0300 Subject: [PATCH 02/26] feat: add execute_async_with_retry() with asyncio timeout and non-blocking backoff --- dynamiq/nodes/node.py | 67 +++++++++++++++++++++++++++++ tests/unit/nodes/test_node_async.py | 58 +++++++++++++++++++++++++ 2 files changed, 125 insertions(+) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index d43501e26..ef45c87ae 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -1097,6 +1097,73 @@ def execute_with_timeout( future.cancel() raise + async def execute_async_with_retry( + self, input_data: dict[str, Any] | BaseModel, config: RunnableConfig = None, **kwargs + ): + """ + Execute the node asynchronously with retry logic. + Uses asyncio.wait_for for timeout instead of thread-based timeout. + Uses asyncio.sleep for non-blocking retry backoff. + """ + config = ensure_config(config) + timeout = self.error_handling.timeout_seconds + error = None + n_attempt = self.error_handling.max_retries + 1 + + for attempt in range(n_attempt): + merged_kwargs = merge(kwargs, {"execution_run_id": uuid4()}) + + try: + self.ensure_client() + except Exception as conn_error: + logger.error( + f"Node {self.name} - {self.id}: Failed to ensure client connection: {conn_error}" + ) + error = conn_error + if attempt < n_attempt - 1: + time_to_sleep = self.error_handling.retry_interval_seconds * ( + self.error_handling.backoff_rate ** attempt + ) + logger.info( + f"Node {self.name} - {self.id}: retrying connection in {time_to_sleep} seconds." + ) + await asyncio.sleep(time_to_sleep) + continue + else: + raise + + self.run_on_node_execute_start(config.callbacks, input_data, **merged_kwargs) + + try: + if timeout is not None: + output = await asyncio.wait_for( + self.execute_async(input_data=input_data, config=config, **merged_kwargs), + timeout=timeout, + ) + else: + output = await self.execute_async(input_data=input_data, config=config, **merged_kwargs) + + self.run_on_node_execute_end(config.callbacks, output, **merged_kwargs) + return output + except asyncio.TimeoutError as e: + error = e + self.run_on_node_execute_error(config.callbacks, error, **merged_kwargs) + logger.warning(f"Node {self.name} - {self.id}: timeout.") + except Exception as e: + error = e + self.run_on_node_execute_error(config.callbacks, error, **merged_kwargs) + logger.error(f"Node {self.name} - {self.id}: execution error: {e}") + + if attempt < n_attempt - 1: + time_to_sleep = self.error_handling.retry_interval_seconds * ( + self.error_handling.backoff_rate ** attempt + ) + logger.info(f"Node {self.name} - {self.id}: retrying in {time_to_sleep} seconds.") + await asyncio.sleep(time_to_sleep) + + logger.error(f"Node {self.name} - {self.id}: execution failed after {n_attempt} attempts.") + raise error + def get_context_for_input_schema(self) -> dict: """Provides context for input schema that is required for proper validation.""" return {} diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index c071e04f5..45edd48b0 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -44,3 +44,61 @@ def test_base_execute_async_returns_not_implemented(self): node.execute_async(input_data={}) ) assert result is NotImplemented + + +class FailThenSucceedAsyncNode(Node): + """Test node that fails N times then succeeds.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "FailThenSucceed" + attempt_count: int = 0 + fail_times: int = 2 + error_handling: ErrorHandling = ErrorHandling( + max_retries=3, retry_interval_seconds=0.01, backoff_rate=1 + ) + + def execute(self, input_data, config=None, **kwargs): + return {"result": "sync"} + + async def execute_async(self, input_data, config=None, **kwargs): + self.attempt_count += 1 + if self.attempt_count <= self.fail_times: + raise ValueError(f"Attempt {self.attempt_count} failed") + return {"result": "success", "attempts": self.attempt_count} + + +class TimeoutAsyncNode(Node): + """Test node that takes too long.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "TimeoutAsync" + error_handling: ErrorHandling = ErrorHandling(timeout_seconds=0.05) + + def execute(self, input_data, config=None, **kwargs): + return {"result": "sync"} + + async def execute_async(self, input_data, config=None, **kwargs): + await asyncio.sleep(10) # Way longer than timeout + return {"result": "should not reach"} + + +class TestExecuteAsyncWithRetry: + @pytest.mark.asyncio + async def test_retry_succeeds_after_failures(self): + node = FailThenSucceedAsyncNode() + config = RunnableConfig(callbacks=[]) + result = await node.execute_async_with_retry(input_data={}, config=config) + assert result == {"result": "success", "attempts": 3} + assert node.attempt_count == 3 + + @pytest.mark.asyncio + async def test_retry_exhausted_raises(self): + node = FailThenSucceedAsyncNode(fail_times=10) + config = RunnableConfig(callbacks=[]) + with pytest.raises(ValueError, match="Attempt .* failed"): + await node.execute_async_with_retry(input_data={}, config=config) + + @pytest.mark.asyncio + async def test_timeout_raises(self): + node = TimeoutAsyncNode() + config = RunnableConfig(callbacks=[]) + with pytest.raises(asyncio.TimeoutError): + await node.execute_async_with_retry(input_data={}, config=config) From 518fd07e7ab12e140193afb1a442f95b5ad32663 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 16:57:16 +0300 Subject: [PATCH 03/26] feat: rewrite Node.run_async() with native async routing and per-flow executor support --- dynamiq/nodes/node.py | 122 ++++++++++++++++++++++++++-- tests/unit/nodes/test_node_async.py | 38 +++++++++ 2 files changed, 154 insertions(+), 6 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index ef45c87ae..947b68084 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -1,5 +1,7 @@ import asyncio +import contextvars import copy +import functools import inspect import time from abc import ABC, abstractmethod @@ -943,29 +945,137 @@ def run_sync( ) return result + async def _run_async_native( + self, + input_data: dict, + config: RunnableConfig = None, + depends_result: dict = None, + **kwargs, + ) -> RunnableResult: + """ + Run the node asynchronously using native async execute. + Mirrors run_sync() lifecycle but calls execute_async_with_retry(). + """ + from dynamiq.nodes.agents.exceptions import RecoverableAgentException + + logger.info(f"Node {self.name} - {self.id}: async execution started.") + transformed_input = input_data + time_start = datetime.now() + + config = ensure_config(config) + + run_id = uuid4() + merged_kwargs = merge(kwargs, {"run_id": run_id, "parent_run_id": kwargs.get("parent_run_id", None)}) + if depends_result is None: + depends_result = {} + + try: + try: + self.validate_depends(depends_result) + input_data = self.get_approved_data_or_origin(input_data, config=config, **merged_kwargs) + except NodeException as e: + transformed_input = input_data | { + k: result.to_tracing_depend_dict() for k, result in depends_result.items() + } + skip_data = {"failed_dependency": e.failed_depend.to_dict(for_tracing=True)} + self.run_on_node_skip( + callbacks=config.callbacks, + skip_data=skip_data, + input_data=transformed_input, + human_feedback=getattr(e, "human_feedback", None), + **merged_kwargs, + ) + logger.info(f"Node {self.name} - {self.id}: execution skipped.") + return RunnableResult( + status=RunnableStatus.SKIP, + input=transformed_input, + output=None, + error=RunnableResultError.from_exception(e, recoverable=e.recoverable), + ) + + transformed_input = self.validate_input_schema( + self.transform_input(input_data=input_data, depends_result=depends_result, config=config, **kwargs), + **kwargs, + ) + self.run_on_node_start(config.callbacks, dict(transformed_input), **merged_kwargs) + cache = cache_wf_entity( + entity_id=self.id, + cache_enabled=self.caching.enabled, + cache_config=config.cache, + ) + + # When caching is enabled, fall back to sync execute (cache is sync) + if self.caching.enabled: + output, from_cache = cache(self.execute_with_retry)(transformed_input, config, **merged_kwargs) + else: + output = await self.execute_async_with_retry(transformed_input, config, **merged_kwargs) + from_cache = False + + merged_kwargs["is_output_from_cache"] = from_cache + transformed_output = self.transform_output(output, config=config, **kwargs) + + self.run_on_node_end(config.callbacks, transformed_output, **merged_kwargs) + + logger.info( + f"Node {self.name} - {self.id}: async execution succeeded in " + f"{format_duration(time_start, datetime.now())}." + ) + return RunnableResult( + status=RunnableStatus.SUCCESS, input=dict(transformed_input), output=transformed_output + ) + except Exception as e: + self.run_on_node_error(callbacks=config.callbacks, error=e, input_data=transformed_input, **merged_kwargs) + logger.error( + f"Node {self.name} - {self.id}: async execution failed in " + f"{format_duration(time_start, datetime.now())}. {e}" + ) + + recoverable = isinstance(e, RecoverableAgentException) + result = RunnableResult( + status=RunnableStatus.FAILURE, + input=transformed_input, + output=None, + error=RunnableResultError.from_exception(e, recoverable=recoverable), + ) + return result + async def run_async( self, input_data: dict, config: RunnableConfig = None, depends_result: dict = None, + executor: "ThreadPoolExecutor | None" = None, **kwargs, ) -> RunnableResult: """ Run the node asynchronously with given input data and configuration. - This runs the synchronous implementation in a thread pool to avoid blocking the event loop. + + If the node has a native async execute implementation (has_native_async), + runs directly on the event loop. Otherwise, offloads sync execution to + the provided executor (or the default asyncio executor if None). Args: input_data (Any): Input data for the node. - config (RunnableConfig, optional): Configuration for the run. Defaults to None. - depends_result (dict, optional): Results of dependent nodes. Defaults to None. + config (RunnableConfig, optional): Configuration for the run. + depends_result (dict, optional): Results of dependent nodes. + executor (ThreadPoolExecutor, optional): Thread pool executor for sync fallback. **kwargs: Additional keyword arguments. Returns: RunnableResult: Result of the node execution. """ - return await asyncio.to_thread( - self.run_sync, input_data=input_data, config=config, depends_result=depends_result, **kwargs - ) + if self.has_native_async: + return await self._run_async_native( + input_data=input_data, config=config, depends_result=depends_result, **kwargs + ) + else: + loop = asyncio.get_running_loop() + ctx = contextvars.copy_context() + fn = functools.partial( + self.run_sync, input_data=input_data, config=config, + depends_result=depends_result, **kwargs + ) + return await loop.run_in_executor(executor, ctx.run, fn) def ensure_client(self) -> None: """ diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index 45edd48b0..c1dcaa26e 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -1,4 +1,6 @@ import asyncio +import time +from concurrent.futures import ThreadPoolExecutor import pytest @@ -102,3 +104,39 @@ async def test_timeout_raises(self): config = RunnableConfig(callbacks=[]) with pytest.raises(asyncio.TimeoutError): await node.execute_async_with_retry(input_data={}, config=config) + + +class TestRunAsyncRouting: + @pytest.mark.asyncio + async def test_sync_node_uses_executor(self): + """Sync-only node should offload to the provided executor.""" + node = SyncOnlyNode() + executor = ThreadPoolExecutor(max_workers=2) + try: + result = await node.run_async( + input_data={"input": "test"}, config=RunnableConfig(callbacks=[]), executor=executor + ) + assert result.status == RunnableStatus.SUCCESS + assert result.output == {"result": "sync"} + finally: + executor.shutdown(wait=False) + + @pytest.mark.asyncio + async def test_async_node_runs_on_event_loop(self): + """Async-native node should NOT use executor — runs directly on event loop.""" + node = NativeAsyncNode() + result = await node.run_async( + input_data={"input": "test"}, config=RunnableConfig(callbacks=[]), executor=None + ) + assert result.status == RunnableStatus.SUCCESS + assert result.output == {"result": "async"} + + @pytest.mark.asyncio + async def test_sync_node_without_executor_falls_back_to_default(self): + """Sync-only node with executor=None should use default executor (backward compat).""" + node = SyncOnlyNode() + result = await node.run_async( + input_data={"input": "test"}, config=RunnableConfig(callbacks=[]) + ) + assert result.status == RunnableStatus.SUCCESS + assert result.output == {"result": "sync"} From ec209c75dd10cbf5f49aae2c30588aa52f7718d9 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 17:08:23 +0300 Subject: [PATCH 04/26] feat: per-flow executor isolation in Flow.run_async(), replace 3ms sleep with sleep(0) --- dynamiq/flows/flow.py | 20 +++--- tests/unit/nodes/test_flow_async.py | 100 ++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 tests/unit/nodes/test_flow_async.py diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 26347f3ff..47b95f3b2 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -11,6 +11,7 @@ from dynamiq.connections.managers import ConnectionManager from dynamiq.executors.base import BaseExecutor +from dynamiq.executors.context import ContextAwareThreadPoolExecutor from dynamiq.executors.pool import ThreadExecutor from dynamiq.flows.base import BaseFlow from dynamiq.nodes.node import Node, NodeReadyToRun @@ -360,13 +361,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar """ Run the flow asynchronously with the given input data and configuration. - Args: - input_data (Any): Input data for the flow. - config (RunnableConfig, optional): Configuration for the run. Defaults to None. - **kwargs: Additional keyword arguments. - - Returns: - RunnableResult: Result of the flow execution. + Creates a dedicated ContextAwareThreadPoolExecutor for this flow run, + isolating it from other concurrent flow executions. """ self.reset_run_state() run_id = uuid4() @@ -375,6 +371,9 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar "parent_run_id": kwargs.get("parent_run_id", run_id), } + max_workers = (config.max_node_workers if config else None) or self.max_node_workers + executor = ContextAwareThreadPoolExecutor(max_workers=max_workers) + logger.info(f"Flow {self.id}: execution started.") self.run_on_flow_start(input_data, config, **merged_kwargs) time_start = datetime.now() @@ -391,6 +390,7 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar input_data=node.input_data, depends_result=node.depends_result, config=config, + executor=executor, **(merged_kwargs | {"parent_run_id": run_id}), ) for node in nodes_to_run @@ -403,8 +403,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar self._results.update(results) self._ts.done(*results.keys()) - # Wait for ready nodes to be processed and reduce CPU usage by yielding control to the event loop - await asyncio.sleep(0.003) + # Yield to event loop without artificial delay + await asyncio.sleep(0) output = self._get_output() failed_nodes = self._get_failed_nodes_with_raise_behavior() @@ -435,6 +435,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar error=RunnableResultError.from_exception(e, failed_nodes=failed_nodes), ) finally: + # wait=False is safe: all node tasks have been awaited via asyncio.gather() + executor.shutdown(wait=False) try: await self._cleanup_dry_run_async(config) except Exception as e: diff --git a/tests/unit/nodes/test_flow_async.py b/tests/unit/nodes/test_flow_async.py new file mode 100644 index 000000000..f2e6e62a0 --- /dev/null +++ b/tests/unit/nodes/test_flow_async.py @@ -0,0 +1,100 @@ +import asyncio +import time +from unittest.mock import patch + +import pytest + +from dynamiq.flows.flow import Flow +from dynamiq.nodes.node import Node, ErrorHandling +from dynamiq.nodes.types import NodeGroup +from dynamiq.runnables import RunnableConfig, RunnableStatus + + +class SlowSyncNode(Node): + """Sync-only node that takes time.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "SlowSync" + latency: float = 0.1 + + def execute(self, input_data, config=None, **kwargs): + time.sleep(self.latency) + return {"result": "sync_done"} + + +class FastAsyncNode(Node): + """Async node that is fast.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "FastAsync" + + def execute(self, input_data, config=None, **kwargs): + time.sleep(0.1) + return {"result": "sync_done"} + + async def execute_async(self, input_data, config=None, **kwargs): + await asyncio.sleep(0.01) + return {"result": "async_done"} + + +class TestFlowAsyncExecutor: + @pytest.mark.asyncio + async def test_flow_run_async_creates_dedicated_executor(self): + """Each flow run should create its own ContextAwareThreadPoolExecutor.""" + node = SlowSyncNode(id="slow1") + flow = Flow(nodes=[node]) + + with patch("dynamiq.flows.flow.ContextAwareThreadPoolExecutor") as mock_executor_cls: + from dynamiq.executors.context import ContextAwareThreadPoolExecutor + real_executor = ContextAwareThreadPoolExecutor(max_workers=4) + mock_executor_cls.return_value = real_executor + + try: + result = await flow.run_async( + input_data={}, config=RunnableConfig(callbacks=[]) + ) + finally: + real_executor.shutdown(wait=False) + + mock_executor_cls.assert_called_once() + + @pytest.mark.asyncio + async def test_concurrent_flows_have_separate_executors(self): + """Two concurrent flow runs should not share executors.""" + node_a = SlowSyncNode(id="a", latency=0.05) + node_b = SlowSyncNode(id="b", latency=0.05) + flow_a = Flow(nodes=[node_a]) + flow_b = Flow(nodes=[node_b]) + + config = RunnableConfig(callbacks=[]) + + results = await asyncio.gather( + flow_a.run_async(input_data={}, config=config), + flow_b.run_async(input_data={}, config=config), + ) + + assert results[0].status == RunnableStatus.SUCCESS + assert results[1].status == RunnableStatus.SUCCESS + + @pytest.mark.asyncio + async def test_flow_uses_sleep_zero_not_three_ms(self): + """Flow should use asyncio.sleep(0) not asyncio.sleep(0.003).""" + node = FastAsyncNode(id="fast1") + flow = Flow(nodes=[node]) + + sleep_args = [] + original_sleep = asyncio.sleep + + async def tracking_sleep(delay, *args, **kwargs): + sleep_args.append(delay) + return await original_sleep(delay, *args, **kwargs) + + with patch("dynamiq.flows.flow.asyncio.sleep", side_effect=tracking_sleep): + result = await flow.run_async( + input_data={}, config=RunnableConfig(callbacks=[]) + ) + + assert result.status == RunnableStatus.SUCCESS + # Filter out sleeps from node execute_async (e.g. 0.01) — only check flow-level sleeps + flow_sleeps = [a for a in sleep_args if a != 0.01] + assert len(flow_sleeps) > 0, "Expected at least one flow-level sleep(0) call" + for arg in flow_sleeps: + assert arg == 0, f"Expected sleep(0), got sleep({arg})" From b0d60fe645fce495e9f0bf4aff790956213627d5 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 17:19:22 +0300 Subject: [PATCH 05/26] feat: add BaseLLM.execute_async() using litellm.acompletion() with async streaming --- dynamiq/nodes/llms/base.py | 117 +++++++++++++++++++++++- tests/unit/nodes/llms/test_llm_async.py | 90 ++++++++++++++++++ 2 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 tests/unit/nodes/llms/test_llm_async.py diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index 6227e6b30..25d5e246e 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -210,6 +210,7 @@ class BaseLLM(ConnectionNode): model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True) _completion: Callable = PrivateAttr() + _acompletion: Callable = PrivateAttr() _stream_chunk_builder: Callable = PrivateAttr() _is_fallback_run: bool = PrivateAttr(default=False) _json_schema_fields: ClassVar[list[str]] = ["model", "temperature", "max_tokens", "prompt"] @@ -261,10 +262,11 @@ def __init__(self, **kwargs): super().__init__(**kwargs) # Save a bit of loading time as litellm is slow - from litellm import completion, stream_chunk_builder + from litellm import acompletion, completion, stream_chunk_builder # Avoid the same imports multiple times and for future usage in execute self._completion = completion + self._acompletion = acompletion self._stream_chunk_builder = stream_chunk_builder def init_components(self, connection_manager=None): @@ -499,6 +501,36 @@ def _handle_streaming_completion_response( full_response = self._stream_chunk_builder(chunks=chunks, messages=messages) return self._handle_completion_response(response=full_response, config=config, **kwargs) + async def _handle_streaming_completion_response_async( + self, + response: Union["ModelResponse", "CustomStreamWrapper"], + messages: list[dict], + config: RunnableConfig = None, + **kwargs, + ): + """Handle async streaming completion response. + + Args: + response (ModelResponse | CustomStreamWrapper): The async streaming response from the LLM. + messages (list[dict]): The messages used for the LLM. + config (RunnableConfig, optional): The configuration for the execution. Defaults to None. + **kwargs: Additional keyword arguments. + + Returns: + dict: A dictionary containing the generated content and tool calls. + """ + chunks = [] + async for chunk in response: + chunks.append(chunk) + self.run_on_node_execute_stream( + config.callbacks, + chunk.model_dump(), + **kwargs, + ) + + full_response = self._stream_chunk_builder(chunks=chunks, messages=messages) + return self._handle_completion_response(response=full_response, config=config, **kwargs) + def _get_response_format_and_tools( self, prompt: Prompt | None = None, @@ -645,6 +677,89 @@ def execute( response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs ) + async def execute_async( + self, + input_data: BaseLLMInputSchema, + config: RunnableConfig = None, + prompt: Prompt | None = None, + tools: list[Tool | dict] | None = None, + response_format: dict[str, Any] | None = None, + parallel_tool_calls: bool | None = None, + **kwargs, + ): + """Execute the LLM node asynchronously using litellm.acompletion. + + This method mirrors execute() but uses await self._acompletion(...) + and async streaming iteration. + + Args: + input_data (BaseLLMInputSchema): The input data for the LLM. + config (RunnableConfig, optional): The configuration for the execution. Defaults to None. + prompt (Prompt, optional): The prompt to use for this execution. Defaults to None. + tools (list[Tool|dict]): List of tools that llm can call. + response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output. + parallel_tool_calls (bool | None): Whether to allow the LLM to return multiple tool calls + in a single response. None means provider decides. + **kwargs: Additional keyword arguments. + + Returns: + dict: A dictionary containing the generated content and tool calls. + """ + config = ensure_config(config) + self.reset_run_state() + prompt = prompt or self.prompt or Prompt(messages=[], tools=None, response_format=None) + messages = self.get_messages(prompt, input_data) + self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs) + + extra = copy.deepcopy(self.__pydantic_extra__) + params = self.connection.conn_params.copy() + if self.client and not isinstance(self.connection, HttpApiKey): + params.update({"client": self.client}) + if self.thinking_enabled: + params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}}) + if extra: + params.update(extra) + + response_format, tools = self._get_response_format_and_tools( + prompt=prompt, + tools=tools, + response_format=response_format, + ) + is_streaming_callback_available = any( + isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks + ) + common_params: dict[str, Any] = { + "model": self.model, + "messages": messages, + "stream": self.streaming.enabled and is_streaming_callback_available, + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "tools": tools, + "tool_choice": self.tool_choice, + "stop": self.stop if self.stop else None, + "top_p": self.top_p, + "seed": self.seed, + "presence_penalty": self.presence_penalty, + "frequency_penalty": self.frequency_penalty, + "response_format": response_format, + "drop_params": True, + **params, + } + if parallel_tool_calls is not None: + common_params["parallel_tool_calls"] = parallel_tool_calls + + common_params = self.update_completion_params(common_params) + + response = await self._acompletion(**common_params) + + if self.streaming.enabled and is_streaming_callback_available: + return await self._handle_streaming_completion_response_async( + response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs + ) + return self._handle_completion_response( + response=response, config=config, input_data=dict(input_data), **kwargs + ) + def _is_rate_limit_error(self, exception_type: type[Exception], error_str: str) -> bool: """Check if the error is a rate limit error. diff --git a/tests/unit/nodes/llms/test_llm_async.py b/tests/unit/nodes/llms/test_llm_async.py new file mode 100644 index 000000000..fffc8fe58 --- /dev/null +++ b/tests/unit/nodes/llms/test_llm_async.py @@ -0,0 +1,90 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from dynamiq.connections import OpenAI as OpenAIConnection +from dynamiq.nodes.llms.openai import OpenAI +from dynamiq.prompts import Prompt +from dynamiq.runnables import RunnableConfig + + +def make_mock_response(content="test response"): + """Create a mock litellm ModelResponse.""" + choice = MagicMock() + choice.message.content = content + choice.message.tool_calls = None + response = MagicMock() + response.choices = [choice] + return response + + +class TestBaseLLMAsync: + def test_base_llm_has_native_async(self): + """BaseLLM should report has_native_async=True after we add execute_async.""" + with patch("litellm.completion"), \ + patch("litellm.stream_chunk_builder"): + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + ) + assert node.has_native_async is True + + @pytest.mark.asyncio + async def test_execute_async_calls_acompletion(self): + """execute_async should call litellm.acompletion, not completion.""" + mock_response = make_mock_response("async response") + + with patch("litellm.completion"), \ + patch("litellm.stream_chunk_builder"): + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + node._acompletion = AsyncMock(return_value=mock_response) + + result = await node.execute_async( + input_data=MagicMock(messages=None, files=None), + config=RunnableConfig(callbacks=[]), + ) + + node._acompletion.assert_called_once() + assert result["content"] == "async response" + + @pytest.mark.asyncio + async def test_execute_async_streaming(self): + """execute_async should handle streaming via async for.""" + chunk1 = MagicMock() + chunk1.model_dump.return_value = {"choices": [{"delta": {"content": "hel"}}]} + chunk2 = MagicMock() + chunk2.model_dump.return_value = {"choices": [{"delta": {"content": "lo"}}]} + + async def async_chunk_iter(): + for chunk in [chunk1, chunk2]: + yield chunk + + full_response = make_mock_response("hello") + + from dynamiq.callbacks.streaming import StreamingIteratorCallbackHandler + from dynamiq.types.streaming import StreamingConfig + + with patch("litellm.completion"), \ + patch("litellm.stream_chunk_builder"): + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + streaming=StreamingConfig(enabled=True), + ) + node._acompletion = AsyncMock(return_value=async_chunk_iter()) + node._stream_chunk_builder = MagicMock(return_value=full_response) + + streaming_handler = StreamingIteratorCallbackHandler() + result = await node.execute_async( + input_data=MagicMock(messages=None, files=None), + config=RunnableConfig(callbacks=[streaming_handler]), + ) + + assert result["content"] == "hello" + node._stream_chunk_builder.assert_called_once() From 3260d5b45a231a5cf4443d1f8a5d12657516767d Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 20:37:06 +0300 Subject: [PATCH 06/26] feat: add BaseLLM.run_async() with async fallback support --- dynamiq/nodes/llms/base.py | 77 +++++++++++++++++++++++++ tests/unit/nodes/llms/test_llm_async.py | 59 ++++++++++++++++++- 2 files changed, 135 insertions(+), 1 deletion(-) diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index 25d5e246e..318835c55 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -28,6 +28,8 @@ from dynamiq.utils.logger import logger if TYPE_CHECKING: + from concurrent.futures import ThreadPoolExecutor + from litellm import CustomStreamWrapper, ModelResponse @@ -887,3 +889,78 @@ def run_sync( logger.error(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) failed.") return result + + async def run_async( + self, + input_data: dict, + config: RunnableConfig = None, + depends_result: dict = None, + executor: "ThreadPoolExecutor | None" = None, + **kwargs, + ) -> RunnableResult: + """Run the LLM asynchronously with fallback support. + + If the primary LLM fails and a fallback is configured, the primary failure + is traced first, then the fallback LLM is executed separately. + + The fallback receives the same transformed input that the primary received, + and the primary's output_transformer is applied to the fallback's output. + + Args: + input_data: Input data for the LLM. + config: Configuration for the run. + depends_result: Results of dependent nodes. + executor: Optional thread pool executor for sync fallback. + **kwargs: Additional keyword arguments. + + Returns: + RunnableResult: Result of the LLM execution. + """ + result = await super().run_async( + input_data=input_data, config=config, depends_result=depends_result, + executor=executor, **kwargs + ) + + if result.status != RunnableStatus.FAILURE: + return result + + if not self.fallback or not self.fallback.llm: + return result + + if not result.error: + return result + + if not self._should_trigger_fallback(result.error.type, result.error.message): + return result + + fallback_llm = self.fallback.llm + fallback_llm._is_fallback_run = True + logger.warning( + f"LLM {self.name} - {self.id}: Primary LLM ({self.model}) failed. " + f"Error: {result.error.type.__name__}: {result.error.message}. " + f"Attempting fallback to {fallback_llm.name} - {fallback_llm.id}" + ) + + fallback_kwargs = {k: v for k, v in kwargs.items() if k != "run_depends"} + fallback_kwargs["parent_run_id"] = kwargs.get("parent_run_id") + + fallback_input = result.input.model_dump() if hasattr(result.input, "model_dump") else result.input + fallback_result = await fallback_llm.run_async( + input_data=fallback_input, + config=config, + depends_result=None, + executor=executor, + **fallback_kwargs, + ) + + if fallback_result.status == RunnableStatus.SUCCESS: + logger.info(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) succeeded") + transformed_output = self.transform_output(fallback_result.output, config=config, **kwargs) + return RunnableResult( + status=RunnableStatus.SUCCESS, + input=result.input, + output=transformed_output, + ) + + logger.error(f"LLM {self.name} - {self.id}: Fallback LLM ({fallback_llm.model}) failed.") + return result diff --git a/tests/unit/nodes/llms/test_llm_async.py b/tests/unit/nodes/llms/test_llm_async.py index fffc8fe58..dfdb2622f 100644 --- a/tests/unit/nodes/llms/test_llm_async.py +++ b/tests/unit/nodes/llms/test_llm_async.py @@ -6,7 +6,8 @@ from dynamiq.connections import OpenAI as OpenAIConnection from dynamiq.nodes.llms.openai import OpenAI from dynamiq.prompts import Prompt -from dynamiq.runnables import RunnableConfig +from dynamiq.nodes.llms.base import FallbackConfig +from dynamiq.runnables import RunnableConfig, RunnableStatus def make_mock_response(content="test response"): @@ -88,3 +89,59 @@ async def async_chunk_iter(): assert result["content"] == "hello" node._stream_chunk_builder.assert_called_once() + + +class TestBaseLLMAsyncFallback: + @pytest.mark.asyncio + async def test_run_async_no_fallback_on_success(self): + """Successful run should not trigger fallback.""" + mock_response = make_mock_response("primary response") + + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + node._acompletion = AsyncMock(return_value=mock_response) + + result = await node.run_async( + input_data={"input": "test"}, + config=RunnableConfig(callbacks=[]), + ) + assert result.status == RunnableStatus.SUCCESS + + @pytest.mark.asyncio + async def test_run_async_triggers_fallback_on_rate_limit(self): + """Failed primary with rate limit should trigger fallback LLM via async path.""" + mock_fallback_response = make_mock_response("fallback response") + + primary = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + fallback_llm = OpenAI( + model="gpt-4o", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + primary.fallback = FallbackConfig(llm=fallback_llm, enabled=True) + + # Primary raises a rate limit error + from litellm.exceptions import RateLimitError + primary._acompletion = AsyncMock( + side_effect=RateLimitError( + message="Rate limit exceeded", + model="gpt-4o-mini", + llm_provider="openai", + ) + ) + # Fallback succeeds + fallback_llm._acompletion = AsyncMock(return_value=mock_fallback_response) + + result = await primary.run_async( + input_data={"input": "test"}, + config=RunnableConfig(callbacks=[]), + ) + assert result.status == RunnableStatus.SUCCESS + assert result.output["content"] == "fallback response" From 9451e0162e295c3b167d6fc90dca0c7640bc22ba Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 21:35:55 +0300 Subject: [PATCH 07/26] fix: skip passing sync client to litellm.acompletion in execute_async --- dynamiq/nodes/llms/base.py | 4 +- tests/benchmarks/__init__.py | 0 tests/benchmarks/conftest.py | 150 +++++++++++++++++++++ tests/benchmarks/test_async_performance.py | 105 +++++++++++++++ 4 files changed, 257 insertions(+), 2 deletions(-) create mode 100644 tests/benchmarks/__init__.py create mode 100644 tests/benchmarks/conftest.py create mode 100644 tests/benchmarks/test_async_performance.py diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index 318835c55..a6caf8dcf 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -715,8 +715,8 @@ async def execute_async( extra = copy.deepcopy(self.__pydantic_extra__) params = self.connection.conn_params.copy() - if self.client and not isinstance(self.connection, HttpApiKey): - params.update({"client": self.client}) + # Do not pass the sync client to acompletion — litellm will create + # its own async HTTP client using the connection params (api_key, api_base). if self.thinking_enabled: params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}}) if extra: diff --git a/tests/benchmarks/__init__.py b/tests/benchmarks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py new file mode 100644 index 000000000..14bfe5883 --- /dev/null +++ b/tests/benchmarks/conftest.py @@ -0,0 +1,150 @@ +import asyncio +import statistics +import time +from dataclasses import dataclass, field +from datetime import datetime + +import pytest + +from dynamiq.callbacks import TracingCallbackHandler +from dynamiq.nodes.node import Node +from dynamiq.nodes.types import NodeGroup +from dynamiq.runnables import RunnableConfig + + +class MockAsyncLLMNode(Node): + """Simulates I/O-bound LLM node with native async support.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "MockAsyncLLM" + latency: float = 0.15 + + def execute(self, input_data, config=None, **kwargs): + time.sleep(self.latency) + return {"content": "sync response"} + + async def execute_async(self, input_data, config=None, **kwargs): + await asyncio.sleep(self.latency) + return {"content": "async response"} + + +class MockSyncLLMNode(Node): + """Simulates I/O-bound LLM node WITHOUT async support (baseline).""" + group: NodeGroup = NodeGroup.UTILS + name: str = "MockSyncLLM" + latency: float = 0.15 + + def execute(self, input_data, config=None, **kwargs): + time.sleep(self.latency) + return {"content": "sync response"} + + +class MockSyncCPUNode(Node): + """Simulates CPU-bound node without async support.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "MockCPU" + + def execute(self, input_data, config=None, **kwargs): + total = sum(i * i for i in range(100_000)) + return {"result": total} + + +class MockPassthroughNode(Node): + """Lightweight passthrough node.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "Passthrough" + + def execute(self, input_data, config=None, **kwargs): + return {"result": "pass"} + + +@dataclass +class BenchmarkMetrics: + """Collected metrics from a benchmark run.""" + latencies: list[float] = field(default_factory=list) + node_gaps: list[float] = field(default_factory=list) + total_wall_time: float = 0.0 + num_workflows: int = 0 + + @property + def p50_latency(self) -> float: + if not self.latencies: + return 0.0 + return statistics.median(self.latencies) + + @property + def p95_latency(self) -> float: + if not self.latencies: + return 0.0 + sorted_l = sorted(self.latencies) + idx = int(len(sorted_l) * 0.95) + return sorted_l[min(idx, len(sorted_l) - 1)] + + @property + def p99_latency(self) -> float: + if not self.latencies: + return 0.0 + sorted_l = sorted(self.latencies) + idx = int(len(sorted_l) * 0.99) + return sorted_l[min(idx, len(sorted_l) - 1)] + + @property + def avg_gap(self) -> float: + if not self.node_gaps: + return 0.0 + return statistics.mean(self.node_gaps) + + @property + def throughput(self) -> float: + if self.total_wall_time == 0: + return 0.0 + return self.num_workflows / self.total_wall_time + + +def compute_node_gaps(tracing: TracingCallbackHandler) -> list[float]: + """Extract inter-node gaps from tracing runs.""" + gaps = [] + runs = list(tracing.runs.values()) + node_runs = [r for r in runs if r.type.value == "node"] + node_runs.sort(key=lambda r: r.start_time) + + for i in range(1, len(node_runs)): + prev_end = node_runs[i - 1].end_time + curr_start = node_runs[i].start_time + if prev_end and curr_start: + gap = (curr_start - prev_end).total_seconds() + if gap > 0: + gaps.append(gap) + + return gaps + + +def print_comparison(scenario: str, before: BenchmarkMetrics, after: BenchmarkMetrics): + """Print a before/after comparison table.""" + def improvement(before_val, after_val): + if before_val == 0: + return "N/A" + pct = ((before_val - after_val) / before_val) * 100 + return f"{pct:+.1f}%" + + print(f"\n{'=' * 70}") + print(f"Scenario: {scenario}") + print(f"{'=' * 70}") + print(f"{'Metric':<25} | {'Before':>12} | {'After':>12} | {'Change':>10}") + print(f"{'-' * 25}-+-{'-' * 12}-+-{'-' * 12}-+-{'-' * 10}") + print( + f"{'e2e p50 (ms)':<25} | {before.p50_latency*1000:>12.1f} | " + f"{after.p50_latency*1000:>12.1f} | {improvement(before.p50_latency, after.p50_latency):>10}" + ) + print( + f"{'e2e p95 (ms)':<25} | {before.p95_latency*1000:>12.1f} | " + f"{after.p95_latency*1000:>12.1f} | {improvement(before.p95_latency, after.p95_latency):>10}" + ) + print( + f"{'avg node gap (ms)':<25} | {before.avg_gap*1000:>12.1f} | " + f"{after.avg_gap*1000:>12.1f} | {improvement(before.avg_gap, after.avg_gap):>10}" + ) + print( + f"{'throughput (wf/sec)':<25} | {before.throughput:>12.1f} | " + f"{after.throughput:>12.1f} | {improvement(-before.throughput, -after.throughput):>10}" + ) + print(f"{'=' * 70}\n") diff --git a/tests/benchmarks/test_async_performance.py b/tests/benchmarks/test_async_performance.py new file mode 100644 index 000000000..a225da44e --- /dev/null +++ b/tests/benchmarks/test_async_performance.py @@ -0,0 +1,105 @@ +""" +Synthetic performance benchmarks for async execution optimization. + +Run: pytest tests/benchmarks/test_async_performance.py -v -s +""" +import asyncio +import time + +import pytest + +from dynamiq import Workflow +from dynamiq.callbacks import TracingCallbackHandler +from dynamiq.flows import Flow +from dynamiq.runnables import RunnableConfig, RunnableStatus + +from .conftest import ( + BenchmarkMetrics, + MockAsyncLLMNode, + MockPassthroughNode, + MockSyncCPUNode, + MockSyncLLMNode, + compute_node_gaps, + print_comparison, +) + + +def build_workflow_before(): + """Build test workflow using sync-only nodes (baseline).""" + llm_a = MockSyncLLMNode(id="llm_a", name="LLM-A", latency=0.15) + llm_b = MockSyncLLMNode(id="llm_b", name="LLM-B", latency=0.15) + llm_c = MockSyncLLMNode(id="llm_c", name="LLM-C", latency=0.10) + aggregator = MockPassthroughNode(id="agg", name="Aggregator") + cpu_node = MockSyncCPUNode(id="cpu", name="CPU-Work") + final = MockPassthroughNode(id="final", name="Final") + + # DAG: [A, B] -> Aggregator -> Final + # C -> CPU -> Final + aggregator.depends_on([llm_a, llm_b]) + cpu_node.depends_on(llm_c) + final.depends_on([aggregator, cpu_node]) + + return Workflow(flow=Flow(nodes=[llm_a, llm_b, llm_c, aggregator, cpu_node, final])) + + +def build_workflow_after(): + """Build test workflow using native async nodes (optimized).""" + llm_a = MockAsyncLLMNode(id="llm_a", name="LLM-A", latency=0.15) + llm_b = MockAsyncLLMNode(id="llm_b", name="LLM-B", latency=0.15) + llm_c = MockAsyncLLMNode(id="llm_c", name="LLM-C", latency=0.10) + aggregator = MockPassthroughNode(id="agg", name="Aggregator") + cpu_node = MockSyncCPUNode(id="cpu", name="CPU-Work") + final = MockPassthroughNode(id="final", name="Final") + + aggregator.depends_on([llm_a, llm_b]) + cpu_node.depends_on(llm_c) + final.depends_on([aggregator, cpu_node]) + + return Workflow(flow=Flow(nodes=[llm_a, llm_b, llm_c, aggregator, cpu_node, final])) + + +async def run_concurrent_workflows(build_fn, concurrency: int) -> BenchmarkMetrics: + """Run multiple workflows concurrently and collect metrics.""" + metrics = BenchmarkMetrics(num_workflows=concurrency) + + async def run_single(): + wf = build_fn() + tracing = TracingCallbackHandler() + config = RunnableConfig(callbacks=[tracing]) + + t0 = time.perf_counter() + result = await wf.run(input_data={}, config=config) + elapsed = time.perf_counter() - t0 + + metrics.latencies.append(elapsed) + metrics.node_gaps.extend(compute_node_gaps(tracing)) + return result + + wall_start = time.perf_counter() + results = await asyncio.gather(*[run_single() for _ in range(concurrency)]) + metrics.total_wall_time = time.perf_counter() - wall_start + + for r in results: + assert r.status == RunnableStatus.SUCCESS + + return metrics + + +class TestSyntheticBenchmarks: + """Before/after benchmarks with synthetic mock nodes.""" + + @pytest.mark.asyncio + @pytest.mark.parametrize("concurrency", [1, 10, 25, 50]) + async def test_concurrent_workflows(self, concurrency): + """Compare before vs after at different concurrency levels.""" + before = await run_concurrent_workflows(build_workflow_before, concurrency) + after = await run_concurrent_workflows(build_workflow_after, concurrency) + + print_comparison(f"{concurrency} concurrent workflows", before, after) + + # After should be no worse than before + if concurrency >= 10: + assert after.p50_latency <= before.p50_latency * 1.1, ( + f"p50 regression at concurrency={concurrency}: " + f"before={before.p50_latency:.3f}s, after={after.p50_latency:.3f}s" + ) From 078b5ff346acc7f346f9c1ebaa8bc75540e9f0c4 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Sun, 5 Apr 2026 22:00:33 +0300 Subject: [PATCH 08/26] fix: update test mocks for BaseLLM.run_async override --- tests/unit/nodes/test_node.py | 2 +- tests/unit/nodes/test_node_async.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tests/unit/nodes/test_node.py b/tests/unit/nodes/test_node.py index c5d869285..51083c800 100644 --- a/tests/unit/nodes/test_node.py +++ b/tests/unit/nodes/test_node.py @@ -91,7 +91,7 @@ def node_async_result(): @pytest.fixture def openai_node(mocker, node_sync_result, node_async_result): mocker.patch("dynamiq.nodes.llms.base.BaseLLM.run_sync", return_value=node_sync_result) - mocker.patch("dynamiq.nodes.node.Node.run_async", return_value=node_async_result) + mocker.patch("dynamiq.nodes.llms.base.BaseLLM.run_async", return_value=node_async_result) yield OpenAI(model="gpt-4", connection=OpenAIConnection(api_key="test_api_key")) diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index c1dcaa26e..cc4ab535f 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -40,11 +40,10 @@ def test_native_async_node_has_native_async(self): node = NativeAsyncNode() assert node.has_native_async is True - def test_base_execute_async_returns_not_implemented(self): + @pytest.mark.asyncio + async def test_base_execute_async_returns_not_implemented(self): node = SyncOnlyNode() - result = asyncio.get_event_loop().run_until_complete( - node.execute_async(input_data={}) - ) + result = await node.execute_async(input_data={}) assert result is NotImplemented From cd062320198e77da0df4cf666d428931ea00201a Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 12:37:41 +0300 Subject: [PATCH 09/26] chore: remove redundant tests --- tests/benchmarks/__init__.py | 0 tests/benchmarks/conftest.py | 150 --------------------- tests/benchmarks/test_async_performance.py | 105 --------------- 3 files changed, 255 deletions(-) delete mode 100644 tests/benchmarks/__init__.py delete mode 100644 tests/benchmarks/conftest.py delete mode 100644 tests/benchmarks/test_async_performance.py diff --git a/tests/benchmarks/__init__.py b/tests/benchmarks/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py deleted file mode 100644 index 14bfe5883..000000000 --- a/tests/benchmarks/conftest.py +++ /dev/null @@ -1,150 +0,0 @@ -import asyncio -import statistics -import time -from dataclasses import dataclass, field -from datetime import datetime - -import pytest - -from dynamiq.callbacks import TracingCallbackHandler -from dynamiq.nodes.node import Node -from dynamiq.nodes.types import NodeGroup -from dynamiq.runnables import RunnableConfig - - -class MockAsyncLLMNode(Node): - """Simulates I/O-bound LLM node with native async support.""" - group: NodeGroup = NodeGroup.UTILS - name: str = "MockAsyncLLM" - latency: float = 0.15 - - def execute(self, input_data, config=None, **kwargs): - time.sleep(self.latency) - return {"content": "sync response"} - - async def execute_async(self, input_data, config=None, **kwargs): - await asyncio.sleep(self.latency) - return {"content": "async response"} - - -class MockSyncLLMNode(Node): - """Simulates I/O-bound LLM node WITHOUT async support (baseline).""" - group: NodeGroup = NodeGroup.UTILS - name: str = "MockSyncLLM" - latency: float = 0.15 - - def execute(self, input_data, config=None, **kwargs): - time.sleep(self.latency) - return {"content": "sync response"} - - -class MockSyncCPUNode(Node): - """Simulates CPU-bound node without async support.""" - group: NodeGroup = NodeGroup.UTILS - name: str = "MockCPU" - - def execute(self, input_data, config=None, **kwargs): - total = sum(i * i for i in range(100_000)) - return {"result": total} - - -class MockPassthroughNode(Node): - """Lightweight passthrough node.""" - group: NodeGroup = NodeGroup.UTILS - name: str = "Passthrough" - - def execute(self, input_data, config=None, **kwargs): - return {"result": "pass"} - - -@dataclass -class BenchmarkMetrics: - """Collected metrics from a benchmark run.""" - latencies: list[float] = field(default_factory=list) - node_gaps: list[float] = field(default_factory=list) - total_wall_time: float = 0.0 - num_workflows: int = 0 - - @property - def p50_latency(self) -> float: - if not self.latencies: - return 0.0 - return statistics.median(self.latencies) - - @property - def p95_latency(self) -> float: - if not self.latencies: - return 0.0 - sorted_l = sorted(self.latencies) - idx = int(len(sorted_l) * 0.95) - return sorted_l[min(idx, len(sorted_l) - 1)] - - @property - def p99_latency(self) -> float: - if not self.latencies: - return 0.0 - sorted_l = sorted(self.latencies) - idx = int(len(sorted_l) * 0.99) - return sorted_l[min(idx, len(sorted_l) - 1)] - - @property - def avg_gap(self) -> float: - if not self.node_gaps: - return 0.0 - return statistics.mean(self.node_gaps) - - @property - def throughput(self) -> float: - if self.total_wall_time == 0: - return 0.0 - return self.num_workflows / self.total_wall_time - - -def compute_node_gaps(tracing: TracingCallbackHandler) -> list[float]: - """Extract inter-node gaps from tracing runs.""" - gaps = [] - runs = list(tracing.runs.values()) - node_runs = [r for r in runs if r.type.value == "node"] - node_runs.sort(key=lambda r: r.start_time) - - for i in range(1, len(node_runs)): - prev_end = node_runs[i - 1].end_time - curr_start = node_runs[i].start_time - if prev_end and curr_start: - gap = (curr_start - prev_end).total_seconds() - if gap > 0: - gaps.append(gap) - - return gaps - - -def print_comparison(scenario: str, before: BenchmarkMetrics, after: BenchmarkMetrics): - """Print a before/after comparison table.""" - def improvement(before_val, after_val): - if before_val == 0: - return "N/A" - pct = ((before_val - after_val) / before_val) * 100 - return f"{pct:+.1f}%" - - print(f"\n{'=' * 70}") - print(f"Scenario: {scenario}") - print(f"{'=' * 70}") - print(f"{'Metric':<25} | {'Before':>12} | {'After':>12} | {'Change':>10}") - print(f"{'-' * 25}-+-{'-' * 12}-+-{'-' * 12}-+-{'-' * 10}") - print( - f"{'e2e p50 (ms)':<25} | {before.p50_latency*1000:>12.1f} | " - f"{after.p50_latency*1000:>12.1f} | {improvement(before.p50_latency, after.p50_latency):>10}" - ) - print( - f"{'e2e p95 (ms)':<25} | {before.p95_latency*1000:>12.1f} | " - f"{after.p95_latency*1000:>12.1f} | {improvement(before.p95_latency, after.p95_latency):>10}" - ) - print( - f"{'avg node gap (ms)':<25} | {before.avg_gap*1000:>12.1f} | " - f"{after.avg_gap*1000:>12.1f} | {improvement(before.avg_gap, after.avg_gap):>10}" - ) - print( - f"{'throughput (wf/sec)':<25} | {before.throughput:>12.1f} | " - f"{after.throughput:>12.1f} | {improvement(-before.throughput, -after.throughput):>10}" - ) - print(f"{'=' * 70}\n") diff --git a/tests/benchmarks/test_async_performance.py b/tests/benchmarks/test_async_performance.py deleted file mode 100644 index a225da44e..000000000 --- a/tests/benchmarks/test_async_performance.py +++ /dev/null @@ -1,105 +0,0 @@ -""" -Synthetic performance benchmarks for async execution optimization. - -Run: pytest tests/benchmarks/test_async_performance.py -v -s -""" -import asyncio -import time - -import pytest - -from dynamiq import Workflow -from dynamiq.callbacks import TracingCallbackHandler -from dynamiq.flows import Flow -from dynamiq.runnables import RunnableConfig, RunnableStatus - -from .conftest import ( - BenchmarkMetrics, - MockAsyncLLMNode, - MockPassthroughNode, - MockSyncCPUNode, - MockSyncLLMNode, - compute_node_gaps, - print_comparison, -) - - -def build_workflow_before(): - """Build test workflow using sync-only nodes (baseline).""" - llm_a = MockSyncLLMNode(id="llm_a", name="LLM-A", latency=0.15) - llm_b = MockSyncLLMNode(id="llm_b", name="LLM-B", latency=0.15) - llm_c = MockSyncLLMNode(id="llm_c", name="LLM-C", latency=0.10) - aggregator = MockPassthroughNode(id="agg", name="Aggregator") - cpu_node = MockSyncCPUNode(id="cpu", name="CPU-Work") - final = MockPassthroughNode(id="final", name="Final") - - # DAG: [A, B] -> Aggregator -> Final - # C -> CPU -> Final - aggregator.depends_on([llm_a, llm_b]) - cpu_node.depends_on(llm_c) - final.depends_on([aggregator, cpu_node]) - - return Workflow(flow=Flow(nodes=[llm_a, llm_b, llm_c, aggregator, cpu_node, final])) - - -def build_workflow_after(): - """Build test workflow using native async nodes (optimized).""" - llm_a = MockAsyncLLMNode(id="llm_a", name="LLM-A", latency=0.15) - llm_b = MockAsyncLLMNode(id="llm_b", name="LLM-B", latency=0.15) - llm_c = MockAsyncLLMNode(id="llm_c", name="LLM-C", latency=0.10) - aggregator = MockPassthroughNode(id="agg", name="Aggregator") - cpu_node = MockSyncCPUNode(id="cpu", name="CPU-Work") - final = MockPassthroughNode(id="final", name="Final") - - aggregator.depends_on([llm_a, llm_b]) - cpu_node.depends_on(llm_c) - final.depends_on([aggregator, cpu_node]) - - return Workflow(flow=Flow(nodes=[llm_a, llm_b, llm_c, aggregator, cpu_node, final])) - - -async def run_concurrent_workflows(build_fn, concurrency: int) -> BenchmarkMetrics: - """Run multiple workflows concurrently and collect metrics.""" - metrics = BenchmarkMetrics(num_workflows=concurrency) - - async def run_single(): - wf = build_fn() - tracing = TracingCallbackHandler() - config = RunnableConfig(callbacks=[tracing]) - - t0 = time.perf_counter() - result = await wf.run(input_data={}, config=config) - elapsed = time.perf_counter() - t0 - - metrics.latencies.append(elapsed) - metrics.node_gaps.extend(compute_node_gaps(tracing)) - return result - - wall_start = time.perf_counter() - results = await asyncio.gather(*[run_single() for _ in range(concurrency)]) - metrics.total_wall_time = time.perf_counter() - wall_start - - for r in results: - assert r.status == RunnableStatus.SUCCESS - - return metrics - - -class TestSyntheticBenchmarks: - """Before/after benchmarks with synthetic mock nodes.""" - - @pytest.mark.asyncio - @pytest.mark.parametrize("concurrency", [1, 10, 25, 50]) - async def test_concurrent_workflows(self, concurrency): - """Compare before vs after at different concurrency levels.""" - before = await run_concurrent_workflows(build_workflow_before, concurrency) - after = await run_concurrent_workflows(build_workflow_after, concurrency) - - print_comparison(f"{concurrency} concurrent workflows", before, after) - - # After should be no worse than before - if concurrency >= 10: - assert after.p50_latency <= before.p50_latency * 1.1, ( - f"p50 regression at concurrency={concurrency}: " - f"before={before.p50_latency:.3f}s, after={after.p50_latency:.3f}s" - ) From 8cf0c2ee862b610d5cfce5d501dedbe2962920d7 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 12:43:41 +0300 Subject: [PATCH 10/26] fix: update linting --- dynamiq/nodes/node.py | 8 +++++--- tests/unit/nodes/llms/test_llm_async.py | 1 - tests/unit/nodes/test_flow_async.py | 6 ++---- tests/unit/nodes/test_node_async.py | 3 +-- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 947b68084..489610582 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -10,7 +10,7 @@ from functools import cached_property from queue import Empty from types import FunctionType, ModuleType -from typing import Any, Callable, ClassVar, Union +from typing import TYPE_CHECKING, Any, Callable, ClassVar, Union from uuid import uuid4 from jinja2 import Template @@ -21,6 +21,7 @@ from dynamiq.connections import BaseConnection from dynamiq.connections.managers import ConnectionManager, ConnectionManagerException from dynamiq.executors.context import ContextAwareThreadPoolExecutor +from dynamiq.nodes.agents.exceptions import RecoverableAgentException from dynamiq.nodes.dry_run import DryRunMixin from dynamiq.nodes.exceptions import ( NodeConditionFailedException, @@ -49,6 +50,9 @@ from dynamiq.utils.logger import logger from dynamiq.utils.utils import clear_annotation +if TYPE_CHECKING: + from concurrent.futures import ThreadPoolExecutor + def ensure_config(config: RunnableConfig = None) -> RunnableConfig: """ @@ -483,8 +487,6 @@ def validate_input_schema(self, input_data: dict[str, Any], **kwargs) -> dict[st Raises: NodeException: If input data does not match the input schema. """ - from dynamiq.nodes.agents.exceptions import RecoverableAgentException - if self.input_schema: try: return self.input_schema.model_validate( diff --git a/tests/unit/nodes/llms/test_llm_async.py b/tests/unit/nodes/llms/test_llm_async.py index dfdb2622f..c6f0c5401 100644 --- a/tests/unit/nodes/llms/test_llm_async.py +++ b/tests/unit/nodes/llms/test_llm_async.py @@ -1,4 +1,3 @@ -import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest diff --git a/tests/unit/nodes/test_flow_async.py b/tests/unit/nodes/test_flow_async.py index f2e6e62a0..b5762d79d 100644 --- a/tests/unit/nodes/test_flow_async.py +++ b/tests/unit/nodes/test_flow_async.py @@ -5,7 +5,7 @@ import pytest from dynamiq.flows.flow import Flow -from dynamiq.nodes.node import Node, ErrorHandling +from dynamiq.nodes.node import Node from dynamiq.nodes.types import NodeGroup from dynamiq.runnables import RunnableConfig, RunnableStatus @@ -48,9 +48,7 @@ async def test_flow_run_async_creates_dedicated_executor(self): mock_executor_cls.return_value = real_executor try: - result = await flow.run_async( - input_data={}, config=RunnableConfig(callbacks=[]) - ) + _ = await flow.run_async(input_data={}, config=RunnableConfig(callbacks=[])) finally: real_executor.shutdown(wait=False) diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index cc4ab535f..f49dcc771 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -1,12 +1,11 @@ import asyncio -import time from concurrent.futures import ThreadPoolExecutor import pytest from dynamiq.nodes.node import Node, ErrorHandling from dynamiq.nodes.types import NodeGroup -from dynamiq.runnables import RunnableConfig, RunnableResult, RunnableStatus +from dynamiq.runnables import RunnableConfig, RunnableStatus class SyncOnlyNode(Node): From ef160d60f2b84c01625ed7323f2085e0526ffda5 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 12:59:10 +0300 Subject: [PATCH 11/26] fix: remove blocking sync execution when caching is enabled --- dynamiq/nodes/node.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 489610582..7810c8e62 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -1006,9 +1006,11 @@ async def _run_async_native( cache_config=config.cache, ) - # When caching is enabled, fall back to sync execute (cache is sync) + # When caching is enabled, offload sync execute to a thread to avoid blocking the event loop if self.caching.enabled: - output, from_cache = cache(self.execute_with_retry)(transformed_input, config, **merged_kwargs) + output, from_cache = await asyncio.to_thread( + cache(self.execute_with_retry), transformed_input, config, **merged_kwargs + ) else: output = await self.execute_async_with_retry(transformed_input, config, **merged_kwargs) from_cache = False From 07c39e1c8ea2d7aa773cf6fd71541d4b0bb2814e Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 13:01:36 +0300 Subject: [PATCH 12/26] fix: add lazy import for RecoverableAgentException --- dynamiq/nodes/node.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 7810c8e62..792c4b2ec 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -21,7 +21,6 @@ from dynamiq.connections import BaseConnection from dynamiq.connections.managers import ConnectionManager, ConnectionManagerException from dynamiq.executors.context import ContextAwareThreadPoolExecutor -from dynamiq.nodes.agents.exceptions import RecoverableAgentException from dynamiq.nodes.dry_run import DryRunMixin from dynamiq.nodes.exceptions import ( NodeConditionFailedException, @@ -494,6 +493,8 @@ def validate_input_schema(self, input_data: dict[str, Any], **kwargs) -> dict[st ) except Exception as e: if kwargs.get("recoverable_error", False): + from dynamiq.nodes.agents.exceptions import RecoverableAgentException + raise RecoverableAgentException(f"Input data validation failed: {e}") raise e From ec028828658fd1ea65c7b08f97237c1c8c23452a Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 13:12:36 +0300 Subject: [PATCH 13/26] fix: update asyncio.sleep() to remove CPU busy-wait --- dynamiq/flows/flow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 47b95f3b2..14c666587 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -403,8 +403,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar self._results.update(results) self._ts.done(*results.keys()) - # Yield to event loop without artificial delay - await asyncio.sleep(0) + # Wait for ready nodes to be processed and reduce CPU usage by yielding control to the event loop + await asyncio.sleep(0.003) output = self._get_output() failed_nodes = self._get_failed_nodes_with_raise_behavior() From 3cbf65bdc00c552a36a95299d64c97bf3301c210 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 13:51:45 +0300 Subject: [PATCH 14/26] fix: update tests for async execution --- tests/conftest.py | 5 ++++ tests/integration/flows/test_flow.py | 45 ++++++++++++++-------------- tests/unit/nodes/test_flow_async.py | 25 ---------------- 3 files changed, 27 insertions(+), 48 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 38bfe46a7..391c948a7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -55,6 +55,11 @@ def response(stream: bool, *args, **kwargs): return model_r mock_llm = mocker.patch("dynamiq.nodes.llms.base.BaseLLM._completion", side_effect=response) + + async def async_response(*args, **kwargs): + return mock_llm(*args, **kwargs) + + mocker.patch("dynamiq.nodes.llms.base.BaseLLM._acompletion", side_effect=async_response) yield mock_llm diff --git a/tests/integration/flows/test_flow.py b/tests/integration/flows/test_flow.py index 26e15a5bb..af99605b4 100644 --- a/tests/integration/flows/test_flow.py +++ b/tests/integration/flows/test_flow.py @@ -382,40 +382,39 @@ async def test_workflow_with_depend_nodes_with_tracing_async( assert mock_llm_executor.call_count == 2 assert mock_llm_executor.call_args_list == [ mock.call( - tools=None, - tool_choice=None, model=openai_node.model, messages=expected_openai_messages, stream=False, temperature=openai_node.temperature, max_tokens=None, + tools=None, + tool_choice=None, stop=None, + top_p=None, seed=None, presence_penalty=None, frequency_penalty=None, - top_p=None, - api_key=openai_node.connection.api_key, - client=ANY, response_format=None, drop_params=True, api_base="https://api.openai.com/v1", + api_key=openai_node.connection.api_key, ), mock.call( - tools=None, - tool_choice=None, model=anthropic_node_with_dependency.model, messages=expected_anthropic_messages, stream=False, temperature=anthropic_node_with_dependency.temperature, max_tokens=None, + tools=None, + tool_choice=None, stop=None, + top_p=None, seed=None, presence_penalty=None, frequency_penalty=None, - top_p=None, - api_key=anthropic_node_with_dependency.connection.api_key, response_format=None, drop_params=True, + api_key=anthropic_node_with_dependency.connection.api_key, ), ] @@ -701,25 +700,25 @@ async def test_workflow_with_depend_nodes_and_depend_fail_async( assert response.error.failed_nodes[0].name == openai_node.name assert mock_llm_executor.call_count == 1 + # Async path uses _acompletion which does not pass `client` assert mock_llm_executor.call_args_list == [ mock.call( - tools=None, - tool_choice=None, model=openai_node.model, messages=expected_openai_messages, stream=False, temperature=openai_node.temperature, - api_key=openai_node.connection.api_key, - client=ANY, max_tokens=None, + tools=None, + tool_choice=None, stop=None, + top_p=None, seed=None, presence_penalty=None, frequency_penalty=None, - top_p=None, response_format=None, drop_params=True, api_base="https://api.openai.com/v1", + api_key=openai_node.connection.api_key, ) ] @@ -988,42 +987,42 @@ async def test_workflow_with_conditional_depend_nodes_with_tracing_async( assert response == RunnableResult(status=RunnableStatus.SUCCESS, input=input_data, output=expected_output) assert mock_llm_executor.call_count == 2 + # Async path uses _acompletion which does not pass `client` assert mock_llm_executor.call_args_list == [ mock.call( - tools=None, - tool_choice=None, model=openai_node_with_return_behavior.model, messages=expected_openai_messages, stream=False, temperature=openai_node_with_return_behavior.temperature, max_tokens=None, + tools=None, + tool_choice=None, stop=None, + top_p=None, seed=None, presence_penalty=None, frequency_penalty=None, - top_p=None, - api_key=openai_node_with_return_behavior.connection.api_key, - client=ANY, response_format=None, drop_params=True, api_base="https://api.openai.com/v1", + api_key=openai_node_with_return_behavior.connection.api_key, ), mock.call( - tools=None, - tool_choice=None, model=anthropic_node_with_success_status_conditional_depend.model, messages=expected_anthropic_messages, stream=False, temperature=anthropic_node_with_success_status_conditional_depend.temperature, max_tokens=None, + tools=None, + tool_choice=None, stop=None, + top_p=None, seed=None, presence_penalty=None, frequency_penalty=None, - top_p=None, - api_key=anthropic_node_with_success_status_conditional_depend.connection.api_key, response_format=None, drop_params=True, + api_key=anthropic_node_with_success_status_conditional_depend.connection.api_key, ), ] diff --git a/tests/unit/nodes/test_flow_async.py b/tests/unit/nodes/test_flow_async.py index b5762d79d..712d9b088 100644 --- a/tests/unit/nodes/test_flow_async.py +++ b/tests/unit/nodes/test_flow_async.py @@ -71,28 +71,3 @@ async def test_concurrent_flows_have_separate_executors(self): assert results[0].status == RunnableStatus.SUCCESS assert results[1].status == RunnableStatus.SUCCESS - - @pytest.mark.asyncio - async def test_flow_uses_sleep_zero_not_three_ms(self): - """Flow should use asyncio.sleep(0) not asyncio.sleep(0.003).""" - node = FastAsyncNode(id="fast1") - flow = Flow(nodes=[node]) - - sleep_args = [] - original_sleep = asyncio.sleep - - async def tracking_sleep(delay, *args, **kwargs): - sleep_args.append(delay) - return await original_sleep(delay, *args, **kwargs) - - with patch("dynamiq.flows.flow.asyncio.sleep", side_effect=tracking_sleep): - result = await flow.run_async( - input_data={}, config=RunnableConfig(callbacks=[]) - ) - - assert result.status == RunnableStatus.SUCCESS - # Filter out sleeps from node execute_async (e.g. 0.01) — only check flow-level sleeps - flow_sleeps = [a for a in sleep_args if a != 0.01] - assert len(flow_sleeps) > 0, "Expected at least one flow-level sleep(0) call" - for arg in flow_sleeps: - assert arg == 0, f"Expected sleep(0), got sleep({arg})" From 7021675e3e3e032b6d11c53d9e2457b013baec2a Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 13:55:35 +0300 Subject: [PATCH 15/26] fix: update finally block for ThreadPoolExecutor --- dynamiq/flows/flow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 14c666587..53293edd6 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -373,12 +373,12 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar max_workers = (config.max_node_workers if config else None) or self.max_node_workers executor = ContextAwareThreadPoolExecutor(max_workers=max_workers) - - logger.info(f"Flow {self.id}: execution started.") - self.run_on_flow_start(input_data, config, **merged_kwargs) time_start = datetime.now() try: + logger.info(f"Flow {self.id}: execution started.") + self.run_on_flow_start(input_data, config, **merged_kwargs) + if self.nodes: while self._ts.is_active(): ready_nodes = self._get_nodes_ready_to_run(input_data=input_data) From fdedc079902c1ae7ad65bef40b9704d0ed29d1b4 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 14:05:53 +0300 Subject: [PATCH 16/26] fix: update MCP tool tests --- tests/integration/nodes/tools/test_mcp_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/nodes/tools/test_mcp_tool.py b/tests/integration/nodes/tools/test_mcp_tool.py index 140470ec9..5064cef62 100644 --- a/tests/integration/nodes/tools/test_mcp_tool.py +++ b/tests/integration/nodes/tools/test_mcp_tool.py @@ -146,7 +146,7 @@ async def test_mock_tool_execute(mcp_server_tool): mock_exec.assert_called_once() assert result == mocked_result - with patch("dynamiq.nodes.tools.mcp.MCPTool.execute", return_value=mocked_result) as mock_exec: + with patch.object(MCPTool, "execute_async", new_callable=AsyncMock, return_value=mocked_result) as mock_exec: result = await tool.run(input_data={"a": 20, "b": 22}) mock_exec.assert_called_once() assert result.output == mocked_result From 06679f987ef0954bb7ef4df63c3ca44850d6b96a Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 14:12:16 +0300 Subject: [PATCH 17/26] fix: update Node for async retries and queue reads --- dynamiq/nodes/node.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 792c4b2ec..4fadbeaae 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -975,7 +975,10 @@ async def _run_async_native( try: try: self.validate_depends(depends_result) - input_data = self.get_approved_data_or_origin(input_data, config=config, **merged_kwargs) + # Offload blocking approval queue read to a thread to avoid blocking the event loop + input_data = await asyncio.to_thread( + self.get_approved_data_or_origin, input_data, config=config, **merged_kwargs + ) except NodeException as e: transformed_input = input_data | { k: result.to_tracing_depend_dict() for k, result in depends_result.items() @@ -1229,7 +1232,8 @@ async def execute_async_with_retry( merged_kwargs = merge(kwargs, {"execution_run_id": uuid4()}) try: - self.ensure_client() + # Offload blocking client initialization to a thread to avoid blocking the event loop + await asyncio.to_thread(self.ensure_client) except Exception as conn_error: logger.error( f"Node {self.name} - {self.id}: Failed to ensure client connection: {conn_error}" From b5428ad91fd599ac037c670f65af3baf16f55e7a Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 14:23:39 +0300 Subject: [PATCH 18/26] fix: fix comments from Bugbot --- dynamiq/flows/flow.py | 6 +++--- dynamiq/nodes/node.py | 7 +++++++ dynamiq/nodes/tools/cua_desktop/cua_desktop.py | 1 + dynamiq/nodes/tools/e2b_desktop/e2b_desktop.py | 1 + 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/dynamiq/flows/flow.py b/dynamiq/flows/flow.py index 53293edd6..14c666587 100644 --- a/dynamiq/flows/flow.py +++ b/dynamiq/flows/flow.py @@ -373,12 +373,12 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar max_workers = (config.max_node_workers if config else None) or self.max_node_workers executor = ContextAwareThreadPoolExecutor(max_workers=max_workers) + + logger.info(f"Flow {self.id}: execution started.") + self.run_on_flow_start(input_data, config, **merged_kwargs) time_start = datetime.now() try: - logger.info(f"Flow {self.id}: execution started.") - self.run_on_flow_start(input_data, config, **merged_kwargs) - if self.nodes: while self._ts.is_active(): ready_nodes = self._get_nodes_ready_to_run(input_data=input_data) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index 4fadbeaae..b755604bb 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -272,9 +272,16 @@ class Node(BaseModel, Runnable, DryRunMixin, ABC): _output_references: NodeOutputReferences = PrivateAttr() + # Set to True in subclasses that manage their own background event loop + # (e.g. CuaDesktopTool, E2BDesktopTool) to force run_async to offload + # via run_in_executor instead of calling execute_async on the main loop. + _force_thread_executor: ClassVar[bool] = False + @property def has_native_async(self) -> bool: """Check if the subclass provides a native async execute implementation.""" + if self._force_thread_executor: + return False return type(self).execute_async is not Node.execute_async model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/dynamiq/nodes/tools/cua_desktop/cua_desktop.py b/dynamiq/nodes/tools/cua_desktop/cua_desktop.py index 47ba7b39b..5e7605c97 100644 --- a/dynamiq/nodes/tools/cua_desktop/cua_desktop.py +++ b/dynamiq/nodes/tools/cua_desktop/cua_desktop.py @@ -320,6 +320,7 @@ class CuaDesktopTool(ConnectionNode): input_schema: ClassVar[type[CuaDesktopToolInputSchema]] = CuaDesktopToolInputSchema timeout: int = 3600 is_files_allowed: bool = True + _force_thread_executor: ClassVar[bool] = True _computer: Any | None = PrivateAttr(default=None) _loop = PrivateAttr(default=None) diff --git a/dynamiq/nodes/tools/e2b_desktop/e2b_desktop.py b/dynamiq/nodes/tools/e2b_desktop/e2b_desktop.py index 018c897e3..4a073a099 100644 --- a/dynamiq/nodes/tools/e2b_desktop/e2b_desktop.py +++ b/dynamiq/nodes/tools/e2b_desktop/e2b_desktop.py @@ -253,6 +253,7 @@ class E2BDesktopTool(ConnectionNode): input_schema: ClassVar[type[E2BDesktopToolInputSchema]] = E2BDesktopToolInputSchema timeout: int = 3600 is_files_allowed: bool = True + _force_thread_executor: ClassVar[bool] = True _desktop: Sandbox | None = PrivateAttr(default=None) _sandbox_id: str | None = PrivateAttr(default=None) From f8f0eb231e0634f6e5d3d758aef88d4a2fc2830e Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 14:49:09 +0300 Subject: [PATCH 19/26] fix: update params for async completion response --- dynamiq/nodes/llms/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index a6caf8dcf..928355c85 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -759,7 +759,7 @@ async def execute_async( response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs ) return self._handle_completion_response( - response=response, config=config, input_data=dict(input_data), **kwargs + response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs ) def _is_rate_limit_error(self, exception_type: type[Exception], error_str: str) -> bool: From 921d00822f6241ca48876bbde84dab0ea1794cd0 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 18:46:33 +0300 Subject: [PATCH 20/26] refactor: extract _build_completion_params to DRY execute/execute_async in BaseLLM --- dynamiq/nodes/llms/base.py | 132 ++++++++++++------------ tests/unit/nodes/llms/test_llm_async.py | 53 ++++++++++ 2 files changed, 121 insertions(+), 64 deletions(-) diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index 928355c85..e5251f916 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -593,43 +593,34 @@ def update_completion_params(self, params: dict[str, Any]) -> dict[str, Any]: params["stream_options"]["include_usage"] = True return params - def execute( + def _build_completion_params( self, - input_data: BaseLLMInputSchema, - config: RunnableConfig = None, + messages: list[dict], + config: RunnableConfig, prompt: Prompt | None = None, tools: list[Tool | dict] | None = None, response_format: dict[str, Any] | None = None, parallel_tool_calls: bool | None = None, - **kwargs, - ): - """Execute the LLM node. - - This method processes the input data, formats the prompt, and generates a response using - the configured LLM. + include_sync_client: bool = True, + ) -> dict[str, Any]: + """Build the common parameter dict for litellm completion/acompletion calls. Args: - input_data (BaseLLMInputSchema): The input data for the LLM. - config (RunnableConfig, optional): The configuration for the execution. Defaults to None. - prompt (Prompt, optional): The prompt to use for this execution. Defaults to None. - tools (list[Tool|dict]): List of tools that llm can call. - response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output - parallel_tool_calls (bool | None): Whether to allow the LLM to return multiple tool calls - in a single response. None means provider decides. - **kwargs: Additional keyword arguments. + messages: Formatted prompt messages. + config: Runnable configuration (used to detect streaming callbacks). + prompt: Prompt with optional tools/response_format overrides. + tools: Explicit tool list override. + response_format: Explicit response format override. + parallel_tool_calls: Whether to allow parallel tool calls. + include_sync_client: If True and self.client exists, include it in params. + Set to False for async calls that should not receive the sync client. Returns: - dict: A dictionary containing the generated content and tool calls. + Dict of params ready to pass to _completion or _acompletion. """ - config = ensure_config(config) - self.reset_run_state() - prompt = prompt or self.prompt or Prompt(messages=[], tools=None, response_format=None) - messages = self.get_messages(prompt, input_data) - self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs) - extra = copy.deepcopy(self.__pydantic_extra__) params = self.connection.conn_params.copy() - if self.client and not isinstance(self.connection, HttpApiKey): + if include_sync_client and self.client and not isinstance(self.connection, HttpApiKey): params.update({"client": self.client}) if self.thinking_enabled: params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}}) @@ -641,8 +632,6 @@ def execute( tools=tools, response_format=response_format, ) - # Check if a streaming callback is available in the config and enable streaming only if it is - # This is to avoid unnecessary streaming to reduce CPU usage is_streaming_callback_available = any( isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks ) @@ -666,12 +655,56 @@ def execute( if parallel_tool_calls is not None: common_params["parallel_tool_calls"] = parallel_tool_calls - common_params = self.update_completion_params(common_params) + return self.update_completion_params(common_params) + + def execute( + self, + input_data: BaseLLMInputSchema, + config: RunnableConfig = None, + prompt: Prompt | None = None, + tools: list[Tool | dict] | None = None, + response_format: dict[str, Any] | None = None, + parallel_tool_calls: bool | None = None, + **kwargs, + ): + """Execute the LLM node. + + This method processes the input data, formats the prompt, and generates a response using + the configured LLM. + + Args: + input_data (BaseLLMInputSchema): The input data for the LLM. + config (RunnableConfig, optional): The configuration for the execution. Defaults to None. + prompt (Prompt, optional): The prompt to use for this execution. Defaults to None. + tools (list[Tool|dict]): List of tools that llm can call. + response_format (dict[str, Any]): JSON schema that specifies the structure of the llm's output + parallel_tool_calls (bool | None): Whether to allow the LLM to return multiple tool calls + in a single response. None means provider decides. + **kwargs: Additional keyword arguments. + + Returns: + dict: A dictionary containing the generated content and tool calls. + """ + config = ensure_config(config) + self.reset_run_state() + prompt = prompt or self.prompt or Prompt(messages=[], tools=None, response_format=None) + messages = self.get_messages(prompt, input_data) + self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs) + + common_params = self._build_completion_params( + messages=messages, + config=config, + prompt=prompt, + tools=tools, + response_format=response_format, + parallel_tool_calls=parallel_tool_calls, + include_sync_client=True, + ) response = self._completion(**common_params) handle_completion = ( self._handle_streaming_completion_response - if self.streaming.enabled and is_streaming_callback_available + if common_params.get("stream") else self._handle_completion_response ) @@ -713,48 +746,19 @@ async def execute_async( messages = self.get_messages(prompt, input_data) self.run_on_node_execute_run(callbacks=config.callbacks, prompt_messages=messages, **kwargs) - extra = copy.deepcopy(self.__pydantic_extra__) - params = self.connection.conn_params.copy() - # Do not pass the sync client to acompletion — litellm will create - # its own async HTTP client using the connection params (api_key, api_base). - if self.thinking_enabled: - params.update({"thinking": {"type": "enabled", "budget_tokens": self.budget_tokens}}) - if extra: - params.update(extra) - - response_format, tools = self._get_response_format_and_tools( + common_params = self._build_completion_params( + messages=messages, + config=config, prompt=prompt, tools=tools, response_format=response_format, + parallel_tool_calls=parallel_tool_calls, + include_sync_client=False, ) - is_streaming_callback_available = any( - isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks - ) - common_params: dict[str, Any] = { - "model": self.model, - "messages": messages, - "stream": self.streaming.enabled and is_streaming_callback_available, - "temperature": self.temperature, - "max_tokens": self.max_tokens, - "tools": tools, - "tool_choice": self.tool_choice, - "stop": self.stop if self.stop else None, - "top_p": self.top_p, - "seed": self.seed, - "presence_penalty": self.presence_penalty, - "frequency_penalty": self.frequency_penalty, - "response_format": response_format, - "drop_params": True, - **params, - } - if parallel_tool_calls is not None: - common_params["parallel_tool_calls"] = parallel_tool_calls - - common_params = self.update_completion_params(common_params) response = await self._acompletion(**common_params) - if self.streaming.enabled and is_streaming_callback_available: + if common_params.get("stream"): return await self._handle_streaming_completion_response_async( response=response, messages=messages, config=config, input_data=dict(input_data), **kwargs ) diff --git a/tests/unit/nodes/llms/test_llm_async.py b/tests/unit/nodes/llms/test_llm_async.py index c6f0c5401..fe35e9575 100644 --- a/tests/unit/nodes/llms/test_llm_async.py +++ b/tests/unit/nodes/llms/test_llm_async.py @@ -90,6 +90,59 @@ async def async_chunk_iter(): node._stream_chunk_builder.assert_called_once() +class TestBuildCompletionParams: + def test_build_completion_params_returns_expected_keys(self): + """_build_completion_params should return dict with model, messages, stream, etc.""" + with patch("litellm.completion"), \ + patch("litellm.stream_chunk_builder"): + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + input_data = MagicMock(messages=None, files=None) + config = RunnableConfig(callbacks=[]) + prompt = node.prompt or Prompt(messages=[], tools=None, response_format=None) + messages = node.get_messages(prompt, input_data) + + params = node._build_completion_params( + messages=messages, + config=config, + prompt=prompt, + include_sync_client=True, + ) + + assert params["model"] == "openai/gpt-4o-mini" + assert params["messages"] == messages + assert "stream" in params + assert "temperature" in params + assert "drop_params" in params + + def test_build_completion_params_excludes_client_when_not_sync(self): + """When include_sync_client=False, client should not be in params.""" + with patch("litellm.completion"), \ + patch("litellm.stream_chunk_builder"): + node = OpenAI( + model="gpt-4o-mini", + connection=OpenAIConnection(api_key="test-key"), + prompt=Prompt(messages=[{"role": "user", "content": "Hello"}]), + ) + node.client = MagicMock() # Simulate a sync client + input_data = MagicMock(messages=None, files=None) + config = RunnableConfig(callbacks=[]) + prompt = node.prompt or Prompt(messages=[], tools=None, response_format=None) + messages = node.get_messages(prompt, input_data) + + params = node._build_completion_params( + messages=messages, + config=config, + prompt=prompt, + include_sync_client=False, + ) + + assert "client" not in params + + class TestBaseLLMAsyncFallback: @pytest.mark.asyncio async def test_run_async_no_fallback_on_success(self): From e73d94697a3c99e2fca0ad14a7d66c0c5e7a6cab Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 18:54:10 +0300 Subject: [PATCH 21/26] refactor: restore streaming comment in _build_completion_params --- dynamiq/nodes/llms/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dynamiq/nodes/llms/base.py b/dynamiq/nodes/llms/base.py index e5251f916..893d64976 100644 --- a/dynamiq/nodes/llms/base.py +++ b/dynamiq/nodes/llms/base.py @@ -632,6 +632,8 @@ def _build_completion_params( tools=tools, response_format=response_format, ) + # Check if a streaming callback is available in the config and enable streaming only if it is. + # This is to avoid unnecessary streaming to reduce CPU usage. is_streaming_callback_available = any( isinstance(callback, BaseStreamingCallbackHandler) for callback in config.callbacks ) From 40656edd86c85f81bb66808191a3bdd8c5d858ab Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 19:06:00 +0300 Subject: [PATCH 22/26] feat: add cache_wf_entity_async for non-blocking async cache support --- dynamiq/cache/utils.py | 66 +++++++++++++++++++++ tests/unit/cache/test_cache_utils_async.py | 67 ++++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 tests/unit/cache/test_cache_utils_async.py diff --git a/dynamiq/cache/utils.py b/dynamiq/cache/utils.py index d07af6840..29277ae20 100644 --- a/dynamiq/cache/utils.py +++ b/dynamiq/cache/utils.py @@ -82,3 +82,69 @@ def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]: return wrapper return _cache + + +def cache_wf_entity_async( + entity_id: str, + cache_enabled: bool = False, + cache_manager_cls: type[WorkflowCacheManager] | None = None, + cache_config: CacheConfig | None = None, + func_kwargs_to_remove: tuple[str] = FUNC_KWARGS_TO_REMOVE, +) -> Callable: + """Async decorator to cache workflow entity outputs. + + Like cache_wf_entity but wraps an async function. Cache I/O (Redis get/set) + is offloaded to threads via asyncio.to_thread to avoid blocking the event loop. + + Args: + entity_id (str): Identifier for the entity. + cache_enabled (bool): Flag to enable caching. + cache_manager_cls (type[WorkflowCacheManager] | None): Cache manager class. + Defaults to WorkflowCacheManager when None. + cache_config (CacheConfig | None): Cache configuration. + func_kwargs_to_remove (tuple[str]): List of params to remove from callable function kwargs. + + Returns: + Callable: Wrapped async function with caching. + """ + import asyncio + + def _cache(func: Callable) -> Callable: + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]: + cache_manager = None + from_cache = False + input_data = kwargs.pop("input_data", args[0] if args else {}) + input_data = dict(input_data) if isinstance(input_data, BaseModel) else input_data + + resolved_cls = cache_manager_cls if cache_manager_cls is not None else WorkflowCacheManager + cleaned_kwargs = {k: v for k, v in kwargs.items() if k not in func_kwargs_to_remove} + if cache_enabled and cache_config: + logger.debug(f"Entity_id {entity_id}: async cache used") + cache_manager = resolved_cls(config=cache_config) + output = await asyncio.to_thread( + cache_manager.get_entity_output, + entity_id=entity_id, + input_data=input_data, + **cleaned_kwargs, + ) + if output: + from_cache = True + return output, from_cache + + output = await func(*args, **kwargs) + + if cache_manager: + await asyncio.to_thread( + cache_manager.set_entity_output, + entity_id=entity_id, + input_data=input_data, + output_data=output, + **cleaned_kwargs, + ) + + return output, from_cache + + return wrapper + + return _cache diff --git a/tests/unit/cache/test_cache_utils_async.py b/tests/unit/cache/test_cache_utils_async.py new file mode 100644 index 000000000..c1c0e8bfd --- /dev/null +++ b/tests/unit/cache/test_cache_utils_async.py @@ -0,0 +1,67 @@ +import asyncio +from unittest.mock import MagicMock, AsyncMock, patch + +import pytest + +from dynamiq.cache.utils import cache_wf_entity_async + + +class TestCacheWfEntityAsync: + @pytest.mark.asyncio + async def test_cache_miss_calls_async_func(self): + """On cache miss, the async wrapper should await the wrapped coroutine.""" + async def my_async_func(*args, **kwargs): + return {"result": "computed"} + + cache = cache_wf_entity_async( + entity_id="node-1", + cache_enabled=False, + ) + wrapped = cache(my_async_func) + output, from_cache = await wrapped({"key": "val"}, config=None) + + assert output == {"result": "computed"} + assert from_cache is False + + @pytest.mark.asyncio + async def test_cache_hit_returns_cached(self): + """On cache hit, should return cached output without calling the function.""" + mock_func = AsyncMock(return_value={"result": "computed"}) + + mock_cache_manager = MagicMock() + mock_cache_manager.get_entity_output.return_value = {"result": "cached"} + + with patch("dynamiq.cache.utils.WorkflowCacheManager", return_value=mock_cache_manager): + cache = cache_wf_entity_async( + entity_id="node-1", + cache_enabled=True, + cache_config=MagicMock(), + ) + wrapped = cache(mock_func) + output, from_cache = await wrapped({"key": "val"}, config=None) + + assert output == {"result": "cached"} + assert from_cache is True + mock_func.assert_not_called() + + @pytest.mark.asyncio + async def test_cache_miss_stores_result(self): + """On cache miss with caching enabled, should store the result.""" + async def my_async_func(*args, **kwargs): + return {"result": "computed"} + + mock_cache_manager = MagicMock() + mock_cache_manager.get_entity_output.return_value = None + + with patch("dynamiq.cache.utils.WorkflowCacheManager", return_value=mock_cache_manager): + cache = cache_wf_entity_async( + entity_id="node-1", + cache_enabled=True, + cache_config=MagicMock(), + ) + wrapped = cache(my_async_func) + output, from_cache = await wrapped({"key": "val"}, config=None) + + assert output == {"result": "computed"} + assert from_cache is False + mock_cache_manager.set_entity_output.assert_called_once() From 96264ea8948aeff8657664981e4abcac01fa50ae Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 19:17:09 +0300 Subject: [PATCH 23/26] fix: clean up cache_wf_entity_async per review --- dynamiq/cache/utils.py | 13 +++----- tests/unit/cache/test_cache_utils_async.py | 37 +++++++++++----------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/dynamiq/cache/utils.py b/dynamiq/cache/utils.py index 29277ae20..25331bb25 100644 --- a/dynamiq/cache/utils.py +++ b/dynamiq/cache/utils.py @@ -1,3 +1,4 @@ +import asyncio from functools import wraps from typing import Any, Callable @@ -87,7 +88,7 @@ def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]: def cache_wf_entity_async( entity_id: str, cache_enabled: bool = False, - cache_manager_cls: type[WorkflowCacheManager] | None = None, + cache_manager_cls: type[WorkflowCacheManager] = WorkflowCacheManager, cache_config: CacheConfig | None = None, func_kwargs_to_remove: tuple[str] = FUNC_KWARGS_TO_REMOVE, ) -> Callable: @@ -99,16 +100,13 @@ def cache_wf_entity_async( Args: entity_id (str): Identifier for the entity. cache_enabled (bool): Flag to enable caching. - cache_manager_cls (type[WorkflowCacheManager] | None): Cache manager class. - Defaults to WorkflowCacheManager when None. + cache_manager_cls (type[WorkflowCacheManager]): Cache manager class. cache_config (CacheConfig | None): Cache configuration. func_kwargs_to_remove (tuple[str]): List of params to remove from callable function kwargs. Returns: Callable: Wrapped async function with caching. """ - import asyncio - def _cache(func: Callable) -> Callable: @wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]: @@ -117,18 +115,17 @@ async def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]: input_data = kwargs.pop("input_data", args[0] if args else {}) input_data = dict(input_data) if isinstance(input_data, BaseModel) else input_data - resolved_cls = cache_manager_cls if cache_manager_cls is not None else WorkflowCacheManager cleaned_kwargs = {k: v for k, v in kwargs.items() if k not in func_kwargs_to_remove} if cache_enabled and cache_config: logger.debug(f"Entity_id {entity_id}: async cache used") - cache_manager = resolved_cls(config=cache_config) + cache_manager = cache_manager_cls(config=cache_config) output = await asyncio.to_thread( cache_manager.get_entity_output, entity_id=entity_id, input_data=input_data, **cleaned_kwargs, ) - if output: + if output is not None: from_cache = True return output, from_cache diff --git a/tests/unit/cache/test_cache_utils_async.py b/tests/unit/cache/test_cache_utils_async.py index c1c0e8bfd..c916f377b 100644 --- a/tests/unit/cache/test_cache_utils_async.py +++ b/tests/unit/cache/test_cache_utils_async.py @@ -1,5 +1,4 @@ -import asyncio -from unittest.mock import MagicMock, AsyncMock, patch +from unittest.mock import MagicMock, AsyncMock import pytest @@ -30,15 +29,16 @@ async def test_cache_hit_returns_cached(self): mock_cache_manager = MagicMock() mock_cache_manager.get_entity_output.return_value = {"result": "cached"} + mock_cls = MagicMock(return_value=mock_cache_manager) - with patch("dynamiq.cache.utils.WorkflowCacheManager", return_value=mock_cache_manager): - cache = cache_wf_entity_async( - entity_id="node-1", - cache_enabled=True, - cache_config=MagicMock(), - ) - wrapped = cache(mock_func) - output, from_cache = await wrapped({"key": "val"}, config=None) + cache = cache_wf_entity_async( + entity_id="node-1", + cache_enabled=True, + cache_manager_cls=mock_cls, + cache_config=MagicMock(), + ) + wrapped = cache(mock_func) + output, from_cache = await wrapped({"key": "val"}, config=None) assert output == {"result": "cached"} assert from_cache is True @@ -52,15 +52,16 @@ async def my_async_func(*args, **kwargs): mock_cache_manager = MagicMock() mock_cache_manager.get_entity_output.return_value = None + mock_cls = MagicMock(return_value=mock_cache_manager) - with patch("dynamiq.cache.utils.WorkflowCacheManager", return_value=mock_cache_manager): - cache = cache_wf_entity_async( - entity_id="node-1", - cache_enabled=True, - cache_config=MagicMock(), - ) - wrapped = cache(my_async_func) - output, from_cache = await wrapped({"key": "val"}, config=None) + cache = cache_wf_entity_async( + entity_id="node-1", + cache_enabled=True, + cache_manager_cls=mock_cls, + cache_config=MagicMock(), + ) + wrapped = cache(my_async_func) + output, from_cache = await wrapped({"key": "val"}, config=None) assert output == {"result": "computed"} assert from_cache is False From 9c1d45fb7228defdd552f1b1ca2a5e452b41e34f Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 19:54:03 +0300 Subject: [PATCH 24/26] fix: use async cache path in _run_async_native --- dynamiq/nodes/node.py | 233 ++++++++++++++-------------- tests/unit/nodes/test_node_async.py | 38 +++++ 2 files changed, 156 insertions(+), 115 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index b755604bb..e20dcc01d 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -16,7 +16,7 @@ from jinja2 import Template from pydantic import BaseModel, ConfigDict, Field, PrivateAttr, computed_field, create_model, model_validator -from dynamiq.cache.utils import cache_wf_entity +from dynamiq.cache.utils import cache_wf_entity, cache_wf_entity_async from dynamiq.callbacks import BaseCallbackHandler, NodeCallbackHandler, TracingCallbackHandler from dynamiq.connections import BaseConnection from dynamiq.connections.managers import ConnectionManager, ConnectionManagerException @@ -858,6 +858,102 @@ def get_approved_data_or_origin( return input_data + def _prepare_execution( + self, + input_data: dict, + config: RunnableConfig, + depends_result: dict | None, + **kwargs, + ) -> tuple[RunnableConfig, dict, dict]: + """Shared pre-execution setup for run_sync and _run_async_native. + + Returns: + Tuple of (config, merged_kwargs, depends_result). + """ + config = ensure_config(config) + run_id = uuid4() + merged_kwargs = merge(kwargs, {"run_id": run_id, "parent_run_id": kwargs.get("parent_run_id", None)}) + if depends_result is None: + depends_result = {} + return config, merged_kwargs, depends_result + + def _handle_skip( + self, + e: "NodeException", + input_data: dict, + depends_result: dict, + config: RunnableConfig, + **merged_kwargs, + ) -> RunnableResult: + """Handle node skip due to failed dependency or approval rejection.""" + transformed_input = input_data | { + k: result.to_tracing_depend_dict() for k, result in depends_result.items() + } + skip_data = {"failed_dependency": e.failed_depend.to_dict(for_tracing=True)} + self.run_on_node_skip( + callbacks=config.callbacks, + skip_data=skip_data, + input_data=transformed_input, + human_feedback=getattr(e, "human_feedback", None), + **merged_kwargs, + ) + logger.info(f"Node {self.name} - {self.id}: execution skipped.") + return RunnableResult( + status=RunnableStatus.SKIP, + input=transformed_input, + output=None, + error=RunnableResultError.from_exception(e, recoverable=e.recoverable), + ) + + def _handle_success( + self, + output: Any, + from_cache: bool, + transformed_input: dict, + config: RunnableConfig, + time_start: datetime, + log_prefix: str, + merged_kwargs: dict, + **kwargs, + ) -> RunnableResult: + """Handle successful execution — transform output, fire callbacks, return result.""" + callback_kwargs = {**merged_kwargs, "is_output_from_cache": from_cache} + # transform_output uses original kwargs; run_on_node_end needs merged_kwargs with run_id + transformed_output = self.transform_output(output, config=config, **kwargs) + self.run_on_node_end(config.callbacks, transformed_output, **callback_kwargs) + logger.info( + f"Node {self.name} - {self.id}: {log_prefix}execution succeeded in " + f"{format_duration(time_start, datetime.now())}." + ) + return RunnableResult( + status=RunnableStatus.SUCCESS, input=dict(transformed_input), output=transformed_output + ) + + def _handle_failure( + self, + e: Exception, + transformed_input: dict, + config: RunnableConfig, + time_start: datetime, + log_prefix: str, + **merged_kwargs, + ) -> RunnableResult: + """Handle execution failure — fire error callbacks, return failure result.""" + from dynamiq.nodes.agents.exceptions import RecoverableAgentException + + self.run_on_node_error(callbacks=config.callbacks, error=e, input_data=transformed_input, **merged_kwargs) + logger.error( + f"Node {self.name} - {self.id}: {log_prefix}execution failed in " + f"{format_duration(time_start, datetime.now())}. {e}" + ) + recoverable = isinstance(e, RecoverableAgentException) + return RunnableResult( + status=RunnableStatus.FAILURE, + input=transformed_input, + output=None, + error=RunnableResultError.from_exception(e, recoverable=recoverable), + ) + def run_sync( self, input_data: dict, @@ -877,42 +973,19 @@ def run_sync( Returns: RunnableResult: Result of the node execution. """ - from dynamiq.nodes.agents.exceptions import RecoverableAgentException - logger.info(f"Node {self.name} - {self.id}: execution started.") transformed_input = input_data time_start = datetime.now() - - config = ensure_config(config) - - run_id = uuid4() - merged_kwargs = merge(kwargs, {"run_id": run_id, "parent_run_id": kwargs.get("parent_run_id", None)}) - if depends_result is None: - depends_result = {} + config, merged_kwargs, depends_result = self._prepare_execution( + input_data, config, depends_result, **kwargs + ) try: try: self.validate_depends(depends_result) input_data = self.get_approved_data_or_origin(input_data, config=config, **merged_kwargs) except NodeException as e: - transformed_input = input_data | { - k: result.to_tracing_depend_dict() for k, result in depends_result.items() - } - skip_data = {"failed_dependency": e.failed_depend.to_dict(for_tracing=True)} - self.run_on_node_skip( - callbacks=config.callbacks, - skip_data=skip_data, - input_data=transformed_input, - human_feedback=getattr(e, "human_feedback", None), - **merged_kwargs, - ) - logger.info(f"Node {self.name} - {self.id}: execution skipped.") - return RunnableResult( - status=RunnableStatus.SKIP, - input=transformed_input, - output=None, - error=RunnableResultError.from_exception(e, recoverable=e.recoverable), - ) + return self._handle_skip(e, input_data, depends_result, config, **merged_kwargs) transformed_input = self.validate_input_schema( self.transform_input(input_data=input_data, depends_result=depends_result, config=config, **kwargs), @@ -927,33 +1000,12 @@ def run_sync( output, from_cache = cache(self.execute_with_retry)(transformed_input, config, **merged_kwargs) - merged_kwargs["is_output_from_cache"] = from_cache - transformed_output = self.transform_output(output, config=config, **kwargs) - - self.run_on_node_end(config.callbacks, transformed_output, **merged_kwargs) - - logger.info( - f"Node {self.name} - {self.id}: execution succeeded in " - f"{format_duration(time_start, datetime.now())}." - ) - return RunnableResult( - status=RunnableStatus.SUCCESS, input=dict(transformed_input), output=transformed_output + return self._handle_success( + output, from_cache, transformed_input, config, time_start, "", + merged_kwargs=merged_kwargs, **kwargs ) except Exception as e: - self.run_on_node_error(callbacks=config.callbacks, error=e, input_data=transformed_input, **merged_kwargs) - logger.error( - f"Node {self.name} - {self.id}: execution failed in " - f"{format_duration(time_start, datetime.now())}. {e}" - ) - - recoverable = isinstance(e, RecoverableAgentException) - result = RunnableResult( - status=RunnableStatus.FAILURE, - input=transformed_input, - output=None, - error=RunnableResultError.from_exception(e, recoverable=recoverable), - ) - return result + return self._handle_failure(e, transformed_input, config, time_start, "", **merged_kwargs) async def _run_async_native( self, @@ -966,18 +1018,12 @@ async def _run_async_native( Run the node asynchronously using native async execute. Mirrors run_sync() lifecycle but calls execute_async_with_retry(). """ - from dynamiq.nodes.agents.exceptions import RecoverableAgentException - logger.info(f"Node {self.name} - {self.id}: async execution started.") transformed_input = input_data time_start = datetime.now() - - config = ensure_config(config) - - run_id = uuid4() - merged_kwargs = merge(kwargs, {"run_id": run_id, "parent_run_id": kwargs.get("parent_run_id", None)}) - if depends_result is None: - depends_result = {} + config, merged_kwargs, depends_result = self._prepare_execution( + input_data, config, depends_result, **kwargs + ) try: try: @@ -987,72 +1033,29 @@ async def _run_async_native( self.get_approved_data_or_origin, input_data, config=config, **merged_kwargs ) except NodeException as e: - transformed_input = input_data | { - k: result.to_tracing_depend_dict() for k, result in depends_result.items() - } - skip_data = {"failed_dependency": e.failed_depend.to_dict(for_tracing=True)} - self.run_on_node_skip( - callbacks=config.callbacks, - skip_data=skip_data, - input_data=transformed_input, - human_feedback=getattr(e, "human_feedback", None), - **merged_kwargs, - ) - logger.info(f"Node {self.name} - {self.id}: execution skipped.") - return RunnableResult( - status=RunnableStatus.SKIP, - input=transformed_input, - output=None, - error=RunnableResultError.from_exception(e, recoverable=e.recoverable), - ) + return self._handle_skip(e, input_data, depends_result, config, **merged_kwargs) transformed_input = self.validate_input_schema( self.transform_input(input_data=input_data, depends_result=depends_result, config=config, **kwargs), **kwargs, ) self.run_on_node_start(config.callbacks, dict(transformed_input), **merged_kwargs) - cache = cache_wf_entity( + + cache = cache_wf_entity_async( entity_id=self.id, cache_enabled=self.caching.enabled, cache_config=config.cache, ) - - # When caching is enabled, offload sync execute to a thread to avoid blocking the event loop - if self.caching.enabled: - output, from_cache = await asyncio.to_thread( - cache(self.execute_with_retry), transformed_input, config, **merged_kwargs - ) - else: - output = await self.execute_async_with_retry(transformed_input, config, **merged_kwargs) - from_cache = False - - merged_kwargs["is_output_from_cache"] = from_cache - transformed_output = self.transform_output(output, config=config, **kwargs) - - self.run_on_node_end(config.callbacks, transformed_output, **merged_kwargs) - - logger.info( - f"Node {self.name} - {self.id}: async execution succeeded in " - f"{format_duration(time_start, datetime.now())}." - ) - return RunnableResult( - status=RunnableStatus.SUCCESS, input=dict(transformed_input), output=transformed_output - ) - except Exception as e: - self.run_on_node_error(callbacks=config.callbacks, error=e, input_data=transformed_input, **merged_kwargs) - logger.error( - f"Node {self.name} - {self.id}: async execution failed in " - f"{format_duration(time_start, datetime.now())}. {e}" + output, from_cache = await cache(self.execute_async_with_retry)( + transformed_input, config, **merged_kwargs ) - recoverable = isinstance(e, RecoverableAgentException) - result = RunnableResult( - status=RunnableStatus.FAILURE, - input=transformed_input, - output=None, - error=RunnableResultError.from_exception(e, recoverable=recoverable), + return self._handle_success( + output, from_cache, transformed_input, config, time_start, "async ", + merged_kwargs=merged_kwargs, **kwargs ) - return result + except Exception as e: + return self._handle_failure(e, transformed_input, config, time_start, "async ", **merged_kwargs) async def run_async( self, diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index f49dcc771..91c4ffed3 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -138,3 +138,41 @@ async def test_sync_node_without_executor_falls_back_to_default(self): ) assert result.status == RunnableStatus.SUCCESS assert result.output == {"result": "sync"} + + +class CachingAsyncNode(Node): + """Test node that tracks whether sync or async execute was called.""" + group: NodeGroup = NodeGroup.UTILS + name: str = "CachingAsync" + sync_called: bool = False + async_called: bool = False + + def execute(self, input_data, config=None, **kwargs): + self.sync_called = True + return {"result": "sync"} + + async def execute_async(self, input_data, config=None, **kwargs): + self.async_called = True + return {"result": "async"} + + +class TestAsyncCachingPath: + @pytest.mark.asyncio + async def test_cached_async_path_uses_execute_async(self): + """When caching is enabled in _run_async_native, it should still use + execute_async_with_retry (async path), not execute_with_retry (sync path).""" + from dynamiq.nodes.node import CachingConfig + + node = CachingAsyncNode( + caching=CachingConfig(enabled=True), + ) + # Run without actual cache config so cache decorator is a passthrough + result = await node.run_async( + input_data={"input": "test"}, + config=RunnableConfig(callbacks=[]), + ) + assert result.status == RunnableStatus.SUCCESS + assert node.async_called is True + assert node.sync_called is False + + From 67558e8e17fafc1cc3249502a0800aa794f3b623 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 20:00:59 +0300 Subject: [PATCH 25/26] fix: skip redundant context copy for ContextAwareTPE --- dynamiq/nodes/node.py | 9 ++++-- tests/unit/nodes/test_node_async.py | 43 +++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/dynamiq/nodes/node.py b/dynamiq/nodes/node.py index e20dcc01d..5e298ec0e 100644 --- a/dynamiq/nodes/node.py +++ b/dynamiq/nodes/node.py @@ -1088,12 +1088,17 @@ async def run_async( ) else: loop = asyncio.get_running_loop() - ctx = contextvars.copy_context() fn = functools.partial( self.run_sync, input_data=input_data, config=config, depends_result=depends_result, **kwargs ) - return await loop.run_in_executor(executor, ctx.run, fn) + # ContextAwareThreadPoolExecutor already propagates contextvars + # in its submit() method — no need to copy context again. + if isinstance(executor, ContextAwareThreadPoolExecutor): + return await loop.run_in_executor(executor, fn) + else: + ctx = contextvars.copy_context() + return await loop.run_in_executor(executor, ctx.run, fn) def ensure_client(self) -> None: """ diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index 91c4ffed3..548f9990d 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -1,8 +1,10 @@ import asyncio from concurrent.futures import ThreadPoolExecutor +from unittest.mock import patch, MagicMock import pytest +from dynamiq.executors.context import ContextAwareThreadPoolExecutor from dynamiq.nodes.node import Node, ErrorHandling from dynamiq.nodes.types import NodeGroup from dynamiq.runnables import RunnableConfig, RunnableStatus @@ -156,6 +158,47 @@ async def execute_async(self, input_data, config=None, **kwargs): return {"result": "async"} +class TestRunAsyncContextPropagation: + @pytest.mark.asyncio + async def test_context_aware_executor_does_not_double_copy(self): + """When executor is ContextAwareThreadPoolExecutor, run_async should not + wrap with ctx.run since the executor handles context propagation.""" + node = SyncOnlyNode() + executor = ContextAwareThreadPoolExecutor(max_workers=2) + try: + with patch("dynamiq.nodes.node.contextvars") as mock_contextvars: + result = await node.run_async( + input_data={"input": "test"}, + config=RunnableConfig(callbacks=[]), + executor=executor, + ) + mock_contextvars.copy_context.assert_not_called() + assert result.status == RunnableStatus.SUCCESS + finally: + executor.shutdown(wait=False) + + @pytest.mark.asyncio + async def test_regular_executor_still_copies_context(self): + """When executor is a regular ThreadPoolExecutor, run_async should + still copy context explicitly.""" + node = SyncOnlyNode() + executor = ThreadPoolExecutor(max_workers=2) + try: + with patch("dynamiq.nodes.node.contextvars") as mock_contextvars: + mock_ctx = MagicMock() + mock_ctx.run = lambda fn, *a, **kw: fn(*a, **kw) + mock_contextvars.copy_context.return_value = mock_ctx + result = await node.run_async( + input_data={"input": "test"}, + config=RunnableConfig(callbacks=[]), + executor=executor, + ) + mock_contextvars.copy_context.assert_called_once() + assert result.status == RunnableStatus.SUCCESS + finally: + executor.shutdown(wait=False) + + class TestAsyncCachingPath: @pytest.mark.asyncio async def test_cached_async_path_uses_execute_async(self): From 356b16e809302521464e8dfd33e8330f2be667f4 Mon Sep 17 00:00:00 2001 From: Taras Yaroshko Date: Tue, 7 Apr 2026 20:03:27 +0300 Subject: [PATCH 26/26] fix: trailing newline in test_node_async.py --- tests/unit/nodes/test_node_async.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/unit/nodes/test_node_async.py b/tests/unit/nodes/test_node_async.py index 548f9990d..3b5a288d3 100644 --- a/tests/unit/nodes/test_node_async.py +++ b/tests/unit/nodes/test_node_async.py @@ -217,5 +217,3 @@ async def test_cached_async_path_uses_execute_async(self): assert result.status == RunnableStatus.SUCCESS assert node.async_called is True assert node.sync_called is False - -