diff --git a/cookbook/async-workflows/README.md b/cookbook/async-workflows/README.md index 4fba55e..10a74eb 100644 --- a/cookbook/async-workflows/README.md +++ b/cookbook/async-workflows/README.md @@ -4,7 +4,7 @@ Advanced asynchronous workflow patterns using CosmoFlow with async/await. ## Examples -- **Flow Builder Example**: Demonstrates declarative workflow construction with custom action routing using conditional paths, error handling, and convergence patterns. Built with FlowBuilder for maximum compatibility. +- **Flow Builder Example**: Demonstrates declarative workflow construction with custom action routing using conditional paths, error handling, and convergence patterns. Uses CosmoFlow's built-in MemoryStorage for optimal performance. ## Features Demonstrated @@ -12,7 +12,7 @@ Advanced asynchronous workflow patterns using CosmoFlow with async/await. - **Custom Action Routing**: Named actions like "default", "error", "continue" for explicit control flow - **Decision-Based Routing**: Conditional workflow paths based on business logic - **Path Convergence**: Multiple execution paths that merge at common endpoints -- **Custom Storage Backend**: Complete implementation with JSON serialization +- **Built-in MemoryStorage**: High-performance in-memory storage with automatic JSON serialization - **Error Handling**: Comprehensive error management in async contexts ## Running Examples diff --git a/cookbook/async-workflows/src/main.rs b/cookbook/async-workflows/src/main.rs index 9901a41..aa9831d 100644 --- a/cookbook/async-workflows/src/main.rs +++ b/cookbook/async-workflows/src/main.rs @@ -39,97 +39,8 @@ //! cd cookbook/async-workflows && cargo run //! ``` -use std::collections::HashMap; - +use cosmoflow::shared_store::backends::MemoryStorage; use cosmoflow::{FlowBackend, SharedStore}; -use serde::{de::DeserializeOwned, Serialize}; - -/// A simple in-memory storage implementation for the workflow -/// -/// This storage backend provides JSON-based serialization and maintains -/// all data in memory using a HashMap. It implements the complete -/// SharedStore trait required by CosmoFlow. -#[derive(Debug, Clone)] -pub struct SimpleStorage { - /// Internal data store using JSON values for flexible data types - data: HashMap, -} - -impl SimpleStorage { - /// Creates a new empty storage instance - pub fn new() -> Self { - Self { - data: HashMap::new(), - } - } -} - -impl Default for SimpleStorage { - fn default() -> Self { - Self::new() - } -} - -impl SharedStore for SimpleStorage { - type Error = SimpleStorageError; - - fn get(&self, key: &str) -> Result, Self::Error> { - match self.data.get(key) { - Some(value) => { - let deserialized = serde_json::from_value(value.clone()) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn set(&mut self, key: String, value: T) -> Result<(), Self::Error> { - let json_value = serde_json::to_value(value) - .map_err(|e| SimpleStorageError::SerializationError(e.to_string()))?; - self.data.insert(key, json_value); - Ok(()) - } - - fn remove(&mut self, key: &str) -> Result, Self::Error> { - match self.data.remove(key) { - Some(value) => { - let deserialized = serde_json::from_value(value) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn contains_key(&self, key: &str) -> Result { - Ok(self.data.contains_key(key)) - } - - fn keys(&self) -> Result, Self::Error> { - Ok(self.data.keys().cloned().collect()) - } - - fn clear(&mut self) -> Result<(), Self::Error> { - self.data.clear(); - Ok(()) - } - - fn len(&self) -> Result { - Ok(self.data.len()) - } -} - -/// Error types for the simple storage implementation -#[derive(Debug, thiserror::Error)] -pub enum SimpleStorageError { - /// Error during JSON serialization - #[error("Serialization error: {0}")] - SerializationError(String), - /// Error during JSON deserialization - #[error("Deserialization error: {0}")] - DeserializationError(String), -} /// Decision node that evaluates conditions and chooses execution paths /// @@ -311,7 +222,7 @@ async fn main() -> Result<(), Box> { use cosmoflow::FlowBuilder; // Create a workflow using FlowBuilder with custom action routing - let mut workflow = FlowBuilder::::new() + let mut workflow = FlowBuilder::::new() .start_node("decision") .node("decision", DecisionNode) .node("success_path", SuccessNode) @@ -324,7 +235,7 @@ async fn main() -> Result<(), Box> { .terminal_route("final", "complete") .build(); - let mut store = SimpleStorage::new(); + let mut store = MemoryStorage::new(); let result = workflow.execute(&mut store).await?; println!("Workflow execution completed!"); diff --git a/cookbook/unified-workflow/Cargo.toml b/cookbook/unified-workflow/Cargo.toml index b318f79..6296359 100644 --- a/cookbook/unified-workflow/Cargo.toml +++ b/cookbook/unified-workflow/Cargo.toml @@ -2,7 +2,7 @@ name = "unified-workflow" version = "0.1.0" edition = "2021" -description = "Advanced workflow composition and unified sync/async patterns" +description = "Advanced async workflow patterns: multi-node composition, state management, and unified Node trait usage" [dependencies] cosmoflow = { workspace = true, features = ["async", "storage-memory"] } @@ -12,3 +12,15 @@ serde_json = { workspace = true } async-trait = { workspace = true } rand = "0.8" thiserror = { workspace = true } + +[[bin]] +name = "unified-workflow" +path = "src/main.rs" + +[[bin]] +name = "unified_counter" +path = "src/unified_counter.rs" + +[[bin]] +name = "unified_shared_store" +path = "src/unified_shared_store.rs" diff --git a/cookbook/unified-workflow/README.md b/cookbook/unified-workflow/README.md new file mode 100644 index 0000000..fc6e683 --- /dev/null +++ b/cookbook/unified-workflow/README.md @@ -0,0 +1,20 @@ +# Unified Workflow Patterns + +This cookbook demonstrates advanced async workflow patterns in CosmoFlow, focusing on the unified Node trait and complex workflow composition. + +## Running the Examples + +### Basic Unified Node Pattern +```bash +cargo run --bin unified-workflow +``` + +### Multi-Node Counter Workflow +```bash +cargo run --bin unified_counter +``` + +### SharedStore Operations Demo +```bash +cargo run --bin unified_shared_store +``` diff --git a/cookbook/unified-workflow/src/flow_composition_sync.rs b/cookbook/unified-workflow/src/flow_composition_sync.rs deleted file mode 100644 index 8ad363c..0000000 --- a/cookbook/unified-workflow/src/flow_composition_sync.rs +++ /dev/null @@ -1,539 +0,0 @@ -//! Flow Composition Example - CosmoFlow Sync Version -//! -//! This example demonstrates flow composition in CosmoFlow workflows using -//! synchronous execution for faster compilation. -//! -//! ## Workflow Behavior -//! - **Decision Node**: Evaluates conditions and chooses between success/error paths -//! - **Success Path**: Handles positive outcomes and continues to final processing -//! - **Error Path**: Handles negative outcomes and also continues to final processing -//! - **Final Node**: Convergence point where both paths complete the workflow -//! - **Manual Orchestration**: Explicit control flow without macro complexity -//! -//! ## Advanced Features Demonstrated -//! - **Sync Node Composition**: Multiple nodes working together without async -//! - **Decision-Based Routing**: Nodes that choose different execution paths based on logic -//! - **Path Convergence**: Multiple execution paths that merge at a common endpoint -//! - **Built-in Storage Backend**: Uses CosmoFlow's MemoryStorage for simplicity -//! - **Manual Flow Control**: Explicit action handling and routing logic -//! - **Performance Optimization**: No async overhead for CPU-intensive decision logic -//! -//! ## Performance Benefits -//! - Faster compilation compared to async flow macro version -//! - Smaller binary size (no async runtime overhead) -//! - Perfect for CPU-intensive decision workflows -//! - Explicit control flow for better debugging -//! -//! ## Execution Flow -//! 1. Decision node evaluates business logic and selects execution path -//! 2. Success path processes positive outcomes OR Error path processes negative outcomes -//! 3. Both paths converge at the final node for completion -//! 4. Manual orchestration handles all routing decisions -//! -//! This example is perfect for understanding sync node composition and manual flow control. -//! -//! To run this example: -//! ```bash -//! cargo run --bin flow_composition_sync --no-default-features --features cosmoflow/storage-memory -//! ``` - -#[cfg(not(feature = "async"))] -use cosmoflow::{ - Node, - action::Action, - node::{ExecutionContext, NodeError}, - shared_store::SharedStore, - shared_store::backends::MemoryStorage, -}; -#[cfg(not(feature = "async"))] -use rand::Rng; - -/// Decision node that evaluates conditions and chooses execution paths (sync version) -/// -/// This node demonstrates conditional routing by analyzing business logic -/// and returning different actions based on the evaluation results. -#[cfg(not(feature = "async"))] -struct DecisionNode { - decision_criteria: f64, -} - -#[cfg(not(feature = "async"))] -impl DecisionNode { - fn new(criteria: f64) -> Self { - Self { - decision_criteria: criteria, - } - } -} - -#[cfg(not(feature = "async"))] -impl Node for DecisionNode { - type PrepResult = f64; - type ExecResult = bool; - type Error = NodeError; - - fn name(&self) -> &str { - "DecisionNode" - } - - fn prep( - &mut self, - _store: &MemoryStorage, - context: &ExecutionContext, - ) -> Result { - println!( - "šŸ”„ [PREP] DecisionNode (exec_id: {}) preparing evaluation...", - context.execution_id() - ); - - // Generate a random value for decision making - let mut rng = rand::thread_rng(); - let random_value: f64 = rng.gen_range(0.0..1.0); - - println!( - " Decision criteria: {:.2}, Random value: {:.2}", - self.decision_criteria, random_value - ); - - Ok(random_value) - } - - fn exec( - &mut self, - prep_result: Self::PrepResult, - _context: &ExecutionContext, - ) -> Result { - println!( - "⚔ [EXEC] DecisionNode evaluating: {:.2} > {:.2}?", - prep_result, self.decision_criteria - ); - - // Simulate some decision computation - std::thread::sleep(std::time::Duration::from_millis(10)); - - let decision = prep_result > self.decision_criteria; - println!( - " Decision result: {}", - if decision { "SUCCESS" } else { "ERROR" } - ); - - Ok(decision) - } - - fn post( - &mut self, - store: &mut MemoryStorage, - prep_result: Self::PrepResult, - exec_result: Self::ExecResult, - _context: &ExecutionContext, - ) -> Result { - // Store decision details for later analysis - store - .set("decision_value".to_string(), prep_result) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - store - .set("decision_result".to_string(), exec_result) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - if exec_result { - println!("āœ… [POST] DecisionNode: Routing to SUCCESS path"); - Ok(Action::simple("success")) - } else { - println!("āœ… [POST] DecisionNode: Routing to ERROR path"); - Ok(Action::simple("error")) - } - } -} - -/// Success path node that handles positive outcomes (sync version) -/// -/// This node processes successful scenarios and continues the workflow -/// toward the final convergence point. -#[cfg(not(feature = "async"))] -struct SuccessNode { - success_count: usize, -} - -#[cfg(not(feature = "async"))] -impl SuccessNode { - fn new() -> Self { - Self { success_count: 0 } - } -} - -#[cfg(not(feature = "async"))] -impl Node for SuccessNode { - type PrepResult = (); - type ExecResult = String; - type Error = NodeError; - - fn name(&self) -> &str { - "SuccessNode" - } - - fn prep( - &mut self, - store: &MemoryStorage, - context: &ExecutionContext, - ) -> Result { - println!( - "šŸ”„ [PREP] SuccessNode (exec_id: {}) handling positive outcome...", - context.execution_id() - ); - - // Read decision details from storage - if let Ok(Some(decision_value)) = store.get::("decision_value") { - println!(" Decision value was: {:.2}", decision_value); - } - - Ok(()) - } - - fn exec( - &mut self, - _prep_result: Self::PrepResult, - _context: &ExecutionContext, - ) -> Result { - println!("⚔ [EXEC] SuccessNode processing successful scenario..."); - - self.success_count += 1; - - // Simulate success processing - std::thread::sleep(std::time::Duration::from_millis(15)); - - let success_message = format!( - "šŸŽ‰ Success processing completed! (count: {})", - self.success_count - ); - println!(" {}", success_message); - - Ok(success_message) - } - - fn post( - &mut self, - store: &mut MemoryStorage, - _prep_result: Self::PrepResult, - exec_result: Self::ExecResult, - _context: &ExecutionContext, - ) -> Result { - println!("āœ… [POST] SuccessNode storing result and continuing..."); - - // Store success result - store - .set("success_message".to_string(), exec_result) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - store - .set("path_taken".to_string(), "success".to_string()) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - Ok(Action::simple("continue")) - } -} - -/// Error path node that handles negative outcomes (sync version) -/// -/// This node processes error scenarios and provides an alternative -/// execution path that also leads to the final convergence point. -#[cfg(not(feature = "async"))] -struct ErrorNode { - error_count: usize, -} - -#[cfg(not(feature = "async"))] -impl ErrorNode { - fn new() -> Self { - Self { error_count: 0 } - } -} - -#[cfg(not(feature = "async"))] -impl Node for ErrorNode { - type PrepResult = (); - type ExecResult = String; - type Error = NodeError; - - fn name(&self) -> &str { - "ErrorNode" - } - - fn prep( - &mut self, - store: &MemoryStorage, - context: &ExecutionContext, - ) -> Result { - println!( - "šŸ”„ [PREP] ErrorNode (exec_id: {}) handling negative outcome...", - context.execution_id() - ); - - // Read decision details from storage - if let Ok(Some(decision_value)) = store.get::("decision_value") { - println!(" Decision value was: {:.2}", decision_value); - } - - Ok(()) - } - - fn exec( - &mut self, - _prep_result: Self::PrepResult, - _context: &ExecutionContext, - ) -> Result { - println!("⚔ [EXEC] ErrorNode processing error scenario..."); - - self.error_count += 1; - - // Simulate error handling - std::thread::sleep(std::time::Duration::from_millis(12)); - - let error_message = format!("āš ļø Error handled gracefully! (count: {})", self.error_count); - println!(" {}", error_message); - - Ok(error_message) - } - - fn post( - &mut self, - store: &mut MemoryStorage, - _prep_result: Self::PrepResult, - exec_result: Self::ExecResult, - _context: &ExecutionContext, - ) -> Result { - println!("āœ… [POST] ErrorNode storing result and continuing..."); - - // Store error result - store - .set("error_message".to_string(), exec_result) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - store - .set("path_taken".to_string(), "error".to_string()) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - Ok(Action::simple("continue")) - } -} - -/// Final convergence node where all execution paths complete (sync version) -/// -/// This node serves as the endpoint for both success and error paths, -/// demonstrating how different workflow branches can converge. -#[cfg(not(feature = "async"))] -struct FinalNode; - -#[cfg(not(feature = "async"))] -impl Node for FinalNode { - type PrepResult = (String, Option, Option); - type ExecResult = String; - type Error = NodeError; - - fn name(&self) -> &str { - "FinalNode" - } - - fn prep( - &mut self, - store: &MemoryStorage, - context: &ExecutionContext, - ) -> Result { - println!( - "šŸ”„ [PREP] FinalNode (exec_id: {}) gathering results from all paths...", - context.execution_id() - ); - - let path_taken: String = store - .get("path_taken") - .map_err(|e| NodeError::StorageError(e.to_string()))? - .unwrap_or_else(|| "unknown".to_string()); - - let success_message: Option = store.get("success_message").ok().flatten(); - - let error_message: Option = store.get("error_message").ok().flatten(); - - println!(" Path taken: {}", path_taken); - - Ok((path_taken, success_message, error_message)) - } - - fn exec( - &mut self, - (path_taken, success_msg, error_msg): Self::PrepResult, - _context: &ExecutionContext, - ) -> Result { - println!("⚔ [EXEC] FinalNode creating workflow summary..."); - - // Simulate final processing - std::thread::sleep(std::time::Duration::from_millis(8)); - - let mut summary = String::new(); - summary.push_str("šŸŽÆ WORKFLOW SUMMARY\n"); - summary.push_str("==================\n"); - summary.push_str(&format!("Path taken: {}\n", path_taken)); - - if let Some(msg) = success_msg { - summary.push_str(&format!("Success result: {}\n", msg)); - } - - if let Some(msg) = error_msg { - summary.push_str(&format!("Error result: {}\n", msg)); - } - - summary.push_str("Workflow completed successfully!"); - - println!(" Summary generated"); - Ok(summary) - } - - fn post( - &mut self, - store: &mut MemoryStorage, - _prep_result: Self::PrepResult, - exec_result: Self::ExecResult, - _context: &ExecutionContext, - ) -> Result { - println!("āœ… [POST] FinalNode workflow completed!"); - - // Store final summary - store - .set("workflow_summary".to_string(), exec_result.clone()) - .map_err(|e| NodeError::StorageError(e.to_string()))?; - - println!("\n{}", exec_result); - - Ok(Action::simple("complete")) - } -} - -/// Manual workflow orchestrator that handles routing between nodes -#[cfg(not(feature = "async"))] -struct WorkflowOrchestrator { - decision_node: DecisionNode, - success_node: SuccessNode, - error_node: ErrorNode, - final_node: FinalNode, -} - -#[cfg(not(feature = "async"))] -impl WorkflowOrchestrator { - fn new(decision_threshold: f64) -> Self { - Self { - decision_node: DecisionNode::new(decision_threshold), - success_node: SuccessNode::new(), - error_node: ErrorNode::new(), - final_node: FinalNode, - } - } - - fn execute(&mut self, store: &mut MemoryStorage) -> Result<(), Box> { - println!("šŸŽ® Starting manual workflow orchestration..."); - println!("============================================\n"); - - // Step 1: Execute decision node - println!("1ļøāƒ£ Executing Decision Phase:"); - let decision_action = self.decision_node.run(store)?; - println!(" Decision Action: {}\n", decision_action.name()); - - // Step 2: Route based on decision - let path_action = match decision_action.name().as_str() { - "success" => { - println!("2ļøāƒ£ Executing Success Path:"); - let action = self.success_node.run(store)?; - println!(" Success Action: {}\n", action.name()); - action - } - "error" => { - println!("2ļøāƒ£ Executing Error Path:"); - let action = self.error_node.run(store)?; - println!(" Error Action: {}\n", action.name()); - action - } - _ => { - return Err( - format!("Unexpected decision action: {}", decision_action.name()).into(), - ); - } - }; - - // Step 3: Execute final convergence if appropriate - if path_action.name() == "continue" { - println!("3ļøāƒ£ Executing Final Convergence:"); - let final_action = self.final_node.run(store)?; - println!(" Final Action: {}\n", final_action.name()); - } - - Ok(()) - } -} - -/// Main function demonstrating sync node composition with manual orchestration -#[cfg(all(feature = "sync", not(feature = "async")))] -#[cfg(not(feature = "async"))] -fn sync_main() -> Result<(), Box> { - println!("šŸš€ CosmoFlow Flow Composition (Sync Version)"); - println!("============================================="); - println!("šŸ“¦ Manual node orchestration without async complexity!\n"); - - // Create shared storage - let mut store = MemoryStorage::new(); - - // Create workflow orchestrator with decision threshold - let mut orchestrator = WorkflowOrchestrator::new(0.5); - - // Execute the workflow - orchestrator.execute(&mut store)?; - - // Display final storage contents - println!("šŸ“Š Final Storage Contents:"); - println!("=========================="); - - if let Ok(Some(decision_result)) = store.get::("decision_result") { - println!("Decision Result: {}", decision_result); - } - - if let Ok(Some(path_taken)) = store.get::("path_taken") { - println!("Path Taken: {}", path_taken); - } - - if let Ok(Some(summary)) = store.get::("workflow_summary") { - println!("\nWorkflow Summary:"); - println!("{}", summary); - } - - println!("\nšŸŽÆ Sync Composition Benefits:"); - println!("============================="); - println!("• ⚔ Faster compilation than async flow macro"); - println!("• šŸ“¦ Smaller binary size (no async runtime)"); - println!("• šŸŽÆ Perfect for CPU-intensive decision logic"); - println!("• šŸ”§ Explicit control flow for easier debugging"); - println!("• šŸš€ No async overhead for decision workflows"); - println!("• šŸ’” Manual orchestration for maximum control"); - - println!("\nšŸ’” Note: This example demonstrates manual node composition"); - println!(" without relying on the flow macro or async features."); - println!(" Each routing decision is handled explicitly for"); - println!(" maximum performance and control."); - - Ok(()) -} - -fn main() { - #[cfg(not(feature = "async"))] - { - if let Err(e) = sync_main() { - eprintln!("Error running sync flow composition example: {}", e); - std::process::exit(1); - } - } - - #[cfg(feature = "async")] - { - println!("This sync example is not available when async features are enabled."); - println!("To run this example, use:"); - println!( - "cargo run --bin flow_composition_sync --no-default-features --features cosmoflow/storage-memory" - ); - } -} diff --git a/cookbook/unified-workflow/src/main.rs b/cookbook/unified-workflow/src/main.rs index ec63384..01d7b3d 100644 --- a/cookbook/unified-workflow/src/main.rs +++ b/cookbook/unified-workflow/src/main.rs @@ -19,145 +19,13 @@ //! Run with: `cargo run --bin unified_hello_world` use async_trait::async_trait; -use cosmoflow::prelude::*; -use cosmoflow::shared_store::SharedStore; -use serde::{de::DeserializeOwned, Serialize}; -use std::{collections::HashMap, time::Duration}; - -/// A simple in-memory storage implementation for demonstration purposes. -/// -/// This storage backend uses a HashMap to store JSON values and provides -/// all the required operations for CosmoFlow workflows. It demonstrates -/// how to implement a custom storage backend from scratch. -/// -/// ## Features: -/// - JSON serialization/deserialization for type safety -/// - Error handling for serialization failures -/// - Complete SharedStore trait implementation -/// - Thread-safe operations (when wrapped appropriately) -#[derive(Debug, Clone)] -pub struct SimpleStorage { - /// Internal data store using JSON values for flexibility - data: HashMap, -} - -impl SimpleStorage { - /// Creates a new empty storage instance. - /// - /// This initializes the internal HashMap that will store all data - /// as JSON values, allowing for flexible type storage and retrieval. - pub fn new() -> Self { - Self { - data: HashMap::new(), - } - } -} - -impl Default for SimpleStorage { - fn default() -> Self { - Self::new() - } -} - -impl SharedStore for SimpleStorage { - type Error = SimpleStorageError; - - /// Retrieves a value from storage and deserializes it to the requested type. - /// - /// # Arguments - /// * `key` - The key to look up in storage - /// - /// # Returns - /// * `Ok(Some(T))` - If the key exists and can be deserialized to type T - /// * `Ok(None)` - If the key doesn't exist - /// * `Err(SimpleStorageError)` - If deserialization fails - fn get(&self, key: &str) -> Result, Self::Error> { - match self.data.get(key) { - Some(value) => { - let deserialized = serde_json::from_value(value.clone()) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - /// Stores a value in the storage after serializing it to JSON. - /// - /// # Arguments - /// * `key` - The key to store the value under - /// * `value` - The value to store (must be serializable) - /// - /// # Returns - /// * `Ok(())` - If the value was successfully stored - /// * `Err(SimpleStorageError)` - If serialization fails - fn set(&mut self, key: String, value: T) -> Result<(), Self::Error> { - let json_value = serde_json::to_value(value) - .map_err(|e| SimpleStorageError::SerializationError(e.to_string()))?; - self.data.insert(key, json_value); - Ok(()) - } - - /// Removes a value from storage and returns it if it existed. - /// - /// # Arguments - /// * `key` - The key to remove from storage - /// - /// # Returns - /// * `Ok(Some(T))` - If the key existed and could be deserialized - /// * `Ok(None)` - If the key didn't exist - /// * `Err(SimpleStorageError)` - If deserialization fails - fn remove(&mut self, key: &str) -> Result, Self::Error> { - match self.data.remove(key) { - Some(value) => { - let deserialized = serde_json::from_value(value) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - /// Checks if a key exists in storage. - fn contains_key(&self, key: &str) -> Result { - Ok(self.data.contains_key(key)) - } - - /// Returns all keys currently stored. - fn keys(&self) -> Result, Self::Error> { - Ok(self.data.keys().cloned().collect()) - } - - /// Clears all data from storage. - fn clear(&mut self) -> Result<(), Self::Error> { - self.data.clear(); - Ok(()) - } - - /// Returns the number of items in storage. - fn len(&self) -> Result { - Ok(self.data.len()) - } - - /// Checks if storage is empty. - fn is_empty(&self) -> Result { - Ok(self.data.is_empty()) - } -} - -/// Error types for SimpleStorage operations. -/// -/// This enum covers the two main categories of errors that can occur -/// when working with JSON serialization/deserialization in storage operations. -#[derive(Debug, thiserror::Error)] -pub enum SimpleStorageError { - /// Error occurred during serialization to JSON - #[error("Serialization error: {0}")] - SerializationError(String), - /// Error occurred during deserialization from JSON - #[error("Deserialization error: {0}")] - DeserializationError(String), -} +use cosmoflow::{ + action::Action, + node::{ExecutionContext, NodeError}, + shared_store::{backends::MemoryStorage, SharedStore}, + FlowBackend, FlowBuilder, Node, +}; +use std::time::Duration; /// A simple greeting node that demonstrates the unified Node trait. /// @@ -189,7 +57,7 @@ impl HelloNode { } #[async_trait] -impl Node for HelloNode { +impl Node for HelloNode { /// Preparation phase returns unit type (no data needed for execution) type PrepResult = (); /// Execution phase returns the generated greeting string @@ -211,7 +79,7 @@ impl Node for HelloNode { /// * `Ok(())` - Always succeeds in this example async fn prep( &mut self, - _store: &SimpleStorage, + _store: &MemoryStorage, _context: &ExecutionContext, ) -> Result { println!("Preparing HelloNode..."); @@ -259,7 +127,7 @@ impl Node for HelloNode { /// * `Err(NodeError)` - If storage operations fail async fn post( &mut self, - store: &mut SimpleStorage, + store: &mut MemoryStorage, _prep_result: Self::PrepResult, exec_result: Self::ExecResult, _context: &ExecutionContext, @@ -311,8 +179,8 @@ async fn main() -> Result<(), Box> { println!("šŸš€ CosmoFlow Unified Node Trait Example"); println!("========================================"); - // Create shared storage with our custom SimpleStorage backend - let mut store = SimpleStorage::new(); + // Create shared storage with MemoryStorage backend + let mut store = MemoryStorage::new(); // Build the workflow using the new unified API // - start_with() creates the starting node and sets it as the entry point diff --git a/cookbook/unified-workflow/src/unified_counter.rs b/cookbook/unified-workflow/src/unified_counter.rs index 47da740..40663f3 100644 --- a/cookbook/unified-workflow/src/unified_counter.rs +++ b/cookbook/unified-workflow/src/unified_counter.rs @@ -19,110 +19,13 @@ //! //! Run with: `cargo run --bin unified_counter` -use std::collections::HashMap; - use async_trait::async_trait; -use cosmoflow::{Action, ExecutionContext, FlowBackend, FlowBuilder, Node, NodeError, SharedStore}; -use serde::{Serialize, de::DeserializeOwned}; - -/// A simple in-memory storage implementation for demonstration purposes. -/// -/// This storage backend uses a HashMap to store JSON values and provides -/// all the required operations for CosmoFlow workflows. It's identical to -/// the one in unified_hello_world.rs but duplicated here for independence. -/// -/// ## Features: -/// - JSON serialization/deserialization for type safety -/// - Error handling for serialization failures -/// - Complete SharedStore trait implementation -/// - Thread-safe operations (when wrapped appropriately) -#[derive(Debug, Clone)] -pub struct SimpleStorage { - /// Internal data store using JSON values for flexibility - data: HashMap, -} - -impl SimpleStorage { - /// Creates a new empty storage instance. - /// - /// This initializes the internal HashMap that will store all data - /// as JSON values, allowing for flexible type storage and retrieval. - pub fn new() -> Self { - Self { - data: HashMap::new(), - } - } -} - -impl Default for SimpleStorage { - fn default() -> Self { - Self::new() - } -} - -impl SharedStore for SimpleStorage { - type Error = SimpleStorageError; - - fn get(&self, key: &str) -> Result, Self::Error> { - match self.data.get(key) { - Some(value) => { - let deserialized = serde_json::from_value(value.clone()) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn set(&mut self, key: String, value: T) -> Result<(), Self::Error> { - let json_value = serde_json::to_value(value) - .map_err(|e| SimpleStorageError::SerializationError(e.to_string()))?; - self.data.insert(key, json_value); - Ok(()) - } - - fn remove(&mut self, key: &str) -> Result, Self::Error> { - match self.data.remove(key) { - Some(value) => { - let deserialized = serde_json::from_value(value) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn contains_key(&self, key: &str) -> Result { - Ok(self.data.contains_key(key)) - } - - fn keys(&self) -> Result, Self::Error> { - Ok(self.data.keys().cloned().collect()) - } - - fn clear(&mut self) -> Result<(), Self::Error> { - self.data.clear(); - Ok(()) - } - - fn len(&self) -> Result { - Ok(self.data.len()) - } -} - -/// Error types for SimpleStorage operations. -/// -/// This enum covers the two main categories of errors that can occur -/// when working with JSON serialization/deserialization in storage operations. -#[derive(Debug, thiserror::Error)] -pub enum SimpleStorageError { - /// Error occurred during serialization to JSON - #[error("Serialization error: {0}")] - SerializationError(String), - /// Error occurred during deserialization from JSON - #[error("Deserialization error: {0}")] - DeserializationError(String), -} +use cosmoflow::{ + action::Action, + node::{ExecutionContext, NodeError}, + shared_store::{backends::MemoryStorage, SharedStore}, + FlowBackend, FlowBuilder, Node, +}; /// A counter node that demonstrates stateful computation and conditional routing. /// @@ -280,8 +183,8 @@ async fn main() -> Result<(), Box> { println!("šŸš€ CosmoFlow Unified Node Trait Example"); println!("========================================"); - // Create shared storage with our custom SimpleStorage backend - let mut store = SimpleStorage::new(); + // Create shared storage with MemoryStorage backend + let mut store = MemoryStorage::new(); // Build a multi-node workflow with sequential counter processing // Each counter node will: diff --git a/cookbook/unified-workflow/src/unified_shared_store.rs b/cookbook/unified-workflow/src/unified_shared_store.rs index cfdc5b4..9803230 100644 --- a/cookbook/unified-workflow/src/unified_shared_store.rs +++ b/cookbook/unified-workflow/src/unified_shared_store.rs @@ -1,85 +1,5 @@ -use std::collections::HashMap; - -use cosmoflow::shared_store::SharedStore; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; - -/// A simple in-memory storage implementation -#[derive(Debug, Clone)] -pub struct SimpleStorage { - data: HashMap, -} - -impl SimpleStorage { - pub fn new() -> Self { - Self { - data: HashMap::new(), - } - } -} - -impl Default for SimpleStorage { - fn default() -> Self { - Self::new() - } -} - -impl SharedStore for SimpleStorage { - type Error = SimpleStorageError; - - fn get(&self, key: &str) -> Result, Self::Error> { - match self.data.get(key) { - Some(value) => { - let deserialized = serde_json::from_value(value.clone()) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn set(&mut self, key: String, value: T) -> Result<(), Self::Error> { - let json_value = serde_json::to_value(value) - .map_err(|e| SimpleStorageError::SerializationError(e.to_string()))?; - self.data.insert(key, json_value); - Ok(()) - } - - fn remove(&mut self, key: &str) -> Result, Self::Error> { - match self.data.remove(key) { - Some(value) => { - let deserialized = serde_json::from_value(value) - .map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?; - Ok(Some(deserialized)) - } - None => Ok(None), - } - } - - fn contains_key(&self, key: &str) -> Result { - Ok(self.data.contains_key(key)) - } - - fn keys(&self) -> Result, Self::Error> { - Ok(self.data.keys().cloned().collect()) - } - - fn clear(&mut self) -> Result<(), Self::Error> { - self.data.clear(); - Ok(()) - } - - fn len(&self) -> Result { - Ok(self.data.len()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum SimpleStorageError { - #[error("Serialization error: {0}")] - SerializationError(String), - #[error("Deserialization error: {0}")] - DeserializationError(String), -} +use cosmoflow::shared_store::{backends::MemoryStorage, SharedStore}; +use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Debug, PartialEq)] struct UserData { @@ -91,8 +11,8 @@ struct UserData { fn main() -> Result<(), Box> { println!("=== CosmoFlow: Unified SharedStore Trait Demo ===\n"); - // Create a memory storage that directly implements SharedStore trait - let mut store = SimpleStorage::new(); + // Create shared storage with MemoryStorage backend + let mut store = MemoryStorage::new(); // Store different types of data println!("1. Storing data using the unified SharedStore trait:"); diff --git a/cosmoflow/Cargo.toml b/cosmoflow/Cargo.toml index 35c77fd..933b20d 100644 --- a/cosmoflow/Cargo.toml +++ b/cosmoflow/Cargo.toml @@ -24,7 +24,7 @@ tempfile = { workspace = true } tokio-test = { workspace = true } [features] -default = [] +default = ["basic"] # Async support async = ["async-trait", "tokio"] @@ -36,7 +36,10 @@ storage-redis = ["redis"] storage-full = ["storage-memory", "storage-file", "storage-redis"] # Convenience feature combinations -minimal = [] -basic = ["storage-memory"] -standard = ["storage-memory", "async"] -full = ["storage-full", "async"] +minimal = [] # Core engine only (bring your own storage) +basic = ["storage-memory"] # Basic usable configuration with memory storage +standard = [ + "storage-memory", + "async", +] # Standard configuration with async support +full = ["storage-full", "async"] # Full feature set with all storage backends diff --git a/cosmoflow/src/lib.rs b/cosmoflow/src/lib.rs index 374ddca..4984f85 100644 --- a/cosmoflow/src/lib.rs +++ b/cosmoflow/src/lib.rs @@ -111,8 +111,8 @@ //! //! ### Convenience Features //! -//! * `minimal`: Just the core engine (bring your own storage) - sync only. -//! * `basic`: Core + memory storage - sync only. +//! * `minimal`: Core engine only (bring your own storage). +//! * `basic`: Basic usable configuration with memory storage. //! * `standard`: Core + memory storage + async support. //! * `full`: All storage backends + async support enabled. //!