A lightweight, flexible event-driven integration platform designed to provide durable event processing, workflow orchestration, and reliable messaging across distributed systems.
Flo serves as a unified abstraction layer over various message brokers. It helps you build event-driven applications with minimal code changes to your existing systems.
- Integration-First: Connect to existing systems with minimal code changes
- Flexible Orchestration: Multiple approaches from simple event handling to declarative workflows
- Unified Messaging: Write once, deploy on any supported message broker
- Operational Simplicity: Single server component with modular scaling
- Developer-Friendly: Multiple integration patterns with varying degrees of coupling
- Configuration-Driven: Define complex orchestration through configuration, not just code
- Go 1.18+
- Redis (for default configuration)
- Protocol Buffers compiler (for development)
# Install Flo CLI
go install github.com/deepconduit/flo/cmd/flo@latest
# Start the Flo server
flo startpackage main
import (
"fmt"
"github.com/deepconduit/flo"
)
func main() {
// Connect to Flo
bus := flo.Connect(flo.ConnectOptions{
Endpoint: "localhost:7233",
})
// Subscribe to events
bus.Subscribe("greeting", func(event flo.Event) {
var message string
event.Unmarshal(&message)
fmt.Println("Received:", message)
// Respond with another event
bus.Publish("reply", "Hello back!")
})
// Publish an event
bus.Publish("greeting", "Hello, Flo!")
// Wait for events
select {}
}Flo offers multiple integration approaches with varying levels of coupling:
Minimal coupling, ideal for quick integration with existing services:
// In your existing service
bus := flo.Connect("localhost:7233")
// Subscribe to events
bus.Subscribe("orders.created", func(event flo.Event) {
var order Order
event.Unmarshal(&order)
// Process with existing business logic
result, err := service.ProcessOrder(order)
// Publish result as event
if err != nil {
bus.Publish("orders.failed", map[string]any{
"orderId": order.ID,
"error": err.Error(),
})
} else {
bus.Publish("orders.completed", result)
}
})Configuration-driven approach:
# flo-routes.yaml
routes:
payment.requested:
handler: processPayment
output:
success: payment.completed
error: payment.failed
payment.completed:
handler: sendReceipt// Register existing functions as handlers
bus.RegisterHandler("processPayment", paymentService.ProcessPayment)
bus.RegisterHandler("sendReceipt", notificationService.SendReceipt)
// Load routes from configuration
bus.LoadRoutesFromConfig("flo-routes.yaml")For complex scenarios:
// Define workflow through builder pattern
workflow := bus.NewWorkflow("payment-processing")
workflow.
OnEvent("payment.requested").ExecuteAction("authorize-payment").
OnSuccess().ExecuteAction("capture-payment").
OnError().ExecuteAction("void-authorization").
OnEvent("payment.captured").ExecuteAction("settle-payment")
// Register handlers with existing service functions
workflow.RegisterAction("authorize-payment", paymentService.AuthorizePayment)
workflow.RegisterAction("capture-payment", paymentService.CapturePayment)
workflow.RegisterAction("void-authorization", paymentService.VoidAuthorization)
workflow.RegisterAction("settle-payment", paymentService.SettlePayment)
// Register the workflow
bus.RegisterWorkflow(workflow)Connect microservices with reliable, durable event delivery and processing:
// In Order Service
orderBus := flo.Connect("flo-server:7233")
orderBus.Publish("orders.created", newOrder)
// In Inventory Service
inventoryBus := flo.Connect("flo-server:7233")
inventoryBus.Subscribe("orders.created", handleOrderCreated)Use request-response pattern for synchronous communications:
// Send a request and wait for response (with timeout)
response, err := bus.Request("payment.authorize", authRequest, 5*time.Second)
if err != nil {
// Handle timeout or error
return nil, fmt.Errorf("authorization failed: %w", err)
}
// Use the response
var authResult AuthorizationResult
response.Unmarshal(&authResult)Orchestrate multi-step payment flows with automatic compensation:
workflow := bus.NewWorkflow("payment-processing")
workflow.
OnEvent("payment.requested").ExecuteAction("authorize").
OnSuccess().ExecuteAction("capture").
OnError().ExecuteAction("void-authorization")Create reliable data processing pipelines with error handling:
// Define a data processing pipeline
pipeline := bus.NewPipeline("customer-data-processing")
pipeline.
FromSource("customer-data-topic").
Transform("normalize-customer-data").
Validate("validate-customer-schema").
EnrichWith("add-geo-data").
EnrichWith("add-segment-data").
ToDestination("processed-customer-topic")To build Flo from source:
# Clone the repository
git clone https://github.com/deepconduit/flo.git
cd flo
# Install development dependencies
make install-tools
# Generate protocol buffers
make proto
# Build the application
make build
# Run tests
make testFlo's architecture is designed for simplicity and flexibility in deployment:
- Flo Server: A single scalable component for event routing and orchestration
- Event Sources: Connect various event sources (Kafka, RabbitMQ, HTTP webhooks)
- Service Workers: Your existing services with minimal Flo integration
- Storage Backend: Flexible storage options (Redis, PostgreSQL, MongoDB, etc.)
For production deployments, we recommend:
- Using Redis or PostgreSQL as the storage backend
- Setting up proper monitoring and alerting
- Configuring TLS for secure communication
- Using namespaces for environment isolation
For detailed documentation, visit:
MIT License