A lightweight, configuration-driven data pipeline framework for Rust.
Source → Transform → Sink
- YAML Configuration: Declarative pipeline definition with DAG validation (duplicate IDs, missing
totargets, cycles) - Fan-out: One source can fan-out to multiple sinks
- Built-in Nodes:
- Sources:
http_client,http_server,file,redis,sql - Sinks:
console,file,blackhole,http_client,http_server,redis,sql,notify::email,notify::telegram,notify::webhook
- Sources:
- Management API: Optional HTTP API (
/health,/metrics,/config,/config/graph), internal use only - CLI:
run,config validate,config show,config graph
Pipeflow uses Cargo features to keep optional dependencies behind flags.
api: Enables the management API server.http-client(default): Enables thehttp_clientsource and sink.http-server: Enables thehttp_serversource and sink.database: Enablessqlsource and sink.redis: Enables theredissource and sink.file(default): Enables thefilesource and sink.notify(default): Enables thenotify::email,notify::telegram, andnotify::webhooksinks.
Core-only build (no optional sources/sinks):
cargo build --no-default-featuresIf a pipeline config references a node behind a disabled feature, Engine::build() returns a
configuration error explaining which feature is required.
- Rust 1.92 or later (uses Rust 2024 edition)
cargo add pipeflowCreate a pipeline configuration file pipeline.yaml:
system:
# Management API configuration (optional, internal use only)
# Binds to localhost by default. Use a reverse proxy for external access.
api:
enabled: true
port: 8000
# bind: "127.0.0.1" # Default: localhost only
# channel_size: default buffer size for transform/sink/internal channels (default: 256)
channel_size: 1024
pipeline:
sources:
- id: api_poller
type: http_client
config:
urls:
- name: "default"
url: "https://httpbin.org/json"
interval: "10s"
# schedule: "0 0 * * *" # Run daily at 00:00 (local time, 5 fields; seconds default to 0)
transforms:
- id: pass_through
to: [console]
sinks:
- id: console
type: console
config:
format: prettyPipeflow wiring is source -> transform -> sink:
- Sources and transforms declare
to(one or more downstream transforms, sinks, or internal channels). tomust be non-empty for sources and transforms.- Transforms may omit
stepsto act as pass-through nodes. - Sinks are terminal and do not declare
to; their target is defined by sink type/config (e.g. file path).
All commands that accept CONFIG also accept a directory. When a directory is provided,
pipeflow loads all *.yaml / *.yml files in lexical order and merges them into a single
configuration before normalization and validation.
This is useful for larger pipelines:
# Directory-based config
pipeflow run ./configs/
pipeflow config validate ./configs/
pipeflow config show ./configs/ --format yamluse pipeflow::prelude::*;
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::from_file("pipeline.yaml")?;
let mut engine = Engine::from_config(config)?;
engine.build().await?;
engine.run().await
}| Type | Description | Status |
|---|---|---|
http_client |
HTTP polling | Implemented |
http_server |
HTTP push/webhook | Implemented |
redis |
Redis GET polling | Implemented |
sql |
SQL polling | Implemented |
file |
File watching | Implemented |
| Type | Description | I/O | Status |
|---|---|---|---|
filter |
Conditional filtering | 1:0/1 | Implemented |
compute |
Math expression eval | 1:1 | Implemented |
remap |
Field mapping (step) | 1:1 | Implemented |
split |
Split 1 to N messages | 1:N | Implemented |
switch |
Route to different targets | 1:1 | Implemented |
| Type | Description | Status |
|---|---|---|
blackhole |
Discard messages | Implemented |
console |
Print to stdout | Implemented |
file |
Write to file | Implemented |
sql |
SQL database insert | Implemented |
redis |
Redis operations | Implemented |
http_client |
HTTP API calls | Implemented |
http_server |
HTTP pull endpoint | Implemented |
notify::email |
Email notifications | Implemented |
notify::telegram |
Telegram notifications | Implemented |
notify::webhook |
Webhook notifications | Implemented |
Internal channels are built-in message streams that can be routed like any other target:
- IDs:
internal::audit,internal::event,internal::notify,internal::metric,internal::dlq - Configure under
pipeline.internalwithto, optionalchannel_size, and optionallog. tocan be empty (messages are dropped after optional logging).- Default log levels:
audit/metric=debug,notify=info,event/dlq=warn - If a message targets an internal channel, its payload must match that channel’s schema; otherwise it is dropped with a warning.
See docs/configuration/INDEX.md for detailed configuration parameters for all supported sources and sinks.
Pipeflow uses per-node tokio::sync::mpsc channels for transforms, sinks, and internal channels.
Set the default buffer size via system.channel_size, and override per node with channel_size.
system:
channel_size: 1024
pipeline:
transforms:
- id: enrich
to: [sink1]
channel_size: 2048
sinks:
- id: sink1
type: file
channel_size: 512
internal:
event:
channel_size: 128Failed messages are routed to the internal internal::dlq channel. You can attach sinks or transforms
via pipeline.internal.dlq.to. Chain-depth protection prevents infinite loops (max depth: 8).
Current status:
internal::dlqrouting: Implemented- Chain-depth protection: Implemented
- Automatic DLQ routing on transform/sink errors: Implemented
See docs/DESIGN.md for the full design.
# Run pipeline
pipeflow run config.yaml
# Run with verbose/debug output
pipeflow -v run config.yaml
# Validate configuration
pipeflow config validate config.yaml
# Show pipeline graph (ASCII)
pipeflow config graph config.yaml
# Show merged + normalized configuration
pipeflow config show config.yaml --format yamlGlobal flags:
-v, --verbose- Enable debug logging
Notes:
pipeflow config validatechecks YAML structure and pipeline wiring (IDs, references, cycles, internal routing). It does not validate node-specificconfigcontents (e.g. requiredhttp_client.urls); those are validated duringEngine::build()(and thereforepipeflow run).- If you use directory-based configs,
config showdisplays the merged + normalized result.
Pipeflow is stand-alone by design.
To keep the architecture simple and robust (KISS principle), Pipeflow does not implement complex distributed coordination protocols (like Raft or Paxos).
- Persistence: State (like silence records) is stored on the local filesystem (
./databy default). We have removed complex distributed backends like Redis for silence to favor simplicity and filesystem atomicity. - Scaling: We recommend Manual Sharding. Deploy multiple independent instances, each handling a different subset of configuration files.
- High Availability: Use detailed health checks (e.g., K8s liveness probes) to restart failed instances.
If you need shared state across instances (e.g., shared silence), mount a shared volume (NFS/EFS) to the
data_dir.
- docs/INDEX.md - Documentation navigation guide
- docs/configuration/INDEX.md - Complete configuration reference
- docs/DESIGN.md - Architecture and design decisions
- configs/examples/ - Working pipeline examples
# Unit + integration tests
cargo test --all-features
# Lint (clippy)
cargo clippy --all-targets --all-features -- -D warnings
# Format check
cargo fmt --all -- --checkMIT