Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9c0485a
feat: add execute_async() and has_native_async to Node base class
tyaroshko Apr 5, 2026
e46da79
feat: add execute_async_with_retry() with asyncio timeout and non-blo…
tyaroshko Apr 5, 2026
518fd07
feat: rewrite Node.run_async() with native async routing and per-flow…
tyaroshko Apr 5, 2026
ec209c7
feat: per-flow executor isolation in Flow.run_async(), replace 3ms sl…
tyaroshko Apr 5, 2026
b0d60fe
feat: add BaseLLM.execute_async() using litellm.acompletion() with as…
tyaroshko Apr 5, 2026
3260d5b
feat: add BaseLLM.run_async() with async fallback support
tyaroshko Apr 5, 2026
9451e01
fix: skip passing sync client to litellm.acompletion in execute_async
tyaroshko Apr 5, 2026
078b5ff
fix: update test mocks for BaseLLM.run_async override
tyaroshko Apr 5, 2026
cd06232
chore: remove redundant tests
tyaroshko Apr 7, 2026
8cf0c2e
fix: update linting
tyaroshko Apr 7, 2026
ef160d6
fix: remove blocking sync execution when caching is enabled
tyaroshko Apr 7, 2026
07c39e1
fix: add lazy import for RecoverableAgentException
tyaroshko Apr 7, 2026
ec02882
fix: update asyncio.sleep() to remove CPU busy-wait
tyaroshko Apr 7, 2026
3cbf65b
fix: update tests for async execution
tyaroshko Apr 7, 2026
7021675
fix: update finally block for ThreadPoolExecutor
tyaroshko Apr 7, 2026
fdedc07
fix: update MCP tool tests
tyaroshko Apr 7, 2026
06679f9
fix: update Node for async retries and queue reads
tyaroshko Apr 7, 2026
b5428ad
fix: fix comments from Bugbot
tyaroshko Apr 7, 2026
f8f0eb2
fix: update params for async completion response
tyaroshko Apr 7, 2026
921d008
refactor: extract _build_completion_params to DRY execute/execute_asy…
tyaroshko Apr 7, 2026
e73d946
refactor: restore streaming comment in _build_completion_params
tyaroshko Apr 7, 2026
40656ed
feat: add cache_wf_entity_async for non-blocking async cache support
tyaroshko Apr 7, 2026
96264ea
fix: clean up cache_wf_entity_async per review
tyaroshko Apr 7, 2026
9c1d45f
fix: use async cache path in _run_async_native
tyaroshko Apr 7, 2026
67558e8
fix: skip redundant context copy for ContextAwareTPE
tyaroshko Apr 7, 2026
356b16e
fix: trailing newline in test_node_async.py
tyaroshko Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions dynamiq/cache/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from functools import wraps
from typing import Any, Callable

Expand Down Expand Up @@ -82,3 +83,65 @@ def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]:
return wrapper

return _cache


def cache_wf_entity_async(
entity_id: str,
cache_enabled: bool = False,
cache_manager_cls: type[WorkflowCacheManager] = WorkflowCacheManager,
cache_config: CacheConfig | None = None,
func_kwargs_to_remove: tuple[str] = FUNC_KWARGS_TO_REMOVE,
) -> Callable:
"""Async decorator to cache workflow entity outputs.

Like cache_wf_entity but wraps an async function. Cache I/O (Redis get/set)
is offloaded to threads via asyncio.to_thread to avoid blocking the event loop.

Args:
entity_id (str): Identifier for the entity.
cache_enabled (bool): Flag to enable caching.
cache_manager_cls (type[WorkflowCacheManager]): Cache manager class.
cache_config (CacheConfig | None): Cache configuration.
func_kwargs_to_remove (tuple[str]): List of params to remove from callable function kwargs.

Returns:
Callable: Wrapped async function with caching.
"""
def _cache(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> tuple[Any, bool]:
cache_manager = None
from_cache = False
input_data = kwargs.pop("input_data", args[0] if args else {})
input_data = dict(input_data) if isinstance(input_data, BaseModel) else input_data

cleaned_kwargs = {k: v for k, v in kwargs.items() if k not in func_kwargs_to_remove}
if cache_enabled and cache_config:
logger.debug(f"Entity_id {entity_id}: async cache used")
cache_manager = cache_manager_cls(config=cache_config)
output = await asyncio.to_thread(
cache_manager.get_entity_output,
entity_id=entity_id,
input_data=input_data,
**cleaned_kwargs,
)
if output is not None:
from_cache = True
return output, from_cache

output = await func(*args, **kwargs)

if cache_manager:
await asyncio.to_thread(
cache_manager.set_entity_output,
entity_id=entity_id,
input_data=input_data,
output_data=output,
**cleaned_kwargs,
)

return output, from_cache

return wrapper

return _cache
16 changes: 9 additions & 7 deletions dynamiq/flows/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from dynamiq.connections.managers import ConnectionManager
from dynamiq.executors.base import BaseExecutor
from dynamiq.executors.context import ContextAwareThreadPoolExecutor
from dynamiq.executors.pool import ThreadExecutor
from dynamiq.flows.base import BaseFlow
from dynamiq.nodes.node import Node, NodeReadyToRun
Expand Down Expand Up @@ -360,13 +361,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar
"""
Run the flow asynchronously with the given input data and configuration.

Args:
input_data (Any): Input data for the flow.
config (RunnableConfig, optional): Configuration for the run. Defaults to None.
**kwargs: Additional keyword arguments.

Returns:
RunnableResult: Result of the flow execution.
Creates a dedicated ContextAwareThreadPoolExecutor for this flow run,
isolating it from other concurrent flow executions.
"""
self.reset_run_state()
run_id = uuid4()
Expand All @@ -375,6 +371,9 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar
"parent_run_id": kwargs.get("parent_run_id", run_id),
}

max_workers = (config.max_node_workers if config else None) or self.max_node_workers
executor = ContextAwareThreadPoolExecutor(max_workers=max_workers)

logger.info(f"Flow {self.id}: execution started.")
self.run_on_flow_start(input_data, config, **merged_kwargs)
time_start = datetime.now()
Expand All @@ -391,6 +390,7 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar
input_data=node.input_data,
depends_result=node.depends_result,
config=config,
executor=executor,
**(merged_kwargs | {"parent_run_id": run_id}),
)
for node in nodes_to_run
Expand Down Expand Up @@ -435,6 +435,8 @@ async def run_async(self, input_data: Any, config: RunnableConfig = None, **kwar
error=RunnableResultError.from_exception(e, failed_nodes=failed_nodes),
)
finally:
# wait=False is safe: all node tasks have been awaited via asyncio.gather()
executor.shutdown(wait=False)
try:
await self._cleanup_dry_run_async(config)
except Exception as e:
Expand Down
Loading
Loading