A demonstration of reactive programming using pure Elixir/OTP primitives. ReactiveFlow shows how to build sophisticated reactive systems without additional libraries, leveraging the actor model and message passing for event-driven architecture.
Reactive programming is a paradigm where systems automatically respond to changes in data or events. Instead of imperatively controlling execution flow, you declare how components should react when conditions change. Think of it like a spreadsheet where changing one cell automatically updates all dependent formulas.
Elixir's OTP (Open Telecom Platform) is inherently designed for reactive systems:
- Actor Model: Each process reacts to messages independently
- Message Passing: Event-driven communication between processes
- Supervision Trees: Reactive failure handling and recovery
- Process Monitoring: Automatic reaction to process lifecycle events
- Distributed Computing: Reactive behavior across nodes
ReactiveFlow demonstrates a reactive order processing pipeline where each component automatically responds to events from upstream components:
Order Submitted → Inventory Check → Payment Processing → Fulfillment → Completion
↓
Analytics & Monitoring
- OrderProcessor: Main coordinator that triggers reactive chains
- InventoryChecker: Reacts to inventory validation requests
- PaymentProcessor: Reacts to payment processing needs
- Fulfillment: Reacts to fulfillment requests
- Analytics: Reacts to business events for monitoring and alerts
Orders trigger cascading reactions through the system without centralized orchestration.
# Order submission triggers the reactive chain
def handle_cast({:new_order, order}, state) do
GenServer.cast(InventoryChecker, {:check_inventory, order})
{:noreply, state}
endComponents use timers to simulate async operations and react to results.
# Simulate async inventory check
Process.send_after(self(), {:inventory_result, order, :available}, 100)
# React to the result
def handle_info({:inventory_result, order, :available}, state) do
GenServer.cast(PaymentProcessor, {:process_payment, updated_order})
{:noreply, state}
endComponents react differently based on current state conditions.
# React to high order volume
if new_state.hourly_orders > 5 do
IO.puts("🚨 High order volume alert!")
endPeriodic reactive behavior using process timers.
# Reactive analytics reporting every 5 seconds
defp schedule_metrics_check do
Process.send_after(self(), :check_metrics, 5_000)
endSupervisor trees provide reactive failure handling.
# Automatic process restart on failure
Supervisor.init(children, strategy: :one_for_one)- Elixir 1.12+
- No additional dependencies required - uses only OTP primitives!
mix deps.get
iex -S mixThen in IEx:
ReactiveFlow.Demo.run_demo()🚀 ReactiveFlow System Started!
📝 Submitting orders...
📦 Processing order 1 for Alice
🔍 Checking inventory for order 1
📦 Processing order 2 for Bob
🔍 Checking inventory for order 2
✓ Inventory available for order 1
💳 Processing payment for order 1 - $41.49
✓ Inventory available for order 2
💳 Processing payment for order 2 - $99.99
💰 Payment successful for order 1
📋 Fulfilling order 1
💰 Payment successful for order 2
📋 Fulfilling order 2
📦 Order 1 shipped!
✅ Order 1 completed! Revenue: $41.49
📦 Order 2 shipped!
✅ Order 2 completed! Revenue: $99.99
📊 Analytics: 3 orders this period
🚨 High order volume alert! 6 orders processed
📈 Final Stats:
Orders processed: 3
Total revenue: $159.47
Alerts: 1
# Start all reactive components
Demo.start_system()# Create and submit orders
order = Order.new(1, "Customer", [{"Item", 25.99}])
OrderProcessor.submit_order(order)# Get processing statistics
OrderProcessor.get_stats()
# => %{processed: 3, revenue: 159.47}
# Get analytics metrics
Analytics.get_metrics()
# => %{hourly_orders: 0, alerts: ["High volume: 6 orders"]}- Order Submission:
OrderProcessorreceives new order - Inventory Check: Automatically forwards to
InventoryChecker - Payment Processing: Successful inventory check triggers
PaymentProcessor - Fulfillment: Successful payment triggers
Fulfillment - Completion: Fulfillment completion updates
OrderProcessorstatistics - Analytics: All events are reactively tracked by
Analytics
Each step happens automatically based on the previous step's completion - no central coordinator needed!
- Create a GenServer that reacts to specific message patterns
- Add message handlers for reactive behavior
- Send messages to trigger reactions in other components
- Add to the supervision tree
Example:
defmodule ShippingTracker do
@moduledoc "Reacts to shipment events"
use GenServer
def handle_cast({:track_shipment, order}, state) do
# React to shipment tracking request
Process.send_after(self(), {:shipment_update, order}, 1000)
{:noreply, state}
end
def handle_info({:shipment_update, order}, state) do
# React to shipment status change
GenServer.cast(CustomerNotifications, {:notify_shipment, order})
{:noreply, state}
end
endEnhance existing components with new reactive patterns:
# React to failed payments
def handle_info({:payment_result, order, :failed}, state) do
GenServer.cast(CustomerService, {:payment_failed, order})
{:noreply, state}
end
# React to inventory shortages
def handle_info({:inventory_result, order, :out_of_stock}, state) do
GenServer.cast(SupplierNotifications, {:restock_needed, order})
{:noreply, state}
end- Scalability: Each component is an independent process
- Fault Tolerance: Process isolation and supervision
- Maintainability: Clear reactive relationships between components
- Testability: Each component can be tested in isolation
- Distributed: Easily scales across multiple nodes
- No Dependencies: Uses only Elixir/OTP primitives
MIT License - Feel free to use this as a learning resource or starting point for your own reactive systems!