Graph-based workflow orchestration engine#1
Merged
Conversation
Add CRUD methods to Store for all new tables from migration 004: - Workflow methods: create, get, list, update, delete - Checkpoint methods: create, get, list, getLatest - Agent event methods: create, listByRunStep - Run state methods: updateRunState, incrementCheckpointCount, createRunWithState, createForkedRun - Pending injection methods: create, consume, discard Also adds PendingInjectionRow to types.zig and fixes migration 004 to preserve step_deps/cycle_state/saga_state tables until the engine rewrite (Task 8) removes their usage. Includes comprehensive tests for all new methods.
Replace the old 14-step-type engine with a graph-based state model using 6 node types (task, route, interrupt, agent, send, transform). The engine now processes workflows as a DAG with edges, applying state updates through reducers and saving checkpoints after each node. Key changes: - processRun loops finding ready nodes until no more progress - findReadyNodes with dead-node detection for conditional routing - State flows through applyUpdates/reducers instead of templates - Route nodes use conditional edges (source:value) for branching - Interrupt nodes pause the run with checkpoint preservation - Transform nodes apply static updates without worker dispatch - Send nodes dispatch target_node per item from state array Also adds state_json to RunRow and updates store queries to include it.
Add workflow CRUD (create/list/get/update/delete/validate/run), checkpoint endpoints (list/get), state control (resume/fork/inject), SSE stream snapshot, and agent events callback. Update handleGetRun with state_json and checkpoint_count. Add SSE hub cleanup to cancel. Remove old signal endpoint (replaced by state inject).
- Add AgentOpts struct and dispatchStepWithOpts() in dispatch.zig so webhook bodies for agent steps can include mode, callback_url, max_iterations, tools, and state fields - Wire SseHub into main.zig: create instance on startup and set ctx.sse_hub for every request handler - Add TODO in tracker.zig pollAndClaim noting workflow format update needed when nulltickets schema changes land (Task 14)
Add native support for nullclaw's Agent-to-Agent protocol (JSON-RPC 2.0 over /a2a endpoint) as a new worker protocol. Agent nodes now prefer A2A-protocol workers for dispatch, falling back to other protocols when no A2A worker is available. - Add 'a2a' variant to worker_protocol.Protocol enum with URL builder that appends /a2a to the worker base URL - Add buildA2aRequestBody() producing tasks/send JSON-RPC requests with contextId for session persistence - Add parseA2aResponse() extracting text from result.artifacts with proper error handling for JSON-RPC errors and failed task status - Route A2A responses through dedicated parser instead of generic worker_response.parse - Engine's executeTaskNode prefers A2A workers for agent-type nodes - All existing webhook/api_chat/openai_chat paths remain unchanged
…, managed values Gap 2: Per-node retry with exponential backoff (max_attempts, initial_interval_ms, backoff_factor, max_interval_ms) Gap 3: Per-node cache with TTL (node_cache table, FNV hash cache keys, skip dispatch on hit) Gap 4: Pending writes table for parallel execution resilience Gap 5: Overwrite bypass (__overwrite: true) skips reducer in applyUpdates Gap 6: Deferred nodes (defer: true) execute just before __end__ Gap 7: Managed values (__meta with step, is_last_step, remaining_steps, run_id, node_name)
Gap 8: Extend SseEvent with StreamMode enum (values, updates, tasks,
debug, custom). Engine broadcasts multi-mode events after each node
execution. Stream endpoint accepts ?mode= query param to filter.
Gap 9: Workflow version tracking in checkpoint metadata. On resume,
detect version mismatch and migrate completed_nodes by filtering
out nodes that no longer exist in the new workflow definition.
Store functions support versioned workflow CRUD.
Gap 10: POST /runs/{id}/replay endpoint resets run state to a
checkpoint and marks it as running for the engine to pick up.
Validates checkpoint belongs to the target run.
Gap 1: WorkflowWatcher in workflow_loader.zig periodically scans
workflows_dir for changed JSON files (FNV1a hash comparison) and
upserts into the workflows table. Wired into engine tick loop and
main.zig startup.
Gap 2: Token accounting columns on runs/steps (migration 004).
Store methods updateStepTokens/updateRunTokens accumulate usage.
Engine extracts usage from worker responses and records per-step
and per-run totals. GET /runs/{id} includes token fields.
Gap 3: In-memory rate_limits map on Engine, populated from worker
response rate_limit objects. GET /rate-limits endpoint returns
current rate limit info for all workers.
Gap 4: Agent multi-turn loop checks turn_timeout_ms from node
config. If elapsed time exceeds the timeout, the loop stops and
uses the last successful response.
Gap 5: Add startupCleanup() to Tracker for workspace cleanup on start.
Gap 6: Add validateWorkspacePath() and sanitizeDirectoryName for symlink
escape prevention; validate paths before workspace operations.
Gap 7: Add OrchestratorEvent struct with typed events; emit structured
events at run/step lifecycle points and broadcast via SseHub.
Gap 8: Add validateConfig() check per engine tick; skip dispatch when
no workers registered or store is unhealthy.
Remove backward-compatibility cruft from the pre-orchestration architecture: Dead types removed: - ChatMessageRow, SagaStateRow from types.zig Dead store methods removed: - getCycleState, upsertCycleState (cycle_state table) - insertChatMessage, getChatMessages (chat_messages table) - insertSagaState, updateSagaState, getSagaStates (saga_state table) - getReadySteps, getStepDeps (step_deps-based DAG scheduling) - setStepStartedAt (wait step timer tracking) Dead template features removed: - debate_responses, chat_history, role context fields and resolvers - StepOutput.outputs field (fan_out/map multi-output) - serializeOutputs helper function Dead API handlers removed: - handleApproveStep, handleRejectStep (approval steps -> 410 Gone) - handleGetChatTranscript (group_chat -> removed) Dead validation rules removed: - loop, sub_workflow, wait, router, saga, debate, group_chat step type rules - All associated error variants and tests CLAUDE.md rewritten to reflect current architecture: - 7 step types (not 14) - 7 reducer types - 35+ API endpoints (was 19) - 4 migrations (was 2) - Full module map with all 27 source files - Unified state model, checkpoints, SSE streaming docs
Engine used "schema" key to look up state schema from workflow JSON, but the API and validation modules use "state_schema". This caused reducers to silently fall back to last_value for all API-created runs. Add getSchemaJson() helper that checks both "state_schema" (canonical) and "schema" (fallback for inline test workflows). Remove dead code: - isNodeDeferred (unused, collectDeferredNodes used instead) - getNodeFieldBool (only caller was isNodeDeferred) - mergeWorkflowVersionIntoMeta (unused, serializeRouteResultsWithVersion handles this) - InvalidNumber error set declaration (error.InvalidNumber works via implicit sets) - _field_name unused parameter in checkStateRefs
Test count is 322 after dead code removal. The canonical schema key in workflow definitions is "state_schema", not "schema".
Pending state injections consumed between agent continuation turns
were discarded (assigned to _ and never applied). This caused
injections submitted via POST /runs/{id}/state during multi-turn
agent execution to be silently lost.
Re-save consumed injections so they are properly applied after the
full agent node completes its multi-turn loop.
This handler was defined but never wired to any API route. The approval/signal step concept was replaced by interrupt + resume. The legacy approve/reject routes already return 410 Gone.
- getActiveRuns no longer queries for 'paused' status which doesn't exist as a valid RunStatus enum value. Only 'running' is queried. - Worker registration error message now lists all 6 supported protocols (was missing mqtt, redis_stream, a2a).
No backward compatibility needed. These endpoints were deprecated in favor of interrupt + resume pattern. Remove the routes, their test, and the now-unused seg5 path segment.
The approval step test was a leftover from the removed approve/reject endpoint. No backward compatibility needed.
The body of `for (ready_nodes) |node_name|` was at the same indent level as the `for` statement itself, making it look like the code was outside the loop. Re-indent the ~420 lines to proper nesting.
Extract max_nodes_per_tick (1000) and max_subgraph_depth (10) into module-level constants for clarity and single-point-of-change.
Use cmd.exe instead of /bin/sh on Windows. Skip shell-dependent tests on Windows since hook commands are Unix shell syntax.
Replace std.Thread.sleep in retry loop with non-blocking retry scheduling. Instead of blocking the engine thread, failed retry attempts now create a step record with next_attempt_at_ms and return control to the tick loop. Future ticks check the timestamp and re-execute when the delay has elapsed. Also fix stale checkpoint parent_id: latest_checkpoint was fetched once at processRunWithDepth start, causing all checkpoints within a tick to point to the same parent. Introduce latest_checkpoint_id that updates after each checkpoint creation for correct chaining.
Remove the explicit ctx.allocator.free(sse_events) call which was misleading since ctx.allocator is a per-request arena where free is a no-op. Add comment explaining why inner strings (event_type, data) must not be freed here as they originate from the engine's per-tick arena, not the request arena.
validateConfig runs listWorkers + getActiveRuns every 200ms tick. Cache the result with a timestamp, only re-validate every 30s. On validation failure, immediately invalidate the cache so the next tick re-checks.
executeSendNode was calling store.listWorkers and building worker_infos for every item in the send array. Move both the worker list fetch and worker_infos construction before the loop since the worker list doesn't change between items.
Previously, run was created with 'pending' status then updated to 'running' in a separate operation. The engine could miss the run between these two DB operations. Add createRunWithStateAndStatus to store and use it in handleRunWorkflow to create the run directly with 'running' status in a single INSERT.
When replaying from a checkpoint, steps and checkpoints created after the replay point remained in the DB, causing stale data. Add deleteStepsAfterTimestamp and deleteCheckpointsAfterVersion to the store, and call both in handleReplayRun before resetting the run state.
workflow_json was parsed independently by ~15 helper calls per tick. Parse it once at the top of processRunWithDepth and pre-extract the state schema. Replace all getSchemaJson(alloc, workflow_json) calls within the function with the cached value, eliminating ~9 redundant JSON parses per node execution.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Complete orchestration engine with graph-based workflow execution and unified state model.
Core Engine
{nodes, edges, state_schema}with__start__/__end__synthetic nodes{"goto": "node_name"}__end__regardless of graph positioninterrupt_before/interrupt_afterarrays pause execution for human reviewUnified State Model
{"__overwrite": true, "value": ...}skips reducer__metainjected before each node (step count, run_id, node_name, remaining_steps)Checkpoints & Time Travel
Streaming
Worker Dispatch
Operations
workflows/directory for JSON changesPull-Mode (NullTickets Integration)
Infrastructure