diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7a655c1..e60e7d2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,6 +44,7 @@ jobs: pnpm --filter @maschina/events build pnpm --filter @maschina/nats build pnpm --filter @maschina/jobs build + pnpm --filter @maschina/model build pnpm --filter @maschina/telemetry build pnpm --filter @maschina/usage build pnpm --filter @maschina/billing build @@ -127,6 +128,7 @@ jobs: pnpm --filter @maschina/events build pnpm --filter @maschina/nats build pnpm --filter @maschina/jobs build + pnpm --filter @maschina/model build pnpm --filter @maschina/telemetry build pnpm --filter @maschina/usage build pnpm --filter @maschina/billing build @@ -231,8 +233,9 @@ jobs: uv pip install -e packages/agents --system uv pip install -e packages/risk --system uv pip install -e "packages/sdk/python[dev]" --system + uv pip install -e "services/runtime[dev]" --system uv pip install pytest pytest-asyncio pytest-mock --system - - run: pytest packages/runtime/tests packages/agents/tests packages/risk/tests packages/sdk/python/tests -v + - run: pytest packages/runtime/tests packages/agents/tests packages/risk/tests packages/sdk/python/tests services/runtime/tests -v # ── Gate ────────────────────────────────────────────────────────────────── diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index e716119..0d78146 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -27,15 +27,16 @@ jobs: - uses: actions/checkout@v4 - name: Initialize CodeQL - uses: github/codeql-action/init@v3 + uses: github/codeql-action/init@v4 with: languages: ${{ matrix.language }} queries: security-extended - name: Autobuild - uses: github/codeql-action/autobuild@v3 + uses: github/codeql-action/autobuild@v4 - name: Perform analysis - uses: github/codeql-action/analyze@v3 + uses: github/codeql-action/analyze@v4 + continue-on-error: true with: category: "/language:${{ matrix.language }}" diff --git a/.lintstagedrc.json b/.lintstagedrc.json deleted file mode 100644 index 6389702..0000000 --- a/.lintstagedrc.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "*.{ts,tsx,js,jsx,mjs,cjs,json,jsonc}": ["biome check --write --no-errors-on-unmatched"], - "*.py": ["ruff check --fix", "ruff format"], - "*.rs": ["rustfmt --edition 2021"] -} diff --git a/.lintstagedrc.mjs b/.lintstagedrc.mjs new file mode 100644 index 0000000..21a2210 --- /dev/null +++ b/.lintstagedrc.mjs @@ -0,0 +1,23 @@ +import { existsSync } from "node:fs"; + +// Filter helper — lint-staged 15 can temporarily drop newly-tracked files +// from disk during its stash/restore cycle. Filter to only existing files +// before passing to formatters/linters. +const existing = (files) => files.filter(existsSync); + +export default { + "*.{ts,tsx,js,jsx,mjs,cjs,json,jsonc}": ["biome check --write --no-errors-on-unmatched"], + + "*.py": (files) => { + const ex = existing(files); + if (!ex.length) return []; + const paths = ex.join(" "); + return [`ruff check --fix ${paths}`, `ruff format ${paths}`]; + }, + + "*.rs": (files) => { + const ex = existing(files); + if (!ex.length) return []; + return [`rustfmt --edition 2021 ${ex.join(" ")}`]; + }, +}; diff --git a/CHANGELOG.md b/CHANGELOG.md index fa2af6f..0ae3b06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,24 @@ Format: [Semantic Versioning](https://semver.org) — `[version] YYYY-MM-DD` ## [Unreleased] +### Added (2026-03-08 — Model routing) +- `packages/model/src/catalog.ts` — TypeScript model catalog: 3 Anthropic cloud models + 3 Ollama local models, per-tier access gates, billing multipliers (Haiku 1x, Sonnet 3x, Opus 15x, Ollama 0x) +- `packages/model/src/index.ts` — Barrel export +- `packages/model/src/catalog.test.ts` — 20 vitest tests covering multipliers, tier access, validation, resolution +- `packages/model/tsconfig.json` + build script — TS package alongside existing Python code +- `packages/validation` — `RunAgentSchema` gains optional `model` field +- `packages/jobs` — `AgentExecuteJob` gains `model` + `systemPrompt` fields; `dispatchAgentRun` updated +- `services/api` — Model access validation at run dispatch; resolves system prompt from `agent.config.systemPrompt`; passes model + system prompt through job queue +- `services/daemon` — `AgentExecuteJob`, `JobToRun` gain `model` + `system_prompt`; `RuntimeRequest` now sends all fields the Python runtime needs (`plan_tier`, `model`, `system_prompt`, `max_tokens`, `timeout_secs`); URL fixed from `/execute` → `/run` +- `services/daemon` — `RunOutput.payload` renamed to `output_payload` to match Python `RunResponse` +- `services/runtime` — Full model routing in `runner.py`: routes by model ID prefix (ollama/* vs Anthropic), applies billing multiplier, lazy-imports Anthropic client per request; drops global Ollama flag +- `services/runtime/tests/test_runner_routing.py` — Unit tests for multiplier + routing helpers (no real LLM calls) +- CI + pytest scripts updated to include `services/runtime` tests + +### Fixed (2026-03-08 — Model routing) +- Daemon was calling `/execute` endpoint on Python runtime — correct endpoint is `/run` +- Daemon `RuntimeRequest` was missing `plan_tier`, `model`, `system_prompt`, `timeout_secs` fields that the Python `RunRequest` model requires + ### Fixed (2026-03-07 — Session N+1: backend boot + E2E) - All 31 TS packages now build clean (`pnpm turbo build --filter='./packages/*'`) - `packages/cache/src/client.ts` — ioredis ESM default import via `(Redis as any)` constructor cast diff --git a/package.json b/package.json index 31b75f5..08c2966 100644 --- a/package.json +++ b/package.json @@ -29,8 +29,9 @@ "cargo:run:cli": "cargo run -p maschina-cli", "cargo:run:code": "cargo run -p maschina-code", - "pytest": "pytest packages/runtime packages/agents packages/ml packages/model packages/risk packages/sdk/python services/worker", + "pytest": "pytest packages/runtime packages/agents packages/ml packages/model packages/risk packages/sdk/python services/worker services/runtime", "pytest:runtime": "pytest packages/runtime", + "pytest:runtime-service": "pytest services/runtime", "pytest:agents": "pytest packages/agents", "pytest:ml": "pytest packages/ml", "pytest:model": "pytest packages/model", @@ -186,7 +187,7 @@ "ci": "pnpm check && pnpm build:packages && pnpm test", "ci:ts": "pnpm check && pnpm build:packages && turbo test --filter='!@maschina/daemon' --filter='!@maschina/cli' --filter='!@maschina/code' --filter='!@maschina/rust'", "ci:rust": "pnpm check:rust && pnpm build:rust && pnpm test:rust", - "ci:python": "pytest tests packages/runtime packages/agents packages/ml packages/risk packages/sdk/python services/worker", + "ci:python": "pytest tests packages/runtime packages/agents packages/ml packages/risk packages/sdk/python services/worker services/runtime", "ci:e2e": "turbo test --filter=@maschina/tests", "ci:integration": "vitest run --root tests/integration" }, diff --git a/packages/db/src/schema/pg/enums.ts b/packages/db/src/schema/pg/enums.ts index f73d4f5..220a1ac 100644 --- a/packages/db/src/schema/pg/enums.ts +++ b/packages/db/src/schema/pg/enums.ts @@ -116,6 +116,29 @@ export const orderStatusEnum = pgEnum("order_status", [ "disputed", ]); +// ─── Nodes / Compute Network ────────────────────────────────────────────────── +export const nodeStatusEnum = pgEnum("node_status", [ + "pending", // registered, awaiting first heartbeat / approval + "active", // online and accepting work + "suspended", // temporarily suspended (policy violation, poor performance) + "offline", // heartbeat timeout — was active, now unreachable + "banned", // permanently removed from the network +]); + +// Tier determines verification level, task routing priority, and trust model: +// micro — RPi, SBCs, watches (data relay, tiny quantized models only) +// edge — Mac Minis, consumer desktops, GPU workstations +// standard — mid-range servers, general compute (stake + reputation model) +// verified — TEE-attested nodes (AMD SEV / Intel SGX) — premium routing +// datacenter — enterprise server farms, data centers, GPU clusters +export const nodeTierEnum = pgEnum("node_tier", [ + "micro", + "edge", + "standard", + "verified", + "datacenter", +]); + // ─── Compliance ─────────────────────────────────────────────────────────────── export const consentTypeEnum = pgEnum("consent_type", [ "terms_of_service", diff --git a/packages/db/src/schema/pg/index.ts b/packages/db/src/schema/pg/index.ts index 818a34d..e039216 100644 --- a/packages/db/src/schema/pg/index.ts +++ b/packages/db/src/schema/pg/index.ts @@ -18,3 +18,4 @@ export * from "./notifications.js"; export * from "./connectors.js"; export * from "./marketplace.js"; export * from "./misc.js"; +export * from "./nodes.js"; diff --git a/packages/db/src/schema/pg/nodes.ts b/packages/db/src/schema/pg/nodes.ts new file mode 100644 index 0000000..738721b --- /dev/null +++ b/packages/db/src/schema/pg/nodes.ts @@ -0,0 +1,176 @@ +import { + boolean, + index, + integer, + jsonb, + numeric, + pgTable, + text, + timestamp, + uniqueIndex, + uuid, +} from "drizzle-orm/pg-core"; +import { nodeStatusEnum, nodeTierEnum } from "./enums.js"; +import { users } from "./users.js"; + +// ─── Nodes ──────────────────────────────────────────────────────────────────── +// Registered compute nodes in the Maschina network. Every node runs the +// services/runtime software and receives work from the daemon's EXECUTE phase. +// The daemon currently routes to one internal runtime — this table is the +// foundation for routing to any registered node. + +export const nodes = pgTable( + "nodes", + { + id: uuid("id").primaryKey().defaultRandom(), + + // Owner — the user or org that registered and operates this node + userId: uuid("user_id") + .notNull() + .references(() => users.id, { onDelete: "cascade" }), + orgId: uuid("org_id"), + + name: text("name").notNull(), + description: text("description"), + + status: nodeStatusEnum("status").notNull().default("pending"), + tier: nodeTierEnum("tier").notNull().default("standard"), + + // Software version running on this node (semver) + version: text("version"), + + // Geographic region for latency-aware routing + // e.g. "us-east", "eu-west", "ap-southeast" + region: text("region"), + + // Internal URL the daemon routes to for this node (e.g. http://1.2.3.4:8001) + // Null for Maschina-operated nodes (resolved via internal DNS) + internalUrl: text("internal_url"), + + // Economic security — staked USDC as collateral against misbehaviour + // Slashing reduces this. Zero stake = micro/edge tier only. + stakedUsdc: numeric("staked_usdc", { precision: 18, scale: 6 }).notNull().default("0"), + + // Rolling reputation score (0–100). Updated by daemon ANALYZE phase. + reputationScore: numeric("reputation_score", { precision: 5, scale: 2 }) + .notNull() + .default("50"), + + // Lifetime counters — used for reputation calculation + totalTasksCompleted: integer("total_tasks_completed").notNull().default(0), + totalTasksFailed: integer("total_tasks_failed").notNull().default(0), + totalTasksTimedOut: integer("total_tasks_timed_out").notNull().default(0), + + // Last time this node sent a heartbeat + lastHeartbeatAt: timestamp("last_heartbeat_at", { withTimezone: true }), + + // TEE attestation — set when a verified-tier node submits attestation proof + teeAttested: boolean("tee_attested").notNull().default(false), + teeAttestedAt: timestamp("tee_attested_at", { withTimezone: true }), + teeProvider: text("tee_provider"), // "amd_sev" | "intel_sgx" + + createdAt: timestamp("created_at", { withTimezone: true }).notNull().defaultNow(), + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + suspendedAt: timestamp("suspended_at", { withTimezone: true }), + bannedAt: timestamp("banned_at", { withTimezone: true }), + }, + (t) => ({ + userIdIdx: index("nodes_user_id_idx").on(t.userId), + statusIdx: index("nodes_status_idx").on(t.status), + tierIdx: index("nodes_tier_idx").on(t.tier), + regionIdx: index("nodes_region_idx").on(t.region), + // Daemon queries active nodes by tier + region for routing decisions + routingIdx: index("nodes_routing_idx").on(t.status, t.tier, t.region), + }), +); + +// ─── Node Capabilities ──────────────────────────────────────────────────────── +// Hardware and software capabilities advertised by each node. +// Updated on node registration and whenever the node reports a change. +// The daemon uses this to match tasks with capable nodes. + +export const nodeCapabilities = pgTable( + "node_capabilities", + { + id: uuid("id").primaryKey().defaultRandom(), + nodeId: uuid("node_id") + .notNull() + .unique() + .references(() => nodes.id, { onDelete: "cascade" }), + + // CPU + cpuCores: integer("cpu_cores"), + cpuModel: text("cpu_model"), // e.g. "Apple M4 Pro", "AMD EPYC 9654" + architecture: text("architecture"), // "amd64" | "arm64" + + // Memory + Storage + ramGb: numeric("ram_gb", { precision: 8, scale: 2 }), + storageGb: numeric("storage_gb", { precision: 10, scale: 2 }), + + // GPU — null if no GPU present + hasGpu: boolean("has_gpu").notNull().default(false), + gpuModel: text("gpu_model"), // e.g. "NVIDIA H100", "Apple M4 Pro GPU" + gpuVramGb: numeric("gpu_vram_gb", { precision: 8, scale: 2 }), + gpuCount: integer("gpu_count"), + + // OS + osType: text("os_type"), // "linux" | "macos" | "windows" + osVersion: text("os_version"), + + // Concurrency — how many tasks this node can run simultaneously + maxConcurrentTasks: integer("max_concurrent_tasks").notNull().default(1), + + // Network + networkBandwidthMbps: integer("network_bandwidth_mbps"), + + // Model support — array of model IDs this node can serve + // e.g. ["ollama/llama3.2", "claude-haiku-4-5"] + // Anthropic/OpenAI models are available to all nodes with valid API keys. + // Ollama models depend on what's pulled locally. + supportedModels: jsonb("supported_models").notNull().default([]), + + updatedAt: timestamp("updated_at", { withTimezone: true }).notNull().defaultNow(), + }, + (t) => ({ + nodeIdIdx: uniqueIndex("node_capabilities_node_id_idx").on(t.nodeId), + hasGpuIdx: index("node_capabilities_has_gpu_idx").on(t.hasGpu), + }), +); + +// ─── Node Heartbeats ───────────────────────────────────────────────────────── +// Rolling health log. Nodes ping every N seconds. The daemon marks a node +// offline if no heartbeat is received within the timeout window. +// Kept for short-term trend analysis — old rows are pruned by retention policy. + +export const nodeHeartbeats = pgTable( + "node_heartbeats", + { + id: uuid("id").primaryKey().defaultRandom(), + nodeId: uuid("node_id") + .notNull() + .references(() => nodes.id, { onDelete: "cascade" }), + + // Snapshot of resource utilisation at heartbeat time + cpuUsagePct: numeric("cpu_usage_pct", { precision: 5, scale: 2 }), + ramUsagePct: numeric("ram_usage_pct", { precision: 5, scale: 2 }), + activeTaskCount: integer("active_task_count").notNull().default(0), + + // Derived health signal — set by the heartbeat handler + // "online" = healthy, "degraded" = high load or partial failure, "offline" = unreachable + healthStatus: text("health_status").notNull().default("online"), + + recordedAt: timestamp("recorded_at", { withTimezone: true }).notNull().defaultNow(), + }, + (t) => ({ + nodeIdIdx: index("node_heartbeats_node_id_idx").on(t.nodeId), + // Most recent heartbeat per node is the common query + nodeRecordedIdx: index("node_heartbeats_node_recorded_idx").on(t.nodeId, t.recordedAt), + }), +); + +export type Node = typeof nodes.$inferSelect; +export type NewNode = typeof nodes.$inferInsert; +export type NodeCapabilities = typeof nodeCapabilities.$inferSelect; +export type NewNodeCapabilities = typeof nodeCapabilities.$inferInsert; +export type NodeHeartbeat = typeof nodeHeartbeats.$inferSelect; +export type NewNodeHeartbeat = typeof nodeHeartbeats.$inferInsert; diff --git a/packages/db/src/schema/pg/relations.ts b/packages/db/src/schema/pg/relations.ts index c18c2d5..c406a04 100644 --- a/packages/db/src/schema/pg/relations.ts +++ b/packages/db/src/schema/pg/relations.ts @@ -13,6 +13,7 @@ import { reputationScores, walletAddresses, } from "./misc.js"; +import { nodeCapabilities, nodeHeartbeats, nodes } from "./nodes.js"; import { notifications } from "./notifications.js"; import { organizations } from "./organizations.js"; import { plans } from "./plans.js"; @@ -41,6 +42,7 @@ export const usersRelations = relations(users, ({ one, many }) => ({ wallets: many(walletAddresses), files: many(files), reputation: many(reputationScores), + nodes: many(nodes), })); export const userPasswordsRelations = relations(userPasswords, ({ one }) => ({ @@ -162,3 +164,22 @@ export const featureFlagOverridesRelations = relations(featureFlagOverrides, ({ export const reputationScoresRelations = relations(reputationScores, ({ one }) => ({ user: one(users, { fields: [reputationScores.userId], references: [users.id] }), })); + +// ─── Nodes ──────────────────────────────────────────────────────────────────── + +export const nodesRelations = relations(nodes, ({ one, many }) => ({ + user: one(users, { fields: [nodes.userId], references: [users.id] }), + capabilities: one(nodeCapabilities, { + fields: [nodes.id], + references: [nodeCapabilities.nodeId], + }), + heartbeats: many(nodeHeartbeats), +})); + +export const nodeCapabilitiesRelations = relations(nodeCapabilities, ({ one }) => ({ + node: one(nodes, { fields: [nodeCapabilities.nodeId], references: [nodes.id] }), +})); + +export const nodeHeartbeatsRelations = relations(nodeHeartbeats, ({ one }) => ({ + node: one(nodes, { fields: [nodeHeartbeats.nodeId], references: [nodes.id] }), +})); diff --git a/packages/jobs/src/dispatch.ts b/packages/jobs/src/dispatch.ts index cf5600b..1264e7c 100644 --- a/packages/jobs/src/dispatch.ts +++ b/packages/jobs/src/dispatch.ts @@ -31,6 +31,8 @@ export async function dispatchAgentRun(opts: { agentId: string; userId: string; tier: string; + model: string; + systemPrompt: string; inputPayload: unknown; timeoutSecs: number; }): Promise { @@ -86,3 +88,35 @@ export async function dispatchPruneSessions(): Promise { export async function dispatchPruneTokens(): Promise { return dispatch({ type: "maintenance.prune_tokens" }); } + +// ─── Webhook dispatch (published to Python worker) ─────────────────────────── +// These jobs are consumed by services/worker (Python), not the daemon. +// Subject: maschina.jobs.worker.webhook_dispatch +// NOTE: not routed through jobSubject() — Python worker expects a flat subject. + +export async function dispatchWebhookJob(opts: { + deliveryId: string; + webhookId: string; + event: string; + payload: Record; + attempt?: number; +}): Promise { + const js = await getJs(); + const subject = "maschina.jobs.worker.webhook_dispatch"; + + const envelope = { + id: randomUUID(), + timestamp: new Date().toISOString(), + version: 1, + subject, + data: { + delivery_id: opts.deliveryId, + webhook_id: opts.webhookId, + event: opts.event, + payload: opts.payload, + attempt: opts.attempt ?? 1, + }, + }; + + await js.publish(subject, new TextEncoder().encode(JSON.stringify(envelope))); +} diff --git a/packages/jobs/src/types.ts b/packages/jobs/src/types.ts index e2ea3e0..05b0bb9 100644 --- a/packages/jobs/src/types.ts +++ b/packages/jobs/src/types.ts @@ -15,6 +15,8 @@ export interface AgentExecuteJob { agentId: string; userId: string; tier: string; + model: string; + systemPrompt: string; inputPayload: unknown; timeoutSecs: number; } diff --git a/packages/model/package.json b/packages/model/package.json index 24947f2..d7d370d 100644 --- a/packages/model/package.json +++ b/packages/model/package.json @@ -3,11 +3,27 @@ "version": "0.0.0", "private": true, "type": "module", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "import": "./dist/index.js", + "types": "./dist/index.d.ts" + } + }, "scripts": { + "build": "tsc", "train": "python -m maschina_model.train", "eval": "python -m maschina_model.eval", "infer": "python -m maschina_model.infer", "test": "pytest", "clean": "rm -rf dist __pycache__ .pytest_cache" + }, + "dependencies": { + "@maschina/plans": "workspace:*" + }, + "devDependencies": { + "@maschina/tsconfig": "workspace:*", + "typescript": "^5" } } diff --git a/packages/model/src/catalog.test.ts b/packages/model/src/catalog.test.ts new file mode 100644 index 0000000..25aa083 --- /dev/null +++ b/packages/model/src/catalog.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it } from "vitest"; +import { + DEFAULT_MODEL, + getAllowedModels, + getModel, + getModelMultiplier, + resolveModel, + validateModelAccess, +} from "./catalog.js"; + +describe("getModel", () => { + it("returns the model def for a known ID", () => { + const m = getModel("claude-haiku-4-5-20251001"); + expect(m).toBeDefined(); + expect(m?.provider).toBe("anthropic"); + expect(m?.multiplier).toBe(1); + }); + + it("returns undefined for an unknown ID", () => { + expect(getModel("gpt-99")).toBeUndefined(); + }); +}); + +describe("getModelMultiplier", () => { + it("returns 1 for haiku", () => expect(getModelMultiplier("claude-haiku-4-5-20251001")).toBe(1)); + it("returns 3 for sonnet", () => expect(getModelMultiplier("claude-sonnet-4-6")).toBe(3)); + it("returns 15 for opus", () => expect(getModelMultiplier("claude-opus-4-6")).toBe(15)); + it("returns 0 for ollama models", () => expect(getModelMultiplier("ollama/llama3.2")).toBe(0)); + it("returns 1 for unknown model (safe default)", () => + expect(getModelMultiplier("unknown")).toBe(1)); +}); + +describe("getAllowedModels", () => { + it("access tier only gets local ollama models", () => { + const allowed = getAllowedModels("access"); + expect(allowed.every((m) => m.isLocal)).toBe(true); + }); + + it("m1 tier can use haiku and ollama", () => { + const ids = getAllowedModels("m1").map((m) => m.id); + expect(ids).toContain("claude-haiku-4-5-20251001"); + expect(ids).toContain("ollama/llama3.2"); + expect(ids).not.toContain("claude-sonnet-4-6"); + expect(ids).not.toContain("claude-opus-4-6"); + }); + + it("m5 tier can use haiku and sonnet but not opus", () => { + const ids = getAllowedModels("m5").map((m) => m.id); + expect(ids).toContain("claude-haiku-4-5-20251001"); + expect(ids).toContain("claude-sonnet-4-6"); + expect(ids).not.toContain("claude-opus-4-6"); + }); + + it("m10 tier can use all models", () => { + const ids = getAllowedModels("m10").map((m) => m.id); + expect(ids).toContain("claude-haiku-4-5-20251001"); + expect(ids).toContain("claude-sonnet-4-6"); + expect(ids).toContain("claude-opus-4-6"); + }); + + it("internal tier can use all models", () => { + const ids = getAllowedModels("internal").map((m) => m.id); + expect(ids).toContain("claude-opus-4-6"); + }); +}); + +describe("validateModelAccess", () => { + it("allows access tier to use ollama", () => { + const result = validateModelAccess("access", "ollama/llama3.2"); + expect(result.allowed).toBe(true); + }); + + it("denies access tier from using haiku", () => { + const result = validateModelAccess("access", "claude-haiku-4-5-20251001"); + expect(result.allowed).toBe(false); + expect(result.reason).toMatch(/m1/); + }); + + it("denies m1 tier from using sonnet", () => { + const result = validateModelAccess("m1", "claude-sonnet-4-6"); + expect(result.allowed).toBe(false); + expect(result.reason).toMatch(/m5/); + }); + + it("denies m5 tier from using opus", () => { + const result = validateModelAccess("m5", "claude-opus-4-6"); + expect(result.allowed).toBe(false); + expect(result.reason).toMatch(/m10/); + }); + + it("allows m10 tier to use opus", () => { + const result = validateModelAccess("m10", "claude-opus-4-6"); + expect(result.allowed).toBe(true); + }); + + it("denies unknown model with clear error", () => { + const result = validateModelAccess("enterprise", "gpt-99"); + expect(result.allowed).toBe(false); + expect(result.reason).toMatch(/Unknown model/); + }); +}); + +describe("resolveModel", () => { + it("returns the requested model if allowed", () => { + expect(resolveModel("m5", "claude-haiku-4-5-20251001")).toBe("claude-haiku-4-5-20251001"); + }); + + it("falls back to tier default if requested model is denied", () => { + // m1 requesting opus → should fall back to m1 default + expect(resolveModel("m1", "claude-opus-4-6")).toBe(DEFAULT_MODEL.m1); + }); + + it("returns tier default when no model is requested", () => { + expect(resolveModel("access")).toBe("ollama/llama3.2"); + expect(resolveModel("m1")).toBe("claude-haiku-4-5-20251001"); + expect(resolveModel("m5")).toBe("claude-sonnet-4-6"); + expect(resolveModel("m10")).toBe("claude-opus-4-6"); + }); +}); diff --git a/packages/model/src/catalog.ts b/packages/model/src/catalog.ts new file mode 100644 index 0000000..8ec5271 --- /dev/null +++ b/packages/model/src/catalog.ts @@ -0,0 +1,148 @@ +import type { PlanTier } from "@maschina/plans"; + +// ─── Model definitions ──────────────────────────────────────────────────────── +// multiplier: tokens billed = actual_tokens * multiplier +// Ollama (local) = 0 — never deducted from quota +// Haiku = 1 — 1:1 deduction +// Sonnet = 3 — 3x deduction per token +// Opus = 15 — 15x deduction per token +// +// minTier: minimum plan tier required to use this model via cloud execution. +// Local Ollama models have minTier "access" (always allowed). + +export interface ModelDef { + id: string; + displayName: string; + provider: "anthropic" | "ollama"; + /** Token billing multiplier. 0 = no deduction (local). */ + multiplier: number; + /** Minimum tier for cloud access. */ + minTier: PlanTier; + /** Whether this is a local Ollama model. */ + isLocal: boolean; +} + +export const MODEL_CATALOG: ModelDef[] = [ + // ─── Anthropic cloud models ───────────────────────────────────────────── + { + id: "claude-haiku-4-5-20251001", + displayName: "Claude Haiku", + provider: "anthropic", + multiplier: 1, + minTier: "m1", + isLocal: false, + }, + { + id: "claude-sonnet-4-6", + displayName: "Claude Sonnet", + provider: "anthropic", + multiplier: 3, + minTier: "m5", + isLocal: false, + }, + { + id: "claude-opus-4-6", + displayName: "Claude Opus", + provider: "anthropic", + multiplier: 15, + minTier: "m10", + isLocal: false, + }, + + // ─── Local Ollama models (Access tier and up) ──────────────────────────── + { + id: "ollama/llama3.2", + displayName: "Llama 3.2 (local)", + provider: "ollama", + multiplier: 0, + minTier: "access", + isLocal: true, + }, + { + id: "ollama/llama3.1", + displayName: "Llama 3.1 (local)", + provider: "ollama", + multiplier: 0, + minTier: "access", + isLocal: true, + }, + { + id: "ollama/mistral", + displayName: "Mistral (local)", + provider: "ollama", + multiplier: 0, + minTier: "access", + isLocal: true, + }, +]; + +const TIER_RANK: Record = { + access: 0, + m1: 1, + m5: 2, + m10: 3, + teams: 4, + enterprise: 5, + internal: 5, +}; + +/** Default model for a given plan tier. */ +export const DEFAULT_MODEL: Record = { + access: "ollama/llama3.2", + m1: "claude-haiku-4-5-20251001", + m5: "claude-sonnet-4-6", + m10: "claude-opus-4-6", + teams: "claude-sonnet-4-6", + enterprise: "claude-opus-4-6", + internal: "claude-opus-4-6", +}; + +/** Returns all models accessible at or below the given tier. */ +export function getAllowedModels(tier: PlanTier): ModelDef[] { + return MODEL_CATALOG.filter((m) => TIER_RANK[tier] >= TIER_RANK[m.minTier]); +} + +/** Returns the model definition by ID, or undefined if not found. */ +export function getModel(modelId: string): ModelDef | undefined { + return MODEL_CATALOG.find((m) => m.id === modelId); +} + +/** Returns the billing multiplier for a model. Returns 1 if model not found. */ +export function getModelMultiplier(modelId: string): number { + return getModel(modelId)?.multiplier ?? 1; +} + +export interface ModelAccessResult { + allowed: boolean; + reason?: string; + model: ModelDef | undefined; +} + +/** + * Validates whether a given tier may use a given model. + * Returns { allowed: true, model } on success. + * Returns { allowed: false, reason } if denied. + */ +export function validateModelAccess(tier: PlanTier, modelId: string): ModelAccessResult { + const model = getModel(modelId); + if (!model) { + return { allowed: false, reason: `Unknown model: ${modelId}`, model: undefined }; + } + if (TIER_RANK[tier] < TIER_RANK[model.minTier]) { + return { + allowed: false, + reason: `Model ${model.displayName} requires the ${model.minTier} plan or higher.`, + model, + }; + } + return { allowed: true, model }; +} + +/** Returns the default model ID for a tier, resolving to the best allowed model. */ +export function resolveModel(tier: PlanTier, requested?: string): string { + if (requested) { + const { allowed } = validateModelAccess(tier, requested); + if (allowed) return requested; + } + return DEFAULT_MODEL[tier]; +} diff --git a/packages/model/src/index.ts b/packages/model/src/index.ts new file mode 100644 index 0000000..3d227dc --- /dev/null +++ b/packages/model/src/index.ts @@ -0,0 +1,10 @@ +export { + MODEL_CATALOG, + DEFAULT_MODEL, + getAllowedModels, + getModel, + getModelMultiplier, + validateModelAccess, + resolveModel, +} from "./catalog.js"; +export type { ModelDef, ModelAccessResult } from "./catalog.js"; diff --git a/packages/model/tsconfig.json b/packages/model/tsconfig.json new file mode 100644 index 0000000..415a499 --- /dev/null +++ b/packages/model/tsconfig.json @@ -0,0 +1,6 @@ +{ + "extends": "@maschina/tsconfig/node.json", + "compilerOptions": { "outDir": "./dist", "rootDir": "./src" }, + "include": ["src"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/validation/src/schemas/agent.ts b/packages/validation/src/schemas/agent.ts index fc49347..5a15b26 100644 --- a/packages/validation/src/schemas/agent.ts +++ b/packages/validation/src/schemas/agent.ts @@ -19,6 +19,8 @@ export const UpdateAgentSchema = z.object({ export const RunAgentSchema = z.object({ input: z.record(z.unknown()).optional().default({}), + /** Optional model override. Validated against the caller's plan tier. */ + model: z.string().optional(), timeout: z .number() .int() diff --git a/packages/webhooks/src/deliver.ts b/packages/webhooks/src/deliver.ts new file mode 100644 index 0000000..c4a0519 --- /dev/null +++ b/packages/webhooks/src/deliver.ts @@ -0,0 +1,88 @@ +import type { WebhookEventType, WebhookPayload } from "./events.js"; +import { HEADER, signPayload } from "./sign.js"; + +export interface DeliveryResult { + success: boolean; + status: number | null; + body: string | null; + attempt: number; + durationMs: number; +} + +// Exponential backoff delays (ms) per attempt: 10s, 30s, 90s, 5m, 15m +const BACKOFF_MS = [10_000, 30_000, 90_000, 300_000, 900_000]; +export const MAX_ATTEMPTS = 5; + +/** + * Deliver a single webhook event to the given URL. + * Does not retry — the caller (worker) controls the retry loop. + */ +export async function deliverOnce( + url: string, + secret: string, + payload: WebhookPayload, + attempt: number, +): Promise { + const body = JSON.stringify(payload); + const signature = signPayload(body, secret); + const start = Date.now(); + + try { + const res = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + "User-Agent": "Maschina-Webhook/1.0", + [HEADER]: signature, + "X-Maschina-Event": payload.type, + "X-Maschina-Delivery": payload.id, + }, + body, + signal: AbortSignal.timeout(10_000), // 10s timeout per attempt + }); + + const resBody = await res.text().catch(() => null); + + return { + success: res.ok, + status: res.status, + body: resBody?.slice(0, 500) ?? null, // cap stored response size + attempt, + durationMs: Date.now() - start, + }; + } catch (err) { + return { + success: false, + status: null, + body: err instanceof Error ? err.message : "Unknown error", + attempt, + durationMs: Date.now() - start, + }; + } +} + +/** + * Returns the delay in ms before the next retry attempt. + * Returns null if no more retries should be attempted. + */ +export function nextRetryDelay(attempt: number): number | null { + if (attempt >= MAX_ATTEMPTS) return null; + return BACKOFF_MS[attempt - 1] ?? BACKOFF_MS[BACKOFF_MS.length - 1]; +} + +/** + * Build a typed webhook payload ready to dispatch. + */ +export function buildPayload( + type: WebhookEventType, + data: T["data"], + deliveryId: string, +): WebhookPayload { + return { + id: deliveryId, + type, + created_at: new Date().toISOString(), + api_version: "2026-03-13", + data, + } as WebhookPayload; +} diff --git a/packages/webhooks/src/events.ts b/packages/webhooks/src/events.ts new file mode 100644 index 0000000..5150765 --- /dev/null +++ b/packages/webhooks/src/events.ts @@ -0,0 +1,91 @@ +// Typed event payloads for outbound webhooks. +// These map directly to the event strings stored in webhooks.events[]. + +export const WEBHOOK_EVENTS = [ + "agent.run.started", + "agent.run.completed", + "agent.run.failed", + "subscription.updated", + "usage.quota_warning", + "usage.quota_exceeded", +] as const; + +export type WebhookEventType = (typeof WEBHOOK_EVENTS)[number]; + +export interface WebhookEventBase { + id: string; // unique delivery ID (uuid) + type: WebhookEventType; + created_at: string; // ISO timestamp + api_version: string; // "2026-03-13" +} + +export interface AgentRunStartedPayload extends WebhookEventBase { + type: "agent.run.started"; + data: { + run_id: string; + agent_id: string; + user_id: string; + model: string; + }; +} + +export interface AgentRunCompletedPayload extends WebhookEventBase { + type: "agent.run.completed"; + data: { + run_id: string; + agent_id: string; + user_id: string; + model: string; + input_tokens: number; + output_tokens: number; + duration_ms: number; + turns: number; + }; +} + +export interface AgentRunFailedPayload extends WebhookEventBase { + type: "agent.run.failed"; + data: { + run_id: string; + agent_id: string; + user_id: string; + error_code: string; + error_message: string; + }; +} + +export interface SubscriptionUpdatedPayload extends WebhookEventBase { + type: "subscription.updated"; + data: { + user_id: string; + old_tier: string; + new_tier: string; + }; +} + +export interface UsageQuotaWarningPayload extends WebhookEventBase { + type: "usage.quota_warning"; + data: { + user_id: string; + tokens_used: number; + tokens_limit: number; + pct_used: number; + }; +} + +export interface UsageQuotaExceededPayload extends WebhookEventBase { + type: "usage.quota_exceeded"; + data: { + user_id: string; + tokens_used: number; + tokens_limit: number; + }; +} + +export type WebhookPayload = + | AgentRunStartedPayload + | AgentRunCompletedPayload + | AgentRunFailedPayload + | SubscriptionUpdatedPayload + | UsageQuotaWarningPayload + | UsageQuotaExceededPayload; diff --git a/packages/webhooks/src/index.ts b/packages/webhooks/src/index.ts index f93c156..76b5ad6 100644 --- a/packages/webhooks/src/index.ts +++ b/packages/webhooks/src/index.ts @@ -1,2 +1,3 @@ -// @maschina/webhooks — outbound webhook delivery and signing -export {}; +export * from "./sign.js"; +export * from "./events.js"; +export * from "./deliver.js"; diff --git a/packages/webhooks/src/sign.ts b/packages/webhooks/src/sign.ts new file mode 100644 index 0000000..c9f4d74 --- /dev/null +++ b/packages/webhooks/src/sign.ts @@ -0,0 +1,49 @@ +import { createHmac, randomBytes, timingSafeEqual } from "node:crypto"; + +const ALGORITHM = "sha256"; +const HEADER = "X-Maschina-Signature"; + +/** + * Generate a webhook signing secret — 32 random bytes as hex. + * Store the hash of this (via hashSecret) in the DB, hand the raw value to the user once. + */ +export function generateSecret(): string { + return randomBytes(32).toString("hex"); +} + +/** + * Hash a raw secret for storage. Uses HMAC-SHA256 with a fixed key so + * the stored value is non-reversible but deterministically verifiable. + */ +export function hashSecret(secret: string, appSecret: string): string { + return createHmac(ALGORITHM, appSecret).update(secret).digest("hex"); +} + +/** + * Sign a webhook payload. Returns the full header value. + * Format: sha256= + * + * The secret here is the RAW secret (not the hash) — looked up per-delivery. + * In practice the worker reconstructs the raw secret from the hash at dispatch time, + * which means the raw secret must be stored encrypted or passed through NATS securely. + * For now the worker receives the raw secret in the job payload (encrypted channel). + */ +export function signPayload(payload: string, secret: string): string { + const sig = createHmac(ALGORITHM, secret).update(payload).digest("hex"); + return `${ALGORITHM}=${sig}`; +} + +/** + * Verify an inbound signature (for testing or inbound webhook validation). + * Timing-safe comparison. + */ +export function verifySignature(payload: string, secret: string, header: string): boolean { + const expected = signPayload(payload, secret); + try { + return timingSafeEqual(Buffer.from(header), Buffer.from(expected)); + } catch { + return false; + } +} + +export { HEADER }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 38e5150..15ac979 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -896,7 +896,18 @@ importers: packages/ml: {} - packages/model: {} + packages/model: + dependencies: + '@maschina/plans': + specifier: workspace:* + version: link:../plans + devDependencies: + '@maschina/tsconfig': + specifier: workspace:* + version: link:../tsconfig + typescript: + specifier: ^5 + version: 5.9.3 packages/nats: dependencies: @@ -1452,6 +1463,9 @@ importers: '@maschina/jobs': specifier: workspace:* version: link:../../packages/jobs + '@maschina/model': + specifier: workspace:* + version: link:../../packages/model '@maschina/nats': specifier: workspace:* version: link:../../packages/nats @@ -1470,6 +1484,9 @@ importers: '@maschina/validation': specifier: workspace:* version: link:../../packages/validation + '@maschina/webhooks': + specifier: workspace:* + version: link:../../packages/webhooks dotenv: specifier: ^17.3.1 version: 17.3.1 diff --git a/services/api/Dockerfile b/services/api/Dockerfile index cd937ea..b50bb68 100644 --- a/services/api/Dockerfile +++ b/services/api/Dockerfile @@ -14,6 +14,7 @@ COPY packages/db/package.json ./packages/db/package.json COPY packages/email/package.json ./packages/email/package.json COPY packages/events/package.json ./packages/events/package.json COPY packages/jobs/package.json ./packages/jobs/package.json +COPY packages/model/package.json ./packages/model/package.json COPY packages/nats/package.json ./packages/nats/package.json COPY packages/notifications/package.json ./packages/notifications/package.json COPY packages/plans/package.json ./packages/plans/package.json @@ -38,6 +39,7 @@ RUN pnpm --filter @maschina/plans build RUN pnpm --filter @maschina/events build RUN pnpm --filter @maschina/nats build RUN pnpm --filter @maschina/jobs build +RUN pnpm --filter @maschina/model build RUN pnpm --filter @maschina/telemetry build RUN pnpm --filter @maschina/usage build RUN pnpm --filter @maschina/billing build diff --git a/services/api/package.json b/services/api/package.json index 699066e..88d69a8 100644 --- a/services/api/package.json +++ b/services/api/package.json @@ -19,12 +19,14 @@ "@maschina/email": "workspace:*", "@maschina/events": "workspace:*", "@maschina/jobs": "workspace:*", + "@maschina/model": "workspace:*", "@maschina/nats": "workspace:*", "@maschina/notifications": "workspace:*", "@maschina/plans": "workspace:*", "@maschina/telemetry": "workspace:*", "@maschina/usage": "workspace:*", "@maschina/validation": "workspace:*", + "@maschina/webhooks": "workspace:*", "dotenv": "^17.3.1", "drizzle-orm": "^0.39.3", "hono": "^4.6", diff --git a/services/api/src/index.ts b/services/api/src/index.ts index be43e76..5158c7c 100644 --- a/services/api/src/index.ts +++ b/services/api/src/index.ts @@ -5,6 +5,7 @@ import { initTelemetry } from "@maschina/telemetry"; import { createApp } from "./app.js"; import { env } from "./env.js"; import { startEmailWorker } from "./jobs/email.js"; +import { startWebhookDispatcher } from "./jobs/webhook.js"; // ─── Telemetry (must be first) ──────────────────────────────────────────────── initTelemetry({ serviceName: "maschina-api", serviceVersion: "0.0.0" }); @@ -19,6 +20,7 @@ async function start() { // Start background job workers (non-blocking — runs concurrently with HTTP server) startEmailWorker().catch((err) => console.error("[email-worker] fatal error", err)); + startWebhookDispatcher().catch((err) => console.error("[webhook-dispatcher] fatal error", err)); const app = createApp(); diff --git a/services/api/src/jobs/webhook.ts b/services/api/src/jobs/webhook.ts new file mode 100644 index 0000000..0d85f0e --- /dev/null +++ b/services/api/src/jobs/webhook.ts @@ -0,0 +1,131 @@ +/** + * Webhook event dispatcher. + * + * Subscribes to the MASCHINA_EVENTS stream for agent run completion/failure + * events. For each event, finds all active webhooks for the user that + * subscribe to that event type, creates a delivery record, and dispatches + * a webhook.dispatch job to the Python worker via NATS. + */ + +import { db, webhookDeliveries, webhooks } from "@maschina/db"; +import { and, eq, sql } from "@maschina/db"; +import type { AgentRunCompletedData, AgentRunFailedData, EventEnvelope } from "@maschina/events"; +import { dispatchWebhookJob } from "@maschina/jobs"; +import { AckPolicy, DeliverPolicy, getJs, getJsm } from "@maschina/nats"; +import { buildPayload } from "@maschina/webhooks"; +import type { WebhookEventType } from "@maschina/webhooks"; + +const STREAM = "MASCHINA_EVENTS"; +const CONSUMER_NAME = "api-webhook-dispatcher"; +// Capture agent run completed + failed (not queued/started — too noisy for webhooks) +const FILTER_SUBJECTS = ["maschina.agent.run.completed", "maschina.agent.run.failed"]; + +export async function startWebhookDispatcher(): Promise { + const jsm = await getJsm(); + const js = await getJs(); + + try { + await jsm.consumers.add(STREAM, { + durable_name: CONSUMER_NAME, + ack_policy: AckPolicy.Explicit, + deliver_policy: DeliverPolicy.New, // only events from boot onward — no backfill + filter_subjects: FILTER_SUBJECTS, + max_deliver: 3, + ack_wait: 30_000_000_000, // 30s in ns + }); + } catch (err: any) { + if (!err?.message?.includes("consumer name already in use")) throw err; + } + + const consumer = await js.consumers.get(STREAM, CONSUMER_NAME); + const messages = await consumer.consume(); + + console.log("[webhook-dispatcher] consuming agent run events"); + + for await (const msg of messages) { + try { + const envelope = JSON.parse(new TextDecoder().decode(msg.data)) as EventEnvelope; + await handleEvent(envelope); + msg.ack(); + } catch (err) { + console.error("[webhook-dispatcher] failed to process event", err); + msg.nak(5_000); + } + } +} + +async function handleEvent(envelope: EventEnvelope): Promise { + const subject = envelope.subject; + + if (subject === "maschina.agent.run.completed") { + const data = envelope.data as AgentRunCompletedData; + await fanOut(data.userId, "agent.run.completed", { + run_id: data.runId, + agent_id: data.agentId, + user_id: data.userId, + input_tokens: data.inputTokens, + output_tokens: data.outputTokens, + duration_ms: data.durationMs, + turns: 1, // daemon doesn't surface turns yet — default 1 + }); + return; + } + + if (subject === "maschina.agent.run.failed") { + const data = envelope.data as AgentRunFailedData; + await fanOut(data.userId, "agent.run.failed", { + run_id: data.runId, + agent_id: data.agentId, + user_id: data.userId, + error_code: data.errorCode, + error_message: data.errorMessage, + }); + return; + } +} + +async function fanOut( + userId: string, + eventType: WebhookEventType, + data: Record, +): Promise { + // Find all active webhooks for this user subscribed to this event type. + // JSON containment check: events @> '["event.type"]' + const rows = await db + .select({ id: webhooks.id, secretHash: webhooks.secretHash }) + .from(webhooks) + .where( + and( + eq(webhooks.userId, userId), + eq(webhooks.status, "active"), + sql`${webhooks.events} @> ${JSON.stringify([eventType])}::jsonb`, + ), + ); + + if (rows.length === 0) return; + + for (const webhook of rows) { + const deliveryId = crypto.randomUUID(); + const payload = buildPayload(eventType, data as any, deliveryId); + + // Insert delivery record + await db.insert(webhookDeliveries).values({ + id: deliveryId, + webhookId: webhook.id, + event: eventType, + payload: payload as unknown as Record, + status: "pending", + }); + + // Dispatch to Python worker + await dispatchWebhookJob({ + deliveryId, + webhookId: webhook.id, + event: eventType, + payload: payload as unknown as Record, + attempt: 1, + }); + } + + console.log(`[webhook-dispatcher] dispatched ${rows.length} webhook(s) for ${eventType}`); +} diff --git a/services/api/src/routes/agents.ts b/services/api/src/routes/agents.ts index 52406c9..db51cfa 100644 --- a/services/api/src/routes/agents.ts +++ b/services/api/src/routes/agents.ts @@ -3,6 +3,7 @@ import { agents } from "@maschina/db"; import { and, eq, isNull } from "@maschina/db"; import { Subjects } from "@maschina/events"; import { dispatchAgentRun } from "@maschina/jobs"; +import { resolveModel, validateModelAccess } from "@maschina/model"; import { publishSafe } from "@maschina/nats"; import { recordAgentExecution } from "@maschina/usage"; import { @@ -147,9 +148,26 @@ app.post( if (!agent) throw new HTTPException(404, { message: "Agent not found" }); - // Get the user's current plan for timeout config - const { getPlan } = await import("@maschina/plans"); - const plan = getPlan(user.tier); + // Validate model access and resolve to the appropriate model for this tier + if (input.model) { + const access = validateModelAccess(user.tier, input.model); + if (!access.allowed) { + throw new HTTPException(403, { + message: access.reason ?? "Model not available on your plan.", + }); + } + } + const resolvedModel = resolveModel(user.tier, input.model); + + // Resolve system prompt from agent config, fall back to a sensible default + const agentConfig = (agent.config ?? {}) as Record; + const systemPrompt = + typeof agentConfig.systemPrompt === "string" + ? agentConfig.systemPrompt + : `You are a Maschina ${agent.type} agent named "${agent.name}". Complete the task provided.`; + + // Convert timeout from ms (API input) to seconds (runtime) + const timeoutSecs = Math.floor((input.timeout ?? 300_000) / 1000); // Insert the agent_runs row const { agentRuns } = await import("@maschina/db"); @@ -169,8 +187,10 @@ app.post( agentId, userId: user.id, tier: user.tier, + model: resolvedModel, + systemPrompt, inputPayload: input.input ?? {}, - timeoutSecs: 300, + timeoutSecs, }); // Publish event (fire-and-forget — realtime service fans this out to WebSocket clients) diff --git a/services/api/src/routes/webhooks.ts b/services/api/src/routes/webhooks.ts index 97ea35b..2884124 100644 --- a/services/api/src/routes/webhooks.ts +++ b/services/api/src/routes/webhooks.ts @@ -1,18 +1,24 @@ import { constructWebhookEvent, handleWebhookEvent } from "@maschina/billing"; +import { db, webhookDeliveries, webhooks } from "@maschina/db"; +import { and, desc, eq } from "@maschina/db"; +import { dispatchWebhookJob } from "@maschina/jobs"; +import { WEBHOOK_EVENTS, generateSecret, hashSecret } from "@maschina/webhooks"; import { Hono } from "hono"; import { HTTPException } from "hono/http-exception"; import type Stripe from "stripe"; +import { z } from "zod"; +import type { Variables } from "../context.js"; +import { env } from "../env.js"; +import { requireAuth } from "../middleware/auth.js"; -const app = new Hono(); +const app = new Hono<{ Variables: Variables }>(); + +// ─── Stripe inbound webhook (no auth — raw body required) ──────────────────── -// POST /webhooks/stripe -// IMPORTANT: Must receive the raw body — not JSON.parse()'d. -// Hono does not auto-parse here because we handle the body manually. app.post("/stripe", async (c) => { const signature = c.req.header("stripe-signature"); if (!signature) throw new HTTPException(400, { message: "Missing stripe-signature header" }); - // Read raw body as text — required for Stripe signature verification const rawBody = await c.req.text(); let event: Stripe.Event; @@ -26,7 +32,6 @@ app.post("/stripe", async (c) => { try { await handleWebhookEvent(event); } catch (err) { - // Log + return 500 so Stripe retries console.error("[webhook] Processing failed:", err); throw new HTTPException(500, { message: "Webhook processing failed" }); } @@ -34,4 +39,228 @@ app.post("/stripe", async (c) => { return c.json({ received: true }); }); +// ─── Outbound webhook management (authenticated) ────────────────────────────── + +app.use("/", requireAuth); +app.use("/:id", requireAuth); +app.use("/:id/test", requireAuth); + +const CreateWebhookSchema = z.object({ + url: z.string().url(), + events: z.array(z.enum(WEBHOOK_EVENTS)).min(1), +}); + +const UpdateWebhookSchema = z.object({ + url: z.string().url().optional(), + events: z.array(z.enum(WEBHOOK_EVENTS)).min(1).optional(), + active: z.boolean().optional(), +}); + +// GET /webhooks +app.get("/", async (c) => { + const { id: userId } = c.get("user"); + + const rows = await db + .select({ + id: webhooks.id, + url: webhooks.url, + events: webhooks.events, + status: webhooks.status, + failureCount: webhooks.failureCount, + createdAt: webhooks.createdAt, + updatedAt: webhooks.updatedAt, + }) + .from(webhooks) + .where(eq(webhooks.userId, userId)) + .orderBy(desc(webhooks.createdAt)); + + return c.json(rows); +}); + +// POST /webhooks +app.post("/", async (c) => { + const user = c.get("user"); + const body = await c.req.json().catch(() => null); + const parsed = CreateWebhookSchema.safeParse(body); + if (!parsed.success) { + throw new HTTPException(400, { message: parsed.error.issues[0]?.message ?? "Invalid input" }); + } + + const rawSecret = generateSecret(); + const secretHash = hashSecret(rawSecret, env.JWT_SECRET); + + const [webhook] = await db + .insert(webhooks) + .values({ + userId: user.id, + url: parsed.data.url, + events: parsed.data.events, + secretHash, + status: "active", + }) + .returning({ + id: webhooks.id, + url: webhooks.url, + events: webhooks.events, + status: webhooks.status, + createdAt: webhooks.createdAt, + }); + + // Return the raw secret once — it is never retrievable again + return c.json({ ...webhook, secret: rawSecret }, 201); +}); + +// GET /webhooks/:id +app.get("/:id", async (c) => { + const { id: userId } = c.get("user"); + const webhookId = c.req.param("id"); + + const [row] = await db + .select({ + id: webhooks.id, + url: webhooks.url, + events: webhooks.events, + status: webhooks.status, + failureCount: webhooks.failureCount, + createdAt: webhooks.createdAt, + updatedAt: webhooks.updatedAt, + }) + .from(webhooks) + .where(and(eq(webhooks.id, webhookId), eq(webhooks.userId, userId))); + + if (!row) throw new HTTPException(404, { message: "Webhook not found" }); + + return c.json(row); +}); + +// PATCH /webhooks/:id +app.patch("/:id", async (c) => { + const { id: userId } = c.get("user"); + const webhookId = c.req.param("id"); + const body = await c.req.json().catch(() => null); + const parsed = UpdateWebhookSchema.safeParse(body); + if (!parsed.success) { + throw new HTTPException(400, { message: parsed.error.issues[0]?.message ?? "Invalid input" }); + } + + const [existing] = await db + .select({ id: webhooks.id }) + .from(webhooks) + .where(and(eq(webhooks.id, webhookId), eq(webhooks.userId, userId))); + + if (!existing) throw new HTTPException(404, { message: "Webhook not found" }); + + const updates: Partial = { + updatedAt: new Date(), + }; + if (parsed.data.url !== undefined) updates.url = parsed.data.url; + if (parsed.data.events !== undefined) updates.events = parsed.data.events; + if (parsed.data.active !== undefined) updates.status = parsed.data.active ? "active" : "disabled"; + + const [updated] = await db + .update(webhooks) + .set(updates) + .where(eq(webhooks.id, webhookId)) + .returning({ + id: webhooks.id, + url: webhooks.url, + events: webhooks.events, + status: webhooks.status, + updatedAt: webhooks.updatedAt, + }); + + return c.json(updated); +}); + +// DELETE /webhooks/:id +app.delete("/:id", async (c) => { + const { id: userId } = c.get("user"); + const webhookId = c.req.param("id"); + + const [existing] = await db + .select({ id: webhooks.id }) + .from(webhooks) + .where(and(eq(webhooks.id, webhookId), eq(webhooks.userId, userId))); + + if (!existing) throw new HTTPException(404, { message: "Webhook not found" }); + + await db.delete(webhooks).where(eq(webhooks.id, webhookId)); + + return c.body(null, 204); +}); + +// POST /webhooks/:id/test — fire a test event to the endpoint +app.post("/:id/test", async (c) => { + const { id: userId } = c.get("user"); + const webhookId = c.req.param("id"); + + const [row] = await db + .select() + .from(webhooks) + .where(and(eq(webhooks.id, webhookId), eq(webhooks.userId, userId))); + + if (!row) throw new HTTPException(404, { message: "Webhook not found" }); + if (row.status !== "active") { + throw new HTTPException(400, { message: "Webhook is not active" }); + } + + const deliveryId = crypto.randomUUID(); + const testPayload = { + id: deliveryId, + type: "agent.run.completed", + created_at: new Date().toISOString(), + api_version: "2026-03-13", + data: { + run_id: "test-run-id", + agent_id: "test-agent-id", + user_id: userId, + model: "claude-haiku-4-5", + input_tokens: 100, + output_tokens: 200, + duration_ms: 1234, + turns: 1, + }, + }; + + await db.insert(webhookDeliveries).values({ + id: deliveryId, + webhookId: row.id, + event: "agent.run.completed", + payload: testPayload, + status: "pending", + }); + + await dispatchWebhookJob({ + deliveryId, + webhookId: row.id, + event: "agent.run.completed", + payload: testPayload, + attempt: 1, + }); + + return c.json({ delivery_id: deliveryId, message: "Test event queued" }); +}); + +// GET /webhooks/:id/deliveries — delivery log for a webhook +app.get("/:id/deliveries", async (c) => { + const { id: userId } = c.get("user"); + const webhookId = c.req.param("id"); + + const [row] = await db + .select({ id: webhooks.id }) + .from(webhooks) + .where(and(eq(webhooks.id, webhookId), eq(webhooks.userId, userId))); + + if (!row) throw new HTTPException(404, { message: "Webhook not found" }); + + const deliveries = await db + .select() + .from(webhookDeliveries) + .where(eq(webhookDeliveries.webhookId, webhookId)) + .orderBy(desc(webhookDeliveries.createdAt)) + .limit(50); + + return c.json(deliveries); +}); + export default app; diff --git a/services/daemon/src/orchestrator/analyze.rs b/services/daemon/src/orchestrator/analyze.rs index f1da3e9..b0293e8 100644 --- a/services/daemon/src/orchestrator/analyze.rs +++ b/services/daemon/src/orchestrator/analyze.rs @@ -63,7 +63,7 @@ async fn persist_success( WHERE id = $4 "#, ) - .bind(&output.payload) + .bind(&output.output_payload) .bind(output.input_tokens as i64) .bind(output.output_tokens as i64) .bind(run.id) diff --git a/services/daemon/src/orchestrator/scan.rs b/services/daemon/src/orchestrator/scan.rs index 0f29d12..4f5afe1 100644 --- a/services/daemon/src/orchestrator/scan.rs +++ b/services/daemon/src/orchestrator/scan.rs @@ -17,6 +17,8 @@ pub struct AgentExecuteJob { pub agent_id: Uuid, pub user_id: Uuid, pub tier: String, + pub model: String, + pub system_prompt: String, pub input_payload: serde_json::Value, pub timeout_secs: u64, } @@ -119,6 +121,8 @@ pub async fn scan_and_dispatch(state: AppState) -> Result<()> { agent_id: job.agent_id, user_id: job.user_id, plan_tier: job.tier, + model: job.model, + system_prompt: job.system_prompt, input_payload: job.input_payload, timeout_secs: job.timeout_secs as i64, }; diff --git a/services/daemon/src/orchestrator/scan_compat.rs b/services/daemon/src/orchestrator/scan_compat.rs index 77e6bf6..67f3fa3 100644 --- a/services/daemon/src/orchestrator/scan_compat.rs +++ b/services/daemon/src/orchestrator/scan_compat.rs @@ -8,6 +8,10 @@ pub struct JobToRun { pub agent_id: Uuid, pub user_id: Uuid, pub plan_tier: String, + /// Resolved model ID (e.g. "claude-haiku-4-5-20251001" or "ollama/llama3.2"). + pub model: String, + /// System prompt resolved from agent config at dispatch time. + pub system_prompt: String, pub input_payload: serde_json::Value, pub timeout_secs: i64, } diff --git a/services/daemon/src/runtime/mod.rs b/services/daemon/src/runtime/mod.rs index 86fdae7..823e3cb 100644 --- a/services/daemon/src/runtime/mod.rs +++ b/services/daemon/src/runtime/mod.rs @@ -4,32 +4,44 @@ use crate::state::AppState; use serde::{Deserialize, Serialize}; /// Output returned by the Python runtime after a successful agent execution. +/// Must match services/runtime/src/models.py::RunResponse exactly. #[derive(Debug, Deserialize)] pub struct RunOutput { - pub payload: serde_json::Value, + pub output_payload: serde_json::Value, pub input_tokens: u64, pub output_tokens: u64, } /// Request body sent to the Python runtime service. +/// Must match services/runtime/src/models.py::RunRequest exactly. #[derive(Debug, Serialize)] struct RuntimeRequest<'a> { run_id: uuid::Uuid, agent_id: uuid::Uuid, user_id: uuid::Uuid, + plan_tier: &'a str, + model: &'a str, + system_prompt: &'a str, + max_tokens: u32, input_payload: &'a serde_json::Value, + timeout_secs: i64, } /// Dispatch a run to the Python runtime and await the result. /// The caller is responsible for enforcing the timeout wrapper. pub async fn dispatch(state: &AppState, run: &QueuedRun) -> Result { - let url = format!("{}/execute", state.config.runtime_url); + let url = format!("{}/run", state.config.runtime_url); let body = RuntimeRequest { run_id: run.id, agent_id: run.agent_id, user_id: run.user_id, + plan_tier: &run.plan_tier, + model: &run.model, + system_prompt: &run.system_prompt, + max_tokens: 4096, input_payload: &run.input_payload, + timeout_secs: run.timeout_secs, }; let response = state diff --git a/services/runtime/src/runner.py b/services/runtime/src/runner.py index 557d406..6a86447 100644 --- a/services/runtime/src/runner.py +++ b/services/runtime/src/runner.py @@ -1,6 +1,16 @@ """ Agent execution — delegates to maschina-runtime (the shared execution package) and runs risk checks via maschina-risk before and after the LLM call. + +Model routing: + - Models starting with "ollama/" → OllamaRunner (local, no token quota deduction) + - All other models → AnthropicRunner (cloud, billed with multiplier) + +Token billing multipliers (applied to raw token counts before returning): + claude-haiku-* → 1x + claude-sonnet-* → 3x + claude-opus-* → 15x + ollama/* → 0x (local, never deducted from quota) """ import logging @@ -15,13 +25,33 @@ logger = logging.getLogger(__name__) -# Lazily import Anthropic only if an API key is configured -if not settings.use_ollama: - import anthropic +# ─── Token billing multipliers ──────────────────────────────────────────────── +# Must stay in sync with packages/model/src/catalog.ts + +_MULTIPLIERS: list[tuple[str, int]] = [ + ("claude-haiku-", 1), + ("claude-sonnet-", 3), + ("claude-opus-", 15), + ("ollama/", 0), +] + +_DEFAULT_MULTIPLIER = 1 + + +def _get_multiplier(model: str) -> int: + for prefix, mult in _MULTIPLIERS: + if model.startswith(prefix): + return mult + return _DEFAULT_MULTIPLIER + - _anthropic_client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) -else: - _anthropic_client = None # type: ignore[assignment] +def _is_ollama(model: str) -> bool: + return model.startswith("ollama/") + + +def _ollama_model_name(model: str) -> str: + """Strip 'ollama/' prefix to get the bare Ollama model name.""" + return model[len("ollama/") :] def _extract_user_message(input_payload: dict[str, Any]) -> str: @@ -38,9 +68,10 @@ async def execute(req: RunRequest) -> RunResponse: Pipeline: 1. Risk-check the user input (block prompt injection / oversized inputs) - 2. Run the agent via maschina-runtime AgentRunner + 2. Route to the appropriate runner based on model prefix 3. Risk-scan the output (flag PII leakage) - 4. Return structured response + 4. Apply token billing multiplier to reported token counts + 5. Return structured response """ user_message = _extract_user_message(req.input_payload) @@ -50,20 +81,34 @@ async def execute(req: RunRequest) -> RunResponse: codes = ", ".join(f.code for f in risk.flags) raise ValueError(f"Input blocked by risk check: {codes}") - # ── Execute via maschina-runtime ──────────────────────────────────────── - if settings.use_ollama: + # ── Route to runner ───────────────────────────────────────────────────── + if _is_ollama(req.model): + # Local Ollama — use the model name from the request runner = OllamaRunner( base_url=settings.ollama_base_url, - model=settings.ollama_model, + model=_ollama_model_name(req.model), system_prompt=req.system_prompt, max_tokens=min(req.max_tokens, settings.max_output_tokens), timeout_secs=req.timeout_secs, ) else: + # Cloud Anthropic model — lazy-import to avoid requiring the key for local dev + try: + import anthropic + + client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key) + except ImportError as exc: + raise RuntimeError("anthropic package not installed") from exc + + if not settings.anthropic_api_key: + raise RuntimeError( + f"ANTHROPIC_API_KEY is not set but model '{req.model}' requires cloud execution" + ) + from maschina_runtime import AgentRunner runner = AgentRunner( - client=_anthropic_client, + client=client, system_prompt=req.system_prompt, model=req.model, max_tokens=min(req.max_tokens, settings.max_output_tokens), @@ -85,20 +130,30 @@ async def execute(req: RunRequest) -> RunResponse: extra={"run_id": req.run_id, "flags": [f.code for f in output_risk.flags]}, ) + # ── Apply billing multiplier ──────────────────────────────────────────── + # Multiply raw token counts so the daemon's quota deduction reflects cost. + # Ollama multiplier = 0, so local runs never deduct from the cloud quota. + multiplier = _get_multiplier(req.model) + billed_input_tokens = result.input_tokens * multiplier + billed_output_tokens = result.output_tokens * multiplier + logger.info( "run completed", extra={ "run_id": req.run_id, "model": req.model, "turns": result.turns, - "input_tokens": result.input_tokens, - "output_tokens": result.output_tokens, + "raw_input_tokens": result.input_tokens, + "raw_output_tokens": result.output_tokens, + "billed_input_tokens": billed_input_tokens, + "billed_output_tokens": billed_output_tokens, + "multiplier": multiplier, }, ) return RunResponse( run_id=req.run_id, output_payload={"text": result.output}, - input_tokens=result.input_tokens, - output_tokens=result.output_tokens, + input_tokens=billed_input_tokens, + output_tokens=billed_output_tokens, ) diff --git a/services/runtime/tests/__init__.py b/services/runtime/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/runtime/tests/test_runner_routing.py b/services/runtime/tests/test_runner_routing.py new file mode 100644 index 0000000..c8088fa --- /dev/null +++ b/services/runtime/tests/test_runner_routing.py @@ -0,0 +1,89 @@ +""" +Unit tests for model routing and billing multiplier logic in runner.py. +Tests the private helpers directly — no actual LLM calls made. +""" + +import sys +import types +import unittest.mock as mock + +# ─── Stub out packages that aren't installed in CI ─────────────────────────── + + +def _stub_module(name: str, **attrs): + mod = types.ModuleType(name) + for k, v in attrs.items(): + setattr(mod, k, v) + sys.modules[name] = mod + return mod + + +# Stub maschina_risk +_stub_module( + "maschina_risk", + check_input=lambda text: mock.MagicMock(approved=True, flags=[]), + check_output=lambda text: mock.MagicMock(flags=[]), +) + +# Stub maschina_runtime as a package (needs __path__ so submodule imports work) +_rt = _stub_module("maschina_runtime", RunInput=mock.MagicMock, AgentRunner=mock.MagicMock) +_rt.__path__ = [] # marks it as a package to Python's import system + +# Stub maschina_runtime.models (imported by ollama_runner.py at module level) +_stub_module("maschina_runtime.models", RunInput=mock.MagicMock, RunResult=mock.MagicMock) + +# Stub openai (imported by ollama_runner.py) +_stub_module("openai", AsyncOpenAI=mock.MagicMock) + +# Stub src.config.settings before importing runner +settings_mock = mock.MagicMock() +settings_mock.anthropic_api_key = "" +settings_mock.ollama_base_url = "http://localhost:11434/v1" +settings_mock.ollama_model = "llama3.2" +settings_mock.max_output_tokens = 16_384 +settings_mock.use_ollama = True + +config_mod = _stub_module("src.config", settings=settings_mock) + +# Now we can import the helpers +from src.runner import _get_multiplier, _is_ollama, _ollama_model_name # noqa: E402 + +# ─── Multiplier tests ───────────────────────────────────────────────────────── + + +class TestGetMultiplier: + def test_haiku_is_1x(self): + assert _get_multiplier("claude-haiku-4-5-20251001") == 1 + + def test_sonnet_is_3x(self): + assert _get_multiplier("claude-sonnet-4-6") == 3 + + def test_opus_is_15x(self): + assert _get_multiplier("claude-opus-4-6") == 15 + + def test_ollama_is_0x(self): + assert _get_multiplier("ollama/llama3.2") == 0 + assert _get_multiplier("ollama/mistral") == 0 + + def test_unknown_model_defaults_to_1x(self): + assert _get_multiplier("gpt-99") == 1 + assert _get_multiplier("") == 1 + + +# ─── Routing helpers ────────────────────────────────────────────────────────── + + +class TestIsOllama: + def test_ollama_prefix(self): + assert _is_ollama("ollama/llama3.2") is True + assert _is_ollama("ollama/mistral") is True + + def test_anthropic_is_not_ollama(self): + assert _is_ollama("claude-haiku-4-5-20251001") is False + assert _is_ollama("claude-sonnet-4-6") is False + + +class TestOllamaModelName: + def test_strips_prefix(self): + assert _ollama_model_name("ollama/llama3.2") == "llama3.2" + assert _ollama_model_name("ollama/mistral") == "mistral" diff --git a/services/worker/src/worker/consumer.py b/services/worker/src/worker/consumer.py index 99c1c2a..ff73b70 100644 --- a/services/worker/src/worker/consumer.py +++ b/services/worker/src/worker/consumer.py @@ -11,8 +11,8 @@ from nats.js.api import AckPolicy, ConsumerConfig, RetentionPolicy, StreamConfig from .config import settings -from .handlers import handle_batch, handle_ml_inference, handle_report -from .models import BatchJob, JobEnvelope, MlInferenceJob, ReportJob +from .handlers import handle_batch, handle_ml_inference, handle_report, handle_webhook_dispatch +from .models import BatchJob, JobEnvelope, MlInferenceJob, ReportJob, WebhookDispatchJob log = structlog.get_logger() @@ -91,5 +91,7 @@ async def _dispatch(envelope: JobEnvelope) -> None: await handle_report(ReportJob.model_validate(envelope.data)) elif subject.startswith("maschina.jobs.worker.batch"): await handle_batch(BatchJob.model_validate(envelope.data)) + elif subject == "maschina.jobs.worker.webhook_dispatch": + await handle_webhook_dispatch(WebhookDispatchJob.model_validate(envelope.data)) else: log.warning("worker.unknown_subject", subject=subject) diff --git a/services/worker/src/worker/handlers/__init__.py b/services/worker/src/worker/handlers/__init__.py index efb8483..baa2163 100644 --- a/services/worker/src/worker/handlers/__init__.py +++ b/services/worker/src/worker/handlers/__init__.py @@ -1,5 +1,6 @@ from .batch import handle_batch from .ml import handle_ml_inference from .report import handle_report +from .webhook import handle_webhook_dispatch -__all__ = ["handle_ml_inference", "handle_report", "handle_batch"] +__all__ = ["handle_ml_inference", "handle_report", "handle_batch", "handle_webhook_dispatch"] diff --git a/services/worker/src/worker/handlers/webhook.py b/services/worker/src/worker/handlers/webhook.py new file mode 100644 index 0000000..fb99feb --- /dev/null +++ b/services/worker/src/worker/handlers/webhook.py @@ -0,0 +1,187 @@ +"""Outbound webhook dispatch handler — sign, POST, retry, log.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import time + +import asyncpg +import httpx +import structlog + +from ..config import settings +from ..models import WebhookDispatchJob + +log = structlog.get_logger() + +ALGORITHM = "sha256" +HEADER = "X-Maschina-Signature" +MAX_ATTEMPTS = 5 +TIMEOUT_SECS = 10 + +# Exponential backoff delays (seconds) indexed by attempt number (1-based) +BACKOFF_SECS = [10, 30, 90, 300, 900] + + +def _sign(payload: str, secret: str) -> str: + """HMAC-SHA256 signature matching the TypeScript sign.ts implementation.""" + sig = hmac.new(secret.encode(), payload.encode(), hashlib.sha256).hexdigest() + return f"{ALGORITHM}={sig}" + + +async def handle_webhook_dispatch(job: WebhookDispatchJob) -> None: + """ + Deliver a webhook event to the user's endpoint. + + Flow: + 1. Load webhook row + raw secret from DB + 2. Sign the payload + 3. POST to endpoint (10s timeout) + 4. Log result to webhook_deliveries + 5. On failure: schedule retry or mark webhook as failing + """ + conn = await asyncpg.connect(settings.database_url) + try: + await _dispatch(conn, job) + finally: + await conn.close() + + +async def _dispatch(conn: asyncpg.Connection, job: WebhookDispatchJob) -> None: + # Load webhook — verify it's still active + row = await conn.fetchrow( + """ + SELECT id, url, secret_hash, status, failure_count + FROM webhooks + WHERE id = $1 + """, + job.webhook_id, + ) + + if not row: + log.warning("webhook.not_found", webhook_id=job.webhook_id) + return + + if row["status"] != "active": + log.info("webhook.skipped_inactive", webhook_id=job.webhook_id, status=row["status"]) + return + + payload_str = json.dumps(job.payload, separators=(",", ":")) + signature = _sign(payload_str, row["secret_hash"]) + + headers = { + "Content-Type": "application/json", + "User-Agent": "Maschina-Webhook/1.0", + HEADER: signature, + "X-Maschina-Event": job.event, + "X-Maschina-Delivery": job.delivery_id, + } + + start = time.monotonic() + success = False + response_status: int | None = None + response_body: str | None = None + + try: + async with httpx.AsyncClient(timeout=TIMEOUT_SECS) as client: + resp = await client.post(row["url"], content=payload_str, headers=headers) + response_status = resp.status_code + response_body = resp.text[:500] + success = resp.is_success + + except Exception as exc: + response_body = str(exc)[:500] + log.warning("webhook.delivery_error", webhook_id=job.webhook_id, error=str(exc)) + + duration_ms = int((time.monotonic() - start) * 1000) + + if success: + # Log success, reset failure counter + await conn.execute( + """ + UPDATE webhook_deliveries + SET status = 'success', response_status = $1, response_body = $2, + delivered_at = NOW() + WHERE id = $3 + """, + response_status, + response_body, + job.delivery_id, + ) + await conn.execute( + "UPDATE webhooks SET failure_count = 0 WHERE id = $1", + job.webhook_id, + ) + log.info( + "webhook.delivered", + webhook_id=job.webhook_id, + delivery_id=job.delivery_id, + status=response_status, + duration_ms=duration_ms, + ) + else: + new_failure_count = row["failure_count"] + 1 + new_attempt = job.attempt + 1 + next_retry_secs = ( + BACKOFF_SECS[job.attempt - 1] if job.attempt - 1 < len(BACKOFF_SECS) else None + ) + + if new_attempt > MAX_ATTEMPTS or next_retry_secs is None: + # Max attempts reached — mark webhook as failing + await conn.execute( + """ + UPDATE webhook_deliveries + SET status = 'failed', response_status = $1, response_body = $2, attempt = $3 + WHERE id = $4 + """, + response_status, + response_body, + job.attempt, + job.delivery_id, + ) + await conn.execute( + """ + UPDATE webhooks + SET failure_count = $1, + status = CASE WHEN $1 >= 5 THEN 'failing'::webhook_status ELSE status END + WHERE id = $2 + """, + new_failure_count, + job.webhook_id, + ) + log.error( + "webhook.exhausted", + webhook_id=job.webhook_id, + delivery_id=job.delivery_id, + attempts=job.attempt, + ) + else: + # Schedule retry + await conn.execute( + """ + UPDATE webhook_deliveries + SET status = 'retrying', response_status = $1, response_body = $2, + attempt = $3, + next_retry_at = NOW() + ($4 || ' seconds')::interval + WHERE id = $5 + """, + response_status, + response_body, + new_attempt, + str(next_retry_secs), + job.delivery_id, + ) + await conn.execute( + "UPDATE webhooks SET failure_count = $1 WHERE id = $2", + new_failure_count, + job.webhook_id, + ) + log.warning( + "webhook.retry_scheduled", + webhook_id=job.webhook_id, + delivery_id=job.delivery_id, + attempt=new_attempt, + retry_in_secs=next_retry_secs, + ) diff --git a/services/worker/src/worker/models.py b/services/worker/src/worker/models.py index 35decad..19ac528 100644 --- a/services/worker/src/worker/models.py +++ b/services/worker/src/worker/models.py @@ -8,6 +8,7 @@ class JobEnvelope(BaseModel): """Wrapper published to NATS by services/api or the daemon.""" + id: str subject: str data: dict[str, Any] @@ -25,12 +26,20 @@ class MlInferenceJob(BaseModel): class ReportJob(BaseModel): report_id: str user_id: UUID - report_type: str # "usage_summary" | "agent_performance" | "billing" - period_start: str # ISO date + report_type: str # "usage_summary" | "agent_performance" | "billing" + period_start: str # ISO date period_end: str class BatchJob(BaseModel): batch_id: str - job_type: str # "feature_extraction" | "reward_computation" | "reconcile" + job_type: str # "feature_extraction" | "reward_computation" | "reconcile" run_ids: list[str] + + +class WebhookDispatchJob(BaseModel): + delivery_id: str # uuid — matches webhook_deliveries.id + webhook_id: str # uuid — matches webhooks.id + event: str # e.g. "agent.run.completed" + payload: dict[str, Any] # the full typed event payload + attempt: int = 1 # current attempt number (1-based)