Releases: nilpntr/tributary
v0.1.5
implement single-step fetching for optimal work distribution
Changed from fixed batch limit of 10 steps to dynamic single-step
fetching per worker, following River's proven approach. This prevents
worker hoarding where one worker could grab multiple long-running tasks
while others remain idle.
Key changes:
- Add
maxStepsToFetch()method that returns 1 step per worker - Update
fetchAndExecuteto use dynamic limit calculation - Ensure each worker processes exactly one step at a time
- Enable true parallel execution for long-running tasks
Before: One worker could fetch 10 steps and process them sequentially
After: Each worker fetches 1 step, enabling optimal distribution across all workers
Combined with subscription-based notifications, this delivers true parallel execution matching River's architecture.
v0.1.4 - subscription-based worker notification
Implement subscription-based worker notification for true parallel execution
Replaced shared channel bottleneck with River-style subscription system
where each worker registers a callback that gets invoked directly when
work becomes available. This eliminates the sequential processing issue
where only one worker could receive notifications at a time.
Key changes:
- Replace single
workerChwith per-worker callback subscriptions - Add
WorkerCallbacktype and subscription management methods - Implement
runWorkerWithSubscriptionwith private worker channels - Update
notifyWorkersto call all registered callbacks simultaneously - Remove old
runWorkerfunction that used shared channel
Before: Only one worker processed steps sequentially despite multiple
workers configured
After: All workers receive notifications simultaneously, enabling true
parallel execution
v0.1.3 - Parallel execution fix
Parallel execution fix ensure all workers process steps concurrently
Fixed critical bottleneck where only one worker would process available
steps at a time due to shared notification channel. Added worker
notification after batch processing to wake up idle workers when more
work is available or dependencies are unlocked.
Changes:
- Added c.notifyWorkers() call after step execution in fetchAndExecute
- Ensures continuous work distribution across all available workers
- Eliminates sequential processing bottleneck in multi-worker setups
v0.1.2 - Race condition fix
Fixed critical race condition where multiple workers could fetch and execute the same step simultaneously, causing duplicate execution and inconsistent state transitions. Implemented atomic fetch-and-mark operation using PostgreSQL transactions with FOR UPDATE SKIP LOCKED to ensure each step executes exactly once.
v0.1.1 - Type Safety Improvements and AI Agent Guide
Tributary v0.1.1 - Type Safety Improvementsand AI Agent Guide
This patch release improves type safety for dependency result helpers and adds comprehensive documentation for AI agents.
π§ Bug Fixes
Generic Type Constraint Fixes
- Fixed
GetAllResultsTyped,GetResultTyped, andGetResultsByPatternTypedto work with any step type - Changed function signatures from
Step[StepArgs]toStep[T StepArgs]for proper generic support - Resolved compilation errors when using result helpers with typed steps like
Step[AggregatorArgs]
Before:
// β This failed to compile
processorResults := tributary.GetAllResultsTyped[ProcessorResult](step)
// Error: Cannot use 'step' (type
*Step[AggregatorArgs]) as type *Step[StepArgs]After:
// β
Now works perfectly
processorResults := tributary.GetAllResultsTyped[ProcessorResult](step)π Documentation
New AI Agent Guide
- Added comprehensive AGENTS.md guide specifically for AI agents
- Documented essential patterns for workflow orchestration
- Included examples for data pipelines, fan-out/fan-in, and conditional execution
- Explained three-level configuration system(step/task/global)
- Provided error handling best practices and security considerations
Tributary v0.1.0 - Initial Release π
We're excited to announce the first release of
Tributary, a type-safe DAG workflow library for Go
that makes building complex, reliable workflows simple
and intuitive.
π― What is Tributary?
Tributary allows you to build workflows as directed
acyclic graphs (DAGs) where steps can depend on each
other, pass data between steps, and execute in
parallel when possible. All backed by PostgreSQL for
reliability and persistence.
β¨ Key Features
π Type-Safe Workflows
- Compile-time guarantees for step arguments and results
- Generic step definitions with strongly-typed interfaces
- No runtime type assertion errors
β‘ Parallel Execution
- Steps with no blocking dependencies run concurrently
- Automatic dependency resolution and scheduling
- Configurable worker pools per queue
π Reliable Processing
- Automatic retries with exponential backoff
- PostgreSQL-backed persistence survives restarts
- LISTEN/NOTIFY for real-time step execution
- Graceful shutdown with in-flight step completion
π‘οΈ Security & Observability
- Built-in encryption hooks using NaCl secretbox
- Structured logging with zap integration
- Composable hook system for cross-cutting concerns
- Three-level configuration (step/task/global) with smart precedence
ποΈ Production Ready
- Comprehensive migration system based on bun migrations
- Support for SQL migrations with --bun:split directives
- Automatic and manual migration options
- Database schema versioning
π¦ Installation
go get github.com/nilpntr/tributary
π Quick Start
// 1. Define your step
type CreateUserArgs struct {
Email string `json:"email"`
}
func (CreateUserArgs) Kind() string {
return "create_user"
}
// 2. Implement the worker
type CreateUserWorker struct {
tributary.WorkerDefaults[CreateUserArgs]
}
func (w *CreateUserWorker) Work(ctx context.Context, step *tributary.Step[CreateUserArgs]) error {
// Your business logic here
fmt.Printf("Creating user: %s\n", step.Args.Email)
return nil
}
// 3. Build and execute workflow
workflow := client.NewWorkflow(&tributary.WorkflowOpts{
Name: "user_signup",
})
workflow.AddTask("create_user", CreateUserArgs{
Email: "alice@example.com",
}, nil)
workflowExecutionID, err := workflow.Execute(ctx)π Examples Included
- User Signup: Sequential workflow with dependency
passing - Retry Logic: Demonstrates automatic failure recovery
- Parallel Aggregation: Shows parallel execution with
result collection
π§ Configuration Options
Step-Level Configuration
func (MyStepArgs) InsertOpts() tributary.StepInsertOpts {
return tributary.StepInsertOpts{
MaxAttempts: 3,
Timeout: 5 * time.Minute,
Queue: "critical",
Priority: 10,
}
}Workflow-Level Scheduling
workflow :=
client.NewWorkflow(&tributary.WorkflowOpts{
Name: "scheduled_workflow",
ScheduledAt: time.Now().Add(1 * time.Hour),
})Encryption
client, err := tributary.NewClient(pool,
&tributary.Config{
Hooks: []tributary.Hook{
tributary.NewEncryptHook(tributary.NewSecretboxEncryptor(key)),
},
})ποΈ Architecture Highlights
- PostgreSQL-native: Uses advanced PostgreSQL features like LISTEN/NOTIFY
- Worker pools: Configurable concurrency per queue
- Hook system: Composable middleware for encryption, logging, metrics
- Migration system: Full schema evolution support
- Type safety: Leverages Go's type system for reliability
π Performance Features
- Real-time step execution via PostgreSQL LISTEN/NOTIFY
- Configurable worker pools with queue isolation
- Optimized dependency resolution queries
- Connection pooling with pgx
π£οΈ What's Next
- Comprehensive test suite and benchmarks
- CLI tooling for workflow management
- Enhanced observability and metrics
- Additional examples and documentation
π Migration & Compatibility
This is the initial release (v0.x.x), so the API may evolve before v1.0.0. We'll follow semantic versioning:
- Patch versions (v0.1.x): Bug fixes
- Minor versions (v0.x.0): New features, possible breaking changes
- v1.0.0: API stability guarantee
π€ Contributing
We welcome contributions! Please see our examples for
patterns and feel free to open issues or pull
requests.
π License
MIT License - see LICENSE file for details.
Try Tributary today and experience the power of
type-safe, reliable workflow orchestration in Go!
go get github.com/nilpntr/tributary@v0.1.0