Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
320 changes: 264 additions & 56 deletions cosmoflow/src/flow/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,49 +216,93 @@ macro_rules! flow {
} $(,)?
) => {
{
#[cfg(not(feature = "async"))]
{
let mut builder = $crate::flow::FlowBuilder::<$storage>::new()
.start_node($start);
let mut builder = $crate::FlowBuilder::<$storage>::new()
.start_node($start);

$(
builder = builder.node($id, $backend);
)*

$(
builder = builder.route($from, $action, $to);
)*
$(
builder = builder.node($id, $backend);
)*

$(
builder = builder.terminal_route($term_from, $term_action);
)*
$(
builder = builder.route($from, $action, $to);
)*

builder.build()
}
$(
builder = builder.terminal_route($term_from, $term_action);
)*

#[cfg(feature = "async")]
{
let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new()
.start_node($start);
builder.build()
}
};

$(
builder = builder.node($id, $backend);
)*
// Structured syntax with only routes (no terminal routes - for backward compatibility)
(
storage: $storage:ty,
start: $start:expr,
nodes: {
$(
$id:literal : $backend:expr
),* $(,)?
},
routes: {
$(
$from:literal - $action:literal => $to:literal
),* $(,)?
} $(,)?
) => {
{
let mut builder = $crate::FlowBuilder::<$storage>::new()
.start_node($start);

$(
builder = builder.route($from, $action, $to);
)*
$(
builder = builder.node($id, $backend);
)*

$(
builder = builder.terminal_route($term_from, $term_action);
)*
$(
builder = builder.route($from, $action, $to);
)*

builder.build()
}
builder.build()
}
};
}

// Structured syntax with only routes (no terminal routes - for backward compatibility)
/// Declarative async workflow construction with explicit terminal routes and type inference.
///
/// This macro provides a clean, declarative syntax for building async CosmoFlow workflows.
/// It automatically handles node registration, routing setup, and explicit terminal route
/// configuration with the new type-safe termination system.
///
/// **Note**: This macro creates async flows. For sync flows, use `flow!` macro.
/// This macro is only available when the "async" feature is enabled.
///
/// # Syntax
///
/// Same as `flow!` macro but creates async flows instead of sync flows.
///
/// # Example
///
/// ```rust,ignore
/// #[cfg(feature = "async")]
/// let workflow = async_flow! {
/// storage: MemoryStorage,
/// start: "start",
/// nodes: {
/// "start": AsyncStartNode,
/// "end": AsyncEndNode,
/// },
/// routes: {
/// "start" - "next" => "end",
/// },
/// terminals: {
/// "end" - "complete",
/// }
/// };
/// ```
#[cfg(feature = "async")]
#[macro_export]
macro_rules! async_flow {
// Structured syntax with explicit terminal routes
(
storage: $storage:ty,
start: $start:expr,
Expand All @@ -271,40 +315,61 @@ macro_rules! flow {
$(
$from:literal - $action:literal => $to:literal
),* $(,)?
},
terminals: {
$(
$term_from:literal - $term_action:literal
),* $(,)?
} $(,)?
) => {
{
#[cfg(not(feature = "async"))]
{
let mut builder = $crate::flow::FlowBuilder::<$storage>::new()
.start_node($start);
let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new()
.start_node($start);

$(
builder = builder.node($id, $backend);
)*
$(
builder = builder.node($id, $backend);
)*

$(
builder = builder.route($from, $action, $to);
)*
$(
builder = builder.route($from, $action, $to);
)*

builder.build()
}
$(
builder = builder.terminal_route($term_from, $term_action);
)*

#[cfg(feature = "async")]
{
let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new()
.start_node($start);
builder.build()
}
};

// Structured syntax with only routes (no terminal routes - for backward compatibility)
(
storage: $storage:ty,
start: $start:expr,
nodes: {
$(
$id:literal : $backend:expr
),* $(,)?
},
routes: {
$(
$from:literal - $action:literal => $to:literal
),* $(,)?
} $(,)?
) => {
{
let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new()
.start_node($start);

$(
builder = builder.node($id, $backend);
)*
$(
builder = builder.node($id, $backend);
)*

$(
builder = builder.route($from, $action, $to);
)*
$(
builder = builder.route($from, $action, $to);
)*

builder.build()
}
builder.build()
}
};
}
Expand Down Expand Up @@ -886,5 +951,148 @@ mod tests {
vec!["check", "success_handler"]
);
}

#[tokio::test]
async fn test_async_flow_macro_with_terminal_routes_execution() {
let mut workflow = async_flow! {
storage: MemoryStorage,
start: "entry",
nodes: {
"entry": TestCustomNode::new("default"),
"process": TestCustomNode::new("continue"),
"end": TestEndNode,
},
routes: {
"entry" - "default" => "process",
"process" - "continue" => "end",
},
terminals: {
"end" - "complete",
}
};

let mut store = MemoryStorage::new();
let result = workflow.execute(&mut store).await;

assert!(result.is_ok(), "Async flow macro execution should succeed");
let execution_result = result.unwrap();
assert!(execution_result.success);
assert_eq!(execution_result.steps_executed, 3);
assert_eq!(
execution_result.execution_path,
vec!["entry", "process", "end"]
);
}

#[tokio::test]
async fn test_async_flow_macro_legacy_syntax() {
let mut workflow = async_flow! {
storage: MemoryStorage,
start: "start",
nodes: {
"start": TestCustomNode::new("next"),
"end": TestEndNode,
},
routes: {
"start" - "next" => "end",
},
terminals: {
"end" - "complete",
}
};

let mut store = MemoryStorage::new();
let result = workflow.execute(&mut store).await;

assert!(result.is_ok(), "Async flow macro legacy syntax should work");
let execution_result = result.unwrap();
assert!(execution_result.success);
assert_eq!(execution_result.steps_executed, 2);
assert_eq!(execution_result.execution_path, vec!["start", "end"]);
}

#[tokio::test]
async fn test_async_flow_macro_multiple_terminal_routes() {
let mut success_workflow = async_flow! {
storage: MemoryStorage,
start: "check",
nodes: {
"check": TestCustomNode::new("success"),
"success_handler": TestEndNode,
"error_handler": TestEndNode,
},
routes: {
"check" - "success" => "success_handler",
"check" - "error" => "error_handler",
},
terminals: {
"success_handler" - "complete",
"error_handler" - "failed",
}
};

let mut store = MemoryStorage::new();
let result = success_workflow.execute(&mut store).await;

assert!(
result.is_ok(),
"Async flow macro with multiple terminal routes should succeed"
);
let execution_result = result.unwrap();
assert!(execution_result.success);
assert_eq!(
execution_result.execution_path,
vec!["check", "success_handler"]
);
}

#[tokio::test]
async fn test_async_flow_macro_vs_regular_flow_consistency() {
// Test that async_flow! produces the same result as manual FlowBuilder
let mut async_flow_workflow = async_flow! {
storage: MemoryStorage,
start: "start",
nodes: {
"start": TestCustomNode::new("proceed"),
"middle": TestCustomNode::new("finish"),
"end": TestEndNode,
},
routes: {
"start" - "proceed" => "middle",
"middle" - "finish" => "end",
},
terminals: {
"end" - "complete",
}
};

let mut manual_workflow = crate::flow::r#async::FlowBuilder::<MemoryStorage>::new()
.start_node("start")
.node("start", TestCustomNode::new("proceed"))
.node("middle", TestCustomNode::new("finish"))
.node("end", TestEndNode)
.route("start", "proceed", "middle")
.route("middle", "finish", "end")
.terminal_route("end", "complete")
.build();

let mut store1 = MemoryStorage::new();
let mut store2 = MemoryStorage::new();

let result1 = async_flow_workflow.execute(&mut store1).await;
let result2 = manual_workflow.execute(&mut store2).await;

assert!(
result1.is_ok() && result2.is_ok(),
"Both workflows should succeed"
);

let exec1 = result1.unwrap();
let exec2 = result2.unwrap();

assert_eq!(exec1.success, exec2.success);
assert_eq!(exec1.steps_executed, exec2.steps_executed);
assert_eq!(exec1.execution_path, exec2.execution_path);
}
}
}