Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cookbook/async-workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ Advanced asynchronous workflow patterns using CosmoFlow with async/await.

## Examples

- **Flow Builder Example**: Demonstrates declarative workflow construction with custom action routing using conditional paths, error handling, and convergence patterns. Built with FlowBuilder for maximum compatibility.
- **Flow Builder Example**: Demonstrates declarative workflow construction with custom action routing using conditional paths, error handling, and convergence patterns. Uses CosmoFlow's built-in MemoryStorage for optimal performance.

## Features Demonstrated

- **Async Node Implementation**: Full async/await support with tokio
- **Custom Action Routing**: Named actions like "default", "error", "continue" for explicit control flow
- **Decision-Based Routing**: Conditional workflow paths based on business logic
- **Path Convergence**: Multiple execution paths that merge at common endpoints
- **Custom Storage Backend**: Complete implementation with JSON serialization
- **Built-in MemoryStorage**: High-performance in-memory storage with automatic JSON serialization
- **Error Handling**: Comprehensive error management in async contexts

## Running Examples
Expand Down
95 changes: 3 additions & 92 deletions cookbook/async-workflows/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,97 +39,8 @@
//! cd cookbook/async-workflows && cargo run
//! ```

use std::collections::HashMap;

use cosmoflow::shared_store::backends::MemoryStorage;
use cosmoflow::{FlowBackend, SharedStore};
use serde::{de::DeserializeOwned, Serialize};

/// A simple in-memory storage implementation for the workflow
///
/// This storage backend provides JSON-based serialization and maintains
/// all data in memory using a HashMap. It implements the complete
/// SharedStore trait required by CosmoFlow.
#[derive(Debug, Clone)]
pub struct SimpleStorage {
/// Internal data store using JSON values for flexible data types
data: HashMap<String, serde_json::Value>,
}

impl SimpleStorage {
/// Creates a new empty storage instance
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
}

impl Default for SimpleStorage {
fn default() -> Self {
Self::new()
}
}

impl SharedStore for SimpleStorage {
type Error = SimpleStorageError;

fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, Self::Error> {
match self.data.get(key) {
Some(value) => {
let deserialized = serde_json::from_value(value.clone())
.map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?;
Ok(Some(deserialized))
}
None => Ok(None),
}
}

fn set<T: Serialize>(&mut self, key: String, value: T) -> Result<(), Self::Error> {
let json_value = serde_json::to_value(value)
.map_err(|e| SimpleStorageError::SerializationError(e.to_string()))?;
self.data.insert(key, json_value);
Ok(())
}

fn remove<T: DeserializeOwned>(&mut self, key: &str) -> Result<Option<T>, Self::Error> {
match self.data.remove(key) {
Some(value) => {
let deserialized = serde_json::from_value(value)
.map_err(|e| SimpleStorageError::DeserializationError(e.to_string()))?;
Ok(Some(deserialized))
}
None => Ok(None),
}
}

fn contains_key(&self, key: &str) -> Result<bool, Self::Error> {
Ok(self.data.contains_key(key))
}

fn keys(&self) -> Result<Vec<String>, Self::Error> {
Ok(self.data.keys().cloned().collect())
}

fn clear(&mut self) -> Result<(), Self::Error> {
self.data.clear();
Ok(())
}

fn len(&self) -> Result<usize, Self::Error> {
Ok(self.data.len())
}
}

/// Error types for the simple storage implementation
#[derive(Debug, thiserror::Error)]
pub enum SimpleStorageError {
/// Error during JSON serialization
#[error("Serialization error: {0}")]
SerializationError(String),
/// Error during JSON deserialization
#[error("Deserialization error: {0}")]
DeserializationError(String),
}

/// Decision node that evaluates conditions and chooses execution paths
///
Expand Down Expand Up @@ -311,7 +222,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
use cosmoflow::FlowBuilder;

// Create a workflow using FlowBuilder with custom action routing
let mut workflow = FlowBuilder::<SimpleStorage>::new()
let mut workflow = FlowBuilder::<MemoryStorage>::new()
.start_node("decision")
.node("decision", DecisionNode)
.node("success_path", SuccessNode)
Expand All @@ -324,7 +235,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.terminal_route("final", "complete")
.build();

let mut store = SimpleStorage::new();
let mut store = MemoryStorage::new();
let result = workflow.execute(&mut store).await?;

println!("Workflow execution completed!");
Expand Down
14 changes: 13 additions & 1 deletion cookbook/unified-workflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "unified-workflow"
version = "0.1.0"
edition = "2021"
description = "Advanced workflow composition and unified sync/async patterns"
description = "Advanced async workflow patterns: multi-node composition, state management, and unified Node trait usage"

[dependencies]
cosmoflow = { workspace = true, features = ["async", "storage-memory"] }
Expand All @@ -12,3 +12,15 @@ serde_json = { workspace = true }
async-trait = { workspace = true }
rand = "0.8"
thiserror = { workspace = true }

[[bin]]
name = "unified-workflow"
path = "src/main.rs"

[[bin]]
name = "unified_counter"
path = "src/unified_counter.rs"

[[bin]]
name = "unified_shared_store"
path = "src/unified_shared_store.rs"
20 changes: 20 additions & 0 deletions cookbook/unified-workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Unified Workflow Patterns

This cookbook demonstrates advanced async workflow patterns in CosmoFlow, focusing on the unified Node trait and complex workflow composition.

## Running the Examples

### Basic Unified Node Pattern
```bash
cargo run --bin unified-workflow
```

### Multi-Node Counter Workflow
```bash
cargo run --bin unified_counter
```

### SharedStore Operations Demo
```bash
cargo run --bin unified_shared_store
```
Loading
Loading