Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
da99ecf
feat: rewrite types for orchestration engine
DonPrus Mar 13, 2026
531d9ea
feat: add migration 004 for orchestration schema
DonPrus Mar 13, 2026
046126f
feat: add store methods for workflows, checkpoints, agent events
DonPrus Mar 13, 2026
665d0b1
feat: add self_url config for callback URL construction
DonPrus Mar 13, 2026
ffb36fb
feat: state management module with reducers
DonPrus Mar 13, 2026
689fd3e
feat: rewrite template engine for state-based interpolation
DonPrus Mar 13, 2026
3f41245
feat: rewrite workflow validation for new graph format
DonPrus Mar 13, 2026
28c7bd9
feat: rewrite engine with unified state model
DonPrus Mar 13, 2026
0c0bbf9
feat: SSE hub for streaming run events
DonPrus Mar 13, 2026
22ddb3c
feat: add orchestration API endpoints
DonPrus Mar 13, 2026
934e367
feat: wire SSE hub and extend dispatch for agent steps
DonPrus Mar 13, 2026
b5f5202
feat: use A2A protocol for agent step instead of custom webhook
DonPrus Mar 13, 2026
6654f51
feat: add subgraph type, add_messages reducer, run config/parent columns
DonPrus Mar 13, 2026
5a97bd1
feat: command goto, breakpoints, subgraph, multi-turn, configurable runs
DonPrus Mar 13, 2026
83f70d3
feat: add config template alias, phase 2 gaps spec
DonPrus Mar 13, 2026
997e2cb
feat: per-node retry/cache, pending writes, overwrite, deferred nodes…
DonPrus Mar 13, 2026
8e273c2
feat: rich streaming modes, graph migrations, replay endpoint
DonPrus Mar 13, 2026
e709cb2
feat: hot reload, token accounting, rate limits, turn timeout
DonPrus Mar 14, 2026
8f22e3e
feat: startup cleanup, path safety, structured events, config validation
DonPrus Mar 14, 2026
b2d7a56
feat: UI messages, Mermaid export, ephemeral state, push_message
DonPrus Mar 14, 2026
d2311e4
refactor: clean architecture, remove dead code, update docs
DonPrus Mar 14, 2026
c20ce7e
Fix schema key mismatch and remove dead code
DonPrus Mar 14, 2026
dd3ed67
Update CLAUDE.md: fix test count and schema key name
DonPrus Mar 14, 2026
2381cc7
Fix multi-turn agent injections silently dropped
DonPrus Mar 14, 2026
95a5c8d
Remove dead handleSignalStep function
DonPrus Mar 14, 2026
4012c8e
fix: remove vestigial paused status and update protocol error message
DonPrus Mar 14, 2026
b4aeec3
Remove legacy approve/reject 410 routes
DonPrus Mar 14, 2026
6860ac7
docs: update test count in CLAUDE.md to 321
DonPrus Mar 14, 2026
931b5c4
Remove legacy approve route test
DonPrus Mar 14, 2026
a0f5637
Update CLAUDE.md test count to 320
DonPrus Mar 14, 2026
f5feccd
style: fix misleading indentation in processRunWithDepth for loop
DonPrus Mar 14, 2026
3b8443d
Replace magic numbers with named constants
DonPrus Mar 14, 2026
2882436
Add MIT license
DonPrus Mar 14, 2026
f6eb068
Fix workspace hook tests on Windows
DonPrus Mar 14, 2026
67964a2
Fix blocking retry loop and stale checkpoint parent_id
DonPrus Mar 14, 2026
46af7ba
Fix misleading free in handleStream SSE event drain
DonPrus Mar 14, 2026
05a46bb
Cache validateConfig result to avoid 2 DB queries per tick
DonPrus Mar 14, 2026
864cbdf
Hoist worker list fetch out of send node per-item loop
DonPrus Mar 14, 2026
3b96e77
Eliminate race in handleRunWorkflow by creating run as running
DonPrus Mar 14, 2026
47d7be3
Clear old steps and checkpoints on replay
DonPrus Mar 14, 2026
6e93a94
Parse workflow_json once and cache schema in processRunWithDepth
DonPrus Mar 14, 2026
5c9156f
Decode encoded orchestration path ids
DonPrus Mar 14, 2026
56fa13f
Support workflow filtering in runs API
DonPrus Mar 14, 2026
0510d62
Accept replay checkpoint_id alias
DonPrus Mar 14, 2026
ff43f14
Honor workflow output contracts in engine
DonPrus Mar 14, 2026
2b74774
Add graph store read/write support
DonPrus Mar 14, 2026
e4d129d
Document graph store memory behavior
DonPrus Mar 14, 2026
25222e3
Restrict tracker access to trusted config
DonPrus Mar 14, 2026
b93ca55
Reuse parsed workflows in engine hot path
DonPrus Mar 14, 2026
3aa3dc7
Align route contracts across validation and runtime
DonPrus Mar 14, 2026
66de615
Extract shared worker snapshot builder
DonPrus Mar 14, 2026
f61af52
Make runtime bindings explicit in engine
DonPrus Mar 14, 2026
dfa9035
Unify tracker runtime bindings
DonPrus Mar 14, 2026
84e9bda
Own SSE queue event payloads
DonPrus Mar 14, 2026
2a6bf9f
Make run stream multi-consumer
DonPrus Mar 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 93 additions & 58 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -1,76 +1,104 @@
# NullBoiler

DAG-based workflow orchestrator for NullClaw AI bot agents. Part of the Null ecosystem (NullTracker, NullClaw).
Graph-based workflow orchestrator with unified state model for NullClaw AI bot agents. Part of the Null ecosystem (NullTracker, NullClaw).

## Tech Stack

- **Language**: Zig 0.15.2
- **Database**: SQLite (vendored in `deps/sqlite/`), WAL mode
- **Protocol**: HTTP/1.1 REST API with JSON payloads
- **Dispatch**: HTTP (webhook/api_chat/openai_chat), MQTT, Redis Streams
- **Dispatch**: HTTP (webhook/api_chat/openai_chat/a2a), MQTT, Redis Streams
- **Vendored C libs**: SQLite (`deps/sqlite/`), hiredis (`deps/hiredis/`), libmosquitto (`deps/mosquitto/`)

## Module Map

| File | Role |
|------|------|
| `main.zig` | CLI args (`--port`, `--db`, `--config`, `--version`), HTTP accept loop, engine thread, tracker thread |
| `api.zig` | REST API routing and 19 endpoint handlers (incl. signal, chat, tracker status) |
| `store.zig` | SQLite layer, 30+ CRUD methods, schema migrations |
| `engine.zig` | DAG scheduler: tick loop, 14 step type handlers, graph cycles, worker handoff |
| `dispatch.zig` | Worker selection (tags, capacity), protocol-aware dispatch (`webhook`, `api_chat`, `openai_chat`, `mqtt`, `redis_stream`) |
| `main.zig` | CLI args (`--port`, `--db`, `--config`, `--version`, `--export-manifest`, `--from-json`), HTTP accept loop, engine thread, tracker thread |
| `api.zig` | REST API routing and 30+ endpoint handlers (runs, workers, workflows, checkpoints, state, SSE stream, tracker) |
| `store.zig` | SQLite layer, CRUD methods for all tables, schema migrations (4 migration files) |
| `engine.zig` | Graph-based state scheduler: tick loop, 7 node type handlers, checkpoints, reducers, goto, breakpoints, deferred nodes, reconciliation |
| `state.zig` | Unified state model: 7 reducer types (last_value, append, merge, add, min, max, add_messages), overwrite bypass, ephemeral keys, state path resolution |
| `sse.zig` | Server-Sent Events hub: per-run event queues, 5 stream modes (values, updates, tasks, debug, custom) |
| `dispatch.zig` | Worker selection (tags, capacity, A2A preference), protocol-aware dispatch |
| `async_dispatch.zig` | Thread-safe response queue for async MQTT/Redis dispatch (keyed by correlation_id) |
| `redis_client.zig` | Hiredis wrapper: connect, XADD, listener thread for response streams |
| `mqtt_client.zig` | Libmosquitto wrapper: connect, publish, subscribe, listener thread for response topics |
| `templates.zig` | Prompt template rendering: `{{input.X}}`, `{{steps.ID.output}}`, `{{item}}`, `{{task.X}}`, `{{debate_responses}}`, `{{chat_history}}`, `{{role}}` |
| `templates.zig` | Prompt template rendering: state-based `{{state.X}}`, legacy `{{input.X}}`, `{{item}}`, `{{task.X}}`, `{{attempt}}`, conditional blocks |
| `callbacks.zig` | Fire-and-forget webhook callbacks on step/run events |
| `config.zig` | JSON config loader (`Config`, `WorkerConfig`, `EngineConfig`, `TrackerConfig`) |
| `types.zig` | `RunStatus`, `StepStatus`, `StepType` (14 types), `WorkerStatus`, `TrackerTaskState`, row types |
| `types.zig` | `RunStatus`, `StepStatus`, `StepType` (7 types), `WorkerStatus`, `ReducerType`, row types |
| `tracker.zig` | Pull-mode tracker thread: poll NullTickets, claim tasks, heartbeat leases, stall detection |
| `tracker_client.zig` | HTTP client for NullTickets API (claim, heartbeat, transition, fail, artifacts) |
| `workspace.zig` | Workspace lifecycle: create, hook execution, cleanup, path sanitization |
| `subprocess.zig` | NullClaw subprocess: spawn, health check, prompt sending, kill |
| `workflow_loader.zig` | Load JSON workflow definitions from `workflows/` directory |
| `workflow_loader.zig` | Load JSON workflow definitions from `workflows/` directory, hot-reload watcher |
| `workflow_validation.zig` | Graph-based workflow validation: reachability, cycles, state key refs, route/send targets |
| `ids.zig` | UUID v4 generation, `nowMs()` |
| `migrations/001_init.sql` | 6 tables: workers, runs, steps, step_deps, events, artifacts |
| `migrations/002_advanced_steps.sql` | 3 tables: cycle_state, chat_messages, saga_state + ALTER TABLE |
| `metrics.zig` | Prometheus-style metrics counters |
| `strategy.zig` | Pluggable strategy map for workflow execution |
| `worker_protocol.zig` | Protocol-specific request body builders |
| `worker_response.zig` | Protocol-specific response parsers |
| `export_manifest.zig` | Export tool manifest for CLI integration |
| `from_json.zig` | Import workflow from JSON CLI command |

## Build / Test / Run

```sh
zig build # build
zig build test # unit tests
zig build test # unit tests (320 tests)
zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock workers)
./zig-out/bin/nullboiler --port 8080 --db nullboiler.db --config config.json
```

## Step Types (7)

`task`, `route`, `interrupt`, `agent`, `send`, `transform`, `subgraph`

## Reducers (7)

`last_value`, `append`, `merge`, `add`, `min`, `max`, `add_messages`

## API Endpoints

| Method | Path | Description |
|--------|------|-------------|
| GET | `/health` | Health check |
| GET | `/metrics` | Prometheus metrics |
| POST | `/workers` | Register worker |
| GET | `/workers` | List workers |
| DELETE | `/workers/{id}` | Remove worker |
| POST | `/runs` | Create workflow run |
| GET | `/runs` | List runs |
| POST | `/runs` | Create workflow run (legacy step-array or graph format) |
| GET | `/runs` | List runs (supports ?status= filter) |
| GET | `/runs/{id}` | Get run details |
| POST | `/runs/{id}/cancel` | Cancel run |
| POST | `/runs/{id}/retry` | Retry failed run |
| POST | `/runs/{id}/resume` | Resume interrupted run (with optional state updates) |
| POST | `/runs/{id}/state` | Inject state into running run (pending injection) |
| POST | `/runs/{id}/replay` | Replay run from a checkpoint |
| POST | `/runs/fork` | Fork run from a checkpoint into a new run |
| GET | `/runs/{id}/steps` | List steps for run |
| GET | `/runs/{id}/steps/{step_id}` | Get step details |
| POST | `/runs/{id}/steps/{step_id}/approve` | Approve approval step |
| POST | `/runs/{id}/steps/{step_id}/reject` | Reject approval step |
| GET | `/runs/{id}/events` | List run events |
| POST | `/runs/{id}/steps/{step_id}/signal` | Signal a waiting step |
| GET | `/runs/{id}/steps/{step_id}/chat` | Get group_chat transcript |
| GET | `/tracker/status` | Pull-mode tracker status (running tasks, concurrency, counters) |
| GET | `/runs/{id}/checkpoints` | List checkpoints for run |
| GET | `/runs/{id}/checkpoints/{cpId}` | Get checkpoint details |
| GET | `/runs/{id}/stream` | SSE stream (supports ?mode=values\|updates\|tasks\|debug) |
| POST | `/workflows` | Create workflow definition |
| GET | `/workflows` | List workflow definitions |
| GET | `/workflows/{id}` | Get workflow definition |
| PUT | `/workflows/{id}` | Update workflow definition |
| DELETE | `/workflows/{id}` | Delete workflow definition |
| POST | `/workflows/{id}/validate` | Validate workflow definition |
| GET | `/workflows/{id}/mermaid` | Export workflow as Mermaid diagram |
| POST | `/workflows/{id}/run` | Start a run from a stored workflow |
| GET | `/rate-limits` | Get current rate limit info per worker |
| POST | `/admin/drain` | Enable drain mode |
| GET | `/tracker/status` | Pull-mode tracker status |
| GET | `/tracker/tasks` | List running pull-mode tasks |
| GET | `/tracker/tasks/{task_id}` | Get single pull-mode task details |

## Step Types

`task`, `fan_out`, `map`, `condition`, `approval`, `reduce`, `loop`, `sub_workflow`, `wait`, `router`, `transform`, `saga`, `debate`, `group_chat`
| GET | `/tracker/stats` | Tracker statistics |
| POST | `/tracker/refresh` | Force tracker poll |
| POST | `/internal/agent-events/{run_id}/{step_id}` | Agent event callback (from NullClaw) |

## Coding Conventions

Expand All @@ -83,16 +111,47 @@ zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock wo

## Architecture

- Single-threaded HTTP accept loop on main thread
- Background engine thread polls DB for active runs (+ polls async response queue for MQTT/Redis steps)
- `std.atomic.Value(bool)` for coordinated shutdown
- Config workers seeded into DB on startup (source = "config")
- Schema in `migrations/001_init.sql` + `002_advanced_steps.sql`, applied on `Store.init`
- Graph cycles: condition/router can route back to completed steps, engine creates new step instances per iteration
- Worker handoff: dispatch result can include `handoff_to` for chained delegation (max 5)
- Async dispatch: MQTT/Redis workers use two-phase dispatch (publish → engine polls response queue)
- Background listener threads (MQTT/Redis) started conditionally when async workers are configured
- Pull-mode tracker thread (conditional): polls NullTickets for tasks, claims work, manages subprocess lifecycles
- **Unified state model**: Every node reads from state, returns partial updates, engine applies reducers
- **Graph-based execution**: Workflow = `{nodes: {}, edges: [], state_schema: {}}` with `__start__` and `__end__` synthetic nodes
- **Checkpoints**: State snapshot after every node, enabling fork/replay/resume
- **Conditional edges**: Route nodes produce values, edges like `["router:yes", "next"]` are taken when route result matches
- **Deferred nodes**: Nodes with `"defer": true` execute right before `__end__`
- **Command primitive**: Workers can return `{"goto": "node_name"}` to override normal graph traversal
- **Breakpoints**: `interrupt_before` / `interrupt_after` arrays pause execution
- **Subgraph**: Inline child workflow execution with input/output mapping (max recursion depth 10)
- **Multi-turn agents**: Agent nodes can loop with `continuation_prompt` up to `max_turns`
- **Configurable runs**: Per-run config stored as `state.__config`
- **Node-level cache**: FNV hash of (node_name, rendered_prompt) with configurable TTL
- **Token accounting**: Cumulative input/output token tracking per step and per run
- **Workflow hot-reload**: `WorkflowWatcher` polls `workflows/` directory for JSON changes, upserts into DB
- **Reconciliation**: Check NullTickets task status between steps, cancel if task is terminal

### Thread Model

```
Main thread: HTTP accept loop (push API)
Engine thread: Graph tick loop (state-based scheduler)
Tracker thread: Poll NullTickets -> claim -> workspace -> subprocess/dispatch
MQTT listener: (conditional, for async MQTT workers)
Redis listener: (conditional, for async Redis workers)
```

### SSE Streaming

5 modes for real-time consumption via `GET /runs/{id}/stream?mode=X`:
- `values` -- full state after each step
- `updates` -- node name + partial state updates
- `tasks` -- task start/finish with metadata
- `debug` -- everything with step number + timestamp
- `custom` -- user-defined events from worker output (`ui_messages`, `stream_messages`)

## Database

SQLite with WAL mode. Schema across 4 migrations:
- `001_init.sql`: workers, runs, steps, step_deps, events, artifacts
- `002_advanced_steps.sql`: cycle_state, chat_messages, saga_state (legacy, unused by current engine)
- `003_tracker.sql`: tracker_runs
- `004_orchestration.sql`: workflows, checkpoints, agent_events, pending_state_injections, node_cache, pending_writes + ALTER TABLE extensions for state_json, config_json, parent_run_id, token accounting

## Pull-Mode (NullTickets Integration)

Expand Down Expand Up @@ -131,27 +190,3 @@ Optional pull-mode where NullBoiler acts as an agent polling NullTickets for wor
```

If `tracker` is absent or null, the tracker thread does not start and push-mode operates unchanged.

### Workflow Definitions

JSON files in `workflows/` directory. Two execution modes:
- `subprocess` — spawn NullClaw child process per task (isolated workspace)
- `dispatch` — use existing registered workers (no workspace)

Three-axis concurrency: global (`max_concurrent_tasks`) + per-pipeline + per-role limits.

### Thread Model

```
Main thread: HTTP accept loop (push API — unchanged)
Engine thread: DAG tick loop (unchanged)
Tracker thread: Poll NullTickets → claim → workspace → subprocess/dispatch
MQTT listener: (unchanged, conditional)
Redis listener: (unchanged, conditional)
```

## Database

SQLite with WAL mode. Schema: 9 tables across 2 migrations.
- `001_init.sql`: workers, runs, steps, step_deps, events, artifacts
- `002_advanced_steps.sql`: cycle_state, chat_messages, saga_state + iteration_index/child_run_id columns on steps
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2026 nullclaw contributors

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ This keeps the architecture modular, simpler to reason about, and easier to evol

See additional integration docs in [`docs/`](./docs).

## Workflow Graph Features

The orchestration graph runtime supports:

- `task`, `agent`, `route`, `interrupt`, `send`, `transform`, and `subgraph` nodes
- run replay, checkpoint forking, breakpoint interrupts, and post-start state injection
- `send` fan-out with canonical `items_key` and configurable `output_key`
- task/agent output shaping via `output_key` and `output_mapping`
- template access to `state.*`, `input.*`, `item.*`, `config.*`, and `store.<namespace>.<key>`
- `transform.store_updates` for writing durable workflow memory back to NullTickets

Store-backed templates and `store_updates` require a NullTickets base URL. The
runtime resolves it from workflow fields such as `tracker_url` or from run config
(`config.tracker_url` / `config.tracker_api_token`), which are injected into
state as `__config`.

## Config Location

- Default config path: `~/.nullboiler/config.json`
Expand Down
9 changes: 9 additions & 0 deletions config.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"port": 8080,
"db": "nullboiler.db",
"api_token": null,
"self_url": null,
"workers": [
{
"id": "nullclaw-1",
Expand All @@ -28,6 +29,14 @@
"model": "anthropic/claude-sonnet-4-6",
"tags": ["writer", "editor"],
"max_concurrent": 2
},
{
"id": "nullclaw-a2a",
"url": "http://localhost:3000",
"token": "set_same_value_as_nullclaw_gateway_paired_tokens",
"protocol": "a2a",
"tags": ["coder", "agent"],
"max_concurrent": 3
}
],
"engine": {
Expand Down
Loading
Loading