diff --git a/cosmoflow/src/flow/macros.rs b/cosmoflow/src/flow/macros.rs index 22dbdc3..13233f9 100644 --- a/cosmoflow/src/flow/macros.rs +++ b/cosmoflow/src/flow/macros.rs @@ -216,49 +216,93 @@ macro_rules! flow { } $(,)? ) => { { - #[cfg(not(feature = "async"))] - { - let mut builder = $crate::flow::FlowBuilder::<$storage>::new() - .start_node($start); + let mut builder = $crate::FlowBuilder::<$storage>::new() + .start_node($start); - $( - builder = builder.node($id, $backend); - )* - - $( - builder = builder.route($from, $action, $to); - )* + $( + builder = builder.node($id, $backend); + )* - $( - builder = builder.terminal_route($term_from, $term_action); - )* + $( + builder = builder.route($from, $action, $to); + )* - builder.build() - } + $( + builder = builder.terminal_route($term_from, $term_action); + )* - #[cfg(feature = "async")] - { - let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new() - .start_node($start); + builder.build() + } + }; - $( - builder = builder.node($id, $backend); - )* + // Structured syntax with only routes (no terminal routes - for backward compatibility) + ( + storage: $storage:ty, + start: $start:expr, + nodes: { + $( + $id:literal : $backend:expr + ),* $(,)? + }, + routes: { + $( + $from:literal - $action:literal => $to:literal + ),* $(,)? + } $(,)? + ) => { + { + let mut builder = $crate::FlowBuilder::<$storage>::new() + .start_node($start); - $( - builder = builder.route($from, $action, $to); - )* + $( + builder = builder.node($id, $backend); + )* - $( - builder = builder.terminal_route($term_from, $term_action); - )* + $( + builder = builder.route($from, $action, $to); + )* - builder.build() - } + builder.build() } }; +} - // Structured syntax with only routes (no terminal routes - for backward compatibility) +/// Declarative async workflow construction with explicit terminal routes and type inference. +/// +/// This macro provides a clean, declarative syntax for building async CosmoFlow workflows. +/// It automatically handles node registration, routing setup, and explicit terminal route +/// configuration with the new type-safe termination system. +/// +/// **Note**: This macro creates async flows. For sync flows, use `flow!` macro. +/// This macro is only available when the "async" feature is enabled. +/// +/// # Syntax +/// +/// Same as `flow!` macro but creates async flows instead of sync flows. +/// +/// # Example +/// +/// ```rust,ignore +/// #[cfg(feature = "async")] +/// let workflow = async_flow! { +/// storage: MemoryStorage, +/// start: "start", +/// nodes: { +/// "start": AsyncStartNode, +/// "end": AsyncEndNode, +/// }, +/// routes: { +/// "start" - "next" => "end", +/// }, +/// terminals: { +/// "end" - "complete", +/// } +/// }; +/// ``` +#[cfg(feature = "async")] +#[macro_export] +macro_rules! async_flow { + // Structured syntax with explicit terminal routes ( storage: $storage:ty, start: $start:expr, @@ -271,40 +315,61 @@ macro_rules! flow { $( $from:literal - $action:literal => $to:literal ),* $(,)? + }, + terminals: { + $( + $term_from:literal - $term_action:literal + ),* $(,)? } $(,)? ) => { { - #[cfg(not(feature = "async"))] - { - let mut builder = $crate::flow::FlowBuilder::<$storage>::new() - .start_node($start); + let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new() + .start_node($start); - $( - builder = builder.node($id, $backend); - )* + $( + builder = builder.node($id, $backend); + )* - $( - builder = builder.route($from, $action, $to); - )* + $( + builder = builder.route($from, $action, $to); + )* - builder.build() - } + $( + builder = builder.terminal_route($term_from, $term_action); + )* - #[cfg(feature = "async")] - { - let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new() - .start_node($start); + builder.build() + } + }; + + // Structured syntax with only routes (no terminal routes - for backward compatibility) + ( + storage: $storage:ty, + start: $start:expr, + nodes: { + $( + $id:literal : $backend:expr + ),* $(,)? + }, + routes: { + $( + $from:literal - $action:literal => $to:literal + ),* $(,)? + } $(,)? + ) => { + { + let mut builder = $crate::flow::r#async::FlowBuilder::<$storage>::new() + .start_node($start); - $( - builder = builder.node($id, $backend); - )* + $( + builder = builder.node($id, $backend); + )* - $( - builder = builder.route($from, $action, $to); - )* + $( + builder = builder.route($from, $action, $to); + )* - builder.build() - } + builder.build() } }; } @@ -886,5 +951,148 @@ mod tests { vec!["check", "success_handler"] ); } + + #[tokio::test] + async fn test_async_flow_macro_with_terminal_routes_execution() { + let mut workflow = async_flow! { + storage: MemoryStorage, + start: "entry", + nodes: { + "entry": TestCustomNode::new("default"), + "process": TestCustomNode::new("continue"), + "end": TestEndNode, + }, + routes: { + "entry" - "default" => "process", + "process" - "continue" => "end", + }, + terminals: { + "end" - "complete", + } + }; + + let mut store = MemoryStorage::new(); + let result = workflow.execute(&mut store).await; + + assert!(result.is_ok(), "Async flow macro execution should succeed"); + let execution_result = result.unwrap(); + assert!(execution_result.success); + assert_eq!(execution_result.steps_executed, 3); + assert_eq!( + execution_result.execution_path, + vec!["entry", "process", "end"] + ); + } + + #[tokio::test] + async fn test_async_flow_macro_legacy_syntax() { + let mut workflow = async_flow! { + storage: MemoryStorage, + start: "start", + nodes: { + "start": TestCustomNode::new("next"), + "end": TestEndNode, + }, + routes: { + "start" - "next" => "end", + }, + terminals: { + "end" - "complete", + } + }; + + let mut store = MemoryStorage::new(); + let result = workflow.execute(&mut store).await; + + assert!(result.is_ok(), "Async flow macro legacy syntax should work"); + let execution_result = result.unwrap(); + assert!(execution_result.success); + assert_eq!(execution_result.steps_executed, 2); + assert_eq!(execution_result.execution_path, vec!["start", "end"]); + } + + #[tokio::test] + async fn test_async_flow_macro_multiple_terminal_routes() { + let mut success_workflow = async_flow! { + storage: MemoryStorage, + start: "check", + nodes: { + "check": TestCustomNode::new("success"), + "success_handler": TestEndNode, + "error_handler": TestEndNode, + }, + routes: { + "check" - "success" => "success_handler", + "check" - "error" => "error_handler", + }, + terminals: { + "success_handler" - "complete", + "error_handler" - "failed", + } + }; + + let mut store = MemoryStorage::new(); + let result = success_workflow.execute(&mut store).await; + + assert!( + result.is_ok(), + "Async flow macro with multiple terminal routes should succeed" + ); + let execution_result = result.unwrap(); + assert!(execution_result.success); + assert_eq!( + execution_result.execution_path, + vec!["check", "success_handler"] + ); + } + + #[tokio::test] + async fn test_async_flow_macro_vs_regular_flow_consistency() { + // Test that async_flow! produces the same result as manual FlowBuilder + let mut async_flow_workflow = async_flow! { + storage: MemoryStorage, + start: "start", + nodes: { + "start": TestCustomNode::new("proceed"), + "middle": TestCustomNode::new("finish"), + "end": TestEndNode, + }, + routes: { + "start" - "proceed" => "middle", + "middle" - "finish" => "end", + }, + terminals: { + "end" - "complete", + } + }; + + let mut manual_workflow = crate::flow::r#async::FlowBuilder::::new() + .start_node("start") + .node("start", TestCustomNode::new("proceed")) + .node("middle", TestCustomNode::new("finish")) + .node("end", TestEndNode) + .route("start", "proceed", "middle") + .route("middle", "finish", "end") + .terminal_route("end", "complete") + .build(); + + let mut store1 = MemoryStorage::new(); + let mut store2 = MemoryStorage::new(); + + let result1 = async_flow_workflow.execute(&mut store1).await; + let result2 = manual_workflow.execute(&mut store2).await; + + assert!( + result1.is_ok() && result2.is_ok(), + "Both workflows should succeed" + ); + + let exec1 = result1.unwrap(); + let exec2 = result2.unwrap(); + + assert_eq!(exec1.success, exec2.success); + assert_eq!(exec1.steps_executed, exec2.steps_executed); + assert_eq!(exec1.execution_path, exec2.execution_path); + } } }