diff --git a/README.md b/README.md index bf50932..1d40166 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ NullTickets). - **One-click updates** -- download, migrate config, rollback on failure - **Multi-instance** -- run multiple instances of the same component side by side - **Web UI + CLI** -- browser dashboard for humans, CLI for automation +- **Orchestration UI** -- workflow editor, poll-based run monitoring, checkpoint forking, encoded workflow/run/store links, and key-value store browser (proxied to NullTickets through NullHub) ## Quick Start @@ -103,6 +104,12 @@ UI modules. NullHub is a generic engine that interprets manifests. **Storage** -- all state lives under `~/.nullhub/` (config, instances, binaries, logs, cached manifests). +**Orchestration proxy** -- requests to `/api/orchestration/*` are reverse-proxied +to the local orchestration stack. Most routes go to NullBoiler's REST API via +`NULLBOILER_URL` (e.g. `http://localhost:8080`) and optional `NULLBOILER_TOKEN`. +`/api/orchestration/store/*` is proxied to NullTickets via `NULLTICKETS_URL` and +optional `NULLTICKETS_TOKEN`. + ## Development Backend: @@ -127,7 +134,9 @@ End-to-end: - Zig 0.15.2 - Svelte 5 + SvelteKit (static adapter) -- JSON over HTTP/1.1, SSE for streaming +- JSON over HTTP/1.1 +- SSE for instance log streaming +- Poll-based orchestration run updates over the `/orchestration/runs/{id}/stream` API ## Project Layout @@ -138,15 +147,17 @@ src/ server.zig # HTTP server (API + static UI) auth.zig # Optional bearer token auth api/ # REST endpoints (components, instances, wizard, ...) + orchestration.zig # Reverse proxy to NullBoiler orchestration API core/ # Manifest parser, state, platform, paths installer/ # Download, build, UI module fetching supervisor/ # Process spawn, health checks, manager - wizard/ # Manifest wizard engine, config writer ui/src/ - routes/ # SvelteKit pages (dashboard, install, instances, settings) + routes/ # SvelteKit pages + orchestration/ # Orchestration pages (dashboard, workflows, runs, store) lib/components/ # Reusable Svelte components + orchestration/ # GraphViewer, StateInspector, RunEventLog, InterruptPanel, + # CheckpointTimeline, WorkflowJsonEditor, NodeCard, SendProgressBar lib/api/ # Typed API client - lib/stores/ # Reactive state (instances, hub config) tests/ test_e2e.sh # End-to-end test script ``` diff --git a/src/api/orchestration.zig b/src/api/orchestration.zig new file mode 100644 index 0000000..eac4857 --- /dev/null +++ b/src/api/orchestration.zig @@ -0,0 +1,194 @@ +const std = @import("std"); +const Allocator = std.mem.Allocator; + +const Response = struct { + status: []const u8, + content_type: []const u8, + body: []const u8, +}; + +const prefix = "/api/orchestration"; +const store_prefix = "/api/orchestration/store"; + +pub const Config = struct { + boiler_url: ?[]const u8 = null, + boiler_token: ?[]const u8 = null, + tickets_url: ?[]const u8 = null, + tickets_token: ?[]const u8 = null, +}; + +const Backend = enum { + boiler, + tickets, + + fn notConfiguredBody(self: Backend) []const u8 { + return switch (self) { + .boiler => "{\"error\":\"NullBoiler not configured\"}", + .tickets => "{\"error\":\"NullTickets not configured\"}", + }; + } + + fn unreachableBody(self: Backend) []const u8 { + return switch (self) { + .boiler => "{\"error\":\"NullBoiler unreachable\"}", + .tickets => "{\"error\":\"NullTickets unreachable\"}", + }; + } +}; + +pub fn isProxyPath(target: []const u8) bool { + return std.mem.eql(u8, target, prefix) or std.mem.startsWith(u8, target, prefix ++ "/"); +} + +fn isStorePath(target: []const u8) bool { + return std.mem.eql(u8, target, store_prefix) or std.mem.startsWith(u8, target, store_prefix ++ "/"); +} + +const ProxyTarget = struct { + backend: Backend, + base_url: []const u8, + token: ?[]const u8, +}; + +fn backendForPath(target: []const u8) ?Backend { + if (!isProxyPath(target)) return null; + return if (isStorePath(target)) .tickets else .boiler; +} + +fn resolveProxyTarget(target: []const u8, cfg: Config) ?ProxyTarget { + const backend = backendForPath(target) orelse return null; + return switch (backend) { + .tickets => blk: { + const base_url = cfg.tickets_url orelse return null; + break :blk .{ + .backend = .tickets, + .base_url = base_url, + .token = cfg.tickets_token, + }; + }, + .boiler => blk: { + const base_url = cfg.boiler_url orelse return null; + break :blk .{ + .backend = .boiler, + .base_url = base_url, + .token = cfg.boiler_token, + }; + }, + }; +} + +/// Proxies orchestration API requests to the local orchestration stack. +/// `/api/orchestration/store/*` goes to NullTickets; all other orchestration +/// routes go to NullBoiler. The shared prefix is stripped before forwarding. +pub fn handle(allocator: Allocator, method: []const u8, target: []const u8, body: []const u8, cfg: Config) Response { + if (!isProxyPath(target)) { + return .{ .status = "404 Not Found", .content_type = "application/json", .body = "{\"error\":\"not found\"}" }; + } + const backend = backendForPath(target) orelse + return .{ .status = "404 Not Found", .content_type = "application/json", .body = "{\"error\":\"not found\"}" }; + const resolved = resolveProxyTarget(target, cfg) orelse + return .{ .status = "503 Service Unavailable", .content_type = "application/json", .body = backend.notConfiguredBody() }; + + const proxied_path = target[prefix.len..]; + const path = if (proxied_path.len == 0) "/" else proxied_path; + + const url = std.fmt.allocPrint(allocator, "{s}{s}", .{ resolved.base_url, path }) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + + const http_method = parseMethod(method) orelse + return .{ .status = "405 Method Not Allowed", .content_type = "application/json", .body = "{\"error\":\"method not allowed\"}" }; + + var auth_header: ?[]const u8 = null; + defer if (auth_header) |value| allocator.free(value); + var header_buf: [1]std.http.Header = undefined; + const extra_headers: []const std.http.Header = if (resolved.token) |token| blk: { + auth_header = std.fmt.allocPrint(allocator, "Bearer {s}", .{token}) catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + header_buf[0] = .{ .name = "Authorization", .value = auth_header.? }; + break :blk header_buf[0..1]; + } else &.{}; + + var client: std.http.Client = .{ .allocator = allocator }; + defer client.deinit(); + + var response_body: std.io.Writer.Allocating = .init(allocator); + defer response_body.deinit(); + + const result = client.fetch(.{ + .location = .{ .url = url }, + .method = http_method, + .payload = if (body.len > 0) body else null, + .response_writer = &response_body.writer, + .extra_headers = extra_headers, + }) catch { + return .{ .status = "502 Bad Gateway", .content_type = "application/json", .body = resolved.backend.unreachableBody() }; + }; + + const status_code: u10 = @intFromEnum(result.status); + const resp_body = response_body.toOwnedSlice() catch + return .{ .status = "500 Internal Server Error", .content_type = "application/json", .body = "{\"error\":\"internal error\"}" }; + + const status = mapStatus(status_code); + + return .{ + .status = status, + .content_type = "application/json", + .body = resp_body, + }; +} + +fn parseMethod(method: []const u8) ?std.http.Method { + if (std.mem.eql(u8, method, "GET")) return .GET; + if (std.mem.eql(u8, method, "POST")) return .POST; + if (std.mem.eql(u8, method, "PUT")) return .PUT; + if (std.mem.eql(u8, method, "DELETE")) return .DELETE; + if (std.mem.eql(u8, method, "PATCH")) return .PATCH; + return null; +} + +fn mapStatus(code: u10) []const u8 { + return switch (code) { + 200 => "200 OK", + 201 => "201 Created", + 204 => "204 No Content", + 400 => "400 Bad Request", + 401 => "401 Unauthorized", + 403 => "403 Forbidden", + 404 => "404 Not Found", + 405 => "405 Method Not Allowed", + 409 => "409 Conflict", + 422 => "422 Unprocessable Entity", + 500 => "500 Internal Server Error", + 502 => "502 Bad Gateway", + 503 => "503 Service Unavailable", + else => if (code >= 200 and code < 300) "200 OK" else if (code >= 400 and code < 500) "400 Bad Request" else "500 Internal Server Error", + }; +} + +test "isProxyPath matches orchestration namespace" { + try std.testing.expect(isProxyPath("/api/orchestration")); + try std.testing.expect(isProxyPath("/api/orchestration/runs")); + try std.testing.expect(isProxyPath("/api/orchestration/store/search")); + try std.testing.expect(!isProxyPath("/api/instances")); +} + +test "backendForPath routes store requests to tickets backend" { + try std.testing.expectEqual(Backend.tickets, backendForPath("/api/orchestration/store/search").?); + try std.testing.expectEqual(Backend.boiler, backendForPath("/api/orchestration/runs").?); +} + +test "handle routes store paths to NullTickets config" { + const resp = handle(std.testing.allocator, "GET", "/api/orchestration/store/search", "", .{ + .boiler_url = "http://127.0.0.1:8080", + }); + try std.testing.expectEqualStrings("503 Service Unavailable", resp.status); + try std.testing.expectEqualStrings("{\"error\":\"NullTickets not configured\"}", resp.body); +} + +test "handle routes non-store paths to NullBoiler config" { + const resp = handle(std.testing.allocator, "GET", "/api/orchestration/runs", "", .{ + .tickets_url = "http://127.0.0.1:7711", + }); + try std.testing.expectEqualStrings("503 Service Unavailable", resp.status); + try std.testing.expectEqualStrings("{\"error\":\"NullBoiler not configured\"}", resp.body); +} diff --git a/src/server.zig b/src/server.zig index cc2dc9c..5f30e85 100644 --- a/src/server.zig +++ b/src/server.zig @@ -17,6 +17,7 @@ const wizard_api = @import("api/wizard.zig"); const providers_api = @import("api/providers.zig"); const channels_api = @import("api/channels.zig"); const usage_api = @import("api/usage.zig"); +const orchestration_api = @import("api/orchestration.zig"); const ui_modules = @import("installer/ui_modules.zig"); const orchestrator = @import("installer/orchestrator.zig"); const registry = @import("installer/registry.zig"); @@ -348,11 +349,20 @@ pub const Server = struct { if (std.mem.eql(u8, method, "GET") or std.mem.eql(u8, method, "HEAD")) { if (try self.redirectLocationForAliasHost(alloc, raw, target)) |location| { defer alloc.free(location); - try sendRedirect(conn.stream, location); + try sendRedirect(conn.stream, location, raw, self.host, self.port); return; } } + if (!requestOriginAllowed(raw, target, self.host, self.port)) { + try sendResponse(conn.stream, .{ + .status = "403 Forbidden", + .content_type = "application/json", + .body = "{\"error\":\"forbidden origin\"}", + }, raw, self.host, self.port); + return; + } + // Read remaining body if Content-Length indicates more data const body = readBody(raw, n, conn.stream, alloc) catch return; @@ -362,7 +372,7 @@ pub const Server = struct { .status = "204 No Content", .content_type = "text/plain", .body = "", - }); + }, raw, self.host, self.port); return; } @@ -373,20 +383,20 @@ pub const Server = struct { .status = "401 Unauthorized", .content_type = "application/json", .body = "{\"error\":\"unauthorized\"}", - }); + }, raw, self.host, self.port); return; } } // Route dispatch (lock mutex so supervisor thread doesn't race) - const response = if (instances_api.isIntegrationPath(target)) + const response = if (routeWithoutServerMutex(target)) self.route(alloc, method, target, body) else blk: { self.mutex.lock(); defer self.mutex.unlock(); break :blk self.route(alloc, method, target, body); }; - try sendResponse(conn.stream, response); + try sendResponse(conn.stream, response, raw, self.host, self.port); } fn redirectLocationForAliasHost(self: *const Server, allocator: std.mem.Allocator, raw: []const u8, target: []const u8) !?[]u8 { @@ -403,6 +413,38 @@ pub const Server = struct { }); } + // std.posix.getenv is unavailable on Windows (WTF-16 encoding). + // Orchestration proxy requires Unix — returns null on Windows. + fn getEnv(name: []const u8) ?[]const u8 { + const native = @import("builtin").os.tag; + if (native == .windows) return null; + return std.posix.getenv(name); + } + + fn getBoilerUrl(self: *Server) ?[]const u8 { + _ = self; + return getEnv("NULLBOILER_URL"); + } + + fn getBoilerToken(self: *Server) ?[]const u8 { + _ = self; + return getEnv("NULLBOILER_TOKEN"); + } + + fn getTicketsUrl(self: *Server) ?[]const u8 { + _ = self; + return getEnv("NULLTICKETS_URL"); + } + + fn getTicketsToken(self: *Server) ?[]const u8 { + _ = self; + return getEnv("NULLTICKETS_TOKEN"); + } + + fn routeWithoutServerMutex(target: []const u8) bool { + return instances_api.isIntegrationPath(target) or orchestration_api.isProxyPath(target); + } + fn route(self: *Server, allocator: std.mem.Allocator, method: []const u8, target: []const u8, body: []const u8) Response { if (std.mem.eql(u8, method, "GET")) { if (std.mem.eql(u8, target, "/health")) { @@ -932,6 +974,16 @@ pub const Server = struct { } } + if (orchestration_api.isProxyPath(target)) { + const resp = orchestration_api.handle(allocator, method, target, body, .{ + .boiler_url = self.getBoilerUrl(), + .boiler_token = self.getBoilerToken(), + .tickets_url = self.getTicketsUrl(), + .tickets_token = self.getTicketsToken(), + }); + return .{ .status = resp.status, .content_type = resp.content_type, .body = resp.body }; + } + // Serve UI module files from data directory (~/.nullhub/ui/{name}@{version}/...) if (!std.mem.startsWith(u8, target, "/api/") and std.mem.startsWith(u8, target, "/ui/")) { // Check if this looks like a module path: /ui/{name}@{version}/... @@ -993,39 +1045,35 @@ fn readBody(raw: []const u8, n: usize, stream: std.net.Stream, alloc: std.mem.Al return extractBody(raw); } -fn sendResponse(stream: std.net.Stream, response: Response) !void { +fn sendResponse(stream: std.net.Stream, response: Response, raw_request: []const u8, bind_host: []const u8, port: u16) !void { var buf: [4096]u8 = undefined; - const header = try std.fmt.bufPrint( - &buf, - "HTTP/1.1 {s}\r\n" ++ - "Content-Type: {s}\r\n" ++ - "Content-Length: {d}\r\n" ++ - "Access-Control-Allow-Origin: *\r\n" ++ - "Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n" ++ - "Access-Control-Allow-Headers: Content-Type, Authorization\r\n" ++ - "Connection: close\r\n\r\n", - .{ response.status, response.content_type, response.body.len }, - ); - _ = try stream.write(header); + var header_stream = std.io.fixedBufferStream(&buf); + const writer = header_stream.writer(); + + try writer.print("HTTP/1.1 {s}\r\n", .{response.status}); + try writer.print("Content-Type: {s}\r\n", .{response.content_type}); + try writer.print("Content-Length: {d}\r\n", .{response.body.len}); + try appendCorsHeaders(writer, raw_request, bind_host, port); + try writer.writeAll("Connection: close\r\n\r\n"); + + _ = try stream.write(header_stream.getWritten()); if (response.body.len > 0) { _ = try stream.write(response.body); } } -fn sendRedirect(stream: std.net.Stream, location: []const u8) !void { +fn sendRedirect(stream: std.net.Stream, location: []const u8, raw_request: []const u8, bind_host: []const u8, port: u16) !void { var buf: [4096]u8 = undefined; - const header = try std.fmt.bufPrint( - &buf, - "HTTP/1.1 308 Permanent Redirect\r\n" ++ - "Location: {s}\r\n" ++ - "Content-Length: 0\r\n" ++ - "Access-Control-Allow-Origin: *\r\n" ++ - "Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n" ++ - "Access-Control-Allow-Headers: Content-Type, Authorization\r\n" ++ - "Connection: close\r\n\r\n", - .{location}, - ); - _ = try stream.write(header); + var header_stream = std.io.fixedBufferStream(&buf); + const writer = header_stream.writer(); + + try writer.writeAll("HTTP/1.1 308 Permanent Redirect\r\n"); + try writer.print("Location: {s}\r\n", .{location}); + try writer.writeAll("Content-Length: 0\r\n"); + try appendCorsHeaders(writer, raw_request, bind_host, port); + try writer.writeAll("Connection: close\r\n\r\n"); + + _ = try stream.write(header_stream.getWritten()); } pub fn extractBody(raw: []const u8) []const u8 { @@ -1055,6 +1103,48 @@ pub fn extractHeader(raw: []const u8, name: []const u8) ?[]const u8 { return null; } +fn requestOriginAllowed(raw_request: []const u8, target: []const u8, bind_host: []const u8, port: u16) bool { + if (!std.mem.startsWith(u8, target, "/api/")) return true; + const origin = extractHeader(raw_request, "Origin") orelse return true; + return isAllowedCorsOrigin(origin, bind_host, port); +} + +fn appendCorsHeaders(writer: anytype, raw_request: []const u8, bind_host: []const u8, port: u16) !void { + const origin = allowedCorsOrigin(raw_request, bind_host, port) orelse return; + try writer.print("Access-Control-Allow-Origin: {s}\r\n", .{origin}); + try writer.writeAll("Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n"); + try writer.writeAll("Access-Control-Allow-Headers: Content-Type, Authorization\r\n"); + try writer.writeAll("Vary: Origin\r\n"); +} + +fn allowedCorsOrigin(raw_request: []const u8, bind_host: []const u8, port: u16) ?[]const u8 { + const origin = extractHeader(raw_request, "Origin") orelse return null; + if (!isAllowedCorsOrigin(origin, bind_host, port)) return null; + return origin; +} + +fn isAllowedCorsOrigin(origin: []const u8, bind_host: []const u8, port: u16) bool { + if (originMatchesHost(origin, bind_host, port)) return true; + if (!access.isLocalBindHost(bind_host)) return false; + + inline for (&[_][]const u8{ + "127.0.0.1", + "localhost", + "[::1]", + access.canonical_local_host, + access.public_alias_host, + }) |host| { + if (originMatchesHost(origin, host, port)) return true; + } + return false; +} + +fn originMatchesHost(origin: []const u8, host: []const u8, port: u16) bool { + var buf: [256]u8 = undefined; + const expected = std.fmt.bufPrint(&buf, "http://{s}:{d}", .{ host, port }) catch return false; + return std.ascii.eqlIgnoreCase(origin, expected); +} + fn hostMatchesAliasHost(host_header: []const u8, alias_host: []const u8) bool { const trimmed = std.mem.trim(u8, host_header, " \t"); if (std.ascii.eqlIgnoreCase(trimmed, alias_host)) return true; @@ -1271,6 +1361,39 @@ test "hostMatchesAliasHost matches bare host and host with port" { try std.testing.expect(!hostMatchesAliasHost("nullhub.localhost:19800", "nullhub.local")); } +test "isAllowedCorsOrigin allows local aliases for loopback binds" { + try std.testing.expect(isAllowedCorsOrigin("http://127.0.0.1:19800", "127.0.0.1", 19800)); + try std.testing.expect(isAllowedCorsOrigin("http://nullhub.localhost:19800", "127.0.0.1", 19800)); + try std.testing.expect(isAllowedCorsOrigin("http://nullhub.local:19800", "127.0.0.1", 19800)); +} + +test "isAllowedCorsOrigin rejects foreign or mismatched origins" { + try std.testing.expect(!isAllowedCorsOrigin("http://evil.example:19800", "127.0.0.1", 19800)); + try std.testing.expect(!isAllowedCorsOrigin("http://127.0.0.1:19801", "127.0.0.1", 19800)); +} + +test "requestOriginAllowed rejects foreign API origins" { + const evil_raw = + "GET /api/status HTTP/1.1\r\n" ++ + "Host: 127.0.0.1:19800\r\n" ++ + "Origin: http://evil.example:19800\r\n\r\n"; + try std.testing.expect(!requestOriginAllowed(evil_raw, "/api/status", "127.0.0.1", 19800)); + + const local_raw = + "GET /api/status HTTP/1.1\r\n" ++ + "Host: 127.0.0.1:19800\r\n" ++ + "Origin: http://nullhub.localhost:19800\r\n\r\n"; + try std.testing.expect(requestOriginAllowed(local_raw, "/api/status", "127.0.0.1", 19800)); +} + +test "routeWithoutServerMutex keeps orchestration proxy requests off global lock" { + try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration")); + try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration/runs")); + try std.testing.expect(Server.routeWithoutServerMutex("/api/orchestration/store/search")); + try std.testing.expect(Server.routeWithoutServerMutex("/api/instances/nullclaw/demo/logs")); + try std.testing.expect(!Server.routeWithoutServerMutex("/api/components")); +} + test "extractBody returns body after headers" { const raw = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\nhello world"; try std.testing.expectEqualStrings("hello world", extractBody(raw)); diff --git a/ui/src/lib/api/client.ts b/ui/src/lib/api/client.ts index 9b027b5..a663ba9 100644 --- a/ui/src/lib/api/client.ts +++ b/ui/src/lib/api/client.ts @@ -1,3 +1,5 @@ +import { createOrchestrationApi } from '$lib/api/orchestration'; + const BASE = '/api'; function withQuery(path: string, params: Record): string { @@ -10,6 +12,8 @@ function withQuery(path: string, params: Record(path: string, options?: RequestInit): Promise { }); if (!res.ok) { const body = await res.json().catch(() => null); - throw new Error(body?.error || `HTTP ${res.status}`); + const errMsg = typeof body?.error === 'string' ? body.error : body?.error?.message || `HTTP ${res.status}`; + throw new Error(errMsg); } - return res.json(); + if (res.status === 204) return undefined as T; + const text = await res.text(); + if (!text) return undefined as T; + return JSON.parse(text); } export const api = { @@ -174,4 +182,5 @@ export const api = { request(`/channels/${id.replace('sc_', '')}`, { method: 'DELETE' }), revalidateSavedChannel: (id: string) => request(`/channels/${id.replace('sc_', '')}/validate`, { method: 'POST' }), + ...createOrchestrationApi(request, withQuery), }; diff --git a/ui/src/lib/api/orchestration.ts b/ui/src/lib/api/orchestration.ts new file mode 100644 index 0000000..bc8139c --- /dev/null +++ b/ui/src/lib/api/orchestration.ts @@ -0,0 +1,175 @@ +import { orchestrationApiPaths } from '$lib/orchestration/routes'; + +type RequestFn = (path: string, options?: RequestInit) => Promise; +type WithQueryFn = ( + path: string, + params: Record, +) => string; + +function msToIso(ms: number | undefined | null): string | undefined { + if (ms == null) return undefined; + return new Date(ms).toISOString(); +} + +function tryParseJson(val: string | undefined | null): any { + if (!val) return undefined; + try { return JSON.parse(val); } catch { return val; } +} + +function normalizeWorkflow(raw: any): any { + if (!raw) return raw; + const def = raw.definition ? tryParseJson(raw.definition) : null; + return { + ...raw, + nodes: raw.nodes ?? def?.nodes ?? {}, + edges: raw.edges ?? def?.edges ?? [], + state_schema: raw.state_schema ?? def?.state_schema, + created_at: raw.created_at ?? msToIso(raw.created_at_ms), + updated_at: raw.updated_at ?? msToIso(raw.updated_at_ms), + }; +} + +function normalizeStep(step: any): any { + if (!step) return step; + return { + ...step, + node_id: step.node_id ?? step.def_step_id ?? step.step, + }; +} + +function normalizeRun(raw: any): any { + if (!raw) return raw; + const steps = raw.steps ? raw.steps.map(normalizeStep) : raw.steps; + return { + ...raw, + steps, + state: raw.state ?? tryParseJson(raw.state_json), + workflow: raw.workflow ?? tryParseJson(raw.workflow_json), + input: raw.input ?? tryParseJson(raw.input_json), + config: raw.config ?? tryParseJson(raw.config_json), + created_at: raw.created_at ?? msToIso(raw.created_at_ms), + completed_at: raw.completed_at ?? raw.ended_at ?? msToIso(raw.ended_at_ms), + updated_at: raw.updated_at ?? msToIso(raw.updated_at_ms), + started_at: raw.started_at ?? msToIso(raw.started_at_ms), + interrupt_message: raw.interrupt_message ?? raw.error_text, + }; +} + +function normalizeCheckpoint(raw: any): any { + if (!raw) return raw; + return { + ...raw, + state: raw.state ?? tryParseJson(raw.state_json), + completed_nodes: raw.completed_nodes ?? tryParseJson(raw.completed_nodes_json), + metadata: raw.metadata ?? tryParseJson(raw.metadata_json), + created_at: raw.created_at ?? msToIso(raw.created_at_ms), + step_name: raw.step_name ?? raw.step_id, + after_step: raw.after_step ?? raw.step_id, + }; +} + +function normalizeValidation(raw: any): any { + if (!raw) return raw; + if (raw.errors && Array.isArray(raw.errors) && raw.errors.length > 0 && typeof raw.errors[0] === 'object') { + return { ...raw, errors: raw.errors.map((e: any) => e.message || `${e.type || e.err_type}: ${e.key || e.node || 'unknown'}`) }; + } + return raw; +} + +function normalizeEventType(type: string | undefined): string { + if (!type) return 'message'; + if (type === 'run.interrupted') return 'interrupted'; + return type.replaceAll('.', '_'); +} + +function normalizeStreamEvent(raw: any): { type: string; data: any; timestamp?: number } { + const timestampMs = typeof raw?.ts_ms === 'number' + ? raw.ts_ms + : typeof raw?.timestamp_ms === 'number' + ? raw.timestamp_ms + : undefined; + + return { + type: normalizeEventType(raw?.event || raw?.type || raw?.kind), + data: raw?.data ?? raw, + timestamp: timestampMs != null ? timestampMs / 1000 : undefined, + }; +} + +export function createOrchestrationApi(request: RequestFn, withQuery: WithQueryFn) { + return { + listWorkflows: async () => { + const raw = await request(orchestrationApiPaths.workflows()); + const list = Array.isArray(raw) ? raw : raw?.items ?? []; + return list.map(normalizeWorkflow); + }, + getWorkflow: async (id: string) => normalizeWorkflow(await request(orchestrationApiPaths.workflow(id))), + createWorkflow: (data: any) => request(orchestrationApiPaths.workflows(), { method: 'POST', body: JSON.stringify(data) }), + updateWorkflow: (id: string, data: any) => request(orchestrationApiPaths.workflow(id), { method: 'PUT', body: JSON.stringify(data) }), + deleteWorkflow: (id: string) => request(orchestrationApiPaths.workflow(id), { method: 'DELETE' }), + validateWorkflow: async (id: string) => normalizeValidation(await request(orchestrationApiPaths.workflowValidate(id), { method: 'POST' })), + runWorkflow: (id: string, input: any) => request(orchestrationApiPaths.workflowRun(id), { method: 'POST', body: JSON.stringify(input) }), + listRuns: async (params?: { status?: string; workflow_id?: string }) => { + const raw = await request(withQuery(orchestrationApiPaths.runs(), params ?? {})); + const list = Array.isArray(raw) ? raw : raw?.items ?? []; + return list.map(normalizeRun); + }, + getRun: async (id: string) => normalizeRun(await request(orchestrationApiPaths.run(id))), + cancelRun: (id: string) => request(orchestrationApiPaths.runCancel(id), { method: 'POST' }), + resumeRun: (id: string, updates: any) => request(orchestrationApiPaths.runResume(id), { method: 'POST', body: JSON.stringify({ state_updates: updates }) }), + forkRun: (checkpointId: string, overrides?: any) => request(orchestrationApiPaths.runsFork(), { method: 'POST', body: JSON.stringify({ checkpoint_id: checkpointId, state_overrides: overrides }) }), + replayRun: (id: string, checkpointId: string) => request(orchestrationApiPaths.runReplay(id), { method: 'POST', body: JSON.stringify({ from_checkpoint_id: checkpointId }) }), + injectState: (id: string, updates: any, afterStep?: string) => request(orchestrationApiPaths.runState(id), { method: 'POST', body: JSON.stringify({ updates, apply_after_step: afterStep }) }), + listCheckpoints: async (runId: string) => { + const cps = await request(orchestrationApiPaths.runCheckpoints(runId)); + return (cps || []).map(normalizeCheckpoint); + }, + getCheckpoint: async (runId: string, cpId: string) => normalizeCheckpoint(await request(orchestrationApiPaths.runCheckpoint(runId, cpId))), + storeList: (namespace: string) => request(orchestrationApiPaths.storeNamespace(namespace)), + storeGet: (namespace: string, key: string) => request(orchestrationApiPaths.storeEntry(namespace, key)), + storePut: (namespace: string, key: string, value: any) => request(orchestrationApiPaths.storeEntry(namespace, key), { method: 'PUT', body: JSON.stringify({ value }) }), + storeDelete: (namespace: string, key: string) => request(orchestrationApiPaths.storeEntry(namespace, key), { method: 'DELETE' }), + streamRun: (runId: string, onEvent: (event: { type: string; data: any; timestamp?: number }) => void) => { + let active = true; + let deliveredInitialSnapshot = false; + let afterSeq = 0; + + const emitEvent = (ev: any) => { + if (!active) return; + onEvent(normalizeStreamEvent(ev)); + }; + + const poll = async () => { + while (active) { + try { + const res = await request(withQuery(orchestrationApiPaths.runStream(runId), { + after_seq: afterSeq > 0 ? afterSeq : undefined, + })); + if (!active) break; + if (res?.stream_events) { + for (const ev of res.stream_events) emitEvent(ev); + } + if (!deliveredInitialSnapshot && res?.events) { + for (const ev of res.events) emitEvent(ev); + deliveredInitialSnapshot = true; + } + if (typeof res?.next_stream_seq === 'number') { + afterSeq = Math.max(afterSeq, res.next_stream_seq); + } + if (res?.status && ['completed', 'failed', 'cancelled'].includes(res.status)) { + break; + } + } catch { + if (!active) break; + // Ignore poll errors, will retry. + } + if (!active) break; + await new Promise(r => setTimeout(r, 1000)); + } + }; + + void poll(); + return { close: () => { active = false; } } as EventSource; + }, + }; +} diff --git a/ui/src/lib/components/Sidebar.svelte b/ui/src/lib/components/Sidebar.svelte index e5c0225..e66b77d 100644 --- a/ui/src/lib/components/Sidebar.svelte +++ b/ui/src/lib/components/Sidebar.svelte @@ -2,6 +2,7 @@ import { page } from "$app/stores"; import { onMount } from "svelte"; import { api } from "$lib/api/client"; + import { orchestrationUiRoutes } from "$lib/orchestration/routes"; let instances = $state>({}); let currentPath = $derived($page.url.pathname); @@ -52,6 +53,14 @@ {/each} + + diff --git a/ui/src/lib/components/orchestration/CheckpointTimeline.svelte b/ui/src/lib/components/orchestration/CheckpointTimeline.svelte new file mode 100644 index 0000000..9b50f33 --- /dev/null +++ b/ui/src/lib/components/orchestration/CheckpointTimeline.svelte @@ -0,0 +1,122 @@ + + +
+ {#if checkpoints.length === 0} +
No checkpoints
+ {/if} + {#each checkpoints as cp, i} + + {/each} +
+ + diff --git a/ui/src/lib/components/orchestration/GraphViewer.svelte b/ui/src/lib/components/orchestration/GraphViewer.svelte new file mode 100644 index 0000000..97180a4 --- /dev/null +++ b/ui/src/lib/components/orchestration/GraphViewer.svelte @@ -0,0 +1,258 @@ + + +
+ + + + + + + + {#each graph.ledges as edge} + + {/each} + + {#each graph.lnodes as node} + {#if node.isTerminal} + + {node.label} + {:else} + {@const color = statusColors[nodeStatus[node.id] || 'pending']} + + {typeLabels[node.type] || '?'} + {node.label.length > 12 ? node.label.slice(0, 11) + '...' : node.label} + {/if} + {/each} + +
+ + diff --git a/ui/src/lib/components/orchestration/InterruptPanel.svelte b/ui/src/lib/components/orchestration/InterruptPanel.svelte new file mode 100644 index 0000000..136bcd5 --- /dev/null +++ b/ui/src/lib/components/orchestration/InterruptPanel.svelte @@ -0,0 +1,206 @@ + + + +
+ +
+ + diff --git a/ui/src/lib/components/orchestration/NodeCard.svelte b/ui/src/lib/components/orchestration/NodeCard.svelte new file mode 100644 index 0000000..84656b5 --- /dev/null +++ b/ui/src/lib/components/orchestration/NodeCard.svelte @@ -0,0 +1,66 @@ + + +
+ {typeLabels[type] || '?'} + {name} +
+ + diff --git a/ui/src/lib/components/orchestration/RunEventLog.svelte b/ui/src/lib/components/orchestration/RunEventLog.svelte new file mode 100644 index 0000000..715cc03 --- /dev/null +++ b/ui/src/lib/components/orchestration/RunEventLog.svelte @@ -0,0 +1,192 @@ + + +
+
+ Event Log + +
+
+ {#if events.length === 0} +
No events yet
+ {/if} + {#each events as ev} +
+ {formatTime(ev.timestamp)} + {ev.type} + {summarize(ev.data)} +
+ {/each} +
+
+ + diff --git a/ui/src/lib/components/orchestration/SendProgressBar.svelte b/ui/src/lib/components/orchestration/SendProgressBar.svelte new file mode 100644 index 0000000..d4d24a4 --- /dev/null +++ b/ui/src/lib/components/orchestration/SendProgressBar.svelte @@ -0,0 +1,51 @@ + + +
+ {#if label} + {label} + {/if} +
+
+
+ {current}/{total} +
+ + diff --git a/ui/src/lib/components/orchestration/StateInspector.svelte b/ui/src/lib/components/orchestration/StateInspector.svelte new file mode 100644 index 0000000..e2c4a44 --- /dev/null +++ b/ui/src/lib/components/orchestration/StateInspector.svelte @@ -0,0 +1,205 @@ + + +
+
+ State + {#if previousState} + + {/if} +
+
+ {#if diffMode && previousState} +
+ {#if diff.added.size > 0} +
+ Added + {#each [...diff.added] as key} +
+ {key}: {JSON.stringify(currentState[key])}
+ {/each} +
+ {/if} + {#if diff.changed.size > 0} +
+ Changed + {#each [...diff.changed] as key} +
- {key}: {JSON.stringify(previousState[key])}
+
+ {key}: {JSON.stringify(currentState[key])}
+ {/each} +
+ {/if} + {#if diff.removed.size > 0} +
+ Removed + {#each [...diff.removed] as key} +
- {key}: {JSON.stringify(previousState[key])}
+ {/each} +
+ {/if} + {#if diff.added.size === 0 && diff.changed.size === 0 && diff.removed.size === 0} +
No changes
+ {/if} +
+ {:else} +
{@html highlighted}
+ {/if} +
+
+ + diff --git a/ui/src/lib/components/orchestration/WorkflowJsonEditor.svelte b/ui/src/lib/components/orchestration/WorkflowJsonEditor.svelte new file mode 100644 index 0000000..be3f86a --- /dev/null +++ b/ui/src/lib/components/orchestration/WorkflowJsonEditor.svelte @@ -0,0 +1,77 @@ + + +
+ + {#if !valid} +
{errorMsg}
+ {/if} +
+ + diff --git a/ui/src/lib/orchestration/routes.ts b/ui/src/lib/orchestration/routes.ts new file mode 100644 index 0000000..9816ef3 --- /dev/null +++ b/ui/src/lib/orchestration/routes.ts @@ -0,0 +1,39 @@ +export function encodePathSegment(value: string): string { + return encodeURIComponent(value); +} + +const uiRoot = '/orchestration'; +const apiRoot = '/orchestration'; +const workflowsBase = `${uiRoot}/workflows`; +const runsBase = `${uiRoot}/runs`; +const storeBase = `${apiRoot}/store`; + +export const orchestrationUiRoutes = { + dashboard: () => uiRoot, + workflows: () => workflowsBase, + newWorkflow: () => `${workflowsBase}/new`, + workflow: (id: string) => `${workflowsBase}/${encodePathSegment(id)}`, + runs: () => runsBase, + run: (id: string) => `${runsBase}/${encodePathSegment(id)}`, + runFork: (id: string) => `${runsBase}/${encodePathSegment(id)}/fork`, + store: () => `${uiRoot}/store`, +}; + +export const orchestrationApiPaths = { + workflows: () => `${apiRoot}/workflows`, + workflow: (id: string) => `${apiRoot}/workflows/${encodePathSegment(id)}`, + workflowValidate: (id: string) => `${apiRoot}/workflows/${encodePathSegment(id)}/validate`, + workflowRun: (id: string) => `${apiRoot}/workflows/${encodePathSegment(id)}/run`, + runs: () => `${apiRoot}/runs`, + run: (id: string) => `${apiRoot}/runs/${encodePathSegment(id)}`, + runCancel: (id: string) => `${apiRoot}/runs/${encodePathSegment(id)}/cancel`, + runResume: (id: string) => `${apiRoot}/runs/${encodePathSegment(id)}/resume`, + runReplay: (id: string) => `${apiRoot}/runs/${encodePathSegment(id)}/replay`, + runState: (id: string) => `${apiRoot}/runs/${encodePathSegment(id)}/state`, + runsFork: () => `${apiRoot}/runs/fork`, + runCheckpoints: (runId: string) => `${apiRoot}/runs/${encodePathSegment(runId)}/checkpoints`, + runCheckpoint: (runId: string, checkpointId: string) => `${apiRoot}/runs/${encodePathSegment(runId)}/checkpoints/${encodePathSegment(checkpointId)}`, + runStream: (runId: string) => `${apiRoot}/runs/${encodePathSegment(runId)}/stream`, + storeNamespace: (namespace: string) => `${storeBase}/${encodePathSegment(namespace)}`, + storeEntry: (namespace: string, key: string) => `${storeBase}/${encodePathSegment(namespace)}/${encodePathSegment(key)}`, +}; diff --git a/ui/src/routes/orchestration/+page.svelte b/ui/src/routes/orchestration/+page.svelte new file mode 100644 index 0000000..58b313f --- /dev/null +++ b/ui/src/routes/orchestration/+page.svelte @@ -0,0 +1,346 @@ + + +
+
+

Orchestration

+ New Run +
+ + {#if error} +
ERR: {error}
+ {/if} + +
+
+
Active
+
{stats.active}
+
+
+
Completed
+
{stats.completed}
+
+
+
Failed
+
{stats.failed}
+
+
+
Interrupted
+
{stats.interrupted}
+
+
+ + {#if loading && runs.length === 0} +
Loading runs...
+ {:else if runs.length === 0} +
+

> No orchestration runs yet.

+ Create a Workflow +
+ {:else} +
+

Recent Runs

+
+ + + + + + + + + + + + {#each runs.slice(0, 20) as run} + goto(runHref(run.id))} class="clickable"> + + + + + + + {/each} + +
IDWorkflowStatusDurationCreated
{(run.id || '').slice(0, 8)}{run.workflow_name || run.workflow_id || '-'} + {run.status} + {formatDuration(run)}{formatTime(run.created_at)}
+
+ {#if runs.length > 20} + + {/if} +
+ {/if} +
+ + diff --git a/ui/src/routes/orchestration/runs/+page.svelte b/ui/src/routes/orchestration/runs/+page.svelte new file mode 100644 index 0000000..e2c9863 --- /dev/null +++ b/ui/src/routes/orchestration/runs/+page.svelte @@ -0,0 +1,286 @@ + + +
+
+

Runs

+
+ + {#if error} +
ERR: {error}
+ {/if} + +
+
+ + +
+
+ + +
+
+ + {#if loading} +
Loading runs...
+ {:else if runs.length === 0} +
+

> No runs match the current filter.

+
+ {:else} +
+
+ + + + + + + + + + + + {#each runs as run} + goto(runHref(run.id))} class="clickable"> + + + + + + + {/each} + +
IDWorkflowStatusDurationCreated
{(run.id || '').slice(0, 8)}{run.workflow_name || run.workflow_id || '-'} + {run.status} + {formatDuration(run)}{formatTime(run.created_at)}
+
+
+ {/if} +
+ + diff --git a/ui/src/routes/orchestration/runs/[id]/+page.svelte b/ui/src/routes/orchestration/runs/[id]/+page.svelte new file mode 100644 index 0000000..5025523 --- /dev/null +++ b/ui/src/routes/orchestration/runs/[id]/+page.svelte @@ -0,0 +1,301 @@ + + +
+
+
+ Runs + / + {(id || '').slice(0, 8)} + {#if run} + {run.status} + {/if} +
+
+ {#if isActive} + + {/if} + Fork +
+
+ + {#if error} +
ERR: {error}
+ {/if} + + {#if loading} +
Loading run...
+ {:else if run} +
+
+ +
+
+ +
+
+
+ +
+ + {#if isInterrupted} + + {/if} + {/if} +
+ + diff --git a/ui/src/routes/orchestration/runs/[id]/fork/+page.svelte b/ui/src/routes/orchestration/runs/[id]/fork/+page.svelte new file mode 100644 index 0000000..b7d9f58 --- /dev/null +++ b/ui/src/routes/orchestration/runs/[id]/fork/+page.svelte @@ -0,0 +1,301 @@ + + +
+
+ +
+ +
+
+ + {#if error} +
ERR: {error}
+ {/if} + + {#if loading} +
Loading checkpoints...
+ {:else} +
+
+
Checkpoints
+ +
+
+
+ +
+
+ + + {#if !overridesValid} + Invalid JSON + {/if} +
+
+
+ {/if} +
+ + diff --git a/ui/src/routes/orchestration/store/+page.svelte b/ui/src/routes/orchestration/store/+page.svelte new file mode 100644 index 0000000..7ab7261 --- /dev/null +++ b/ui/src/routes/orchestration/store/+page.svelte @@ -0,0 +1,567 @@ + + +
+
+

Store

+
+ + {#if error} +
ERR: {error}
+ {/if} + +
+ +
+
+

Browse Namespace

+
+ + +
+
+ +
+

Add Entry

+
+ + +
+
+ + +
+
+ + +
+ {#if addError} +
{addError}
+ {/if} + {#if addSuccess} +
Saved.
+ {/if} + +
+
+ + +
+ {#if !browsedNamespace} +
+

> Enter a namespace and press Browse.

+
+ {:else if loading} +
Loading entries...
+ {:else if entries.length === 0} +
+

> No entries in namespace "{browsedNamespace}".

+
+ {:else} +
+
+ /{browsedNamespace} + {entries.length} entries +
+
+ + + + + + + + + + {#each entries as entry} + viewEntry(entry)}> + + + + + {/each} + +
KeyValue PreviewActions
{entry.key ?? entry} + {#if entry.value !== undefined} + {typeof entry.value === 'string' + ? entry.value.slice(0, 80) + : JSON.stringify(entry.value).slice(0, 80)} + {:else} + - + {/if} + e.stopPropagation()}> + +
+
+
+ {/if} +
+
+
+ + +{#if selectedEntry} + + +{/if} + + diff --git a/ui/src/routes/orchestration/workflows/+page.svelte b/ui/src/routes/orchestration/workflows/+page.svelte new file mode 100644 index 0000000..dd29aa7 --- /dev/null +++ b/ui/src/routes/orchestration/workflows/+page.svelte @@ -0,0 +1,294 @@ + + +
+
+

Workflows

+ + New Workflow +
+ + {#if error} +
ERR: {error}
+ {/if} + + {#if loading} +
Loading workflows...
+ {:else if workflows.length === 0} +
+

> No workflows defined yet.

+ Create Workflow +
+ {:else} +
+ {#each workflows as wf} +
+
+ {wf.name || wf.id} + {nodeCount(wf)} nodes +
+ {#if wf.id} +
{wf.id}
+ {/if} +
+ Edit + + {#if deleteConfirm === wf.id} + + + {:else} + + {/if} +
+
+ {/each} +
+ {/if} +
+ + diff --git a/ui/src/routes/orchestration/workflows/[id]/+page.svelte b/ui/src/routes/orchestration/workflows/[id]/+page.svelte new file mode 100644 index 0000000..82e83c2 --- /dev/null +++ b/ui/src/routes/orchestration/workflows/[id]/+page.svelte @@ -0,0 +1,297 @@ + + +
+
+
+ Workflows + / + {isNew ? 'New Workflow' : (parsedWorkflow?.name || id)} +
+
+ {#if !isNew} + + {/if} + + {#if !isNew} + + {/if} +
+
+ + {#if error} +
ERR: {error}
+ {/if} + + {#if validationResult} +
+ {#if validationResult.valid} + Workflow is valid. + {:else} + Validation errors: + {#each validationResult.errors || [] as err} +
{err}
+ {/each} + {/if} +
+ {/if} + + {#if loading} +
Loading workflow...
+ {:else} +
+
+ +
+
+ parseError = msg} /> +
+
+ {/if} +
+ +