Skip to content

sethdford/flow

Repository files navigation

Flow - Queue-Based Workflow Orchestration 🌊

Give your AI agents a workflow engine

Flow is a lightweight, powerful workflow orchestration system where work items flow through queues, automated by rules and processed by AI agents.

Perfect for:

  • 🤖 AI Agent Coordination - Orchestrate multiple specialized agents
  • 🔄 Automated Pipelines - ML training, CI/CD, content publishing
  • 📋 Task Management - Track work through stages automatically
  • 🎯 Sub-Agent Workflows - Design → Implement → Test → Deploy

Instant start:

curl -fsSL https://raw.githubusercontent.com/sethdford/flow/main/install.sh | bash

Then initialize in your project:

flow init
flow node create my-pipeline
flow queue create backlog -n my-pipeline
flow agent spawn worker-1

That's it! Flow is now ready to orchestrate your work.

Overview

Flow is a lightweight workflow orchestration system where:

  • Queues are work containers where items wait to be processed
  • Nodes are logical groupings of related queues (many queues per node)
  • Work Items are units of work that flow from queue to queue
  • Agents claim and process work items from specific queues
  • Rules automate work transfer between queues

Think of it as a production line where work items move through queues, with AI agents processing them and rules automating the flow.

Key Concepts

Queues

Queues are containers for work items waiting to be processed:

  • data-prep - Prepare data for training
  • model-training - Train ML models
  • evaluation - Evaluate model performance
  • staging - Stage for deployment

Multiple queues can belong to the same node for logical grouping.

Nodes

Nodes are logical groupings of related queues:

  • training - Contains queues like data-prep, model-training, evaluation
  • deployment - Contains queues like staging, production

Nodes help organize your workflow but work items flow between queues, not nodes.

Work Items

Work items flow through queues:

  • Created in a starting queue
  • Claimed by agents for processing
  • Automatically transferred to next queue when completed (if rules exist)
  • Can be manually transferred between queues
  • Track complete history of their journey

Agents

Workers that process work items:

  • Claim work from specific queues
  • Process the work
  • Complete work items (triggering rules)

Rules

Automate work transfer between queues:

  • on_complete - Transfer when work is completed
  • on_condition - Transfer based on conditions (future)
  • on_schedule - Time-based transfers (future)
  • manual - Explicit manual transfers

Installation

Quick Install (Recommended)

curl -fsSL https://raw.githubusercontent.com/sethdford/flow/main/install.sh | bash

The installer will:

  • Detect your platform (macOS/Linux, amd64/arm64)
  • Install via go install if Go is available
  • Fall back to building from source if needed
  • Guide you through PATH setup if necessary

Manual Install

# Using go install (requires Go 1.21+)
go install github.com/sethdford/flow@latest

# Or build from source
git clone https://github.com/sethdford/flow
cd flow
go build -o flow
sudo mv flow /usr/local/bin/  # or anywhere in your PATH

Quick Start

# 1. Initialize
./flow init

# 2. Create nodes (logical groupings)
./flow node create training -d "ML training pipeline"
./flow node create deployment -d "Deployment pipeline"

# 3. Create queues in nodes
./flow queue create data-prep -n training -d "Prepare training data"
./flow queue create model-training -n training -d "Train models" -p 1
./flow queue create evaluation -n training -d "Evaluate performance" -p 2
./flow queue create staging -n deployment -d "Staging environment"

# 4. Create rules to automate queue transfers
./flow rule create prep-to-train -f data-prep -t model-training --type on_complete
./flow rule create train-to-eval -f model-training -t evaluation --type on_complete
./flow rule create eval-to-staging -f evaluation -t staging --type on_complete

# 5. Create a work item
./flow work create "Train sentiment model" -q data-prep -p 0

# 6. Spawn an agent
./flow agent spawn ml-worker

# 7. Agent claims and processes work (automatic flow via rules)
./flow agent claim data-prep ml-worker
./flow work complete work-xxx ml-worker --rules=true  # Auto-transfers to model-training

# 8. Continue through workflow
./flow agent claim model-training ml-worker
./flow work complete work-xxx ml-worker --rules=true  # Auto-transfers to evaluation

./flow agent claim evaluation ml-worker
./flow work complete work-xxx ml-worker --rules=true  # Auto-transfers to staging

# 9. Check ready work at any time
./flow ready

Commands

Initialize

flow init                    # Initialize flow workspace

Nodes (Logical Groupings)

flow node create <name> [-d description] [-t type]
flow node list
flow node show <name>        # Shows queues in this node

Node types: stage (default), decision, parallel, merge

Queues (Work Containers)

flow queue create <name> -n <node> [-d description] [-p priority]
flow queue list [--node node]
flow queue show <name>

Work Items

flow work create <title> [-q queue] [-p priority] [-d description]
flow work list [--status status] [--queue queue]
flow work show <work-item-id>
flow work transfer <work-item-id> <to-queue> [--notes notes]  # Manual transfer
flow work complete <work-item-id> <agent-name> [--rules=true]  # Auto-transfer

Agents

flow agent spawn <name> [--capability cap1 --capability cap2]
flow agent list
flow agent claim <queue-name> <agent-name>

Rules (Automate Transfers)

flow rule create <name> -f <from-queue> -t <to-queue> [options]
  --type <type>              # on_complete, on_condition, on_schedule, manual
  --condition <json>         # JSON condition expression
  --priority <num>           # Rule priority
flow rule list [--queue queue]

Ready Work

flow ready                   # Show all claimable work items

Example Workflows

ML Training Pipeline

# 1. Setup nodes and queues
flow node create ml-pipeline -d "Complete ML training pipeline"

flow queue create data-prep -n ml-pipeline -p 0
flow queue create feature-eng -n ml-pipeline -p 1
flow queue create training -n ml-pipeline -p 2
flow queue create validation -n ml-pipeline -p 3
flow queue create deployment -n ml-pipeline -p 4

# 2. Create automation rules
flow rule create prep-to-features -f data-prep -t feature-eng --type on_complete
flow rule create features-to-train -f feature-eng -t training --type on_complete
flow rule create train-to-validate -f training -t validation --type on_complete
flow rule create validate-to-deploy -f validation -t deployment --type on_complete

# 3. Create work and let it flow
flow work create "Train sentiment classifier" -q data-prep -p 0
flow agent spawn ml-agent

# 4. Work automatically flows through entire pipeline
flow agent claim data-prep ml-agent
flow work complete work-xxx ml-agent --rules=true    # → feature-eng

flow agent claim feature-eng ml-agent
flow work complete work-xxx ml-agent --rules=true    # → training

flow agent claim training ml-agent
flow work complete work-xxx ml-agent --rules=true    # → validation

flow agent claim validation ml-agent
flow work complete work-xxx ml-agent --rules=true    # → deployment

Software Development Workflow

# 1. Setup development stages
flow node create dev -d "Development stages"

flow queue create backlog -n dev -p 0
flow queue create in-progress -n dev -p 1
flow queue create code-review -n dev -p 2
flow queue create testing -n dev -p 3
flow queue create done -n dev -p 4

# 2. Create workflow rules
flow rule create start-work -f backlog -t in-progress --type on_complete
flow rule create ready-review -f in-progress -t code-review --type on_complete
flow rule create ready-test -f code-review -t testing --type on_complete
flow rule create complete-work -f testing -t done --type on_complete

# 3. Create work items
flow work create "Implement user authentication" -q backlog -p 0
flow work create "Add search functionality" -q backlog -p 1

# 4. Multiple agents work concurrently
flow agent spawn developer1
flow agent spawn developer2
flow agent spawn reviewer

# 5. Work flows through pipeline
flow agent claim backlog developer1
flow work complete work-xxx developer1 --rules=true  # → in-progress

flow agent claim in-progress developer1
flow work complete work-xxx developer1 --rules=true  # → code-review

flow agent claim code-review reviewer
flow work complete work-xxx reviewer --rules=true    # → testing

# Meanwhile developer2 works on next item
flow agent claim backlog developer2
flow work complete work-xxx developer2 --rules=true  # → in-progress

Architecture

Database Schema

Flow uses SQLite with these main tables:

  • nodes - Logical groupings of queues
  • queues - Work containers, many queues per node
  • work_items - Work units flowing through queues
  • agents - Worker agents
  • rules - Automation rules for queue-to-queue transfers
  • work_item_history - Complete audit trail

Work Item Lifecycle

  1. Created - Work item enters system in a queue
  2. Pending - Waiting to be claimed in current queue
  3. Running - Claimed by an agent
  4. Completed - Finished, rules may auto-transfer to next queue
  5. Repeat 2-4 through the queue chain

Queue-Based Flow

When an agent completes work with --rules=true:

  1. System finds rules where from_queue_id matches current queue
  2. Applies first matching on_complete rule
  3. Work item transfers to to_queue_id
  4. Status resets to pending
  5. Work is ready to be claimed by another agent

Manual vs Automatic Transfers

  • Automatic: flow work complete work-xxx agent --rules=true applies rules
  • Manual: flow work transfer work-xxx target-queue moves explicitly

Use Cases

  • ML Pipelines - Data prep → Training → Evaluation → Deployment
  • Development Workflows - Backlog → Dev → Review → Testing → Done
  • Data Processing - Ingest → Transform → Validate → Store
  • Content Pipelines - Draft → Edit → Review → Publish
  • Order Fulfillment - Received → Processing → Shipping → Delivered

Advanced Features

Work Item History

Every movement and state change is tracked in work_item_history:

  • When work entered a node
  • Which agent claimed it
  • When it was completed
  • When it moved to next node

Priority-Based Claiming

Agents automatically claim highest priority work first (P0 > P4)

Concurrent Processing

Multiple agents can work on different items in the same node

MCP Integration

Flow includes an MCP (Model Context Protocol) server that integrates with Claude Code, enabling AI-powered workflow orchestration.

Quick Start with Claude Code

  1. Install the MCP server:

    cd mcp-server-flow
    npm install
  2. Configure Claude Code by adding to ~/Library/Application Support/Claude/claude_desktop_config.json:

    {
      "mcpServers": {
        "flow": {
          "command": "node",
          "args": ["/absolute/path/to/flow/mcp-server-flow/index.js"]
        }
      }
    }
  3. Restart Claude Code and start orchestrating workflows through natural language!

See mcp-server-flow/SETUP.md for complete integration guide.

MCP Tools Available

  • Node management: create, list, show nodes
  • Queue management: create, list, show queues
  • Work items: create, list, show, transfer, complete work
  • Agents: spawn, list, claim work
  • Rules: create, list automation rules
  • Utility: view ready work

Example with Claude Code

You: "Create an ML training pipeline with Flow"

Claude uses MCP tools to:
- Create nodes and queues (data-prep → training → evaluation)
- Set up automation rules
- Create work items
- Coordinate AI agents through the pipeline

Development

Built with:

  • Go 1.21+
  • SQLite3
  • Cobra CLI framework
  • MCP server: Node.js + @modelcontextprotocol/sdk

Version

0.3.0 - Queue-based workflow with rules engine for automated transfers

  • Work flows through queues, not nodes
  • Many queues per node for logical grouping
  • Rules automate queue-to-queue transfers
  • Both manual and automatic transfer modes

License

MIT

Author

Built with Claude Code

About

a node queue workflow inspired by beads

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published