diff --git a/AGENTS.md b/AGENTS.md index b720aa1..dedce25 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -18,7 +18,12 @@ Primary modules: - `src/store.zig` - SQLite access, transactions, migrations, ownership/free helpers - `src/domain.zig` - pipeline FSM parse/validation/transition logic - `src/ids.zig` - UUID/token/hash/time helpers +- `src/config.zig` - config loading and resolution +- `src/export_manifest.zig` - nullhub manifest export +- `src/from_json.zig` - JSON config bootstrap - `src/migrations/001_init.sql` - schema +- `src/migrations/003_store.sql` - KV store table +- `src/migrations/004_store_fts.sql` - FTS5 search index Baseline commands: diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..bc38dea --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 nullclaw contributors + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md index 0d99457..f18587b 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ stages, and attach artifacts. - `nullTickets` (this repository) is responsible for durable task state: - pipelines, stages, transitions - runs, leases, events, artifacts - - dependencies, quality gates, assignments + - dependencies, assignments - idempotent writes and optimistic transition checks - `nullTickets` is intentionally orchestration-light: - it does not decide global scheduling strategy @@ -40,7 +40,7 @@ Practical architecture: You do not have to use all three components. - `nullclaw` + `nullTickets` is a valid setup for sequential execution. -- `nullTickets` can be used with other agent runtimes as long as they implement the tracker contract (`claim -> events/gates -> transition/fail`). +- `nullTickets` can be used with other agent runtimes as long as they implement the tracker contract (`claim -> events -> transition/fail`). - `nullboiler` is optional and is mainly needed for advanced multi-agent orchestration. ## Adoption Path @@ -87,10 +87,15 @@ bash tests/test_e2e.sh - `src/main.zig` - process entrypoint, argument parsing, socket accept loop - `src/api.zig` - HTTP routing, request validation, response serialization -- `src/store.zig` - SQLite queries, transactions, ownership/free helpers +- `src/store.zig` - SQLite queries, transactions, migrations, ownership/free helpers - `src/domain.zig` - pipeline FSM parsing and validation - `src/ids.zig` - UUID/token/hash/time helpers +- `src/config.zig` - config loading and resolution +- `src/export_manifest.zig` - nullhub manifest export +- `src/from_json.zig` - JSON config bootstrap - `src/migrations/001_init.sql` - database schema +- `src/migrations/003_store.sql` - KV store table +- `src/migrations/004_store_fts.sql` - FTS5 full-text search index - `tests/test_e2e.sh` - end-to-end API flow ## API Surface @@ -116,15 +121,26 @@ bash tests/test_e2e.sh | `DELETE` | `/tasks/{id}/assignments/{agent_id}` | Unassign task | | `POST` | `/leases/claim` | Claim next task by role | | `POST` | `/leases/{id}/heartbeat` | Extend lease | +| `GET` | `/tasks/{id}/run-state` | Get task run_id | | `POST` | `/runs/{id}/events` | Append run event | | `GET` | `/runs/{id}/events?limit=&cursor=` | List run events (cursor paginated) | -| `POST` | `/runs/{id}/gates` | Add quality gate result | -| `GET` | `/runs/{id}/gates` | List quality gate results | | `POST` | `/runs/{id}/transition` | Move task to next stage | | `POST` | `/runs/{id}/fail` | Mark run as failed | | `POST` | `/artifacts` | Attach artifact | | `GET` | `/artifacts?task_id=&run_id=&limit=&cursor=` | List artifacts (cursor paginated) | | `GET` | `/ops/queue` | Per-role queue stats for orchestrator | +| `PUT` | `/store/{namespace}/{key}` | Put KV store entry | +| `GET` | `/store/{namespace}/{key}` | Get KV store entry | +| `DELETE` | `/store/{namespace}/{key}` | Delete KV store entry | +| `GET` | `/store/{namespace}` | List entries in namespace | +| `DELETE` | `/store/{namespace}` | Delete all entries in namespace | +| `GET` | `/store/search?q=&namespace=&limit=&filter_path=&filter_value=` | Full-text search store | + +### Store API Notes + +- Store path segments are URL-decoded by the server. Clients should percent-encode reserved characters in `namespace` and `key` (for example spaces or `/`). +- The namespace name `search` is reserved for `GET /store/search` and cannot be listed via `GET /store/{namespace}`. +- `GET /store/search` also supports exact JSON filtering with `filter_path` and `filter_value` in addition to FTS search. ## Agent Loop @@ -134,7 +150,6 @@ POST /leases/claim { agent_id, agent_role, lease_ttl_ms? } -> 204 (no work) POST /runs/{run_id}/events (Bearer ) -POST /runs/{run_id}/gates (Bearer ) POST /runs/{run_id}/transition (Bearer ) POST /runs/{run_id}/fail (Bearer ) diff --git a/deps/sqlite/build.zig b/deps/sqlite/build.zig index f746702..24898fa 100644 --- a/deps/sqlite/build.zig +++ b/deps/sqlite/build.zig @@ -14,6 +14,7 @@ pub fn build(b: *std.Build) void { }); lib.root_module.addCSourceFile(.{ .file = b.path("sqlite3.c"), + .flags = &.{"-DSQLITE_ENABLE_FTS5"}, }); lib.installHeader(b.path("sqlite3.h"), "sqlite3.h"); lib.installHeader(b.path("sqlite3ext.h"), "sqlite3ext.h"); diff --git a/docs/api.md b/docs/api.md index 4debd25..aa20895 100644 --- a/docs/api.md +++ b/docs/api.md @@ -32,12 +32,6 @@ OTLP attribute mapping keys: - `GET /pipelines` - `GET /pipelines/{id}` -Pipeline transitions support `required_gates`: - -```json -{ "from": "coding", "to": "review", "trigger": "complete", "required_gates": ["tests_passed"] } -``` - ## Tasks - `POST /tasks` @@ -71,8 +65,6 @@ Pipeline transitions support `required_gates`: - `POST /runs/{id}/events` (Bearer) - `GET /runs/{id}/events?limit=&cursor=` -- `POST /runs/{id}/gates` (Bearer) -- `GET /runs/{id}/gates` - `POST /runs/{id}/transition` (Bearer) - `POST /runs/{id}/fail` (Bearer) @@ -86,7 +78,6 @@ Pipeline transitions support `required_gates`: Transition returns `409` when: -- required gates are not passed - `expected_stage` does not match - `expected_task_version` does not match @@ -95,6 +86,21 @@ Transition returns `409` when: - `POST /artifacts` - `GET /artifacts?task_id=&run_id=&limit=&cursor=` +## Store (KV) + +- `PUT /store/{namespace}/{key}` with `{ "value": ... }` +- `GET /store/{namespace}/{key}` +- `DELETE /store/{namespace}/{key}` +- `GET /store/{namespace}` (list entries) +- `DELETE /store/{namespace}` (delete all entries in namespace) +- `GET /store/search?q=&namespace=&limit=&filter_path=&filter_value=` (FTS5 full-text search) + +Notes: + +- `namespace` and `key` path segments are URL-decoded server-side, so clients should percent-encode reserved characters such as spaces or `/`. +- `search` is a reserved namespace name because `GET /store/search` is the full-text search endpoint. +- `filter_path` and `filter_value` apply an exact JSON filter on top of FTS results. + ## Ops - `GET /ops/queue?near_expiry_ms=&stuck_ms=` diff --git a/docs/architecture.md b/docs/architecture.md index a1f68ae..8c7bdd1 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -18,8 +18,8 @@ In scope: - Lease ownership and retries - Run events and artifacts - Task dependencies (DAG) -- Quality gate evidence and enforcement - Optional task assignments +- Key-value store with full-text search - Orchestrator-facing queue metrics (`/ops/queue`) Out of scope: @@ -44,8 +44,8 @@ Out of scope: - `events`: append-only run timeline - `artifacts`: task/run outputs - `task_dependencies`: DAG edges (`task -> depends_on_task`) -- `gate_results`: pass/fail evidence per run and gate - `task_assignments`: optional explicit owner binding for agents +- `store`: namespaced key-value entries with FTS5 search - `idempotency_keys`: deduplication store for write retries ## Execution Semantics @@ -53,17 +53,15 @@ Out of scope: 1. Agent claims work using role (`/leases/claim`). 2. Service starts a new run and grants a lease token. 3. Agent sends events and periodic heartbeats. -4. Agent (or orchestrator) may submit gate results for run quality checks. -5. Agent either transitions the run to the next stage or fails it. -6. Transition can require gate pass state and optimistic checks (`expected_stage`, `expected_task_version`). -7. Failures apply retry policy and optional dead-letter routing. -8. Lease is released on transition/failure or expires automatically. +4. Agent either transitions the run to the next stage or fails it. +5. Transition can enforce optimistic checks (`expected_stage`, `expected_task_version`). +6. Failures apply retry policy and optional dead-letter routing. +7. Lease is released on transition/failure or expires automatically. ## Safety and Correctness - State-changing paths use SQL transactions (`BEGIN IMMEDIATE`) to avoid double-claim races. - Lease tokens are stored as SHA-256 hashes, not plaintext. - Pipeline transitions are validated against declared FSM definitions. -- Required quality gates are enforced server-side on transition. - Claim excludes blocked dependencies, non-eligible retries, and foreign assignments. - API string fields are JSON-escaped at serialization time. diff --git a/docs/workflows.md b/docs/workflows.md index 433fccd..5969825 100644 --- a/docs/workflows.md +++ b/docs/workflows.md @@ -19,10 +19,6 @@ State metadata supports: - `description`: optional description - `terminal`: whether this stage is terminal -Transition metadata supports: - -- `required_gates`: quality gates that must be passed before transition - ## Transition Rules - Every transition must reference existing states. @@ -53,13 +49,6 @@ Transition metadata supports: Dependencies are resolved only when the upstream task reaches a terminal pipeline stage. -## Quality Gates - -- Add gate result: `POST /runs/{id}/gates` -- Inspect gate history: `GET /runs/{id}/gates` - -`/runs/{id}/transition` returns `409` if required gates are not passed. - ## Assignments (Optional) - Assign: `POST /tasks/{id}/assignments` diff --git a/src/api.zig b/src/api.zig index 1e9b7af..6003202 100644 --- a/src/api.zig +++ b/src/api.zig @@ -1,7 +1,6 @@ const std = @import("std"); const store_mod = @import("store.zig"); const Store = store_mod.Store; -const domain = @import("domain.zig"); const ids = @import("ids.zig"); const log = std.log.scoped(.api); @@ -94,17 +93,18 @@ pub fn handleRequest( raw_request: []const u8, ) HttpResponse { const path = parsePath(target); - const seg0 = getPathSegment(path.path, 0); - const seg1 = getPathSegment(path.path, 1); - const seg2 = getPathSegment(path.path, 2); - const seg3 = getPathSegment(path.path, 3); - const seg4 = getPathSegment(path.path, 4); + const seg0 = decodePathSegment(ctx.allocator, getPathSegment(path.path, 0)); + const seg1 = decodePathSegment(ctx.allocator, getPathSegment(path.path, 1)); + const seg2 = decodePathSegment(ctx.allocator, getPathSegment(path.path, 2)); + const seg3 = decodePathSegment(ctx.allocator, getPathSegment(path.path, 3)); + const seg4 = decodePathSegment(ctx.allocator, getPathSegment(path.path, 4)); const is_get = std.mem.eql(u8, method, "GET"); const is_post = std.mem.eql(u8, method, "POST"); + const is_put = std.mem.eql(u8, method, "PUT"); const is_delete = std.mem.eql(u8, method, "DELETE"); - const is_write = is_post or is_delete; + const is_write = is_post or is_delete or is_put; const request_token = extractBearerToken(raw_request); if (!isAuthorized(ctx, seg0, seg1, seg2, request_token)) { @@ -199,6 +199,11 @@ pub fn handleRequest( return response; } + if (is_get and seg1 != null and eql(seg2, "run-state") and seg3 == null) { + response = handleGetTaskRunState(ctx, seg1.?); + return response; + } + if (seg1 != null and eql(seg2, "dependencies")) { if (is_post and seg3 == null) { response = handleAddTaskDependency(ctx, seg1.?, body); @@ -248,14 +253,6 @@ pub fn handleRequest( response = handleListEvents(ctx, seg1.?, path.query); return response; } - if (is_post and eql(seg2, "gates")) { - response = handleAddGateResult(ctx, seg1.?, body, raw_request); - return finalizeWithIdempotency(ctx, method, path.path, idempotency, response); - } - if (is_get and eql(seg2, "gates")) { - response = handleListGateResults(ctx, seg1.?); - return response; - } if (is_post and eql(seg2, "transition")) { response = handleTransition(ctx, seg1.?, body, raw_request); return finalizeWithIdempotency(ctx, method, path.path, idempotency, response); @@ -283,6 +280,36 @@ pub fn handleRequest( return response; } + // Store (KV) + // NOTE: "search" is a reserved namespace — GET /store/search is the search endpoint, + // so a namespace literally named "search" cannot be listed via GET /store/{namespace}. + if (eql(seg0, "store") and seg1 != null) { + if (is_get and eql(seg1, "search") and seg2 == null) { + response = handleStoreSearch(ctx, path.query); + return response; + } + if (is_put and seg2 != null and seg3 == null) { + response = handleStorePut(ctx, seg1.?, seg2.?, body); + return finalizeWithIdempotency(ctx, method, path.path, idempotency, response); + } + if (is_get and seg2 != null and seg3 == null) { + response = handleStoreGet(ctx, seg1.?, seg2.?); + return response; + } + if (is_get and seg2 == null) { + response = handleStoreList(ctx, seg1.?); + return response; + } + if (is_delete and seg2 != null and seg3 == null) { + response = handleStoreDelete(ctx, seg1.?, seg2.?); + return finalizeWithIdempotency(ctx, method, path.path, idempotency, response); + } + if (is_delete and seg2 == null) { + response = handleStoreDeleteNamespace(ctx, seg1.?); + return finalizeWithIdempotency(ctx, method, path.path, idempotency, response); + } + } + return response; } @@ -736,92 +763,26 @@ fn handleListTasks(ctx: *Context, query: ?[]const u8) HttpResponse { } fn handleGetTask(ctx: *Context, id: []const u8) HttpResponse { - const task = (ctx.store.getTask(id) catch return serverError(ctx.allocator)) orelse { + const details = (ctx.store.getTaskDetails(id) catch return serverError(ctx.allocator)) orelse { return respondError(ctx.allocator, 404, "not_found", "Task not found"); }; - defer ctx.store.freeTaskRow(task); - - // Get pipeline definition for available transitions - const pipeline = ctx.store.getPipeline(task.pipeline_id) catch null; - defer if (pipeline) |p| ctx.store.freePipelineRow(p); - const latest_run = ctx.store.getLatestRun(id) catch null; - defer if (latest_run) |r| ctx.store.freeRunRow(r); - const dependencies = ctx.store.listTaskDependencies(id) catch return serverError(ctx.allocator); - defer ctx.store.freeDependencyRows(dependencies); - const assignments = ctx.store.listTaskAssignments(id) catch return serverError(ctx.allocator); - defer ctx.store.freeAssignmentRows(assignments); + defer ctx.store.freeTaskDetails(details); var buf: std.ArrayListUnmanaged(u8) = .empty; var w = buf.writer(ctx.allocator); w.writeAll("{") catch return serverError(ctx.allocator); - writeTaskJsonFields(&w, ctx.allocator, task) catch return serverError(ctx.allocator); + writeTaskJsonFields(&w, ctx.allocator, details.task) catch return serverError(ctx.allocator); // Latest run - if (latest_run) |run| { + if (details.latest_run) |run| { w.writeAll(",\"latest_run\":{") catch return serverError(ctx.allocator); writeRunFields(&w, ctx.allocator, run) catch return serverError(ctx.allocator); w.writeAll("}") catch return serverError(ctx.allocator); } - w.writeAll(",\"dependencies\":[") catch return serverError(ctx.allocator); - for (dependencies, 0..) |dep, i| { - if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); - w.writeAll("{") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "depends_on_task_id", dep.depends_on_task_id) catch return serverError(ctx.allocator); - w.print(",\"resolved\":{s}", .{if (dep.resolved) "true" else "false"}) catch return serverError(ctx.allocator); - w.writeAll("}") catch return serverError(ctx.allocator); - } - w.writeAll("]") catch return serverError(ctx.allocator); - - w.writeAll(",\"assignments\":[") catch return serverError(ctx.allocator); - for (assignments, 0..) |a, i| { - if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); - w.writeAll("{") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "agent_id", a.agent_id) catch return serverError(ctx.allocator); - w.writeAll(",") catch return serverError(ctx.allocator); - writeNullableStringField(&w, ctx.allocator, "assigned_by", a.assigned_by) catch return serverError(ctx.allocator); - w.print(",\"active\":{s},\"created_at_ms\":{d},\"updated_at_ms\":{d}", .{ - if (a.active) "true" else "false", - a.created_at_ms, - a.updated_at_ms, - }) catch return serverError(ctx.allocator); - w.writeAll("}") catch return serverError(ctx.allocator); - } - w.writeAll("]") catch return serverError(ctx.allocator); - - // Available transitions - if (pipeline) |pip| { - var parsed_pipeline = domain.parseAndValidate(ctx.allocator, pip.definition_json) catch { - w.writeAll(",\"available_transitions\":[]") catch return serverError(ctx.allocator); - w.writeAll("}") catch return serverError(ctx.allocator); - return .{ .status = "200 OK", .body = buf.items }; - }; - defer parsed_pipeline.deinit(); - - const transitions = domain.getAvailableTransitions(ctx.allocator, parsed_pipeline.value, task.stage) catch &.{}; - w.writeAll(",\"available_transitions\":[") catch return serverError(ctx.allocator); - for (transitions, 0..) |t, i| { - if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); - w.writeAll("{") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "trigger", t.trigger) catch return serverError(ctx.allocator); - w.writeAll(",") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "to", t.to) catch return serverError(ctx.allocator); - w.writeAll(",\"required_gates\":") catch return serverError(ctx.allocator); - if (t.required_gates) |required_gates| { - w.writeAll("[") catch return serverError(ctx.allocator); - for (required_gates, 0..) |gate, gi| { - if (gi > 0) w.writeAll(",") catch return serverError(ctx.allocator); - const gate_json = quoteJson(ctx.allocator, gate) catch return serverError(ctx.allocator); - w.writeAll(gate_json) catch return serverError(ctx.allocator); - } - w.writeAll("]") catch return serverError(ctx.allocator); - } else { - w.writeAll("[]") catch return serverError(ctx.allocator); - } - w.writeAll("}") catch return serverError(ctx.allocator); - } - w.writeAll("]") catch return serverError(ctx.allocator); - } + writeDependencyRows(&w, ctx.allocator, details.dependencies) catch return serverError(ctx.allocator); + writeAssignmentRows(&w, ctx.allocator, details.assignments) catch return serverError(ctx.allocator); + writeTaskTransitions(&w, ctx.allocator, details.available_transitions) catch return serverError(ctx.allocator); w.writeAll("}") catch return serverError(ctx.allocator); return .{ .status = "200 OK", .body = buf.items }; @@ -832,6 +793,9 @@ fn handleClaim(ctx: *Context, body: []const u8) HttpResponse { agent_id: []const u8, agent_role: []const u8, lease_ttl_ms: ?i64 = null, + concurrency: ?struct { + per_state: ?std.json.Value = null, + } = null, }, ctx.allocator, body, .{ .ignore_unknown_fields = true }) catch { return respondError(ctx.allocator, 400, "invalid_json", "Invalid JSON body"); }; @@ -839,7 +803,10 @@ fn handleClaim(ctx: *Context, body: []const u8) HttpResponse { const req = parsed.value; const ttl = req.lease_ttl_ms orelse 300_000; // 5 min default - const result = ctx.store.claimTask(req.agent_id, req.agent_role, ttl) catch |err| { + // Extract per_state concurrency map + const per_state_val: ?std.json.Value = if (req.concurrency) |conc| conc.per_state else null; + + const result = ctx.store.claimTask(req.agent_id, req.agent_role, ttl, per_state_val) catch |err| { log.err("claim failed: {}", .{err}); return serverError(ctx.allocator); }; @@ -981,7 +948,6 @@ fn handleTransition(ctx: *Context, run_id: []const u8, body: []const u8, raw_req error.RunNotFound => respondError(ctx.allocator, 404, "not_found", "Run not found"), error.RunNotRunning => respondError(ctx.allocator, 409, "conflict", "Run is not in running state"), error.InvalidTransition => respondError(ctx.allocator, 400, "invalid_transition", "No valid transition for this trigger from current stage"), - error.RequiredGatesNotPassed => respondError(ctx.allocator, 409, "required_gates_not_passed", "Required quality gates are not passed"), error.ExpectedStageMismatch => respondError(ctx.allocator, 409, "expected_stage_mismatch", "Current task stage does not match expected_stage"), error.TaskVersionMismatch => respondError(ctx.allocator, 409, "task_version_mismatch", "Current task version does not match expected_task_version"), else => serverError(ctx.allocator), @@ -1202,70 +1168,14 @@ fn handleUnassignTask(ctx: *Context, task_id: []const u8, agent_id: []const u8) return .{ .status = "200 OK", .body = "{\"status\":\"unassigned\"}" }; } -fn handleAddGateResult(ctx: *Context, run_id: []const u8, body: []const u8, raw_request: []const u8) HttpResponse { - const token = extractBearerToken(raw_request) orelse { - return respondError(ctx.allocator, 401, "unauthorized", "Missing Authorization header"); - }; - ctx.store.validateLeaseByRunId(run_id, token) catch |err| { - return switch (err) { - error.LeaseNotFound => respondError(ctx.allocator, 404, "not_found", "No active lease for this run"), - error.InvalidToken => respondError(ctx.allocator, 401, "unauthorized", "Invalid token"), - error.LeaseExpired => respondError(ctx.allocator, 410, "expired", "Lease expired"), - else => serverError(ctx.allocator), - }; - }; - - var parsed = std.json.parseFromSlice(struct { - gate: []const u8, - status: []const u8, - evidence: ?std.json.Value = null, - actor: ?[]const u8 = null, - }, ctx.allocator, body, .{ .ignore_unknown_fields = true }) catch { - return respondError(ctx.allocator, 400, "invalid_json", "Invalid JSON body"); - }; - defer parsed.deinit(); - - if (!std.mem.eql(u8, parsed.value.status, "pass") and !std.mem.eql(u8, parsed.value.status, "fail")) { - return respondError(ctx.allocator, 400, "invalid_status", "status must be pass or fail"); - } - - const evidence_json = if (parsed.value.evidence) |e| (jsonStringify(ctx.allocator, e) catch "{}") else "{}"; - const id = ctx.store.addGateResult(run_id, parsed.value.gate, parsed.value.status, evidence_json, parsed.value.actor) catch |err| { - return switch (err) { - error.RunNotFound => respondError(ctx.allocator, 404, "not_found", "Run not found"), - else => serverError(ctx.allocator), - }; - }; - - const resp = std.fmt.allocPrint(ctx.allocator, "{{\"id\":{d}}}", .{id}) catch return serverError(ctx.allocator); - return .{ .status = "201 Created", .body = resp, .status_code = 201 }; -} - -fn handleListGateResults(ctx: *Context, run_id: []const u8) HttpResponse { - const rows = ctx.store.listGateResults(run_id) catch return serverError(ctx.allocator); - defer ctx.store.freeGateResultRows(rows); - - var buf: std.ArrayListUnmanaged(u8) = .empty; - var w = buf.writer(ctx.allocator); - w.writeAll("[") catch return serverError(ctx.allocator); - for (rows, 0..) |row, i| { - if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); - w.writeAll("{") catch return serverError(ctx.allocator); - w.print("\"id\":{d},", .{row.id}) catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "run_id", row.run_id) catch return serverError(ctx.allocator); - w.writeAll(",") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "task_id", row.task_id) catch return serverError(ctx.allocator); - w.writeAll(",") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "gate", row.gate) catch return serverError(ctx.allocator); - w.writeAll(",") catch return serverError(ctx.allocator); - writeStringField(&w, ctx.allocator, "status", row.status) catch return serverError(ctx.allocator); - w.print(",\"evidence\":{s},", .{row.evidence_json}) catch return serverError(ctx.allocator); - writeNullableStringField(&w, ctx.allocator, "actor", row.actor) catch return serverError(ctx.allocator); - w.print(",\"ts_ms\":{d}", .{row.ts_ms}) catch return serverError(ctx.allocator); - w.writeAll("}") catch return serverError(ctx.allocator); +fn handleGetTaskRunState(ctx: *Context, task_id: []const u8) HttpResponse { + const run_id = ctx.store.getTaskRunId(task_id) catch return serverError(ctx.allocator); + if (run_id) |rid| { + defer ctx.store.freeOwnedString(rid); + const resp = std.fmt.allocPrint(ctx.allocator, "{{\"run_id\":\"{s}\"}}", .{rid}) catch return serverError(ctx.allocator); + return .{ .status = "200 OK", .body = resp }; } - w.writeAll("]") catch return serverError(ctx.allocator); - return .{ .status = "200 OK", .body = buf.items }; + return respondError(ctx.allocator, 404, "not_found", "Task has no run_id"); } fn handleQueueOps(ctx: *Context, query: ?[]const u8) HttpResponse { @@ -1391,6 +1301,187 @@ fn writeRunFields(w: anytype, allocator: std.mem.Allocator, r: store_mod.RunRow) } } +fn writeDependencyRows(w: anytype, allocator: std.mem.Allocator, rows: []store_mod.DependencyRow) !void { + try w.writeAll(",\"dependencies\":["); + for (rows, 0..) |dep, i| { + if (i > 0) try w.writeAll(","); + try w.writeAll("{"); + try writeStringField(w, allocator, "depends_on_task_id", dep.depends_on_task_id); + try w.print(",\"resolved\":{s}", .{if (dep.resolved) "true" else "false"}); + try w.writeAll("}"); + } + try w.writeAll("]"); +} + +fn writeAssignmentRows(w: anytype, allocator: std.mem.Allocator, rows: []store_mod.AssignmentRow) !void { + try w.writeAll(",\"assignments\":["); + for (rows, 0..) |assignment, i| { + if (i > 0) try w.writeAll(","); + try w.writeAll("{"); + try writeStringField(w, allocator, "agent_id", assignment.agent_id); + try w.writeAll(","); + try writeNullableStringField(w, allocator, "assigned_by", assignment.assigned_by); + try w.print(",\"active\":{s},\"created_at_ms\":{d},\"updated_at_ms\":{d}", .{ + if (assignment.active) "true" else "false", + assignment.created_at_ms, + assignment.updated_at_ms, + }); + try w.writeAll("}"); + } + try w.writeAll("]"); +} + +fn writeTaskTransitions(w: anytype, allocator: std.mem.Allocator, rows: []store_mod.TaskTransition) !void { + try w.writeAll(",\"available_transitions\":["); + for (rows, 0..) |transition, i| { + if (i > 0) try w.writeAll(","); + try w.writeAll("{"); + try writeStringField(w, allocator, "trigger", transition.trigger); + try w.writeAll(","); + try writeStringField(w, allocator, "to", transition.to); + try w.writeAll(",\"required_gates\":"); + if (transition.required_gates) |required_gates| { + try w.writeAll("["); + for (required_gates, 0..) |gate, gi| { + if (gi > 0) try w.writeAll(","); + const gate_json = try quoteJson(allocator, gate); + try w.writeAll(gate_json); + } + try w.writeAll("]"); + } else { + try w.writeAll("[]"); + } + try w.writeAll("}"); + } + try w.writeAll("]"); +} + +// ===== Store handlers ===== + +fn handleStorePut(ctx: *Context, namespace: []const u8, key: []const u8, body: []const u8) HttpResponse { + if (std.mem.eql(u8, namespace, "search")) { + return respondError(ctx.allocator, 400, "reserved_namespace", "\"search\" is a reserved namespace name"); + } + var parsed = std.json.parseFromSlice(struct { + value: std.json.Value, + }, ctx.allocator, body, .{ .ignore_unknown_fields = true }) catch { + return respondError(ctx.allocator, 400, "invalid_json", "Invalid JSON body; expected {\"value\": ...}"); + }; + defer parsed.deinit(); + const value_json = jsonStringify(ctx.allocator, parsed.value.value) catch return serverError(ctx.allocator); + + ctx.store.storePut(namespace, key, value_json) catch return serverError(ctx.allocator); + return .{ .status = "204 No Content", .body = "", .status_code = 204 }; +} + +fn handleStoreGet(ctx: *Context, namespace: []const u8, key: []const u8) HttpResponse { + const entry = ctx.store.storeGet(ctx.allocator, namespace, key) catch return serverError(ctx.allocator); + if (entry) |e| { + var buf: std.ArrayListUnmanaged(u8) = .empty; + var w = buf.writer(ctx.allocator); + w.writeAll("{") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "namespace", e.namespace) catch return serverError(ctx.allocator); + w.writeAll(",") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "key", e.key) catch return serverError(ctx.allocator); + w.print(",\"value\":{s}", .{e.value_json}) catch return serverError(ctx.allocator); + w.print(",\"created_at_ms\":{d},\"updated_at_ms\":{d}", .{ e.created_at_ms, e.updated_at_ms }) catch return serverError(ctx.allocator); + w.writeAll("}") catch return serverError(ctx.allocator); + return .{ .status = "200 OK", .body = buf.items }; + } else { + return respondError(ctx.allocator, 404, "not_found", "Key not found"); + } +} + +fn handleStoreList(ctx: *Context, namespace: []const u8) HttpResponse { + const entries = ctx.store.storeList(ctx.allocator, namespace) catch return serverError(ctx.allocator); + + var buf: std.ArrayListUnmanaged(u8) = .empty; + var w = buf.writer(ctx.allocator); + w.writeAll("[") catch return serverError(ctx.allocator); + for (entries, 0..) |e, i| { + if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); + w.writeAll("{") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "namespace", e.namespace) catch return serverError(ctx.allocator); + w.writeAll(",") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "key", e.key) catch return serverError(ctx.allocator); + w.print(",\"value\":{s}", .{e.value_json}) catch return serverError(ctx.allocator); + w.print(",\"created_at_ms\":{d},\"updated_at_ms\":{d}", .{ e.created_at_ms, e.updated_at_ms }) catch return serverError(ctx.allocator); + w.writeAll("}") catch return serverError(ctx.allocator); + } + w.writeAll("]") catch return serverError(ctx.allocator); + return .{ .status = "200 OK", .body = buf.items }; +} + +fn handleStoreDelete(ctx: *Context, namespace: []const u8, key: []const u8) HttpResponse { + ctx.store.storeDelete(namespace, key) catch return serverError(ctx.allocator); + return .{ .status = "204 No Content", .body = "", .status_code = 204 }; +} + +fn handleStoreDeleteNamespace(ctx: *Context, namespace: []const u8) HttpResponse { + ctx.store.storeDeleteNamespace(namespace) catch return serverError(ctx.allocator); + return .{ .status = "204 No Content", .body = "", .status_code = 204 }; +} + +fn sanitizeFts5Query(allocator: std.mem.Allocator, raw: []const u8) ?[]const u8 { + // Split on whitespace, wrap each token in double quotes (escaping internal quotes). + // This turns arbitrary user input into safe FTS5 literal phrases. + var buf: std.ArrayListUnmanaged(u8) = .empty; + var w = buf.writer(allocator); + var first = true; + var it = std.mem.tokenizeAny(u8, raw, " \t\n\r"); + while (it.next()) |token| { + if (!first) w.writeAll(" ") catch return null; + first = false; + w.writeAll("\"") catch return null; + for (token) |ch| { + if (ch == '"') { + w.writeAll("\"\"") catch return null; + } else { + w.writeByte(ch) catch return null; + } + } + w.writeAll("\"") catch return null; + } + if (first) return null; // all whitespace / empty + return buf.items; +} + +fn handleStoreSearch(ctx: *Context, query: ?[]const u8) HttpResponse { + const raw_q = parseQueryParam(query, "q") orelse { + return respondError(ctx.allocator, 400, "missing_param", "Query parameter 'q' is required"); + }; + const q = urlDecode(ctx.allocator, raw_q) orelse raw_q; + const sanitized = sanitizeFts5Query(ctx.allocator, q) orelse { + return respondError(ctx.allocator, 400, "invalid_query", "Search query must contain at least one non-whitespace term"); + }; + const namespace = decodeQueryParamValue(ctx.allocator, parseQueryParam(query, "namespace")); + const limit_str = parseQueryParam(query, "limit"); + const limit: usize = if (limit_str) |ls| (std.fmt.parseInt(usize, ls, 10) catch 10) else 10; + if (limit == 0 or limit > 1000) { + return respondError(ctx.allocator, 400, "invalid_limit", "limit must be between 1 and 1000"); + } + const filter_path = decodeQueryParamValue(ctx.allocator, parseQueryParam(query, "filter_path")); + const filter_value = decodeQueryParamValue(ctx.allocator, parseQueryParam(query, "filter_value")); + + const entries = ctx.store.storeSearch(ctx.allocator, namespace, sanitized, limit, filter_path, filter_value) catch return serverError(ctx.allocator); + + var buf: std.ArrayListUnmanaged(u8) = .empty; + var w = buf.writer(ctx.allocator); + w.writeAll("[") catch return serverError(ctx.allocator); + for (entries, 0..) |e, i| { + if (i > 0) w.writeAll(",") catch return serverError(ctx.allocator); + w.writeAll("{") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "namespace", e.namespace) catch return serverError(ctx.allocator); + w.writeAll(",") catch return serverError(ctx.allocator); + writeStringField(&w, ctx.allocator, "key", e.key) catch return serverError(ctx.allocator); + w.print(",\"value\":{s}", .{e.value_json}) catch return serverError(ctx.allocator); + w.print(",\"created_at_ms\":{d},\"updated_at_ms\":{d}", .{ e.created_at_ms, e.updated_at_ms }) catch return serverError(ctx.allocator); + w.writeAll("}") catch return serverError(ctx.allocator); + } + w.writeAll("]") catch return serverError(ctx.allocator); + return .{ .status = "200 OK", .body = buf.items }; +} + fn jsonStringify(allocator: std.mem.Allocator, value: std.json.Value) ![]const u8 { return std.json.Stringify.valueAlloc(allocator, value, .{}) catch return error.JsonStringifyFailed; } @@ -1433,6 +1524,66 @@ pub fn parseQueryParam(query: ?[]const u8, key: []const u8) ?[]const u8 { return null; } +fn decodePathSegment(allocator: std.mem.Allocator, segment: ?[]const u8) ?[]const u8 { + const raw = segment orelse return null; + if (std.mem.indexOfScalar(u8, raw, '%') == null) return raw; + return decodeComponent(allocator, raw, false) orelse raw; +} + +fn decodeQueryParamValue(allocator: std.mem.Allocator, value: ?[]const u8) ?[]const u8 { + const raw = value orelse return null; + return urlDecode(allocator, raw) orelse raw; +} + +/// URL-decode a query parameter value: decode %XX sequences and replace '+' with space. +pub fn urlDecode(allocator: std.mem.Allocator, input: []const u8) ?[]const u8 { + return decodeComponent(allocator, input, true); +} + +fn decodeComponent(allocator: std.mem.Allocator, input: []const u8, plus_as_space: bool) ?[]const u8 { + if (std.mem.indexOfScalar(u8, input, '%') == null and (!plus_as_space or std.mem.indexOfScalar(u8, input, '+') == null)) { + return input; + } + var buf = allocator.alloc(u8, input.len) catch return null; + var out: usize = 0; + var i: usize = 0; + while (i < input.len) { + if (plus_as_space and input[i] == '+') { + buf[out] = ' '; + out += 1; + i += 1; + } else if (input[i] == '%' and i + 2 < input.len) { + const hi = hexVal(input[i + 1]) orelse { + buf[out] = input[i]; + out += 1; + i += 1; + continue; + }; + const lo = hexVal(input[i + 2]) orelse { + buf[out] = input[i]; + out += 1; + i += 1; + continue; + }; + buf[out] = (@as(u8, hi) << 4) | @as(u8, lo); + out += 1; + i += 3; + } else { + buf[out] = input[i]; + out += 1; + i += 1; + } + } + return buf[0..out]; +} + +fn hexVal(c: u8) ?u4 { + if (c >= '0' and c <= '9') return @intCast(c - '0'); + if (c >= 'a' and c <= 'f') return @intCast(c - 'a' + 10); + if (c >= 'A' and c <= 'F') return @intCast(c - 'A' + 10); + return null; +} + const CompositeCursor = struct { ts_ms: i64, id: []const u8, @@ -1476,7 +1627,7 @@ fn isAuthorized( fn requiresLeaseOrAdminToken(seg0: ?[]const u8, seg1: ?[]const u8, seg2: ?[]const u8) bool { if (eql(seg0, "leases") and seg1 != null and eql(seg2, "heartbeat")) return true; if (eql(seg0, "runs") and seg1 != null and seg2 != null) { - return eql(seg2, "events") or eql(seg2, "gates") or eql(seg2, "transition") or eql(seg2, "fail"); + return eql(seg2, "events") or eql(seg2, "transition") or eql(seg2, "fail"); } return false; } @@ -1583,3 +1734,94 @@ test "auth accepts admin token for protected endpoint" { const resp = handleRequest(&ctx, "GET", "/tasks", "", raw); try std.testing.expectEqualStrings("200 OK", resp.status); } + +test "store API decodes percent-encoded namespace and key segments" { + const allocator = std.testing.allocator; + var store = try Store.init(allocator, ":memory:"); + defer store.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + + var ctx = Context{ + .store = &store, + .allocator = arena.allocator(), + }; + + const put_resp = handleRequest( + &ctx, + "PUT", + "/store/team%20alpha/key%2F1", + "{\"value\":{\"message\":\"hello\"}}", + "PUT /store/team%20alpha/key%2F1 HTTP/1.1\r\n\r\n", + ); + try std.testing.expectEqualStrings("204 No Content", put_resp.status); + + const get_resp = handleRequest( + &ctx, + "GET", + "/store/team%20alpha/key%2F1", + "", + "GET /store/team%20alpha/key%2F1 HTTP/1.1\r\n\r\n", + ); + try std.testing.expectEqualStrings("200 OK", get_resp.status); + try std.testing.expect(std.mem.indexOf(u8, get_resp.body, "\"namespace\":\"team alpha\"") != null); + try std.testing.expect(std.mem.indexOf(u8, get_resp.body, "\"key\":\"key/1\"") != null); +} + +test "store search decodes namespace and filter query params" { + const allocator = std.testing.allocator; + var store = try Store.init(allocator, ":memory:"); + defer store.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + + var ctx = Context{ + .store = &store, + .allocator = arena.allocator(), + }; + + const put_resp = handleRequest( + &ctx, + "PUT", + "/store/team%20alpha/key%2F1", + "{\"value\":{\"message\":\"hello world\",\"status\":\"release candidate\"}}", + "PUT /store/team%20alpha/key%2F1 HTTP/1.1\r\n\r\n", + ); + try std.testing.expectEqualStrings("204 No Content", put_resp.status); + + const search_resp = handleRequest( + &ctx, + "GET", + "/store/search?q=hello&namespace=team%20alpha&filter_path=%24.status&filter_value=release+candidate", + "", + "GET /store/search?q=hello&namespace=team%20alpha&filter_path=%24.status&filter_value=release+candidate HTTP/1.1\r\n\r\n", + ); + try std.testing.expectEqualStrings("200 OK", search_resp.status); + try std.testing.expect(std.mem.indexOf(u8, search_resp.body, "\"key\":\"key/1\"") != null); +} + +test "store search rejects excessive limit" { + const allocator = std.testing.allocator; + var store = try Store.init(allocator, ":memory:"); + defer store.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + + var ctx = Context{ + .store = &store, + .allocator = arena.allocator(), + }; + + const resp = handleRequest( + &ctx, + "GET", + "/store/search?q=hello&limit=1001", + "", + "GET /store/search?q=hello&limit=1001 HTTP/1.1\r\n\r\n", + ); + try std.testing.expectEqualStrings("400 Bad Request", resp.status); + try std.testing.expect(std.mem.indexOf(u8, resp.body, "\"invalid_limit\"") != null); +} diff --git a/src/domain.zig b/src/domain.zig index 8ce539f..53d6f7c 100644 --- a/src/domain.zig +++ b/src/domain.zig @@ -12,6 +12,7 @@ pub const StateDef = struct { agent_role: ?[]const u8 = null, description: ?[]const u8 = null, terminal: ?bool = null, + workflow_id: ?[]const u8 = null, }; pub const TransitionDef = struct { @@ -111,19 +112,6 @@ pub fn getAvailableTransitions(allocator: std.mem.Allocator, def: PipelineDefini return results.toOwnedSlice(allocator); } -pub fn getStagesForRole(allocator: std.mem.Allocator, def: PipelineDefinition, role: []const u8) ![]const []const u8 { - var stages: std.ArrayListUnmanaged([]const u8) = .empty; - var iter = def.states.map.iterator(); - while (iter.next()) |entry| { - if (entry.value_ptr.agent_role) |agent_role| { - if (std.mem.eql(u8, agent_role, role)) { - try stages.append(allocator, try allocator.dupe(u8, entry.key_ptr.*)); - } - } - } - return stages.toOwnedSlice(allocator); -} - pub fn validationErrorMessage(err: ValidationError) []const u8 { return switch (err) { ValidationError.InvalidJson => "Invalid JSON in pipeline definition", diff --git a/src/migrations/001_init.sql b/src/migrations/001_init.sql index b294013..4a08a98 100644 --- a/src/migrations/001_init.sql +++ b/src/migrations/001_init.sql @@ -21,6 +21,8 @@ CREATE TABLE IF NOT EXISTS tasks ( retry_delay_ms INTEGER NOT NULL DEFAULT 0, dead_letter_stage TEXT, dead_letter_reason TEXT, + run_id TEXT, + workflow_state_json TEXT, created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL ); @@ -87,19 +89,6 @@ CREATE TABLE IF NOT EXISTS task_dependencies ( ); CREATE INDEX IF NOT EXISTS idx_task_dependencies_depends_on ON task_dependencies(depends_on_task_id); -CREATE TABLE IF NOT EXISTS gate_results ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - run_id TEXT NOT NULL REFERENCES runs(id) ON DELETE CASCADE, - task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, - gate TEXT NOT NULL, - status TEXT NOT NULL, - evidence_json TEXT NOT NULL DEFAULT '{}', - actor TEXT, - ts_ms INTEGER NOT NULL -); -CREATE INDEX IF NOT EXISTS idx_gate_results_run ON gate_results(run_id, id); -CREATE INDEX IF NOT EXISTS idx_gate_results_gate ON gate_results(run_id, gate, id DESC); - CREATE TABLE IF NOT EXISTS task_assignments ( task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE, agent_id TEXT NOT NULL, diff --git a/src/migrations/003_store.sql b/src/migrations/003_store.sql new file mode 100644 index 0000000..5a98a70 --- /dev/null +++ b/src/migrations/003_store.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS store ( + namespace TEXT NOT NULL, + key TEXT NOT NULL, + value_json TEXT NOT NULL, + created_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL, + PRIMARY KEY (namespace, key) +); +CREATE INDEX IF NOT EXISTS idx_store_namespace ON store(namespace); diff --git a/src/migrations/004_store_fts.sql b/src/migrations/004_store_fts.sql new file mode 100644 index 0000000..b875339 --- /dev/null +++ b/src/migrations/004_store_fts.sql @@ -0,0 +1,22 @@ +-- Full-text search index for store values +CREATE VIRTUAL TABLE IF NOT EXISTS store_fts USING fts5( + namespace, + key, + content, + content='store', + content_rowid='rowid' +); + +-- Triggers to keep FTS in sync +CREATE TRIGGER IF NOT EXISTS store_fts_insert AFTER INSERT ON store BEGIN + INSERT INTO store_fts(rowid, namespace, key, content) VALUES (new.rowid, new.namespace, new.key, new.value_json); +END; + +CREATE TRIGGER IF NOT EXISTS store_fts_delete AFTER DELETE ON store BEGIN + INSERT INTO store_fts(store_fts, rowid, namespace, key, content) VALUES ('delete', old.rowid, old.namespace, old.key, old.value_json); +END; + +CREATE TRIGGER IF NOT EXISTS store_fts_update AFTER UPDATE ON store BEGIN + INSERT INTO store_fts(store_fts, rowid, namespace, key, content) VALUES ('delete', old.rowid, old.namespace, old.key, old.value_json); + INSERT INTO store_fts(rowid, namespace, key, content) VALUES (new.rowid, new.namespace, new.key, new.value_json); +END; diff --git a/src/openapi.json b/src/openapi.json index f74381a..5ced725 100644 --- a/src/openapi.json +++ b/src/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "nullTickets API", "version": "2026.3.2", - "description": "Headless AI task tracker API with leases, quality gates, dependencies, assignments, pagination, and orchestrator ops endpoints." + "description": "Headless AI task tracker API with leases, dependencies, assignments, pagination, KV store, and orchestrator ops endpoints." }, "servers": [ { "url": "http://127.0.0.1:7700" } @@ -327,40 +327,6 @@ } } }, - "/runs/{id}/gates": { - "post": { - "summary": "Record gate result", - "security": [{ "bearerAuth": [] }], - "parameters": [ - { "name": "id", "in": "path", "required": true, "schema": { "type": "string" } } - ], - "requestBody": { - "required": true, - "content": { "application/json": { "schema": { "$ref": "#/components/schemas/GateResultCreateRequest" } } } - }, - "responses": { - "201": { - "description": "Created", - "content": { "application/json": { "schema": { "type": "object", "properties": { "id": { "type": "integer" } }, "required": ["id"] } } } - }, - "401": { "$ref": "#/components/responses/Error401" }, - "404": { "$ref": "#/components/responses/Error404" }, - "410": { "$ref": "#/components/responses/Error410" } - } - }, - "get": { - "summary": "List gate results", - "parameters": [ - { "name": "id", "in": "path", "required": true, "schema": { "type": "string" } } - ], - "responses": { - "200": { - "description": "Gate results", - "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/GateResult" } } } } - } - } - } - }, "/runs/{id}/transition": { "post": { "summary": "Transition task stage", @@ -448,6 +414,105 @@ } } } + }, + "/tasks/{id}/run-state": { + "get": { + "summary": "Get task run_id", + "parameters": [ + { "name": "id", "in": "path", "required": true, "schema": { "type": "string" } } + ], + "responses": { + "200": { + "description": "Run state", + "content": { "application/json": { "schema": { "type": "object", "properties": { "run_id": { "type": "string" } }, "required": ["run_id"] } } } + }, + "404": { "$ref": "#/components/responses/Error404" } + } + } + }, + "/store/{namespace}/{key}": { + "put": { + "summary": "Put store entry", + "description": "Path segments are URL-decoded server-side. Percent-encode reserved characters such as spaces or '/'.", + "parameters": [ + { "name": "namespace", "in": "path", "required": true, "description": "Store namespace. The literal name 'search' is reserved for the search endpoint.", "schema": { "type": "string" } }, + { "name": "key", "in": "path", "required": true, "description": "Store key. Reserved characters should be percent-encoded by the client.", "schema": { "type": "string" } } + ], + "requestBody": { + "required": true, + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/StorePutRequest" } } } + }, + "responses": { + "204": { "description": "Stored" } + } + }, + "get": { + "summary": "Get store entry", + "parameters": [ + { "name": "namespace", "in": "path", "required": true, "description": "Store namespace. The literal name 'search' is reserved for the search endpoint.", "schema": { "type": "string" } }, + { "name": "key", "in": "path", "required": true, "description": "Store key. Reserved characters should be percent-encoded by the client.", "schema": { "type": "string" } } + ], + "responses": { + "200": { + "description": "Entry", + "content": { "application/json": { "schema": { "$ref": "#/components/schemas/StoreEntry" } } } + }, + "404": { "$ref": "#/components/responses/Error404" } + } + }, + "delete": { + "summary": "Delete store entry", + "parameters": [ + { "name": "namespace", "in": "path", "required": true, "description": "Store namespace. The literal name 'search' is reserved for the search endpoint.", "schema": { "type": "string" } }, + { "name": "key", "in": "path", "required": true, "description": "Store key. Reserved characters should be percent-encoded by the client.", "schema": { "type": "string" } } + ], + "responses": { + "204": { "description": "Deleted" } + } + } + }, + "/store/{namespace}": { + "get": { + "summary": "List store entries in namespace", + "parameters": [ + { "name": "namespace", "in": "path", "required": true, "description": "Store namespace. The literal name 'search' is reserved for the search endpoint.", "schema": { "type": "string" } } + ], + "responses": { + "200": { + "description": "Entries", + "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/StoreEntry" } } } } + } + } + }, + "delete": { + "summary": "Delete all entries in namespace", + "parameters": [ + { "name": "namespace", "in": "path", "required": true, "description": "Store namespace. The literal name 'search' is reserved for the search endpoint.", "schema": { "type": "string" } } + ], + "responses": { + "204": { "description": "Deleted" } + } + } + }, + "/store/search": { + "get": { + "summary": "Full-text search store entries", + "description": "Searches FTS5 content and can optionally narrow results with exact JSON filtering via filter_path/filter_value.", + "parameters": [ + { "name": "q", "in": "query", "required": true, "schema": { "type": "string" } }, + { "name": "namespace", "in": "query", "required": false, "schema": { "type": "string" } }, + { "name": "limit", "in": "query", "required": false, "schema": { "type": "integer" } }, + { "name": "filter_path", "in": "query", "required": false, "schema": { "type": "string" } }, + { "name": "filter_value", "in": "query", "required": false, "schema": { "type": "string" } } + ], + "responses": { + "200": { + "description": "Search results", + "content": { "application/json": { "schema": { "type": "array", "items": { "$ref": "#/components/schemas/StoreEntry" } } } } + }, + "400": { "$ref": "#/components/responses/Error400" } + } + } } }, "components": { @@ -525,7 +590,8 @@ "properties": { "agent_role": { "type": ["string", "null"] }, "description": { "type": ["string", "null"] }, - "terminal": { "type": ["boolean", "null"] } + "terminal": { "type": ["boolean", "null"] }, + "workflow_id": { "type": ["string", "null"] } } }, "PipelineTransitionDef": { @@ -700,7 +766,17 @@ "properties": { "agent_id": { "type": "string" }, "agent_role": { "type": "string" }, - "lease_ttl_ms": { "type": "integer" } + "lease_ttl_ms": { "type": "integer" }, + "concurrency": { + "type": ["object", "null"], + "properties": { + "per_state": { + "type": ["object", "null"], + "description": "Map of state name to max concurrent leased tasks", + "additionalProperties": { "type": "integer" } + } + } + } } }, "ClaimResponse": { @@ -741,30 +817,6 @@ "next_cursor": { "type": ["string", "null"] } } }, - "GateResultCreateRequest": { - "type": "object", - "required": ["gate", "status"], - "properties": { - "gate": { "type": "string" }, - "status": { "type": "string", "enum": ["pass", "fail"] }, - "evidence": { "type": ["object", "null"], "additionalProperties": true }, - "actor": { "type": ["string", "null"] } - } - }, - "GateResult": { - "type": "object", - "required": ["id", "run_id", "task_id", "gate", "status", "evidence", "ts_ms"], - "properties": { - "id": { "type": "integer" }, - "run_id": { "type": "string" }, - "task_id": { "type": "string" }, - "gate": { "type": "string" }, - "status": { "type": "string", "enum": ["pass", "fail"] }, - "evidence": { "type": "object", "additionalProperties": true }, - "actor": { "type": ["string", "null"] }, - "ts_ms": { "type": "integer" } - } - }, "TransitionRequest": { "type": "object", "required": ["trigger"], @@ -848,6 +900,24 @@ "roles": { "type": "array", "items": { "$ref": "#/components/schemas/QueueRoleStats" } }, "generated_at_ms": { "type": "integer" } } + }, + "StorePutRequest": { + "type": "object", + "required": ["value"], + "properties": { + "value": {} + } + }, + "StoreEntry": { + "type": "object", + "required": ["namespace", "key", "value", "created_at_ms", "updated_at_ms"], + "properties": { + "namespace": { "type": "string" }, + "key": { "type": "string" }, + "value": {}, + "created_at_ms": { "type": "integer" }, + "updated_at_ms": { "type": "integer" } + } } } } diff --git a/src/store.zig b/src/store.zig index e783d38..dd1b738 100644 --- a/src/store.zig +++ b/src/store.zig @@ -82,17 +82,6 @@ pub const DependencyRow = struct { resolved: bool, }; -pub const GateResultRow = struct { - id: i64, - run_id: []const u8, - task_id: []const u8, - gate: []const u8, - status: []const u8, - evidence_json: []const u8, - actor: ?[]const u8, - ts_ms: i64, -}; - pub const AssignmentRow = struct { task_id: []const u8, agent_id: []const u8, @@ -109,6 +98,14 @@ pub const IdempotencyRow = struct { created_at_ms: i64, }; +pub const StoreEntry = struct { + namespace: []const u8, + key: []const u8, + value_json: []const u8, + created_at_ms: i64, + updated_at_ms: i64, +}; + pub const QueueRoleStats = struct { role: []const u8, claimable_count: i64, @@ -118,6 +115,12 @@ pub const QueueRoleStats = struct { near_expiry_leases: i64, }; +const PipelineStageRoleRow = struct { + pipeline_id: []const u8, + stage: []const u8, + agent_role: []const u8, +}; + pub const TaskPage = struct { items: []TaskRow, next_cursor: ?[]const u8, @@ -147,6 +150,50 @@ pub const TransitionResult = struct { trigger: []const u8, }; +pub const TaskTransition = struct { + trigger: []const u8, + to: []const u8, + required_gates: ?[]const []const u8, +}; + +pub const TaskDetails = struct { + task: TaskRow, + latest_run: ?RunRow, + dependencies: []DependencyRow, + assignments: []AssignmentRow, + available_transitions: []TaskTransition, +}; + +const PipelineDefinitionView = struct { + parsed: domain.ParsedPipeline, + + fn init(allocator: std.mem.Allocator, definition_json: []const u8) !PipelineDefinitionView { + return .{ + .parsed = try domain.parseAndValidate(allocator, definition_json), + }; + } + + fn deinit(self: *PipelineDefinitionView) void { + self.parsed.deinit(); + } + + fn definition(self: PipelineDefinitionView) domain.PipelineDefinition { + return self.parsed.value; + } + + fn isTerminal(self: PipelineDefinitionView, stage: []const u8) bool { + return domain.isTerminal(self.definition(), stage); + } + + fn findTransition(self: PipelineDefinitionView, from_stage: []const u8, trigger: []const u8) ?domain.TransitionDef { + return domain.findTransition(self.definition(), from_stage, trigger); + } + + fn hasState(self: PipelineDefinitionView, stage: []const u8) bool { + return self.definition().states.map.contains(stage); + } +}; + pub const OtlpSpanInsert = struct { trace_id: []const u8, span_id: []const u8, @@ -263,6 +310,59 @@ pub const Store = struct { try self.execSimple("CREATE INDEX IF NOT EXISTS idx_tasks_next_eligible ON tasks(next_eligible_at_ms);"); try self.execSimple("CREATE INDEX IF NOT EXISTS idx_tasks_dead_letter_reason ON tasks(dead_letter_reason);"); + + // Migration 002: orchestration columns + drop gate_results + try self.ensureColumn( + "tasks", + "run_id", + "ALTER TABLE tasks ADD COLUMN run_id TEXT;", + ); + try self.ensureColumn( + "tasks", + "workflow_state_json", + "ALTER TABLE tasks ADD COLUMN workflow_state_json TEXT;", + ); + try self.execSimple("DROP TABLE IF EXISTS gate_results;"); + + // Migration 003: store table + { + const store_sql = @embedFile("migrations/003_store.sql"); + var store_err: [*c]u8 = null; + const store_rc = c.sqlite3_exec(self.db, store_sql.ptr, null, null, &store_err); + if (store_rc != c.SQLITE_OK) { + if (store_err) |msg| { + log.err("migration 003 failed (rc={d}): {s}", .{ store_rc, std.mem.span(msg) }); + c.sqlite3_free(msg); + } + return error.MigrationFailed; + } + } + + // Migration 004: store FTS5 full-text search + { + const fts_sql = @embedFile("migrations/004_store_fts.sql"); + var fts_err: [*c]u8 = null; + const fts_rc = c.sqlite3_exec(self.db, fts_sql.ptr, null, null, &fts_err); + if (fts_rc != c.SQLITE_OK) { + if (fts_err) |msg| { + log.err("migration 004 failed (rc={d}): {s}", .{ fts_rc, std.mem.span(msg) }); + c.sqlite3_free(msg); + } + return error.MigrationFailed; + } + } + + try self.execSimple( + "CREATE TABLE IF NOT EXISTS pipeline_stage_roles (" ++ + "pipeline_id TEXT NOT NULL REFERENCES pipelines(id) ON DELETE CASCADE," ++ + "stage TEXT NOT NULL," ++ + "agent_role TEXT NOT NULL," ++ + "PRIMARY KEY (pipeline_id, stage)" ++ + ");", + ); + try self.execSimple("CREATE INDEX IF NOT EXISTS idx_pipeline_stage_roles_role_stage ON pipeline_stage_roles(agent_role, stage);"); + try self.execSimple("CREATE INDEX IF NOT EXISTS idx_pipeline_stage_roles_pipeline ON pipeline_stage_roles(pipeline_id);"); + try self.rebuildPipelineStageRoles(); } fn ensureColumn(self: *Self, table_name: []const u8, column_name: []const u8, alter_sql: [*:0]const u8) !void { @@ -332,16 +432,20 @@ pub const Store = struct { // Validate the pipeline definition var validation_arena = std.heap.ArenaAllocator.init(self.allocator); defer validation_arena.deinit(); - domain.validatePipeline(validation_arena.allocator(), definition_json) catch |err| { + const parsed = domain.parseAndValidate(validation_arena.allocator(), definition_json) catch |err| { log.err("pipeline validation failed: {s}", .{domain.validationErrorMessage(err)}); return error.ValidationFailed; }; + const definition = parsed.value; const id_arr = ids.generateId(); const id = try self.allocator.dupe(u8, &id_arr); errdefer self.allocator.free(id); const now_ms = ids.nowMs(); + try self.execSimple("BEGIN IMMEDIATE;"); + errdefer self.execSimple("ROLLBACK;") catch {}; + const stmt = try self.prepare("INSERT INTO pipelines (id, name, definition_json, created_at_ms) VALUES (?, ?, ?, ?);"); defer _ = c.sqlite3_finalize(stmt); @@ -358,6 +462,8 @@ pub const Store = struct { return error.InsertFailed; } + try self.replacePipelineStageRoles(id, definition); + try self.execSimple("COMMIT;"); return id; } @@ -409,9 +515,9 @@ pub const Store = struct { var parse_arena = std.heap.ArenaAllocator.init(self.allocator); defer parse_arena.deinit(); - var parsed = domain.parseAndValidate(parse_arena.allocator(), pipeline.definition_json) catch return error.InvalidPipeline; - defer parsed.deinit(); - const def = parsed.value; + var pipeline_def = PipelineDefinitionView.init(parse_arena.allocator(), pipeline.definition_json) catch return error.InvalidPipeline; + defer pipeline_def.deinit(); + const def = pipeline_def.definition(); const id_arr = ids.generateId(); const id = try self.allocator.dupe(u8, &id_arr); @@ -450,63 +556,6 @@ pub const Store = struct { return self.readTaskRow(stmt); } - pub fn listTasks(self: *Self, stage_filter: ?[]const u8, pipeline_id_filter: ?[]const u8, limit: ?i64) ![]TaskRow { - // Build query dynamically - var sql_buf: [1024]u8 = undefined; - var sql_len: usize = 0; - const base = "SELECT id, pipeline_id, stage, title, description, priority, metadata_json, task_version, next_eligible_at_ms, max_attempts, retry_delay_ms, dead_letter_stage, dead_letter_reason, created_at_ms, updated_at_ms FROM tasks"; - @memcpy(sql_buf[0..base.len], base); - sql_len = base.len; - - var has_where = false; - if (stage_filter != null) { - const clause = " WHERE stage = ?"; - @memcpy(sql_buf[sql_len..][0..clause.len], clause); - sql_len += clause.len; - has_where = true; - } - if (pipeline_id_filter != null) { - const clause = if (has_where) " AND pipeline_id = ?" else " WHERE pipeline_id = ?"; - @memcpy(sql_buf[sql_len..][0..clause.len], clause); - sql_len += clause.len; - } - - const order = " ORDER BY priority DESC, created_at_ms ASC"; - @memcpy(sql_buf[sql_len..][0..order.len], order); - sql_len += order.len; - - if (limit != null) { - const lim = " LIMIT ?"; - @memcpy(sql_buf[sql_len..][0..lim.len], lim); - sql_len += lim.len; - } - - sql_buf[sql_len] = 0; - const sql_z: [*:0]const u8 = @ptrCast(sql_buf[0..sql_len :0]); - - const stmt = try self.prepare(sql_z); - defer _ = c.sqlite3_finalize(stmt); - - var bind_idx: c_int = 1; - if (stage_filter) |sf| { - self.bindText(stmt, bind_idx, sf); - bind_idx += 1; - } - if (pipeline_id_filter) |pf| { - self.bindText(stmt, bind_idx, pf); - bind_idx += 1; - } - if (limit) |l| { - _ = c.sqlite3_bind_int64(stmt, bind_idx, l); - } - - var results: std.ArrayListUnmanaged(TaskRow) = .empty; - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try results.append(self.allocator, self.readTaskRow(stmt)); - } - return results.toOwnedSlice(self.allocator); - } - pub fn getLatestRun(self: *Self, task_id: []const u8) !?RunRow { const stmt = try self.prepare("SELECT id, task_id, attempt, status, agent_id, agent_role, started_at_ms, ended_at_ms, usage_json, error_text FROM runs WHERE task_id = ? ORDER BY attempt DESC LIMIT 1;"); defer _ = c.sqlite3_finalize(stmt); @@ -516,6 +565,31 @@ pub const Store = struct { return self.readRunRow(stmt); } + pub fn getTaskDetails(self: *Self, id: []const u8) !?TaskDetails { + const task = (try self.getTask(id)) orelse return null; + errdefer self.freeTaskRow(task); + + const latest_run = try self.getLatestRun(id); + errdefer if (latest_run) |row| self.freeRunRow(row); + + const dependencies = try self.listTaskDependencies(id); + errdefer self.freeDependencyRows(dependencies); + + const assignments = try self.listTaskAssignments(id); + errdefer self.freeAssignmentRows(assignments); + + const available_transitions = try self.listTaskAvailableTransitions(task.pipeline_id, task.stage); + errdefer self.freeTaskTransitions(available_transitions); + + return .{ + .task = task, + .latest_run = latest_run, + .dependencies = dependencies, + .assignments = assignments, + .available_transitions = available_transitions, + }; + } + pub fn addTaskDependency(self: *Self, task_id: []const u8, depends_on_task_id: []const u8) !void { if (std.mem.eql(u8, task_id, depends_on_task_id)) return error.InvalidDependency; @@ -563,11 +637,11 @@ pub const Store = struct { const dep_id = self.colTextView(stmt, 0); const dep_stage = self.colTextView(stmt, 1); const dep_def_json = self.colTextView(stmt, 2); - var parsed = domain.parseAndValidate(temp_alloc, dep_def_json) catch return error.InvalidPipeline; - defer parsed.deinit(); + var pipeline_def = PipelineDefinitionView.init(temp_alloc, dep_def_json) catch return error.InvalidPipeline; + defer pipeline_def.deinit(); try results.append(self.allocator, .{ .depends_on_task_id = try self.allocator.dupe(u8, dep_id), - .resolved = domain.isTerminal(parsed.value, dep_stage), + .resolved = pipeline_def.isTerminal(dep_stage), }); } return results.toOwnedSlice(self.allocator); @@ -629,43 +703,23 @@ pub const Store = struct { return results.toOwnedSlice(self.allocator); } - pub fn addGateResult(self: *Self, run_id: []const u8, gate: []const u8, status: []const u8, evidence_json: []const u8, actor: ?[]const u8) !i64 { - const run_stmt = try self.prepare("SELECT task_id FROM runs WHERE id = ?;"); - defer _ = c.sqlite3_finalize(run_stmt); - self.bindText(run_stmt, 1, run_id); - if (c.sqlite3_step(run_stmt) != c.SQLITE_ROW) return error.RunNotFound; - const task_id = self.colTextView(run_stmt, 0); - - const stmt = try self.prepare("INSERT INTO gate_results (run_id, task_id, gate, status, evidence_json, actor, ts_ms) VALUES (?, ?, ?, ?, ?, ?, ?);"); - defer _ = c.sqlite3_finalize(stmt); - self.bindText(stmt, 1, run_id); - self.bindText(stmt, 2, task_id); - self.bindText(stmt, 3, gate); - self.bindText(stmt, 4, status); - self.bindText(stmt, 5, evidence_json); - if (actor) |value| self.bindText(stmt, 6, value) else _ = c.sqlite3_bind_null(stmt, 6); - _ = c.sqlite3_bind_int64(stmt, 7, ids.nowMs()); - if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.InsertFailed; - return c.sqlite3_last_insert_rowid(self.db); - } + pub fn listTaskAvailableTransitions(self: *Self, pipeline_id: []const u8, stage: []const u8) ![]TaskTransition { + const pipeline = (try self.getPipeline(pipeline_id)) orelse return self.allocator.alloc(TaskTransition, 0); + defer self.freePipelineRow(pipeline); - pub fn listGateResults(self: *Self, run_id: []const u8) ![]GateResultRow { - const stmt = try self.prepare("SELECT id, run_id, task_id, gate, status, evidence_json, actor, ts_ms FROM gate_results WHERE run_id = ? ORDER BY id ASC;"); - defer _ = c.sqlite3_finalize(stmt); - self.bindText(stmt, 1, run_id); + var pipeline_def = PipelineDefinitionView.init(self.allocator, pipeline.definition_json) catch { + return self.allocator.alloc(TaskTransition, 0); + }; + defer pipeline_def.deinit(); - var results: std.ArrayListUnmanaged(GateResultRow) = .empty; - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try results.append(self.allocator, .{ - .id = c.sqlite3_column_int64(stmt, 0), - .run_id = self.colText(stmt, 1), - .task_id = self.colText(stmt, 2), - .gate = self.colText(stmt, 3), - .status = self.colText(stmt, 4), - .evidence_json = self.colText(stmt, 5), - .actor = self.colTextNullable(stmt, 6), - .ts_ms = c.sqlite3_column_int64(stmt, 7), - }); + var results: std.ArrayListUnmanaged(TaskTransition) = .empty; + errdefer { + for (results.items) |transition| self.freeTaskTransition(transition); + results.deinit(self.allocator); + } + for (pipeline_def.definition().transitions) |transition| { + if (!std.mem.eql(u8, transition.from, stage)) continue; + try results.append(self.allocator, try self.dupeTaskTransition(transition)); } return results.toOwnedSlice(self.allocator); } @@ -827,7 +881,7 @@ pub const Store = struct { // ===== Claim + Lease ===== - pub fn claimTask(self: *Self, agent_id: []const u8, agent_role: []const u8, lease_ttl_ms: i64) !?ClaimResult { + pub fn claimTask(self: *Self, agent_id: []const u8, agent_role: []const u8, lease_ttl_ms: i64, per_state_concurrency: ?std.json.Value) !?ClaimResult { // BEGIN IMMEDIATE to prevent double-claim try self.execSimple("BEGIN IMMEDIATE;"); errdefer self.execSimple("ROLLBACK;") catch {}; @@ -845,56 +899,59 @@ pub const Store = struct { var stale_lease_ids: std.ArrayListUnmanaged([]const u8) = .empty; var stale_run_ids: std.ArrayListUnmanaged([]const u8) = .empty; + var seen_stale_runs = std.StringHashMap(void).init(temp_alloc); while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { try stale_lease_ids.append(temp_alloc, try temp_alloc.dupe(u8, self.colTextView(stmt, 0))); - try stale_run_ids.append(temp_alloc, try temp_alloc.dupe(u8, self.colTextView(stmt, 1))); + const run_id = try temp_alloc.dupe(u8, self.colTextView(stmt, 1)); + if (!seen_stale_runs.contains(run_id)) { + try seen_stale_runs.put(run_id, {}); + try stale_run_ids.append(temp_alloc, run_id); + } } - for (stale_run_ids.items) |run_id| { + if (stale_run_ids.items.len > 0) { const upd = try self.prepare("UPDATE runs SET status = 'stale', ended_at_ms = ? WHERE id = ?;"); defer _ = c.sqlite3_finalize(upd); - _ = c.sqlite3_bind_int64(upd, 1, now_ms); - self.bindText(upd, 2, run_id); - _ = c.sqlite3_step(upd); + for (stale_run_ids.items) |run_id| { + _ = c.sqlite3_reset(upd); + _ = c.sqlite3_clear_bindings(upd); + _ = c.sqlite3_bind_int64(upd, 1, now_ms); + self.bindText(upd, 2, run_id); + _ = c.sqlite3_step(upd); + } } - for (stale_lease_ids.items) |lease_id| { + if (stale_lease_ids.items.len > 0) { const del = try self.prepare("DELETE FROM leases WHERE id = ?;"); defer _ = c.sqlite3_finalize(del); - self.bindText(del, 1, lease_id); - _ = c.sqlite3_step(del); - } - } - - // Find stages matching this role across all pipelines - var all_stages: std.ArrayListUnmanaged([]const u8) = .empty; - { - const pstmt = try self.prepare("SELECT definition_json FROM pipelines;"); - defer _ = c.sqlite3_finalize(pstmt); - while (c.sqlite3_step(pstmt) == c.SQLITE_ROW) { - const def_json = self.colTextView(pstmt, 0); - var parsed = domain.parseAndValidate(temp_alloc, def_json) catch continue; - defer parsed.deinit(); - const stages = domain.getStagesForRole(temp_alloc, parsed.value, agent_role) catch continue; - for (stages) |s| { - try all_stages.append(temp_alloc, s); + for (stale_lease_ids.items) |lease_id| { + _ = c.sqlite3_reset(del); + _ = c.sqlite3_clear_bindings(del); + self.bindText(del, 1, lease_id); + _ = c.sqlite3_step(del); } } } - if (all_stages.items.len == 0) { + // Find pipeline+stage pairs matching this role. + const role_stages = try self.listPipelineStageRoles(temp_alloc, agent_role); + + if (role_stages.len == 0) { try self.execSimple("COMMIT;"); return null; } // Find task: stage matches, no active lease, ordered by priority var task_row: ?TaskRow = null; - for (all_stages.items) |stage| { - const find_sql = "SELECT t.id, t.pipeline_id, t.stage, t.title, t.description, t.priority, t.metadata_json, t.task_version, t.next_eligible_at_ms, t.max_attempts, t.retry_delay_ms, t.dead_letter_stage, t.dead_letter_reason, t.created_at_ms, t.updated_at_ms FROM tasks t WHERE t.stage = ? AND t.dead_letter_reason IS NULL AND t.next_eligible_at_ms <= ? AND NOT EXISTS (SELECT 1 FROM leases l JOIN runs r ON l.run_id = r.id WHERE r.task_id = t.id AND l.expires_at_ms > ?) ORDER BY t.priority DESC, t.created_at_ms ASC LIMIT 20;"; - const fstmt = try self.prepare(find_sql); - defer _ = c.sqlite3_finalize(fstmt); - self.bindText(fstmt, 1, stage); - _ = c.sqlite3_bind_int64(fstmt, 2, now_ms); + const find_sql = "SELECT t.id, t.pipeline_id, t.stage, t.title, t.description, t.priority, t.metadata_json, t.task_version, t.next_eligible_at_ms, t.max_attempts, t.retry_delay_ms, t.dead_letter_stage, t.dead_letter_reason, t.created_at_ms, t.updated_at_ms FROM tasks t WHERE t.pipeline_id = ? AND t.stage = ? AND t.dead_letter_reason IS NULL AND t.next_eligible_at_ms <= ? AND NOT EXISTS (SELECT 1 FROM leases l JOIN runs r ON l.run_id = r.id WHERE r.task_id = t.id AND l.expires_at_ms > ?) ORDER BY t.priority DESC, t.created_at_ms ASC LIMIT 20;"; + const fstmt = try self.prepare(find_sql); + defer _ = c.sqlite3_finalize(fstmt); + for (role_stages) |role_stage| { + _ = c.sqlite3_reset(fstmt); + _ = c.sqlite3_clear_bindings(fstmt); + self.bindText(fstmt, 1, role_stage.pipeline_id); + self.bindText(fstmt, 2, role_stage.stage); _ = c.sqlite3_bind_int64(fstmt, 3, now_ms); + _ = c.sqlite3_bind_int64(fstmt, 4, now_ms); while (c.sqlite3_step(fstmt) == c.SQLITE_ROW) { const candidate = try self.readTaskRowAlloc(temp_alloc, fstmt); @@ -902,6 +959,23 @@ pub const Store = struct { continue; } + // Per-state concurrency check + if (per_state_concurrency) |psc| { + if (psc == .object) { + if (psc.object.get(candidate.stage)) |limit_val| { + const limit: i64 = switch (limit_val) { + .integer => |v| v, + .float => |v| @intFromFloat(v), + else => 0, + }; + if (limit > 0) { + const leased_count = try self.countLeasedTasksInState(candidate.stage, now_ms); + if (leased_count >= limit) continue; + } + } + } + } + if (task_row) |existing| { if (candidate.priority > existing.priority or (candidate.priority == existing.priority and candidate.created_at_ms < existing.created_at_ms)) @@ -972,20 +1046,31 @@ pub const Store = struct { if (c.sqlite3_step(lstmt) != c.SQLITE_DONE) return error.InsertFailed; } + const run_task_id = try self.allocator.dupe(u8, task.id); + errdefer self.allocator.free(run_task_id); + const run_status = try self.allocator.dupe(u8, "running"); + errdefer self.allocator.free(run_status); + const run_agent_id = try self.allocator.dupe(u8, agent_id); + errdefer self.allocator.free(run_agent_id); + const run_agent_role = try self.allocator.dupe(u8, agent_role); + errdefer self.allocator.free(run_agent_role); + const run_usage_json = try self.allocator.dupe(u8, "{}"); + errdefer self.allocator.free(run_usage_json); + try self.execSimple("COMMIT;"); return .{ .task = task, .run = .{ .id = run_id, - .task_id = task.id, + .task_id = run_task_id, .attempt = attempt, - .status = "running", - .agent_id = agent_id, - .agent_role = agent_role, + .status = run_status, + .agent_id = run_agent_id, + .agent_role = run_agent_role, .started_at_ms = now_ms, .ended_at_ms = null, - .usage_json = "{}", + .usage_json = run_usage_json, .error_text = null, }, .lease_id = lease_id, @@ -1043,9 +1128,9 @@ pub const Store = struct { while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { const dep_stage = self.colTextView(stmt, 0); const def_json = self.colTextView(stmt, 1); - var parsed = domain.parseAndValidate(temp_alloc, def_json) catch return false; - defer parsed.deinit(); - if (!domain.isTerminal(parsed.value, dep_stage)) return false; + var pipeline_def = PipelineDefinitionView.init(temp_alloc, def_json) catch return false; + defer pipeline_def.deinit(); + if (!pipeline_def.isTerminal(dep_stage)) return false; } return true; @@ -1066,6 +1151,19 @@ pub const Store = struct { return !has_active_assignment; } + fn countLeasedTasksInState(self: *Self, state: []const u8, now_ms: i64) !i64 { + const stmt = try self.prepare( + "SELECT COUNT(*) FROM tasks t WHERE t.stage = ? AND EXISTS (SELECT 1 FROM leases l JOIN runs r ON l.run_id = r.id WHERE r.task_id = t.id AND l.expires_at_ms > ?);", + ); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, state); + _ = c.sqlite3_bind_int64(stmt, 2, now_ms); + if (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + return c.sqlite3_column_int64(stmt, 0); + } + return 0; + } + pub fn validateLeaseByRunId(self: *Self, run_id: []const u8, token_hex: []const u8) !void { var token_bytes: [32]u8 = undefined; _ = std.fmt.hexToBytes(&token_bytes, token_hex) catch return error.InvalidToken; @@ -1088,21 +1186,6 @@ pub const Store = struct { if (expires <= ids.nowMs()) return error.LeaseExpired; } - fn areRequiredGatesPassed(self: *Self, run_id: []const u8, required_gates: []const []const u8) !bool { - for (required_gates) |gate| { - const stmt = try self.prepare("SELECT status FROM gate_results WHERE run_id = ? AND gate = ? ORDER BY id DESC LIMIT 1;"); - defer _ = c.sqlite3_finalize(stmt); - self.bindText(stmt, 1, run_id); - self.bindText(stmt, 2, gate); - - if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return false; - const status = self.colTextView(stmt, 0); - if (!std.mem.eql(u8, status, "pass")) return false; - } - - return true; - } - // ===== Events ===== pub fn addEvent(self: *Self, run_id: []const u8, kind: []const u8, data_json: []const u8) !i64 { @@ -1117,24 +1200,6 @@ pub const Store = struct { return c.sqlite3_last_insert_rowid(self.db); } - pub fn listEvents(self: *Self, run_id: []const u8) ![]EventRow { - const stmt = try self.prepare("SELECT id, run_id, ts_ms, kind, data_json FROM events WHERE run_id = ? ORDER BY id ASC;"); - defer _ = c.sqlite3_finalize(stmt); - self.bindText(stmt, 1, run_id); - - var results: std.ArrayListUnmanaged(EventRow) = .empty; - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try results.append(self.allocator, .{ - .id = c.sqlite3_column_int64(stmt, 0), - .run_id = self.colText(stmt, 1), - .ts_ms = c.sqlite3_column_int64(stmt, 2), - .kind = self.colText(stmt, 3), - .data_json = self.colText(stmt, 4), - }); - } - return results.toOwnedSlice(self.allocator); - } - // ===== Transition ===== pub fn transitionRun( @@ -1186,14 +1251,9 @@ pub const Store = struct { if (c.sqlite3_step(pip_stmt) != c.SQLITE_ROW) return error.PipelineNotFound; const def_json = self.colTextView(pip_stmt, 0); - var parsed = domain.parseAndValidate(temp_alloc, def_json) catch return error.InvalidPipeline; - defer parsed.deinit(); - const transition = domain.findTransition(parsed.value, current_stage, trigger) orelse return error.InvalidTransition; - - if (transition.required_gates) |required_gates| { - const gates_ok = try self.areRequiredGatesPassed(run_id, required_gates); - if (!gates_ok) return error.RequiredGatesNotPassed; - } + var pipeline_def = PipelineDefinitionView.init(temp_alloc, def_json) catch return error.InvalidPipeline; + defer pipeline_def.deinit(); + const transition = pipeline_def.findTransition(current_stage, trigger) orelse return error.InvalidTransition; // Update run { @@ -1316,11 +1376,11 @@ pub const Store = struct { self.bindText(pip_stmt, 1, pipeline_id); if (c.sqlite3_step(pip_stmt) == c.SQLITE_ROW) { const def_json = self.colTextView(pip_stmt, 0); - const parsed = domain.parseAndValidate(temp_alloc, def_json) catch null; + const parsed = PipelineDefinitionView.init(temp_alloc, def_json) catch null; if (parsed) |parsed_value| { - var p = parsed_value; - defer p.deinit(); - if (p.value.states.map.contains(candidate)) { + var pipeline_def = parsed_value; + defer pipeline_def.deinit(); + if (pipeline_def.hasState(candidate)) { dead_stage_to_use = candidate; } } @@ -1427,7 +1487,7 @@ pub const Store = struct { pub fn freeClaimResult(self: *Self, claim: ClaimResult) void { self.freeTaskRow(claim.task); - self.allocator.free(claim.run.id); + self.freeRunRow(claim.run); self.allocator.free(claim.lease_id); self.allocator.free(claim.lease_token); } @@ -1438,6 +1498,28 @@ pub const Store = struct { self.allocator.free(transition.trigger); } + pub fn freeTaskTransition(self: *Self, transition: TaskTransition) void { + self.allocator.free(transition.trigger); + self.allocator.free(transition.to); + if (transition.required_gates) |gates| { + for (gates) |gate| self.allocator.free(gate); + self.allocator.free(gates); + } + } + + pub fn freeTaskTransitions(self: *Self, rows: []TaskTransition) void { + for (rows) |row| self.freeTaskTransition(row); + self.allocator.free(rows); + } + + pub fn freeTaskDetails(self: *Self, details: TaskDetails) void { + self.freeTaskRow(details.task); + if (details.latest_run) |row| self.freeRunRow(row); + self.freeDependencyRows(details.dependencies); + self.freeAssignmentRows(details.assignments); + self.freeTaskTransitions(details.available_transitions); + } + pub fn freeEventRows(self: *Self, rows: []EventRow) void { for (rows) |row| { self.allocator.free(row.run_id); @@ -1467,18 +1549,6 @@ pub const Store = struct { self.allocator.free(rows); } - pub fn freeGateResultRows(self: *Self, rows: []GateResultRow) void { - for (rows) |row| { - self.allocator.free(row.run_id); - self.allocator.free(row.task_id); - self.allocator.free(row.gate); - self.allocator.free(row.status); - self.allocator.free(row.evidence_json); - if (row.actor) |actor| self.allocator.free(actor); - } - self.allocator.free(rows); - } - pub fn freeAssignmentRows(self: *Self, rows: []AssignmentRow) void { for (rows) |row| { self.allocator.free(row.task_id); @@ -1595,62 +1665,6 @@ pub const Store = struct { return id; } - pub fn listArtifacts(self: *Self, task_id: ?[]const u8, run_id: ?[]const u8) ![]ArtifactRow { - var sql_buf: [512]u8 = undefined; - var sql_len: usize = 0; - const base = "SELECT id, task_id, run_id, created_at_ms, kind, uri, sha256_hex, size_bytes, meta_json FROM artifacts"; - @memcpy(sql_buf[0..base.len], base); - sql_len = base.len; - - var has_where = false; - if (task_id != null) { - const clause = " WHERE task_id = ?"; - @memcpy(sql_buf[sql_len..][0..clause.len], clause); - sql_len += clause.len; - has_where = true; - } - if (run_id != null) { - const clause = if (has_where) " AND run_id = ?" else " WHERE run_id = ?"; - @memcpy(sql_buf[sql_len..][0..clause.len], clause); - sql_len += clause.len; - } - - const order = " ORDER BY created_at_ms DESC;"; - @memcpy(sql_buf[sql_len..][0..order.len], order); - sql_len += order.len; - - sql_buf[sql_len] = 0; - const sql_z: [*:0]const u8 = @ptrCast(sql_buf[0..sql_len :0]); - - const stmt = try self.prepare(sql_z); - defer _ = c.sqlite3_finalize(stmt); - - var bind_idx: c_int = 1; - if (task_id) |tid| { - self.bindText(stmt, bind_idx, tid); - bind_idx += 1; - } - if (run_id) |rid| { - self.bindText(stmt, bind_idx, rid); - } - - var results: std.ArrayListUnmanaged(ArtifactRow) = .empty; - while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { - try results.append(self.allocator, .{ - .id = self.colText(stmt, 0), - .task_id = self.colTextNullable(stmt, 1), - .run_id = self.colTextNullable(stmt, 2), - .created_at_ms = c.sqlite3_column_int64(stmt, 3), - .kind = self.colText(stmt, 4), - .uri = self.colText(stmt, 5), - .sha256_hex = self.colTextNullable(stmt, 6), - .size_bytes = self.colInt64Nullable(stmt, 7), - .meta_json = self.colText(stmt, 8), - }); - } - return results.toOwnedSlice(self.allocator); - } - pub fn listArtifactsPage(self: *Self, task_id: ?[]const u8, run_id: ?[]const u8, cursor_created_at_ms: ?i64, cursor_id: ?[]const u8, limit: i64) !ArtifactPage { const page_limit: usize = @intCast(limit); var sql_buf: [768]u8 = undefined; @@ -1757,6 +1771,66 @@ pub const Store = struct { return roles.items.len - 1; } + fn rebuildPipelineStageRoles(self: *Self) !void { + var scratch = std.heap.ArenaAllocator.init(self.allocator); + defer scratch.deinit(); + const temp_alloc = scratch.allocator(); + + const stmt = try self.prepare("SELECT id, definition_json FROM pipelines;"); + defer _ = c.sqlite3_finalize(stmt); + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + const pipeline_id = self.colTextView(stmt, 0); + const definition_json = self.colTextView(stmt, 1); + var pipeline_def = PipelineDefinitionView.init(temp_alloc, definition_json) catch { + log.warn("skipping pipeline role index rebuild for invalid pipeline {s}", .{pipeline_id}); + continue; + }; + defer pipeline_def.deinit(); + try self.replacePipelineStageRoles(pipeline_id, pipeline_def.definition()); + } + } + + fn replacePipelineStageRoles(self: *Self, pipeline_id: []const u8, def: domain.PipelineDefinition) !void { + const delete_stmt = try self.prepare("DELETE FROM pipeline_stage_roles WHERE pipeline_id = ?;"); + defer _ = c.sqlite3_finalize(delete_stmt); + self.bindText(delete_stmt, 1, pipeline_id); + if (c.sqlite3_step(delete_stmt) != c.SQLITE_DONE) return error.DeleteFailed; + + const insert_stmt = try self.prepare("INSERT INTO pipeline_stage_roles (pipeline_id, stage, agent_role) VALUES (?, ?, ?);"); + defer _ = c.sqlite3_finalize(insert_stmt); + + var states_it = def.states.map.iterator(); + while (states_it.next()) |entry| { + const agent_role = entry.value_ptr.agent_role orelse continue; + _ = c.sqlite3_reset(insert_stmt); + _ = c.sqlite3_clear_bindings(insert_stmt); + self.bindText(insert_stmt, 1, pipeline_id); + self.bindText(insert_stmt, 2, entry.key_ptr.*); + self.bindText(insert_stmt, 3, agent_role); + if (c.sqlite3_step(insert_stmt) != c.SQLITE_DONE) return error.InsertFailed; + } + } + + fn listPipelineStageRoles(self: *Self, alloc: std.mem.Allocator, agent_role: ?[]const u8) ![]PipelineStageRoleRow { + const sql = if (agent_role == null) + "SELECT pipeline_id, stage, agent_role FROM pipeline_stage_roles ORDER BY pipeline_id, stage;" + else + "SELECT pipeline_id, stage, agent_role FROM pipeline_stage_roles WHERE agent_role = ? ORDER BY pipeline_id, stage;"; + const stmt = try self.prepare(sql); + defer _ = c.sqlite3_finalize(stmt); + if (agent_role) |role| self.bindText(stmt, 1, role); + + var rows: std.ArrayListUnmanaged(PipelineStageRoleRow) = .empty; + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try rows.append(alloc, .{ + .pipeline_id = try alloc.dupe(u8, self.colTextView(stmt, 0)), + .stage = try alloc.dupe(u8, self.colTextView(stmt, 1)), + .agent_role = try alloc.dupe(u8, self.colTextView(stmt, 2)), + }); + } + return rows.toOwnedSlice(alloc); + } + pub fn getIdempotency(self: *Self, key: []const u8, method: []const u8, path: []const u8) !?IdempotencyRow { const stmt = try self.prepare("SELECT request_hash, response_status, response_body, created_at_ms FROM idempotency_keys WHERE key = ? AND method = ? AND path = ?;"); defer _ = c.sqlite3_finalize(stmt); @@ -1802,76 +1876,197 @@ pub const Store = struct { var roles: std.ArrayListUnmanaged(QueueRoleStats) = .empty; const now_ms = ids.nowMs(); - - const pipelines_stmt = try self.prepare("SELECT id, definition_json FROM pipelines;"); - defer _ = c.sqlite3_finalize(pipelines_stmt); - while (c.sqlite3_step(pipelines_stmt) == c.SQLITE_ROW) { - const pipeline_id = self.colTextView(pipelines_stmt, 0); - const definition_json = self.colTextView(pipelines_stmt, 1); - var parsed = domain.parseAndValidate(temp_alloc, definition_json) catch continue; - defer parsed.deinit(); - - var states_it = parsed.value.states.map.iterator(); - while (states_it.next()) |entry| { - const role = entry.value_ptr.agent_role orelse continue; - const stage = entry.key_ptr.*; - const idx = try self.ensureRoleStatsIndex(&roles, role); - - const claimable_stmt = try self.prepare( - "SELECT t.id, t.created_at_ms FROM tasks t WHERE t.pipeline_id = ? AND t.stage = ? AND t.dead_letter_reason IS NULL AND t.next_eligible_at_ms <= ? AND NOT EXISTS (SELECT 1 FROM leases l JOIN runs r ON l.run_id = r.id WHERE r.task_id = t.id AND l.expires_at_ms > ?) ORDER BY t.created_at_ms ASC;", - ); - defer _ = c.sqlite3_finalize(claimable_stmt); - self.bindText(claimable_stmt, 1, pipeline_id); - self.bindText(claimable_stmt, 2, stage); - _ = c.sqlite3_bind_int64(claimable_stmt, 3, now_ms); - _ = c.sqlite3_bind_int64(claimable_stmt, 4, now_ms); - while (c.sqlite3_step(claimable_stmt) == c.SQLITE_ROW) { - const task_id = self.colTextView(claimable_stmt, 0); - const created_at_ms = c.sqlite3_column_int64(claimable_stmt, 1); - if (!(try self.isTaskDependenciesSatisfied(task_id))) continue; - roles.items[idx].claimable_count += 1; - const age = now_ms - created_at_ms; - if (roles.items[idx].oldest_claimable_age_ms == null or age > roles.items[idx].oldest_claimable_age_ms.?) { - roles.items[idx].oldest_claimable_age_ms = age; - } + const role_stages = try self.listPipelineStageRoles(temp_alloc, null); + for (role_stages) |role_stage| { + const idx = try self.ensureRoleStatsIndex(&roles, role_stage.agent_role); + + const claimable_stmt = try self.prepare( + "SELECT t.id, t.created_at_ms FROM tasks t WHERE t.pipeline_id = ? AND t.stage = ? AND t.dead_letter_reason IS NULL AND t.next_eligible_at_ms <= ? AND NOT EXISTS (SELECT 1 FROM leases l JOIN runs r ON l.run_id = r.id WHERE r.task_id = t.id AND l.expires_at_ms > ?) ORDER BY t.created_at_ms ASC;", + ); + defer _ = c.sqlite3_finalize(claimable_stmt); + self.bindText(claimable_stmt, 1, role_stage.pipeline_id); + self.bindText(claimable_stmt, 2, role_stage.stage); + _ = c.sqlite3_bind_int64(claimable_stmt, 3, now_ms); + _ = c.sqlite3_bind_int64(claimable_stmt, 4, now_ms); + while (c.sqlite3_step(claimable_stmt) == c.SQLITE_ROW) { + const task_id = self.colTextView(claimable_stmt, 0); + const created_at_ms = c.sqlite3_column_int64(claimable_stmt, 1); + if (!(try self.isTaskDependenciesSatisfied(task_id))) continue; + roles.items[idx].claimable_count += 1; + const age = now_ms - created_at_ms; + if (roles.items[idx].oldest_claimable_age_ms == null or age > roles.items[idx].oldest_claimable_age_ms.?) { + roles.items[idx].oldest_claimable_age_ms = age; } + } - const failed_stmt = try self.prepare("SELECT COUNT(*) FROM runs r JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND r.status = 'failed';"); - defer _ = c.sqlite3_finalize(failed_stmt); - self.bindText(failed_stmt, 1, pipeline_id); - self.bindText(failed_stmt, 2, stage); - if (c.sqlite3_step(failed_stmt) == c.SQLITE_ROW) { - roles.items[idx].failed_count += c.sqlite3_column_int64(failed_stmt, 0); - } + const failed_stmt = try self.prepare("SELECT COUNT(*) FROM runs r JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND r.status = 'failed';"); + defer _ = c.sqlite3_finalize(failed_stmt); + self.bindText(failed_stmt, 1, role_stage.pipeline_id); + self.bindText(failed_stmt, 2, role_stage.stage); + if (c.sqlite3_step(failed_stmt) == c.SQLITE_ROW) { + roles.items[idx].failed_count += c.sqlite3_column_int64(failed_stmt, 0); + } - const stuck_stmt = try self.prepare( - "SELECT COUNT(*) FROM runs r JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND r.status = 'running' AND r.started_at_ms <= ?;", - ); - defer _ = c.sqlite3_finalize(stuck_stmt); - self.bindText(stuck_stmt, 1, pipeline_id); - self.bindText(stuck_stmt, 2, stage); - _ = c.sqlite3_bind_int64(stuck_stmt, 3, now_ms - stuck_window_ms); - if (c.sqlite3_step(stuck_stmt) == c.SQLITE_ROW) { - roles.items[idx].stuck_count += c.sqlite3_column_int64(stuck_stmt, 0); - } + const stuck_stmt = try self.prepare( + "SELECT COUNT(*) FROM runs r JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND r.status = 'running' AND r.started_at_ms <= ?;", + ); + defer _ = c.sqlite3_finalize(stuck_stmt); + self.bindText(stuck_stmt, 1, role_stage.pipeline_id); + self.bindText(stuck_stmt, 2, role_stage.stage); + _ = c.sqlite3_bind_int64(stuck_stmt, 3, now_ms - stuck_window_ms); + if (c.sqlite3_step(stuck_stmt) == c.SQLITE_ROW) { + roles.items[idx].stuck_count += c.sqlite3_column_int64(stuck_stmt, 0); + } - const lease_stmt = try self.prepare( - "SELECT COUNT(*) FROM leases l JOIN runs r ON r.id = l.run_id JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND l.expires_at_ms > ? AND l.expires_at_ms <= ?;", - ); - defer _ = c.sqlite3_finalize(lease_stmt); - self.bindText(lease_stmt, 1, pipeline_id); - self.bindText(lease_stmt, 2, stage); - _ = c.sqlite3_bind_int64(lease_stmt, 3, now_ms); - _ = c.sqlite3_bind_int64(lease_stmt, 4, now_ms + near_expiry_window_ms); - if (c.sqlite3_step(lease_stmt) == c.SQLITE_ROW) { - roles.items[idx].near_expiry_leases += c.sqlite3_column_int64(lease_stmt, 0); - } + const lease_stmt = try self.prepare( + "SELECT COUNT(*) FROM leases l JOIN runs r ON r.id = l.run_id JOIN tasks t ON t.id = r.task_id WHERE t.pipeline_id = ? AND t.stage = ? AND l.expires_at_ms > ? AND l.expires_at_ms <= ?;", + ); + defer _ = c.sqlite3_finalize(lease_stmt); + self.bindText(lease_stmt, 1, role_stage.pipeline_id); + self.bindText(lease_stmt, 2, role_stage.stage); + _ = c.sqlite3_bind_int64(lease_stmt, 3, now_ms); + _ = c.sqlite3_bind_int64(lease_stmt, 4, now_ms + near_expiry_window_ms); + if (c.sqlite3_step(lease_stmt) == c.SQLITE_ROW) { + roles.items[idx].near_expiry_leases += c.sqlite3_column_int64(lease_stmt, 0); } } return roles.toOwnedSlice(self.allocator); } + // ===== Store (KV) ===== + + pub fn storePut(self: *Self, namespace: []const u8, key: []const u8, value_json: []const u8) !void { + const now_ms = ids.nowMs(); + const stmt = try self.prepare( + "INSERT INTO store (namespace, key, value_json, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?) ON CONFLICT(namespace, key) DO UPDATE SET value_json = excluded.value_json, updated_at_ms = excluded.updated_at_ms;", + ); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, namespace); + self.bindText(stmt, 2, key); + self.bindText(stmt, 3, value_json); + _ = c.sqlite3_bind_int64(stmt, 4, now_ms); + _ = c.sqlite3_bind_int64(stmt, 5, now_ms); + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.InsertFailed; + } + + pub fn storeGet(self: *Self, alloc: std.mem.Allocator, namespace: []const u8, key: []const u8) !?StoreEntry { + const stmt = try self.prepare("SELECT namespace, key, value_json, created_at_ms, updated_at_ms FROM store WHERE namespace = ? AND key = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, namespace); + self.bindText(stmt, 2, key); + + if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return null; + return .{ + .namespace = try alloc.dupe(u8, self.colTextView(stmt, 0)), + .key = try alloc.dupe(u8, self.colTextView(stmt, 1)), + .value_json = try alloc.dupe(u8, self.colTextView(stmt, 2)), + .created_at_ms = c.sqlite3_column_int64(stmt, 3), + .updated_at_ms = c.sqlite3_column_int64(stmt, 4), + }; + } + + pub fn storeList(self: *Self, alloc: std.mem.Allocator, namespace: []const u8) ![]StoreEntry { + const stmt = try self.prepare("SELECT namespace, key, value_json, created_at_ms, updated_at_ms FROM store WHERE namespace = ? ORDER BY key;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, namespace); + + var results: std.ArrayListUnmanaged(StoreEntry) = .empty; + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try results.append(alloc, .{ + .namespace = try alloc.dupe(u8, self.colTextView(stmt, 0)), + .key = try alloc.dupe(u8, self.colTextView(stmt, 1)), + .value_json = try alloc.dupe(u8, self.colTextView(stmt, 2)), + .created_at_ms = c.sqlite3_column_int64(stmt, 3), + .updated_at_ms = c.sqlite3_column_int64(stmt, 4), + }); + } + return results.toOwnedSlice(alloc); + } + + pub fn storeDelete(self: *Self, namespace: []const u8, key: []const u8) !void { + const stmt = try self.prepare("DELETE FROM store WHERE namespace = ? AND key = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, namespace); + self.bindText(stmt, 2, key); + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.DeleteFailed; + } + + pub fn storeSearch( + self: *Self, + alloc: std.mem.Allocator, + namespace: ?[]const u8, + query: []const u8, + limit: usize, + filter_path: ?[]const u8, + filter_value: ?[]const u8, + ) ![]StoreEntry { + const sql = + "SELECT s.namespace, s.key, s.value_json, s.created_at_ms, s.updated_at_ms " ++ + "FROM store s " ++ + "JOIN store_fts f ON s.rowid = f.rowid " ++ + "WHERE store_fts MATCH ? " ++ + "AND (? IS NULL OR s.namespace = ?) " ++ + "AND (? IS NULL OR json_extract(s.value_json, ?) = ?) " ++ + "ORDER BY rank " ++ + "LIMIT ?;"; + const stmt = try self.prepare(sql); + defer _ = c.sqlite3_finalize(stmt); + + self.bindText(stmt, 1, query); + if (namespace) |ns| { + self.bindText(stmt, 2, ns); + self.bindText(stmt, 3, ns); + } else { + _ = c.sqlite3_bind_null(stmt, 2); + _ = c.sqlite3_bind_null(stmt, 3); + } + if (filter_path) |fp| { + self.bindText(stmt, 4, fp); + self.bindText(stmt, 5, fp); + if (filter_value) |fv| { + self.bindText(stmt, 6, fv); + } else { + _ = c.sqlite3_bind_null(stmt, 6); + } + } else { + _ = c.sqlite3_bind_null(stmt, 4); + _ = c.sqlite3_bind_null(stmt, 5); + _ = c.sqlite3_bind_null(stmt, 6); + } + _ = c.sqlite3_bind_int64(stmt, 7, @intCast(limit)); + + var results: std.ArrayListUnmanaged(StoreEntry) = .empty; + while (c.sqlite3_step(stmt) == c.SQLITE_ROW) { + try results.append(alloc, .{ + .namespace = try alloc.dupe(u8, self.colTextView(stmt, 0)), + .key = try alloc.dupe(u8, self.colTextView(stmt, 1)), + .value_json = try alloc.dupe(u8, self.colTextView(stmt, 2)), + .created_at_ms = c.sqlite3_column_int64(stmt, 3), + .updated_at_ms = c.sqlite3_column_int64(stmt, 4), + }); + } + return results.toOwnedSlice(alloc); + } + + pub fn storeDeleteNamespace(self: *Self, namespace: []const u8) !void { + const stmt = try self.prepare("DELETE FROM store WHERE namespace = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, namespace); + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.DeleteFailed; + } + + pub fn freeStoreEntry(self: *Self, entry: StoreEntry) void { + self.allocator.free(entry.namespace); + self.allocator.free(entry.key); + self.allocator.free(entry.value_json); + } + + pub fn freeStoreEntries(self: *Self, entries: []StoreEntry) void { + for (entries) |entry| self.freeStoreEntry(entry); + self.allocator.free(entries); + } + // ===== Helpers ===== pub fn execSimple(self: *Self, sql: [*:0]const u8) !void { @@ -1975,8 +2170,360 @@ pub const Store = struct { }; } + fn dupeTaskTransition(self: *Self, transition: domain.TransitionDef) !TaskTransition { + return .{ + .trigger = try self.allocator.dupe(u8, transition.trigger), + .to = try self.allocator.dupe(u8, transition.to), + .required_gates = if (transition.required_gates) |gates| + try self.dupeStringSlice(gates) + else + null, + }; + } + + fn dupeStringSlice(self: *Self, values: []const []const u8) ![]const []const u8 { + const duped = try self.allocator.alloc([]const u8, values.len); + var filled: usize = 0; + errdefer { + for (duped[0..filled]) |value| self.allocator.free(value); + self.allocator.free(duped); + } + + for (values, 0..) |value, i| { + duped[i] = try self.allocator.dupe(u8, value); + filled = i + 1; + } + return duped; + } + fn colInt64Nullable(_: *Self, stmt: *c.sqlite3_stmt, col: c_int) ?i64 { if (c.sqlite3_column_type(stmt, col) == c.SQLITE_NULL) return null; return c.sqlite3_column_int64(stmt, col); } + + // ===== Orchestration ===== + + pub fn updateTaskRunId(self: *Self, task_id: []const u8, run_id: []const u8) !void { + const stmt = try self.prepare("UPDATE tasks SET run_id = ? WHERE id = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, run_id); + self.bindText(stmt, 2, task_id); + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.UpdateFailed; + } + + pub fn updateTaskWorkflowState(self: *Self, task_id: []const u8, state_json: []const u8) !void { + const stmt = try self.prepare("UPDATE tasks SET workflow_state_json = ? WHERE id = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, state_json); + self.bindText(stmt, 2, task_id); + if (c.sqlite3_step(stmt) != c.SQLITE_DONE) return error.UpdateFailed; + } + + pub fn getTaskRunId(self: *Self, task_id: []const u8) !?[]const u8 { + const stmt = try self.prepare("SELECT run_id FROM tasks WHERE id = ?;"); + defer _ = c.sqlite3_finalize(stmt); + self.bindText(stmt, 1, task_id); + if (c.sqlite3_step(stmt) != c.SQLITE_ROW) return null; + return self.colTextNullable(stmt, 0); + } }; + +test "store CRUD" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + // Put + Get + try store.storePut("ns1", "key1", "{\"x\":1}"); + const entry = (try store.storeGet(alloc, "ns1", "key1")).?; + defer alloc.free(entry.namespace); + defer alloc.free(entry.key); + defer alloc.free(entry.value_json); + try std.testing.expectEqualStrings("{\"x\":1}", entry.value_json); + + // Update (upsert preserves created_at_ms) + try store.storePut("ns1", "key1", "{\"x\":2}"); + const entry2 = (try store.storeGet(alloc, "ns1", "key1")).?; + defer alloc.free(entry2.namespace); + defer alloc.free(entry2.key); + defer alloc.free(entry2.value_json); + try std.testing.expectEqualStrings("{\"x\":2}", entry2.value_json); + try std.testing.expectEqual(entry.created_at_ms, entry2.created_at_ms); + try std.testing.expect(entry2.updated_at_ms >= entry.updated_at_ms); + + // List + const list = try store.storeList(alloc, "ns1"); + defer { + for (list) |e| { + alloc.free(e.namespace); + alloc.free(e.key); + alloc.free(e.value_json); + } + alloc.free(list); + } + try std.testing.expectEqual(@as(usize, 1), list.len); + + // Delete + try store.storeDelete("ns1", "key1"); + const gone = try store.storeGet(alloc, "ns1", "key1"); + try std.testing.expect(gone == null); + + // Delete namespace + try store.storePut("ns2", "a", "1"); + try store.storePut("ns2", "b", "2"); + try store.storeDeleteNamespace("ns2"); + const ns2_list = try store.storeList(alloc, "ns2"); + defer alloc.free(ns2_list); + try std.testing.expectEqual(@as(usize, 0), ns2_list.len); +} + +test "store search" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + try store.storePut("docs", "readme", "{\"title\":\"Getting Started\",\"body\":\"Welcome to the project\"}"); + try store.storePut("docs", "api", "{\"title\":\"API Reference\",\"body\":\"Endpoints and methods\"}"); + try store.storePut("notes", "todo", "{\"title\":\"Todo List\",\"body\":\"Fix bugs and add features\"}"); + + // Search across all namespaces + const results = try store.storeSearch(alloc, null, "endpoints methods", 10, null, null); + defer { + for (results) |e| { + alloc.free(e.namespace); + alloc.free(e.key); + alloc.free(e.value_json); + } + alloc.free(results); + } + try std.testing.expectEqual(@as(usize, 1), results.len); + try std.testing.expectEqualStrings("api", results[0].key); + + // Search with namespace filter + const ns_results = try store.storeSearch(alloc, "notes", "bugs features", 10, null, null); + defer { + for (ns_results) |e| { + alloc.free(e.namespace); + alloc.free(e.key); + alloc.free(e.value_json); + } + alloc.free(ns_results); + } + try std.testing.expectEqual(@as(usize, 1), ns_results.len); + try std.testing.expectEqualStrings("todo", ns_results[0].key); + + // Search with no results + const empty_results = try store.storeSearch(alloc, null, "nonexistent_xyz", 10, null, null); + defer alloc.free(empty_results); + try std.testing.expectEqual(@as(usize, 0), empty_results.len); +} + +test "task lifecycle: create, claim, event, transition" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const pipeline_def = + \\{"initial":"todo","states":{"todo":{"agent_role":"worker"},"done":{"terminal":true}},"transitions":[{"from":"todo","to":"done","trigger":"complete"}]} + ; + + // Create pipeline + const pipeline_id = try store.createPipeline("test-pipeline", pipeline_def); + defer store.freeOwnedString(pipeline_id); + + // Create task + const task_id = try store.createTask(pipeline_id, "Test Task", "A test task", 5, "{}", null, 0, null); + defer store.freeOwnedString(task_id); + + // Verify task is in initial stage + const task = (try store.getTask(task_id)).?; + defer store.freeTaskRow(task); + try std.testing.expectEqualStrings("todo", task.stage); + try std.testing.expectEqual(@as(i64, 5), task.priority); + + // Claim task + const claim = (try store.claimTask("agent-1", "worker", 300_000, null)).?; + defer store.freeClaimResult(claim); + try std.testing.expectEqualStrings(task_id, claim.task.id); + try std.testing.expectEqualStrings("running", claim.run.status); + try std.testing.expect(claim.lease_token.len > 0); + + // Add event + const event_id = try store.addEvent(claim.run.id, "progress", "{\"step\":1}"); + try std.testing.expect(event_id > 0); + + // Transition + const transition = try store.transitionRun(claim.run.id, "complete", null, null, "todo", null); + defer store.freeTransitionResult(transition); + try std.testing.expectEqualStrings("todo", transition.previous_stage); + try std.testing.expectEqualStrings("done", transition.new_stage); + + // Verify task moved to terminal stage + const task_after = (try store.getTask(task_id)).?; + defer store.freeTaskRow(task_after); + try std.testing.expectEqualStrings("done", task_after.stage); + try std.testing.expectEqual(@as(i64, 2), task_after.task_version); + + // No more claimable work + const no_claim = try store.claimTask("agent-1", "worker", 300_000, null); + try std.testing.expect(no_claim == null); +} + +test "claim result owns run fields independently from inputs and task row" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const pipeline_def = + \\{"initial":"todo","states":{"todo":{"agent_role":"worker"},"done":{"terminal":true}},"transitions":[{"from":"todo","to":"done","trigger":"complete"}]} + ; + + const pipeline_id = try store.createPipeline("claim-owned-run", pipeline_def); + defer store.freeOwnedString(pipeline_id); + + const task_id = try store.createTask(pipeline_id, "Owned Run", "desc", 1, "{}", null, 0, null); + defer store.freeOwnedString(task_id); + + const agent_id = try alloc.dupe(u8, "agent-owned"); + defer alloc.free(agent_id); + const agent_role = try alloc.dupe(u8, "worker"); + defer alloc.free(agent_role); + + const claim = (try store.claimTask(agent_id, agent_role, 300_000, null)).?; + defer store.freeClaimResult(claim); + + try std.testing.expect(claim.run.task_id.ptr != claim.task.id.ptr); + try std.testing.expect(claim.run.agent_id != null); + try std.testing.expect(claim.run.agent_id.?.ptr != agent_id.ptr); + try std.testing.expect(claim.run.agent_role != null); + try std.testing.expect(claim.run.agent_role.?.ptr != agent_role.ptr); +} + +test "claim respects per-state concurrency limits" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const pipeline_def = + \\{"initial":"review","states":{"review":{"agent_role":"reviewer"},"done":{"terminal":true}},"transitions":[{"from":"review","to":"done","trigger":"approve"}]} + ; + + const pipeline_id = try store.createPipeline("concurrency-test", pipeline_def); + defer store.freeOwnedString(pipeline_id); + + // Create 3 tasks + const t1 = try store.createTask(pipeline_id, "Task 1", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(t1); + const t2 = try store.createTask(pipeline_id, "Task 2", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(t2); + const t3 = try store.createTask(pipeline_id, "Task 3", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(t3); + + // Set per-state concurrency limit of 2 for "review" + var concurrency_map = std.json.ObjectMap.init(alloc); + defer concurrency_map.deinit(); + try concurrency_map.put("review", .{ .integer = 2 }); + const per_state: std.json.Value = .{ .object = concurrency_map }; + + // Claim first two tasks — should succeed + const c1 = (try store.claimTask("a1", "reviewer", 300_000, per_state)).?; + defer store.freeClaimResult(c1); + const c2 = (try store.claimTask("a2", "reviewer", 300_000, per_state)).?; + defer store.freeClaimResult(c2); + + // Third claim should be blocked by concurrency limit + const c3 = try store.claimTask("a3", "reviewer", 300_000, per_state); + try std.testing.expect(c3 == null); + + // Complete one task, freeing a slot + const transition = try store.transitionRun(c1.run.id, "approve", null, null, null, null); + defer store.freeTransitionResult(transition); + + // Now third claim should succeed + const c3_retry = (try store.claimTask("a3", "reviewer", 300_000, per_state)).?; + defer store.freeClaimResult(c3_retry); + try std.testing.expectEqualStrings(t3, c3_retry.task.id); +} + +test "claim: no work when no matching role" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const pipeline_def = + \\{"initial":"coding","states":{"coding":{"agent_role":"coder"},"done":{"terminal":true}},"transitions":[{"from":"coding","to":"done","trigger":"complete"}]} + ; + + const pipeline_id = try store.createPipeline("role-test", pipeline_def); + defer store.freeOwnedString(pipeline_id); + + const task_id = try store.createTask(pipeline_id, "Code Task", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(task_id); + + // Claim with wrong role returns null + const result = try store.claimTask("agent-1", "reviewer", 300_000, null); + try std.testing.expect(result == null); + + // Claim with correct role returns task + const claim = (try store.claimTask("agent-1", "coder", 300_000, null)).?; + defer store.freeClaimResult(claim); + try std.testing.expectEqualStrings(task_id, claim.task.id); +} + +test "claim isolates shared stage names by pipeline role mapping" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const reviewer_pipeline = + \\{"initial":"shared","states":{"shared":{"agent_role":"reviewer"},"done":{"terminal":true}},"transitions":[{"from":"shared","to":"done","trigger":"approve"}]} + ; + const coder_pipeline = + \\{"initial":"shared","states":{"shared":{"agent_role":"coder"},"done":{"terminal":true}},"transitions":[{"from":"shared","to":"done","trigger":"complete"}]} + ; + + const reviewer_pipeline_id = try store.createPipeline("shared-stage-reviewer", reviewer_pipeline); + defer store.freeOwnedString(reviewer_pipeline_id); + const coder_pipeline_id = try store.createPipeline("shared-stage-coder", coder_pipeline); + defer store.freeOwnedString(coder_pipeline_id); + + const reviewer_task = try store.createTask(reviewer_pipeline_id, "Review Task", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(reviewer_task); + const coder_task = try store.createTask(coder_pipeline_id, "Code Task", "desc", 0, "{}", null, 0, null); + defer store.freeOwnedString(coder_task); + + const reviewer_claim = (try store.claimTask("agent-r", "reviewer", 300_000, null)).?; + defer store.freeClaimResult(reviewer_claim); + try std.testing.expectEqualStrings(reviewer_task, reviewer_claim.task.id); + + const coder_claim = (try store.claimTask("agent-c", "coder", 300_000, null)).?; + defer store.freeClaimResult(coder_claim); + try std.testing.expectEqualStrings(coder_task, coder_claim.task.id); +} + +test "fail run with retry policy" { + const alloc = std.testing.allocator; + var store = try Store.init(alloc, ":memory:"); + defer store.deinit(); + + const pipeline_def = + \\{"initial":"process","states":{"process":{"agent_role":"worker"},"dead":{"terminal":true}},"transitions":[{"from":"process","to":"dead","trigger":"complete"}]} + ; + + const pipeline_id = try store.createPipeline("retry-test", pipeline_def); + defer store.freeOwnedString(pipeline_id); + + const task_id = try store.createTask(pipeline_id, "Retry Task", "desc", 0, "{}", 2, 1000, "dead"); + defer store.freeOwnedString(task_id); + + // First claim and fail + const c1 = (try store.claimTask("agent-1", "worker", 300_000, null)).?; + defer store.freeClaimResult(c1); + try store.failRun(c1.run.id, "error 1", null); + + // Task should have next_eligible_at_ms set (retry delay) + const task_after_1 = (try store.getTask(task_id)).?; + defer store.freeTaskRow(task_after_1); + try std.testing.expect(task_after_1.next_eligible_at_ms > 0); + try std.testing.expect(task_after_1.dead_letter_reason == null); +} diff --git a/src/types.zig b/src/types.zig deleted file mode 100644 index 3f50b19..0000000 --- a/src/types.zig +++ /dev/null @@ -1,93 +0,0 @@ -/// Shared response and data types for JSON serialization. -/// Zig structs used with std.json.stringify. - -pub const HealthResponse = struct { - status: []const u8, - version: []const u8, - tasks_by_stage: []const StageCount, - active_leases: i64, -}; - -pub const StageCount = struct { - stage: []const u8, - count: i64, -}; - -pub const ErrorResponse = struct { - @"error": ErrorDetail, -}; - -pub const ErrorDetail = struct { - code: []const u8, - message: []const u8, -}; - -pub const PipelineResponse = struct { - id: []const u8, - name: []const u8, - definition: []const u8, // raw JSON string - created_at_ms: i64, -}; - -pub const TaskResponse = struct { - id: []const u8, - pipeline_id: []const u8, - stage: []const u8, - title: []const u8, - description: []const u8, - priority: i64, - metadata: []const u8, // raw JSON string - created_at_ms: i64, - updated_at_ms: i64, -}; - -pub const RunResponse = struct { - id: []const u8, - task_id: []const u8, - attempt: i64, - status: []const u8, - agent_id: ?[]const u8, - agent_role: ?[]const u8, - started_at_ms: ?i64, - ended_at_ms: ?i64, -}; - -pub const LeaseResponse = struct { - lease_id: []const u8, - lease_token: []const u8, - expires_at_ms: i64, -}; - -pub const ClaimResponse = struct { - task: TaskResponse, - run: RunResponse, - lease_id: []const u8, - lease_token: []const u8, - expires_at_ms: i64, -}; - -pub const EventResponse = struct { - id: i64, - run_id: []const u8, - ts_ms: i64, - kind: []const u8, - data: []const u8, // raw JSON string -}; - -pub const TransitionResponse = struct { - previous_stage: []const u8, - new_stage: []const u8, - trigger: []const u8, -}; - -pub const ArtifactResponse = struct { - id: []const u8, - task_id: ?[]const u8, - run_id: ?[]const u8, - created_at_ms: i64, - kind: []const u8, - uri: []const u8, - sha256_hex: ?[]const u8, - size_bytes: ?i64, - meta: []const u8, // raw JSON string -}; diff --git a/tests/test_e2e.sh b/tests/test_e2e.sh index 1c9a8d7..58f7f5d 100755 --- a/tests/test_e2e.sh +++ b/tests/test_e2e.sh @@ -343,29 +343,7 @@ assert_status 200 "$CODE" "POST /leases/claim (coder)" RUN2_ID=$(echo "$BODY" | python3 -c "import sys,json; print(json.load(sys.stdin)['run']['id'])") LEASE2_TOKEN=$(echo "$BODY" | python3 -c "import sys,json; print(json.load(sys.stdin)['lease_token'])") -# Transition without gate should fail -RESP=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" \ - -H "Authorization: Bearer $LEASE2_TOKEN" \ - -d '{"trigger":"complete"}' \ - "$BASE/runs/$RUN2_ID/transition") -CODE=$(echo "$RESP" | tail -1) -assert_status 409 "$CODE" "POST /runs/{id}/transition blocked by required gates" - -# Add gate result -RESP=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" \ - -H "Authorization: Bearer $LEASE2_TOKEN" \ - -d '{"gate":"tests_passed","status":"pass","evidence":{"tests":"ok"},"actor":"review-bot"}' \ - "$BASE/runs/$RUN2_ID/gates") -CODE=$(echo "$RESP" | tail -1) -assert_status 201 "$CODE" "POST /runs/{id}/gates" - -RESP=$(curl -s -w "\n%{http_code}" "$BASE/runs/$RUN2_ID/gates") -CODE=$(echo "$RESP" | tail -1) -BODY=$(echo "$RESP" | sed '$d') -assert_status 200 "$CODE" "GET /runs/{id}/gates" -assert_json "$BODY" "str(len(data))" "1" "gate result persisted" - -# Transition: coding → review +# Transition: coding → review (required_gates are informational only; not enforced server-side) RESP=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" \ -H "Authorization: Bearer $LEASE2_TOKEN" \ -d '{"trigger":"complete","expected_stage":"coding","expected_task_version":2}' \ @@ -430,13 +408,6 @@ assert_status 200 "$CODE" "Task re-claimable after failure" RUN5_ID=$(echo "$RESP" | sed '$d' | python3 -c "import sys,json; print(json.load(sys.stdin)['run']['id'])") LEASE5_TOKEN=$(echo "$RESP" | sed '$d' | python3 -c "import sys,json; print(json.load(sys.stdin)['lease_token'])") -RESP=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" \ - -H "Authorization: Bearer $LEASE5_TOKEN" \ - -d '{"gate":"tests_passed","status":"pass","evidence":{"tests":"ok"}}' \ - "$BASE/runs/$RUN5_ID/gates") -CODE=$(echo "$RESP" | tail -1) -assert_status 201 "$CODE" "POST /runs/{id}/gates before retry completion" - RESP=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" \ -H "Authorization: Bearer $LEASE5_TOKEN" \ -d '{"trigger":"complete"}' \ @@ -588,6 +559,78 @@ BODY=$(echo "$RESP" | sed '$d') assert_status 200 "$CODE" "GET /ops/queue" assert_json "$BODY" "str(len(data['roles']) > 0)" "True" "queue roles present" +# ===== 8.4 Store API (KV) ===== +echo "" +echo "=== 8.4 Store API ===" + +# Put +RESP=$(curl -s -w "\n%{http_code}" -X PUT -H "Content-Type: application/json" \ + -d '{"value":{"title":"Getting Started","body":"Welcome"}}' \ + "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +assert_status 204 "$CODE" "PUT /store/{ns}/{key}" + +# Get +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +BODY=$(echo "$RESP" | sed '$d') +assert_status 200 "$CODE" "GET /store/{ns}/{key}" +assert_json "$BODY" "data['value']['title']" "Getting Started" "store get value" + +# Upsert +RESP=$(curl -s -w "\n%{http_code}" -X PUT -H "Content-Type: application/json" \ + -d '{"value":{"title":"Updated","body":"New content"}}' \ + "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +assert_status 204 "$CODE" "PUT /store/{ns}/{key} (upsert)" + +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +BODY=$(echo "$RESP" | sed '$d') +assert_json "$BODY" "data['value']['title']" "Updated" "store upsert value" + +# Put second key +RESP=$(curl -s -w "\n%{http_code}" -X PUT -H "Content-Type: application/json" \ + -d '{"value":{"title":"API Docs","body":"Endpoints and methods"}}' \ + "$BASE/store/docs/api") +CODE=$(echo "$RESP" | tail -1) +assert_status 204 "$CODE" "PUT /store/{ns}/{key} (second key)" + +# List namespace +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/docs") +CODE=$(echo "$RESP" | tail -1) +BODY=$(echo "$RESP" | sed '$d') +assert_status 200 "$CODE" "GET /store/{ns}" +assert_json "$BODY" "str(len(data))" "2" "store list namespace count" + +# Search +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/search?q=endpoints+methods") +CODE=$(echo "$RESP" | tail -1) +BODY=$(echo "$RESP" | sed '$d') +assert_status 200 "$CODE" "GET /store/search" +assert_json "$BODY" "str(len(data))" "1" "store search result count" +assert_json "$BODY" "data[0]['key']" "api" "store search found correct key" + +# Delete key +RESP=$(curl -s -w "\n%{http_code}" -X DELETE "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +assert_status 204 "$CODE" "DELETE /store/{ns}/{key}" + +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/docs/readme") +CODE=$(echo "$RESP" | tail -1) +assert_status 404 "$CODE" "GET deleted store key returns 404" + +# Delete namespace +RESP=$(curl -s -w "\n%{http_code}" -X DELETE "$BASE/store/docs") +CODE=$(echo "$RESP" | tail -1) +assert_status 204 "$CODE" "DELETE /store/{ns}" + +RESP=$(curl -s -w "\n%{http_code}" "$BASE/store/docs") +CODE=$(echo "$RESP" | tail -1) +BODY=$(echo "$RESP" | sed '$d') +assert_status 200 "$CODE" "GET /store/{ns} after delete" +assert_json "$BODY" "str(len(data))" "0" "store namespace empty after delete" + # ===== 9. No tasks for non-existent role ===== echo "" echo "=== 9. Edge Cases ==="