Composable, async, stream-first computation in pure Rust
Build fully composable, async data pipelines using a fluent API.
StreamWeave is a general-purpose Rust framework built around the concept of streaming data, with a focus on simplicity, composability, and performance.
High-Performance Streaming: Process 2-6 million messages per second with in-process zero-copy execution. Perfect for high-throughput data processing pipelines.
- Pure Rust API with zero-cost abstractions
- Full async/await compatibility via
futures::Stream - Fluent pipeline-style API with type-safe builder pattern
- Graph-based API for complex topologies with fan-in/fan-out patterns
- Flow-Based Programming (FBP) patterns with type-safe routing
- Comprehensive error handling system with multiple strategies
- Code-as-configuration β no external DSLs
- Extensive package ecosystem for I/O, transformations, and integrations
StreamWeave breaks computation into three primary building blocks:
| Component | Description |
|---|---|
| Producer | Starts a stream of data |
| Transformer | Transforms stream items (e.g., map/filter) |
| Consumer | Consumes the stream, e.g. writing, logging |
All components can be chained together fluently. These components can be used in both the Pipeline API (for simple linear flows) and the Graph API (for complex topologies with fan-in/fan-out patterns).
StreamWeave provides two APIs for building data processing workflows:
| Feature | Pipeline API | Graph API |
|---|---|---|
| Use Case | Simple linear flows | Complex topologies |
| Topology | Single path: Producer β Transformer β Consumer | Multiple paths, fan-in/fan-out |
| Routing | Sequential processing | Configurable routing strategies |
| Complexity | Lower complexity, easier to use | Higher flexibility, more powerful |
| Best For | ETL pipelines, simple transformations | Complex workflows, parallel processing, data distribution |
Add StreamWeave to your Cargo.toml:
[dependencies]
streamweave = "0.8.0"use streamweave::PipelineBuilder;
use streamweave_array::ArrayProducer;
use streamweave_transformers::MapTransformer;
use streamweave_vec::VecConsumer;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pipeline = PipelineBuilder::new()
.producer(ArrayProducer::new(vec![1, 2, 3, 4, 5]))
.transformer(MapTransformer::new(|x: i32| x * 2))
.consumer(VecConsumer::new());
let ((), result) = pipeline.run().await?;
println!("Result: {:?}", result.collected);
Ok(())
}For more examples and detailed documentation, see the package documentation below.
StreamWeave is organized as a monorepo with 39 packages, each providing specific functionality. Each package has its own README with detailed documentation, examples, and API reference.
These are the foundational packages that other packages depend on:
- streamweave - Core traits and types (Producer, Transformer, Consumer)
- error - Error handling system with multiple strategies
- message - Message envelope and metadata
- offset - Offset management for exactly-once processing
- transaction - Transaction support and boundaries
Core system functionality:
- pipeline - Pipeline builder and execution
- graph - Graph API for complex topologies
- stateful - Stateful processing and state management
- window - Windowing operations (tumbling, sliding, session)
Standard I/O and file system operations:
- stdio - Standard input/output streaming
- file - File I/O operations
- fs - File system operations and directory traversal
- tempfile - Temporary file handling
- path - Path manipulation and transformations
Data format parsing and serialization:
Database integration:
- database - Generic database support
- database-mysql - MySQL integration
- database-postgresql - PostgreSQL integration
- database-sqlite - SQLite integration
Network protocol integration:
- kafka - Apache Kafka producer and consumer
- redis - Redis Streams integration
- http-server - HTTP graph server with Axum integration
Various data source and sink implementations:
- array - Array-based streaming
- vec - Vector-based streaming
- env - Environment variable streaming
- command - Command execution and output streaming
- process - Process management and monitoring
- signal - Unix signal handling
- timer - Time-based and interval-based streaming
- tokio - Tokio channel integration
Comprehensive transformer implementations:
- transformers - All transformer types including:
- Basic: Map, Filter, Reduce
- Advanced: Batch, Retry, CircuitBreaker, RateLimit
- Stateful: RunningSum, MovingAverage
- Routing: Router, Partition, RoundRobin
- Merging: Merge, OrderedMerge, Interleave
- ML: Inference, BatchedInference
- Utility: Sample, Skip, Take, Limit, Sort, Split, Zip, Timeout, MessageDedupe
Observability and integration capabilities:
- integrations/opentelemetry - OpenTelemetry integration
- integrations/sql - SQL query support
- metrics - Metrics collection and Prometheus integration
- visualization - Pipeline and graph visualization
- API Documentation - Full API reference on docs.rs
- Local Documentation - Generated with rustdoc (run
./bin/docs) - Graph API Guide - Advanced graph patterns, routing strategies, and Flow-Based Programming
- Getting Started Guide
- Architecture Overview
- Common Use Cases
- Troubleshooting
- Contributing Guide
StreamWeave includes comprehensive examples demonstrating all major features. See the examples directory for:
- Integration examples (Kafka, Redis, Database, HTTP)
- File format examples (CSV, JSONL, Parquet)
- Processing examples (Stateful, Error Handling, Windowing)
- Visualization examples
- Graph API examples
Run any example with:
cargo run --example <example_name> --features <required_features>Contributions are welcome! Please see our Contributing Guide for details.
This project is licensed under the Creative Commons Attribution-ShareAlike 4.0 International License.
See [LICENSE](LICENSE) for details.