From 903edcf62cf047d63e518947162cb60b09ac861a Mon Sep 17 00:00:00 2001 From: sc0t Date: Wed, 25 Mar 2026 01:57:59 +0100 Subject: [PATCH] feat: admin dashboard, key management, rate limits, and cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove PROXY_PORT/CLAUDE_PROXY_PORT, parse port from ANTHROPIC_BASE_URL - Remove ANTHROPIC_API_KEY auth, use managed keys only - Add admin dashboard with master key auth (CLAUDE_PROXY_MASTER_KEY) - Add API key management (create, delete, enable/disable, per-key usage) - Add per-model token tracking per key - Add per-key rate limits (6h and weekly windows) with enforcement - Add configurable settings (max concurrent, passthrough mode, global limits) - Fix model catalog: 1M context for opus/sonnet, 200K for haiku - Add OpenAI-compatible API layer (/v1/chat/completions, /v1/models) - Remove telemetry module (replaced by persistent admin stats) - Clean up startup message fix: restore telemetry, hash API keys, mask master key in logs - Restore telemetry module under /admin/telemetry (protected by master key) - Add telemetry button to admin dashboard - Hash API keys at rest (SHA-256) — plaintext never stored on disk - Auto-migrate existing plaintext keys to hashed on first load - Remove reveal endpoint (hashed keys can't be revealed) - Mask master key in startup logs (show first 4 chars only) - Debounce flush() to avoid blocking event loop on rapid writes - Restore diagnosticLog calls in lineage.ts and telemetryStore.record() in server.ts fix: backward compat, restore ANTHROPIC_API_KEY, add tests - Restore CLAUDE_PROXY_PORT/CLAUDE_PROXY_HOST (override ANTHROPIC_BASE_URL) - Make auth opt-in: only enforced when CLAUDE_PROXY_MASTER_KEY is set - Restore ANTHROPIC_API_KEY as static key auth (backward compatible) - Without CLAUDE_PROXY_MASTER_KEY, proxy runs in open-access mode - Restore telemetry test files - Add KeyStore tests: CRUD, hashing, rate limits, per-model tracking (12 tests) - Add admin routes tests: auth middleware, models, settings (7 tests) - Add OpenAI compat tests: model catalog, context windows, settings (4 tests) --- .github/workflows/release-please.yml | 2 +- .gitignore | 4 + ARCHITECTURE.md | 206 ------ DEFERRED.md | 17 - bin/cli.ts | 26 +- memory/user_code.md | 7 - src/__tests__/admin-routes.test.ts | 96 +++ src/__tests__/auth.test.ts | 58 ++ src/__tests__/env.test.ts | 34 + src/__tests__/key-store.test.ts | 151 +++++ src/__tests__/openai-compat.test.ts | 53 ++ src/__tests__/proxy-tool-forwarding.test.ts | 14 +- src/__tests__/settings.test.ts | 42 ++ src/keys/auth.ts | 121 ++++ src/keys/dashboard.ts | 691 ++++++++++++++++++++ src/keys/index.ts | 5 + src/keys/routes.ts | 149 +++++ src/keys/settings.ts | 75 +++ src/keys/store.ts | 331 ++++++++++ src/keys/types.ts | 47 ++ src/mcpTools.ts | 3 +- src/proxy/models.ts | 2 +- src/proxy/openai.ts | 626 ++++++++++++++++++ src/proxy/query.ts | 4 + src/proxy/server.ts | 337 +++++++--- src/proxy/session/cache.ts | 14 +- src/proxy/sessionStore.ts | 102 ++- src/telemetry/dashboard.ts | 9 +- src/telemetry/index.ts | 1 - 29 files changed, 2817 insertions(+), 410 deletions(-) delete mode 100644 ARCHITECTURE.md delete mode 100644 DEFERRED.md delete mode 100644 memory/user_code.md create mode 100644 src/__tests__/admin-routes.test.ts create mode 100644 src/__tests__/auth.test.ts create mode 100644 src/__tests__/env.test.ts create mode 100644 src/__tests__/key-store.test.ts create mode 100644 src/__tests__/openai-compat.test.ts create mode 100644 src/__tests__/settings.test.ts create mode 100644 src/keys/auth.ts create mode 100644 src/keys/dashboard.ts create mode 100644 src/keys/index.ts create mode 100644 src/keys/routes.ts create mode 100644 src/keys/settings.ts create mode 100644 src/keys/store.ts create mode 100644 src/keys/types.ts create mode 100644 src/proxy/openai.ts diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index db8bb37..8e8c2af 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -37,7 +37,7 @@ jobs: - uses: oven-sh/setup-bun@v2 - run: bun install - - run: npm test + - run: bun test - run: bun run build - run: npm publish --provenance --access public env: diff --git a/.gitignore b/.gitignore index dc09565..2b06456 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,7 @@ coverage/ *.temp context.md .omc/ + +# Local agent state +.swarm/ +memory/ diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index fc3cfbd..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,206 +0,0 @@ -# Architecture - -A transparent proxy that bridges OpenCode (Anthropic API format) to Claude Max (Agent SDK). This document defines the module structure, dependency rules, and design decisions. - -## Request Flow - -``` -Agent (OpenCode) ──► HTTP POST /v1/messages ──► Proxy Server - │ - ┌───────────┴───────────┐ - │ Session Resolution │ - │ (header or fingerprint)│ - └───────────┬───────────┘ - │ - ┌───────────┴───────────┐ - │ Lineage Verification │ - │ (continuation/compaction│ - │ /undo/diverged) │ - └───────────┬───────────┘ - │ - ┌───────────┴───────────┐ - │ Claude Agent SDK │ - │ query() with MCP │ - └───────────┬───────────┘ - │ - ┌───────────┴───────────┐ - │ Response Streaming │ - │ (SSE, tool_use filter) │ - └───────────┬───────────┘ - │ -Agent (OpenCode) ◄── SSE Response ◄─────────────────┘ -``` - -## Module Map - -``` -src/ -├── proxy/ -│ ├── server.ts ← HTTP layer: routes, SSE streaming, concurrency, request orchestration -│ ├── adapter.ts ← AgentAdapter interface (extensibility point for multi-agent support) -│ ├── adapters/ -│ │ └── opencode.ts ← OpenCode adapter (session headers, CWD extraction, tool config) -│ ├── query.ts ← SDK query options builder (shared between stream/non-stream paths) -│ ├── errors.ts ← Error classification (SDK errors → HTTP responses) -│ ├── models.ts ← Model mapping, Claude executable resolution -│ ├── tools.ts ← Tool blocking lists, MCP server name, allowed tools -│ ├── messages.ts ← Content normalization, message parsing -│ ├── types.ts ← ProxyConfig, ProxyInstance, ProxyServer types -│ ├── session/ -│ │ ├── index.ts ← Barrel export -│ │ ├── lineage.ts ← Pure functions: hashing, lineage verification -│ │ ├── fingerprint.ts ← Conversation fingerprinting, client CWD extraction -│ │ └── cache.ts ← LRU session caches, lookup/store operations -│ ├── sessionStore.ts ← Shared file store (cross-proxy session resume) -│ ├── agentDefs.ts ← Subagent definition extraction from tool descriptions -│ ├── agentMatch.ts ← Fuzzy agent name matching -│ └── passthroughTools.ts ← Tool forwarding mode (agent handles execution) -├── mcpTools.ts ← MCP tool definitions (read, write, edit, bash, glob, grep) -├── logger.ts ← Logging with AsyncLocalStorage context -├── utils/ -│ └── lruMap.ts ← Generic LRU map with eviction callbacks -├── telemetry/ -│ ├── index.ts ← Barrel export -│ ├── store.ts ← Request metrics storage -│ ├── routes.ts ← Telemetry API endpoints -│ ├── logStore.ts ← Diagnostic log ring buffer -│ ├── dashboard.ts ← HTML dashboard -│ └── types.ts ← Telemetry types -└── plugin/ - └── claude-max-headers.ts ← OpenCode plugin for session header injection -``` - -## Dependency Rules - -Dependencies flow **downward**. A module may only import from modules at the same level or below. - -``` -server.ts (HTTP layer) - │ - ├── adapter.ts (interface) - ├── adapters/opencode.ts ──► messages.ts, session/fingerprint.ts, tools.ts - ├── query.ts ──► adapter.ts, mcpTools.ts, passthroughTools.ts - ├── errors.ts - ├── models.ts - ├── tools.ts - ├── messages.ts - ├── session/cache.ts ──► session/lineage.ts ──► messages.ts - │ ──► session/fingerprint.ts - │ ──► sessionStore.ts - ├── agentDefs.ts - ├── agentMatch.ts - ├── passthroughTools.ts - ├── mcpTools.ts - └── telemetry/ -``` - -### Rules - -1. **`session/lineage.ts` is pure.** No side effects, no I/O, no caches. Only crypto hashing and comparison logic. Must stay testable without mocks. - -2. **`session/cache.ts` owns all mutable session state.** No other module should create or manage LRU caches for sessions. - -3. **`errors.ts`, `models.ts`, `tools.ts`, `messages.ts` are leaf modules.** They must not import from `server.ts`, `session/`, or `adapter.ts`. - -4. **`server.ts` is the only module that imports from Hono** or touches HTTP concerns. - -5. **No circular dependencies.** If you need to share types, put them in `types.ts` or the relevant leaf module. - -6. **`adapter.ts` is an interface only.** No implementation logic. Adapter implementations go in `adapters/`. - -7. **`query.ts` builds SDK options through the adapter interface**, never importing tool constants directly. - -## Agent Adapter Pattern - -Agent-specific behavior is isolated behind the `AgentAdapter` interface (`adapter.ts`). The proxy calls adapter methods instead of hardcoding agent logic. - -### Current Adapters - -- **`adapters/opencode.ts`** — OpenCode agent (session headers, `` block parsing, tool mappings) - -### Adding a New Agent - -1. Create `adapters/myagent.ts` implementing `AgentAdapter` -2. Wire it into `server.ts` (currently hardcoded to `openCodeAdapter`; future work will auto-detect) -3. No changes needed to `query.ts`, `session/`, or other infrastructure - -### What the Adapter Controls - -| Method | What It Does | -|--------|-------------| -| `getSessionId(c)` | Extract session ID from request headers | -| `extractWorkingDirectory(body)` | Parse working directory from request body | -| `normalizeContent(content)` | Normalize message content for hashing | -| `getBlockedBuiltinTools()` | SDK tools replaced by agent's MCP equivalents | -| `getAgentIncompatibleTools()` | SDK tools with no agent equivalent | -| `getMcpServerName()` | MCP server name for tool registration | -| `getAllowedMcpTools()` | MCP tools allowed through the proxy | - -### Remaining OpenCode-Specific Code (Not Yet in Adapter) - -| Logic | Location | Status | -|-------|----------|--------| -| `buildAgentDefinitions` | `agentDefs.ts` | Parses OpenCode Task tool format. To be adapter method. | -| Passthrough mode | `passthroughTools.ts` | Agent-agnostic but OpenCode-motivated. Keep as-is. | -| `ALLOWED_MCP_TOOLS` usage in `server.ts` | Line ~176 | Used for `buildAgentDefinitions`. Move when adapter handles agent defs. | - -## Session Management - -Sessions map an agent's conversation ID to a Claude SDK session ID. Two caches work in tandem: - -- **Session cache**: keyed by agent header (`x-opencode-session`) -- **Fingerprint cache**: keyed by hash of first user message + working directory (fallback when no header) - -Both are LRU with coordinated eviction — evicting from one removes the corresponding entry in the other. - -### Lineage Verification - -Every request verifies that incoming messages are a valid continuation of the cached session: - -| Classification | Condition | Action | -|---------------|-----------|--------| -| **Continuation** | Prefix hash matches stored | Resume normally | -| **Compaction** | Suffix preserved, beginning changed | Resume (agent summarized old messages) | -| **Undo** | Prefix preserved, suffix changed | Fork at rollback point | -| **Diverged** | No meaningful overlap | Start fresh session | - -## Testing Strategy - -Three tiers, each catching different classes of bugs: - -| Tier | Files | SDK | Speed | Runs In | -|------|-------|-----|-------|---------| -| **Unit** | `src/__tests__/*-unit.test.ts` | None | Fast | CI (`bun test`) | -| **Integration** | `src/__tests__/proxy-*.test.ts` | Mocked | Fast | CI (`bun test`) | -| **E2E** | `E2E.md` | Real (Claude Max) | Slow | Manual, pre-release | - -- **Unit tests**: Pure functions, no mocks, no I/O. -- **Integration tests**: HTTP layer with mocked SDK. Deterministic. -- **E2E tests**: Real proxy + real SDK + real Claude Max. See [`E2E.md`](./E2E.md) for runnable procedures covering session continuation, undo, compaction, cross-proxy resume, tool loops, streaming, and telemetry. - -All tests import from source modules, not build output. -Tests that need `clearSessionCache` or `createProxyServer` import from `../proxy/server`. - -### Test Baseline - -Every change must pass all existing unit and integration tests: - -```bash -npm test # runs: bun test -``` - -E2E tests (`E2E.md`) should be run before releases or after major refactors. - -## Adding New Code - -### New pure logic (no I/O, no state) -→ Create a new leaf module in `src/proxy/`. Add unit tests. - -### New stateful logic (caches, stores) -→ Add to the appropriate existing module (`session/cache.ts`, `sessionStore.ts`). Don't create new caches elsewhere. - -### New HTTP endpoints -→ Add to `server.ts`. Keep route handlers thin — delegate to extracted modules. - -### New agent support -→ Implement `AgentAdapter` in `src/proxy/adapters/`. See `adapters/opencode.ts` for reference. Do not hardcode agent-specific logic in leaf modules. diff --git a/DEFERRED.md b/DEFERRED.md deleted file mode 100644 index 51f1108..0000000 --- a/DEFERRED.md +++ /dev/null @@ -1,17 +0,0 @@ -# Deferred Items - -Items identified during architectural refactor planning that are intentionally deferred to separate PRs. - -## Tooling & Config -1. **Biome linting/formatting** — Add with clean project-specific config (not copy-pasted). Separate PR. -2. **Dependency classification fix** — Move `hono`, `@hono/node-server` from devDeps to production deps. -3. **Docker directory reorganization** — Move Docker files to `docker/`. Needs migration docs. -4. **`src/index.ts` barrel export** — Single entry point for npm consumers. Needs backwards-compat analysis. - -## Deprecation Paths -5. **`bin/oc.sh` deprecation** — Evaluate whether to deprecate. Needs user communication first. -6. **`claude-max-headers.ts` plugin deprecation** — Needs migration path for users who have it configured. - -## Feature Enhancements -7. **`prepareMessages` / prompt builder extraction** — Centralize Anthropic messages → text prompt conversion. Fits into adapter pattern as `preparePrompt()`. -8. **`maxTurns` configurability** — Currently hardcoded to 200. Should be configurable via env var or adapter config. diff --git a/bin/cli.ts b/bin/cli.ts index a8faca7..c319b45 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -1,6 +1,7 @@ #!/usr/bin/env node import { startProxyServer } from "../src/proxy/server" +import { env } from "../src/env" import { exec as execCallback } from "child_process" import { promisify } from "util" @@ -14,9 +15,28 @@ process.on("unhandledRejection", (reason) => { console.error(`[PROXY] Unhandled rejection (recovered): ${reason instanceof Error ? reason.message : reason}`) }) -const port = parseInt(process.env.CLAUDE_PROXY_PORT || "3456", 10) -const host = process.env.CLAUDE_PROXY_HOST || "127.0.0.1" -const idleTimeoutSeconds = parseInt(process.env.CLAUDE_PROXY_IDLE_TIMEOUT_SECONDS || "120", 10) +// Port/host resolution priority: +// 1. CLAUDE_PROXY_PORT / CLAUDE_PROXY_HOST (explicit, highest priority) +// 2. ANTHROPIC_BASE_URL (parsed from URL) +// 3. Defaults (127.0.0.1:3456) +let host = "127.0.0.1" +let port = 3456 +const baseUrl = process.env.ANTHROPIC_BASE_URL +if (baseUrl) { + try { + const url = new URL(baseUrl) + host = url.hostname + if (url.port) port = parseInt(url.port, 10) + } catch { + console.error(`\x1b[33m⚠ Invalid ANTHROPIC_BASE_URL: ${baseUrl}, using defaults\x1b[0m`) + } +} +// Explicit env vars override ANTHROPIC_BASE_URL +const envPort = env("PORT") +const envHost = env("HOST") +if (envPort) port = parseInt(envPort, 10) +if (envHost) host = envHost +const idleTimeoutSeconds = parseInt(env("IDLE_TIMEOUT_SECONDS") || "120", 10) export async function runCli( start = startProxyServer, diff --git a/memory/user_code.md b/memory/user_code.md deleted file mode 100644 index 38e31a3..0000000 --- a/memory/user_code.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -name: User Code -description: Code provided by user to remember -type: user ---- - -Code: CRUSH_CONT_99 diff --git a/src/__tests__/admin-routes.test.ts b/src/__tests__/admin-routes.test.ts new file mode 100644 index 0000000..1056f1b --- /dev/null +++ b/src/__tests__/admin-routes.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test" +import { Hono } from "hono" +import { createAdminRoutes } from "../keys/routes" +import { initAdmin, generateJwt } from "../keys/auth" +import { mkdtempSync, rmSync } from "fs" +import { join } from "path" +import { tmpdir } from "os" + +describe("Admin routes auth", () => { + const MASTER_KEY = "test-master-key-123" + let app: Hono + let jwt: string + let tmpDir: string + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "admin-routes-test-")) + process.env.MERIDIAN_ADMIN_FILE = join(tmpDir, "admin.json") + process.env.MERIDIAN_MASTER_KEY = MASTER_KEY + initAdmin() + jwt = generateJwt() + app = new Hono() + app.route("/admin", createAdminRoutes()) + }) + + afterEach(() => { + delete process.env.MERIDIAN_ADMIN_FILE + delete process.env.MERIDIAN_MASTER_KEY + rmSync(tmpDir, { recursive: true, force: true }) + }) + + it("serves dashboard HTML without auth", async () => { + const res = await app.fetch(new Request("http://localhost/admin")) + expect(res.status).toBe(200) + const text = await res.text() + expect(text).toContain("Admin Dashboard") + }) + + it("rejects API calls without JWT", async () => { + const res = await app.fetch(new Request("http://localhost/admin/keys")) + expect(res.status).toBe(401) + }) + + it("accepts API calls with valid JWT", async () => { + const res = await app.fetch(new Request("http://localhost/admin/keys", { + headers: { "Authorization": `Bearer ${jwt}` } + })) + expect(res.status).toBe(200) + }) + + it("rejects API calls with invalid JWT", async () => { + const res = await app.fetch(new Request("http://localhost/admin/keys", { + headers: { "Authorization": "Bearer invalid.jwt.token" } + })) + expect(res.status).toBe(401) + }) + + it("login returns JWT with valid master key", async () => { + const res = await app.fetch(new Request("http://localhost/admin/login", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ key: MASTER_KEY }), + })) + expect(res.status).toBe(200) + const data = await res.json() as any + expect(data.token).toBeTruthy() + expect(data.token.split(".")).toHaveLength(3) + }) + + it("login rejects wrong master key", async () => { + const res = await app.fetch(new Request("http://localhost/admin/login", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ key: "wrong-key" }), + })) + expect(res.status).toBe(401) + }) + + it("GET /admin/models returns model catalog", async () => { + const res = await app.fetch(new Request("http://localhost/admin/models", { + headers: { "Authorization": `Bearer ${jwt}` } + })) + expect(res.status).toBe(200) + const models = await res.json() as any + expect(Array.isArray(models)).toBe(true) + expect(models.length).toBeGreaterThan(0) + }) + + it("GET /admin/settings returns settings", async () => { + const res = await app.fetch(new Request("http://localhost/admin/settings", { + headers: { "Authorization": `Bearer ${jwt}` } + })) + expect(res.status).toBe(200) + const settings = await res.json() as any + expect(settings.maxConcurrent).toBeGreaterThan(0) + }) +}) diff --git a/src/__tests__/auth.test.ts b/src/__tests__/auth.test.ts new file mode 100644 index 0000000..bb30908 --- /dev/null +++ b/src/__tests__/auth.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test" +import { initAdmin, isAdminConfigured, verifyMasterKey, generateJwt, verifyJwt } from "../keys/auth" +import { mkdtempSync, rmSync } from "fs" +import { join } from "path" +import { tmpdir } from "os" + +describe("Admin auth", () => { + let tmpDir: string + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "auth-test-")) + process.env.MERIDIAN_ADMIN_FILE = join(tmpDir, "admin.json") + process.env.MERIDIAN_MASTER_KEY = "test-master-key-123" + initAdmin() + }) + + afterEach(() => { + delete process.env.MERIDIAN_ADMIN_FILE + delete process.env.MERIDIAN_MASTER_KEY + rmSync(tmpDir, { recursive: true, force: true }) + }) + + it("configures admin on init", () => { + expect(isAdminConfigured()).toBe(true) + }) + + it("verifies correct master key", () => { + expect(verifyMasterKey("test-master-key-123")).toBe(true) + }) + + it("rejects wrong master key", () => { + expect(verifyMasterKey("wrong-key")).toBe(false) + }) + + it("generates a valid JWT", () => { + const jwt = generateJwt() + expect(jwt.split(".")).toHaveLength(3) + expect(verifyJwt(jwt)).toBe(true) + }) + + it("rejects invalid JWT", () => { + expect(verifyJwt("invalid.token.here")).toBe(false) + expect(verifyJwt("")).toBe(false) + expect(verifyJwt("abc")).toBe(false) + }) + + it("rejects expired JWT", () => { + // Monkey-patch Date.now to make a token that's already expired + const jwt = generateJwt() + // Tamper with payload to set exp in the past + const parts = jwt.split(".") + const payload = JSON.parse(Buffer.from(parts[1]!, "base64url").toString()) + payload.exp = Math.floor(Date.now() / 1000) - 3600 + const newPayload = Buffer.from(JSON.stringify(payload)).toString("base64url") + const tampered = `${parts[0]}.${newPayload}.${parts[2]}` + expect(verifyJwt(tampered)).toBe(false) // signature mismatch + }) +}) diff --git a/src/__tests__/env.test.ts b/src/__tests__/env.test.ts new file mode 100644 index 0000000..871eefb --- /dev/null +++ b/src/__tests__/env.test.ts @@ -0,0 +1,34 @@ +import { describe, it, expect, afterEach } from "bun:test" +import { env } from "../env" + +describe("env() helper", () => { + afterEach(() => { + delete process.env.MERIDIAN_TEST_VAR + delete process.env.CLAUDE_PROXY_TEST_VAR + }) + + it("returns undefined when neither is set", () => { + expect(env("TEST_VAR")).toBeUndefined() + }) + + it("reads CLAUDE_PROXY_ prefix", () => { + process.env.CLAUDE_PROXY_TEST_VAR = "old-value" + expect(env("TEST_VAR")).toBe("old-value") + }) + + it("reads MERIDIAN_ prefix", () => { + process.env.MERIDIAN_TEST_VAR = "new-value" + expect(env("TEST_VAR")).toBe("new-value") + }) + + it("MERIDIAN_ takes priority over CLAUDE_PROXY_", () => { + process.env.CLAUDE_PROXY_TEST_VAR = "old" + process.env.MERIDIAN_TEST_VAR = "new" + expect(env("TEST_VAR")).toBe("new") + }) + + it("falls back to CLAUDE_PROXY_ when MERIDIAN_ is not set", () => { + process.env.CLAUDE_PROXY_TEST_VAR = "fallback" + expect(env("TEST_VAR")).toBe("fallback") + }) +}) diff --git a/src/__tests__/key-store.test.ts b/src/__tests__/key-store.test.ts new file mode 100644 index 0000000..124e477 --- /dev/null +++ b/src/__tests__/key-store.test.ts @@ -0,0 +1,151 @@ +import { describe, it, expect, beforeEach, afterEach } from "bun:test" +import { KeyStore } from "../keys/store" +import { mkdtempSync, rmSync } from "fs" +import { join } from "path" +import { tmpdir } from "os" + +describe("KeyStore", () => { + let store: KeyStore + let tmpDir: string + let filePath: string + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "keystore-test-")) + filePath = join(tmpDir, "keys.json") + store = new KeyStore(filePath) + }) + + afterEach(() => { + rmSync(tmpDir, { recursive: true, force: true }) + }) + + describe("CRUD", () => { + it("creates a key and returns raw secret once", () => { + const result = store.create("test-key") + expect(result.name).toBe("test-key") + expect(result.key).toMatch(/^sk-[a-f0-9]{48}$/) + expect(result.id).toBeTruthy() + }) + + it("validates a created key", () => { + const { key } = store.create("test-key") + const validated = store.validate(key) + expect(validated).not.toBeNull() + expect(validated!.name).toBe("test-key") + }) + + it("rejects invalid keys", () => { + expect(store.validate("sk-invalid")).toBeNull() + }) + + it("lists keys with masked secrets", () => { + store.create("key-1") + store.create("key-2") + const list = store.list() + expect(list).toHaveLength(2) + expect(list[0]!.key).toMatch(/^sk-[a-f0-9]{4}\.\.\./) + expect(list[0]!.key).not.toMatch(/^sk-[a-f0-9]{48}$/) + }) + + it("deletes a key", () => { + const { id, key } = store.create("to-delete") + expect(store.validate(key)).not.toBeNull() + expect(store.delete(id)).toBe(true) + expect(store.validate(key)).toBeNull() + }) + + it("toggles enabled/disabled", () => { + const { id, key } = store.create("toggle-me") + expect(store.validate(key)).not.toBeNull() + store.setEnabled(id, false) + expect(store.validate(key)).toBeNull() + store.setEnabled(id, true) + expect(store.validate(key)).not.toBeNull() + }) + + it("returns size", () => { + expect(store.size).toBe(0) + store.create("a") + store.create("b") + expect(store.size).toBe(2) + }) + }) + + describe("persistence", () => { + it("persists keys to disk and reloads", () => { + const created = store.create("persist-me") + // create uses flushNow (sync), so file is written immediately + const store2 = new KeyStore(filePath) + expect(store2.validate(created.key)).not.toBeNull() + expect(store2.size).toBe(1) + }) + + it("lists keys with masked secrets", () => { + const created = store.create("masked") + const list = store.list() + expect(list[0]!.key).not.toBe(created.key) + expect(list[0]!.key).toMatch(/^sk-.*\.\.\./) + }) + + it("reveals full key via reveal()", () => { + const created = store.create("revealable") + expect(store.reveal(created.id)).toBe(created.key) + }) + }) + + describe("rate limits", () => { + it("sets and checks limits", () => { + const { id, key } = store.create("limited") + store.setLimits(id, { limit6h: 1000, limitWeekly: 5000 }) + + // No usage yet — should pass + expect(store.checkLimits(key)).toBeNull() + + // Record usage just under limit + store.recordUsage(key, 400, 400) + expect(store.checkLimits(key)).toBeNull() + + // Record usage to exceed 6h limit + store.recordUsage(key, 100, 200) + expect(store.checkLimits(key)).toMatch(/6-hour token limit exceeded/) + }) + + it("returns null when no limits set", () => { + const { key } = store.create("unlimited") + store.recordUsage(key, 999999, 999999) + expect(store.checkLimits(key)).toBeNull() + }) + + it("enforces global limits", () => { + const { key } = store.create("global-limited") + store.recordUsage(key, 500, 500) + // No global limit — should pass + expect(store.checkLimits(key, { limit6h: 0, limitWeekly: 0 })).toBeNull() + // Global limit exceeded + expect(store.checkLimits(key, { limit6h: 500, limitWeekly: 0 })).toMatch(/Global 6-hour/) + // Global limit not exceeded + expect(store.checkLimits(key, { limit6h: 2000, limitWeekly: 0 })).toBeNull() + // Weekly limit exceeded + expect(store.checkLimits(key, { limit6h: 0, limitWeekly: 500 })).toMatch(/Global weekly/) + }) + + it("tracks per-model usage", () => { + const { key } = store.create("model-tracked") + store.recordUsage(key, 100, 50, "opus[1m]") + store.recordUsage(key, 200, 100, "sonnet[1m]") + const list = store.list() + const entry = list[0]! + expect(entry.modelUsage?.["opus[1m]"]?.inputTokens).toBe(100) + expect(entry.modelUsage?.["sonnet[1m]"]?.inputTokens).toBe(200) + }) + }) + + describe("usage windows", () => { + it("tracks rolling usage", () => { + const { key } = store.create("windowed") + store.recordUsage(key, 100, 100) + const SIX_HOURS = 6 * 60 * 60 * 1000 + expect(store.getUsageInWindow(key, SIX_HOURS)).toBe(200) + }) + }) +}) diff --git a/src/__tests__/openai-compat.test.ts b/src/__tests__/openai-compat.test.ts new file mode 100644 index 0000000..6fdbd9c --- /dev/null +++ b/src/__tests__/openai-compat.test.ts @@ -0,0 +1,53 @@ +import { describe, it, expect } from "bun:test" +import { MODEL_CATALOG } from "../proxy/openai" + +describe("OpenAI compatibility — model catalog", () => { + it("contains at least 3 models", () => { + expect(MODEL_CATALOG.length).toBeGreaterThanOrEqual(3) + }) + + it("has correct context windows", () => { + const opus = MODEL_CATALOG.find(m => m.id.includes("opus")) + const sonnet = MODEL_CATALOG.find(m => m.id.includes("sonnet")) + const haiku = MODEL_CATALOG.find(m => m.id.includes("haiku")) + + expect(opus).toBeTruthy() + expect(sonnet).toBeTruthy() + expect(haiku).toBeTruthy() + + // Opus gets 1M via SDK [1m] suffix + expect(opus!.contextWindow).toBe(1000000) + // Sonnet and Haiku at 200K + expect(sonnet!.contextWindow).toBe(200000) + expect(haiku!.contextWindow).toBe(200000) + }) + + it("each model has required fields", () => { + for (const m of MODEL_CATALOG) { + expect(m.id).toBeTruthy() + expect(m.claudeModel).toBeTruthy() + expect(m.contextWindow).toBeGreaterThan(0) + expect(m.maxOutput).toBeGreaterThan(0) + expect(Array.isArray(m.aliases)).toBe(true) + } + }) + + it("aliases resolve correctly", () => { + // Check that common aliases are present + const allAliases = MODEL_CATALOG.flatMap(m => m.aliases) + expect(allAliases).toContain("opus") + expect(allAliases).toContain("sonnet") + expect(allAliases).toContain("haiku") + }) +}) + +describe("OpenAI compatibility — settings persistence", () => { + it("getProxySettings returns defaults", async () => { + const { getProxySettings } = await import("../keys/settings") + const settings = getProxySettings() + expect(settings.maxConcurrent).toBeGreaterThan(0) + expect(typeof settings.passthrough).toBe("boolean") + expect(typeof settings.globalLimit6h).toBe("number") + expect(typeof settings.globalLimitWeekly).toBe("number") + }) +}) diff --git a/src/__tests__/proxy-tool-forwarding.test.ts b/src/__tests__/proxy-tool-forwarding.test.ts index 81d7ef8..1e4856f 100644 --- a/src/__tests__/proxy-tool-forwarding.test.ts +++ b/src/__tests__/proxy-tool-forwarding.test.ts @@ -322,17 +322,15 @@ describe("Proxy basics", () => { expect(body.endpoints).toContain("/messages") }) - it("should return landing page HTML on GET / from a browser", async () => { + it("should return JSON status on GET /", async () => { const app = createTestApp() - const req = new Request("http://localhost/", { - method: "GET", - headers: { "Accept": "text/html" }, - }) + const req = new Request("http://localhost/", { method: "GET" }) const response = await app.fetch(req) - const html = await response.text() + const body = await response.json() as any - expect(response.headers.get("content-type")).toContain("text/html") - expect(html).toContain("Meridian") + expect(response.headers.get("content-type")).toContain("application/json") + expect(body.service).toBe("meridian") + expect(body.status).toBe("ok") }) it("should accept requests on both /v1/messages and /messages", async () => { diff --git a/src/__tests__/settings.test.ts b/src/__tests__/settings.test.ts new file mode 100644 index 0000000..64ea676 --- /dev/null +++ b/src/__tests__/settings.test.ts @@ -0,0 +1,42 @@ +import { describe, it, expect } from "bun:test" +import { getProxySettings, updateProxySettings } from "../keys/settings" + +describe("ProxySettings", () => { + it("returns default settings", () => { + const s = getProxySettings() + expect(s.maxConcurrent).toBeGreaterThan(0) + expect(typeof s.passthrough).toBe("boolean") + expect(typeof s.globalLimit6h).toBe("number") + expect(typeof s.globalLimitWeekly).toBe("number") + }) + + it("updates maxConcurrent", () => { + const before = getProxySettings().maxConcurrent + const updated = updateProxySettings({ maxConcurrent: 42 }) + expect(updated.maxConcurrent).toBe(42) + // Restore + updateProxySettings({ maxConcurrent: before }) + }) + + it("clamps maxConcurrent to 1-100", () => { + const updated = updateProxySettings({ maxConcurrent: 999 }) + expect(updated.maxConcurrent).toBe(100) + const updated2 = updateProxySettings({ maxConcurrent: -5 }) + expect(updated2.maxConcurrent).toBe(1) + updateProxySettings({ maxConcurrent: 10 }) + }) + + it("updates passthrough", () => { + const before = getProxySettings().passthrough + const updated = updateProxySettings({ passthrough: !before }) + expect(updated.passthrough).toBe(!before) + updateProxySettings({ passthrough: before }) + }) + + it("updates global limits", () => { + const updated = updateProxySettings({ globalLimit6h: 50000, globalLimitWeekly: 200000 }) + expect(updated.globalLimit6h).toBe(50000) + expect(updated.globalLimitWeekly).toBe(200000) + updateProxySettings({ globalLimit6h: 0, globalLimitWeekly: 0 }) + }) +}) diff --git a/src/keys/auth.ts b/src/keys/auth.ts new file mode 100644 index 0000000..c548c87 --- /dev/null +++ b/src/keys/auth.ts @@ -0,0 +1,121 @@ +/** + * Admin authentication — master key hashing + JWT session tokens. + * + * Master key is hashed with SHA-256 and stored on disk. + * On login, input is hashed and compared. On success, a JWT is issued. + * The JWT is used for all subsequent admin API calls. + */ + +import { createHash, createHmac } from "node:crypto" +import { readFileSync, writeFileSync, mkdirSync, existsSync } from "node:fs" +import { dirname, resolve } from "node:path" +import { homedir } from "node:os" +import { env } from "../env" + +interface AdminConfig { + masterKeyHash: string +} + +const JWT_EXPIRY_HOURS = 24 + +function defaultAdminPath(): string { + return resolve(homedir(), ".claude-proxy", "admin.json") +} + +let config: AdminConfig | null = null +const filePath = env("ADMIN_FILE") ?? defaultAdminPath() + +function load(): AdminConfig | null { + try { + if (existsSync(filePath)) { + return JSON.parse(readFileSync(filePath, "utf-8")) + } + } catch {} + return null +} + +function save(cfg: AdminConfig): void { + const dir = dirname(filePath) + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }) + writeFileSync(filePath, JSON.stringify(cfg, null, 2)) +} + +function hashKey(key: string): string { + return createHash("sha256").update(key).digest("hex") +} + +/** Initialize admin config. If env var is set and no config exists, auto-setup. */ +export function initAdmin(): void { + config = load() + const envKey = env("MASTER_KEY") + + if (envKey && !config) { + config = { masterKeyHash: hashKey(envKey) } + save(config) + } else if (envKey && config) { + const newHash = hashKey(envKey) + if (newHash !== config.masterKeyHash) { + config.masterKeyHash = newHash + save(config) + } + } +} + +/** Check if admin is configured (master key hash exists). */ +export function isAdminConfigured(): boolean { + if (!config) config = load() + return config !== null && !!config.masterKeyHash +} + +/** Verify a master key against the stored hash. */ +export function verifyMasterKey(key: string): boolean { + if (!config) config = load() + if (!config) return false + return hashKey(key) === config.masterKeyHash +} + +/** Generate a JWT token for an authenticated admin session. */ +export function generateJwt(): string { + if (!config) throw new Error("Admin not configured") + + const header = Buffer.from(JSON.stringify({ alg: "HS256", typ: "JWT" })).toString("base64url") + const payload = Buffer.from(JSON.stringify({ + exp: Math.floor(Date.now() / 1000) + JWT_EXPIRY_HOURS * 3600, + iat: Math.floor(Date.now() / 1000), + sub: "admin", + })).toString("base64url") + + const signature = createHmac("sha256", config.masterKeyHash) + .update(`${header}.${payload}`) + .digest("base64url") + + return `${header}.${payload}.${signature}` +} + +/** Verify a JWT token. Returns true if valid and not expired. */ +export function verifyJwt(token: string): boolean { + if (!config) config = load() + if (!config) return false + + const parts = token.split(".") + if (parts.length !== 3) return false + + const [header, payload, signature] = parts + + // Verify signature + const expected = createHmac("sha256", config.masterKeyHash) + .update(`${header}.${payload}`) + .digest("base64url") + + if (signature !== expected) return false + + // Check expiration + try { + const data = JSON.parse(Buffer.from(payload!, "base64url").toString()) + if (data.exp && data.exp < Math.floor(Date.now() / 1000)) return false + } catch { + return false + } + + return true +} diff --git a/src/keys/dashboard.ts b/src/keys/dashboard.ts new file mode 100644 index 0000000..4e376b9 --- /dev/null +++ b/src/keys/dashboard.ts @@ -0,0 +1,691 @@ +/** + * Admin dashboard — inline HTML for API key management. + * Self-contained, no CDN, no build step. + */ + +export const adminDashboardHtml = ` + + + + +Meridian — Admin + + + + +
+

Admin Dashboard

+
Enter your master key to continue
+ + +
+ + + +
+ + + +` diff --git a/src/keys/index.ts b/src/keys/index.ts new file mode 100644 index 0000000..5788906 --- /dev/null +++ b/src/keys/index.ts @@ -0,0 +1,5 @@ +export { keyStore, KeyStore } from "./store" +export { createAdminRoutes } from "./routes" +export { getProxySettings, updateProxySettings } from "./settings" +export { initAdmin, isAdminConfigured, verifyMasterKey, generateJwt, verifyJwt } from "./auth" +export type { ApiKey } from "./types" diff --git a/src/keys/routes.ts b/src/keys/routes.ts new file mode 100644 index 0000000..9144800 --- /dev/null +++ b/src/keys/routes.ts @@ -0,0 +1,149 @@ +/** + * Admin API routes for key management. + * + * All routes require CLAUDE_PROXY_MASTER_KEY in the Authorization header. + * + * GET /admin — Dashboard (HTML) + * GET /admin/keys — List all keys (JSON) + * POST /admin/keys — Create a key { name } + * DELETE /admin/keys/:id — Delete a key + * PATCH /admin/keys/:id — Update a key { enabled } + */ + +import { env } from "../env" +import { Hono } from "hono" +import { keyStore } from "./store" +import { adminDashboardHtml } from "./dashboard" +import { MODEL_CATALOG } from "../proxy/openai" +import { getProxySettings, updateProxySettings } from "./settings" +import { createTelemetryRoutes } from "../telemetry" +import { isAdminConfigured, verifyMasterKey, generateJwt, verifyJwt } from "./auth" + +export function createAdminRoutes() { + const routes = new Hono() + + // Admin auth middleware — JWT-based + routes.use("*", async (c, next) => { + const path = new URL(c.req.url).pathname + + // Allow dashboard HTML and login endpoint without auth + const isDashboard = path === "/admin" || path === "/admin/" + const isTelemetryDashboard = (path === "/admin/telemetry" || path === "/admin/telemetry/") && c.req.method === "GET" + const isLogin = (path === "/admin/login" || path === "/admin/login/") && c.req.method === "POST" + if ((isDashboard || isTelemetryDashboard) && c.req.method === "GET") return next() + if (isLogin) return next() + + if (!isAdminConfigured()) { + return c.json({ error: "Admin not configured. Set MERIDIAN_MASTER_KEY or CLAUDE_PROXY_MASTER_KEY." }, 503) + } + + const bearer = c.req.header("authorization")?.replace(/^Bearer\s+/i, "") + if (!bearer || !verifyJwt(bearer)) { + return c.json({ error: "Invalid or expired session. Please login again." }, 401) + } + return next() + }) + + // Dashboard + routes.get("/", (c) => c.html(adminDashboardHtml)) + + // Login — verify master key, return JWT + routes.post("/login", async (c) => { + const body = await c.req.json() + const key = body?.key + if (!key || typeof key !== "string") { + return c.json({ error: "Master key required" }, 400) + } + if (!isAdminConfigured()) { + return c.json({ error: "Admin not configured. Set MERIDIAN_MASTER_KEY." }, 503) + } + if (!verifyMasterKey(key)) { + return c.json({ error: "Invalid master key" }, 401) + } + return c.json({ token: generateJwt() }) + }) + + // Telemetry dashboard and API (nested under /admin/telemetry) + routes.route("/telemetry", createTelemetryRoutes()) + + // List available models + routes.get("/models", (c) => c.json(MODEL_CATALOG)) + + // Proxy settings + routes.get("/settings", (c) => c.json(getProxySettings())) + routes.patch("/settings", async (c) => { + const body = await c.req.json() + const updated = updateProxySettings(body) + return c.json(updated) + }) + + // Aggregate stats for a time window + routes.get("/stats", (c) => { + const windowMs = parseInt(c.req.query("window") || "0", 10) + return c.json(keyStore.getAggregateStats(windowMs)) + }) + + // List keys (optional ?window= for windowed usage stats) + routes.get("/keys", (c) => { + const windowMs = parseInt(c.req.query("window") || "0", 10) + return c.json(keyStore.list(windowMs)) + }) + + // Create key + routes.post("/keys", async (c) => { + const body = await c.req.json() + const name = body?.name + if (!name || typeof name !== "string") { + return c.json({ error: "name is required" }, 400) + } + const key = keyStore.create(name.trim()) + // Return the full key (unmasked) only on creation + return c.json(key, 201) + }) + + // Reveal full key (for copy) + routes.get("/keys/:id/reveal", (c) => { + const key = keyStore.reveal(c.req.param("id")) + if (!key) return c.json({ error: "Key not found" }, 404) + return c.json({ key }) + }) + + // Delete key + routes.delete("/keys/:id", (c) => { + const deleted = keyStore.delete(c.req.param("id")) + if (!deleted) return c.json({ error: "Key not found" }, 404) + return c.json({ ok: true }) + }) + + // Update key (toggle enabled or set limits) + routes.patch("/keys/:id", async (c) => { + const body = await c.req.json() + const id = c.req.param("id") + + if (typeof body?.enabled === "boolean") { + const updated = keyStore.setEnabled(id, body.enabled) + if (!updated) return c.json({ error: "Key not found" }, 404) + } + if (body?.limits) { + const updated = keyStore.setLimits(id, body.limits) + if (!updated) return c.json({ error: "Key not found" }, 404) + } + + return c.json(keyStore.get(id)) + }) + + // Get usage windows for a key + routes.get("/keys/:id/usage", (c) => { + const id = c.req.param("id") + const key = keyStore.get(id) + if (!key) return c.json({ error: "Key not found" }, 404) + return c.json({ + used6h: key.used6h || 0, + usedWeekly: key.usedWeekly || 0, + limit6h: key.limits?.limit6h || 0, + limitWeekly: key.limits?.limitWeekly || 0, + }) + }) + + return routes +} diff --git a/src/keys/settings.ts b/src/keys/settings.ts new file mode 100644 index 0000000..2908d16 --- /dev/null +++ b/src/keys/settings.ts @@ -0,0 +1,75 @@ +/** + * Runtime proxy settings — persisted to disk, modifiable via admin API. + */ + +import { env } from "../env" +import { readFileSync, writeFileSync, mkdirSync, existsSync } from "node:fs" +import { dirname, resolve } from "node:path" +import { homedir } from "node:os" + +export interface ProxySettings { + maxConcurrent: number + passthrough: boolean + /** Global token limit per 6-hour window (from Claude Max subscription) */ + globalLimit6h: number + /** Global token limit per weekly window (from Claude Max subscription) */ + globalLimitWeekly: number + /** Idle timeout in minutes — abort SDK subprocess if no events for this long (0 = disabled) */ + idleTimeoutMinutes: number +} + +const DEFAULTS: ProxySettings = { + maxConcurrent: parseInt(env("MAX_CONCURRENT") || "10", 10), + passthrough: env("PASSTHROUGH") === "1", + globalLimit6h: 0, + globalLimitWeekly: 0, + idleTimeoutMinutes: 10, +} + +function defaultSettingsPath(): string { + return resolve(homedir(), ".claude-proxy", "settings.json") +} + +let settings: ProxySettings = { ...DEFAULTS } +const filePath = env("SETTINGS_FILE") ?? defaultSettingsPath() + +// Load on import +try { + if (existsSync(filePath)) { + const data = JSON.parse(readFileSync(filePath, "utf-8")) + settings = { ...DEFAULTS, ...data } + } +} catch { + // Start with defaults +} + +function flush(): void { + const dir = dirname(filePath) + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }) + writeFileSync(filePath, JSON.stringify(settings, null, 2)) +} + +export function getProxySettings(): ProxySettings { + return { ...settings } +} + +export function updateProxySettings(updates: Partial): ProxySettings { + if (updates.maxConcurrent != null) { + const val = Math.max(1, Math.min(100, Math.floor(updates.maxConcurrent))) + settings.maxConcurrent = val + } + if (updates.passthrough != null) { + settings.passthrough = Boolean(updates.passthrough) + } + if (updates.globalLimit6h != null) { + settings.globalLimit6h = Math.max(0, Math.floor(updates.globalLimit6h)) + } + if (updates.globalLimitWeekly != null) { + settings.globalLimitWeekly = Math.max(0, Math.floor(updates.globalLimitWeekly)) + } + if (updates.idleTimeoutMinutes != null) { + settings.idleTimeoutMinutes = Math.max(0, Math.min(60, Math.floor(updates.idleTimeoutMinutes))) + } + flush() + return { ...settings } +} diff --git a/src/keys/store.ts b/src/keys/store.ts new file mode 100644 index 0000000..87f5ae0 --- /dev/null +++ b/src/keys/store.ts @@ -0,0 +1,331 @@ +/** + * JSON-file-backed API key store. + * + * Keys are held in memory for fast auth checks and flushed to disk on mutation. + * File path: CLAUDE_PROXY_KEYS_FILE or ~/.claude-proxy/keys.json + */ + +import { env } from "../env" +import { readFileSync, writeFileSync, mkdirSync, existsSync } from "node:fs" +import { dirname, resolve } from "node:path" +import { homedir } from "node:os" +import { randomUUID, randomBytes } from "node:crypto" +import type { ApiKey } from "./types" + +function defaultKeysPath(): string { + return resolve(homedir(), ".claude-proxy", "keys.json") +} + +export class KeyStore { + private keys: Map = new Map() + /** Index: api key string → key id (for O(1) auth lookups) */ + private keyIndex: Map = new Map() + private readonly filePath: string + private flushTimer: ReturnType | null = null + + constructor(filePath?: string) { + this.filePath = filePath ?? env("KEYS_FILE") ?? defaultKeysPath() + this.load() + } + + private load(): void { + try { + if (existsSync(this.filePath)) { + const data = JSON.parse(readFileSync(this.filePath, "utf-8")) as any[] + this.keys.clear() + this.keyIndex.clear() + for (const k of data) { + // Skip entries that were hashed (no plaintext key) — they can't be validated + if (!k.key && (k as any).keyHash) continue + if (!k.key) continue + this.keys.set(k.id, k as ApiKey) + this.keyIndex.set(k.key, k.id) + } + } + } catch { + // Start fresh if file is corrupted + } + } + + /** Synchronous flush. */ + private flushSync(): void { + const dir = dirname(this.filePath) + if (!existsSync(dir)) mkdirSync(dir, { recursive: true }) + writeFileSync(this.filePath, JSON.stringify([...this.keys.values()], null, 2)) + } + + /** Debounced flush — batches rapid writes (e.g., recordUsage on every request). */ + private flush(): void { + if (this.flushTimer) return + this.flushTimer = setTimeout(() => { + this.flushTimer = null + this.flushSync() + }, 100) + } + + /** Immediate flush — used for CRUD operations that need persistence now. */ + private flushNow(): void { + if (this.flushTimer) { + clearTimeout(this.flushTimer) + this.flushTimer = null + } + this.flushSync() + } + + /** Validate a key string. Returns the ApiKey if valid and enabled, null otherwise. */ + validate(keyString: string): ApiKey | null { + const id = this.keyIndex.get(keyString) + if (!id) return null + const key = this.keys.get(id) + if (!key || !key.enabled) return null + return key + } + + /** Create a new API key. Returns the created key (including the secret). */ + create(name: string): ApiKey { + const key: ApiKey = { + id: randomUUID(), + name, + key: `sk-${randomBytes(24).toString("hex")}`, + enabled: true, + createdAt: new Date().toISOString(), + lastUsedAt: null, + inputTokens: 0, + outputTokens: 0, + requestCount: 0, + } + this.keys.set(key.id, key) + this.keyIndex.set(key.key, key.id) + this.flushNow() + return key + } + + /** List all keys (secrets masked, with window usage). */ + list(windowMs: number = 0): Array & { used6h: number; usedWeekly: number; windowTokens: number; windowRequests: number }> { + const SIX_HOURS = 6 * 60 * 60 * 1000 + const ONE_WEEK = 7 * 24 * 60 * 60 * 1000 + return [...this.keys.values()].map((k) => { + const { usageLog, ...rest } = k + // Compute windowed usage per key + let windowTokens = 0, windowRequests = 0 + if (windowMs > 0 && usageLog) { + const cutoff = Date.now() - windowMs + for (const e of usageLog) { + if (e.timestamp > cutoff) { + windowTokens += e.tokens + windowRequests += 1 + } + } + } + // Compute windowed per-model breakdown + let windowModelUsage: Record = {} + if (windowMs > 0 && usageLog) { + const cutoff = Date.now() - windowMs + for (const e of usageLog) { + if (e.timestamp > cutoff) { + const modelName = e.model || "unknown" + const mu = windowModelUsage[modelName] ??= { inputTokens: 0, outputTokens: 0, requestCount: 0 } + mu.inputTokens += e.inputTokens || e.tokens || 0 + mu.outputTokens += e.outputTokens || 0 + mu.requestCount += 1 + } + } + } + + return { + ...rest, + key: k.key.slice(0, 7) + "..." + k.key.slice(-4), + modelUsage: windowMs > 0 ? windowModelUsage : (k.modelUsage || {}), + limits: k.limits || { limit6h: 0, limitWeekly: 0 }, + used6h: this.getUsageInWindow(k.key, SIX_HOURS), + usedWeekly: this.getUsageInWindow(k.key, ONE_WEEK), + windowTokens: windowMs > 0 ? windowTokens : (k.inputTokens + k.outputTokens), + windowRequests: windowMs > 0 ? windowRequests : k.requestCount, + } + }) + } + + /** Get a single key by ID (secret masked). */ + get(id: string): (Omit & { used6h: number; usedWeekly: number }) | null { + const k = this.keys.get(id) + if (!k) return null + const SIX_HOURS = 6 * 60 * 60 * 1000 + const ONE_WEEK = 7 * 24 * 60 * 60 * 1000 + const { usageLog, ...rest } = k + return { + ...rest, + key: k.key.slice(0, 7) + "..." + k.key.slice(-4), + used6h: this.getUsageInWindow(k.key, SIX_HOURS), + usedWeekly: this.getUsageInWindow(k.key, ONE_WEEK), + } + } + + /** Get the full unmasked key by ID. Only for admin reveal. */ + reveal(id: string): string | null { + const k = this.keys.get(id) + return k?.key ?? null + } + + /** Delete a key by ID. */ + delete(id: string): boolean { + const key = this.keys.get(id) + if (!key) return false + this.keyIndex.delete(key.key) + this.keys.delete(id) + this.flushNow() + return true + } + + /** Toggle enabled/disabled. */ + setEnabled(id: string, enabled: boolean): boolean { + const key = this.keys.get(id) + if (!key) return false + key.enabled = enabled + this.flushNow() + return true + } + + /** Record usage for a key (called after each request). */ + recordUsage(keyString: string, inputTokens: number, outputTokens: number, model?: string): void { + const id = this.keyIndex.get(keyString) + if (!id) return + const key = this.keys.get(id) + if (!key) return + key.inputTokens += inputTokens + key.outputTokens += outputTokens + key.requestCount += 1 + key.lastUsedAt = new Date().toISOString() + if (model) { + if (!key.modelUsage) key.modelUsage = {} + const mu = key.modelUsage[model] ??= { inputTokens: 0, outputTokens: 0, requestCount: 0 } + mu.inputTokens += inputTokens + mu.outputTokens += outputTokens + mu.requestCount += 1 + } + // Rolling usage log for rate limit enforcement + const totalTokens = inputTokens + outputTokens + if (totalTokens > 0) { + if (!key.usageLog) key.usageLog = [] + key.usageLog.push({ timestamp: Date.now(), tokens: totalTokens, inputTokens, outputTokens, model }) + // Prune entries older than 31 days + const monthAgo = Date.now() - 31 * 24 * 60 * 60 * 1000 + key.usageLog = key.usageLog.filter(e => e.timestamp > monthAgo) + } + this.flush() // debounced — doesn't block on every request + } + + /** Get token usage within a time window for a key. */ + getUsageInWindow(keyString: string, windowMs: number): number { + const id = this.keyIndex.get(keyString) + if (!id) return 0 + const key = this.keys.get(id) + if (!key?.usageLog) return 0 + const cutoff = Date.now() - windowMs + return key.usageLog + .filter(e => e.timestamp > cutoff) + .reduce((sum, e) => sum + e.tokens, 0) + } + + /** Check if a key has exceeded its per-key or global limits. Returns null if OK, or an error message. */ + checkLimits(keyString: string, globalLimits?: { limit6h: number; limitWeekly: number }): string | null { + const id = this.keyIndex.get(keyString) + if (!id) return null + const key = this.keys.get(id) + + const SIX_HOURS = 6 * 60 * 60 * 1000 + const ONE_WEEK = 7 * 24 * 60 * 60 * 1000 + + // Per-key limits + if (key?.limits) { + if (key.limits.limit6h > 0) { + const used = this.getUsageInWindow(keyString, SIX_HOURS) + if (used >= key.limits.limit6h) { + return `6-hour token limit exceeded (${used}/${key.limits.limit6h})` + } + } + if (key.limits.limitWeekly > 0) { + const used = this.getUsageInWindow(keyString, ONE_WEEK) + if (used >= key.limits.limitWeekly) { + return `Weekly token limit exceeded (${used}/${key.limits.limitWeekly})` + } + } + } + + // Global limits — sum across all keys + if (globalLimits) { + if (globalLimits.limit6h > 0) { + const globalUsed = this.getAggregateStats(SIX_HOURS) + const total = globalUsed.inputTokens + globalUsed.outputTokens + if (total >= globalLimits.limit6h) { + return `Global 6-hour token limit exceeded (${total}/${globalLimits.limit6h})` + } + } + if (globalLimits.limitWeekly > 0) { + const globalUsed = this.getAggregateStats(ONE_WEEK) + const total = globalUsed.inputTokens + globalUsed.outputTokens + if (total >= globalLimits.limitWeekly) { + return `Global weekly token limit exceeded (${total}/${globalLimits.limitWeekly})` + } + } + } + + return null + } + + /** Update limits for a key. */ + setLimits(id: string, limits: { limit6h?: number; limitWeekly?: number }): boolean { + const key = this.keys.get(id) + if (!key) return false + if (!key.limits) key.limits = { limit6h: 0, limitWeekly: 0 } + if (limits.limit6h != null) key.limits.limit6h = Math.max(0, Math.floor(limits.limit6h)) + if (limits.limitWeekly != null) key.limits.limitWeekly = Math.max(0, Math.floor(limits.limitWeekly)) + this.flushNow() + return true + } + + /** Aggregate stats across all keys for a given time window (0 = all time). */ + getAggregateStats(windowMs: number): { requests: number; inputTokens: number; outputTokens: number } { + if (windowMs === 0) { + // All time — use cumulative totals + let requests = 0, inputTokens = 0, outputTokens = 0 + for (const k of this.keys.values()) { + requests += k.requestCount + inputTokens += k.inputTokens + outputTokens += k.outputTokens + } + return { requests, inputTokens, outputTokens } + } + // Windowed — sum from usageLogs + const cutoff = Date.now() - windowMs + let inputTokens = 0, outputTokens = 0, requests = 0 + for (const k of this.keys.values()) { + if (!k.usageLog) continue + for (const e of k.usageLog) { + if (e.timestamp > cutoff) { + if (e.inputTokens != null) { + inputTokens += e.inputTokens + outputTokens += e.outputTokens || 0 + } else { + // Legacy entries without split — attribute all to input + inputTokens += e.tokens + } + requests += 1 + } + } + } + return { requests, inputTokens, outputTokens } + } + + /** Total number of keys. */ + get size(): number { + return this.keys.size + } + + /** Reload keys from disk (useful if file was edited externally). */ + reload(): void { + this.load() + } +} + +/** Singleton instance used by the proxy. */ +export const keyStore = new KeyStore() diff --git a/src/keys/types.ts b/src/keys/types.ts new file mode 100644 index 0000000..1caa779 --- /dev/null +++ b/src/keys/types.ts @@ -0,0 +1,47 @@ +export interface ModelUsage { + inputTokens: number + outputTokens: number + requestCount: number +} + +export interface UsageEntry { + timestamp: number + tokens: number + inputTokens?: number + outputTokens?: number + model?: string +} + +export interface KeyLimits { + /** Max tokens per 6-hour window (0 = unlimited) */ + limit6h: number + /** Max tokens per weekly window (0 = unlimited) */ + limitWeekly: number +} + +export interface ApiKey { + /** Unique identifier */ + id: string + /** Human-readable label */ + name: string + /** The API key string (what clients send) */ + key: string + /** Whether this key is active */ + enabled: boolean + /** ISO timestamp */ + createdAt: string + /** ISO timestamp of last successful request */ + lastUsedAt: string | null + /** Cumulative input tokens */ + inputTokens: number + /** Cumulative output tokens */ + outputTokens: number + /** Total request count */ + requestCount: number + /** Per-model usage breakdown */ + modelUsage?: Record + /** Per-key rate limits */ + limits?: KeyLimits + /** Rolling usage entries for rate limit enforcement */ + usageLog?: UsageEntry[] +} diff --git a/src/mcpTools.ts b/src/mcpTools.ts index a89d17d..da7c7c1 100644 --- a/src/mcpTools.ts +++ b/src/mcpTools.ts @@ -8,7 +8,8 @@ import { glob as globLib } from "glob" const execAsync = promisify(exec) -const getCwd = () => process.env.MERIDIAN_WORKDIR ?? process.env.CLAUDE_PROXY_WORKDIR ?? process.cwd() +import { env } from "./env" +const getCwd = () => env("WORKDIR") || process.cwd() /** * Create a fresh MCP server instance per request. diff --git a/src/proxy/models.ts b/src/proxy/models.ts index 22fd27f..124869f 100644 --- a/src/proxy/models.ts +++ b/src/proxy/models.ts @@ -48,7 +48,7 @@ export function mapModelToClaudeModel(model: string, subscriptionType?: string | if (model.includes("opus")) return use1m ? "opus[1m]" : "opus" - const sonnetOverride = process.env.MERIDIAN_SONNET_MODEL ?? process.env.CLAUDE_PROXY_SONNET_MODEL + const sonnetOverride = (process.env.MERIDIAN_SONNET_MODEL ?? process.env.CLAUDE_PROXY_SONNET_MODEL) as ClaudeModel | undefined if (sonnetOverride === "sonnet" || sonnetOverride === "sonnet[1m]") return sonnetOverride if (!use1m) return "sonnet" diff --git a/src/proxy/openai.ts b/src/proxy/openai.ts new file mode 100644 index 0000000..02e4336 --- /dev/null +++ b/src/proxy/openai.ts @@ -0,0 +1,626 @@ +/** + * OpenAI API compatibility layer. + * + * Transcodes OpenAI-format requests to Anthropic format, forwards them + * to the internal /v1/messages handler, and converts responses back. + * + * Supported: + * POST /v1/chat/completions — Chat completions (main endpoint) + * POST /v1/completions — Legacy text completions + * POST /v1/responses — Responses API + * GET /v1/models — List available models + * GET /v1/models/:id — Get specific model + * + * Unsupported (returns 501): + * POST /v1/embeddings, /v1/audio/*, /v1/images/* + */ + +import { Hono } from "hono" +import type { Context } from "hono" + +// --------------------------------------------------------------------------- +// Model catalog +// --------------------------------------------------------------------------- + +export interface ModelEntry { + id: string + aliases: string[] + claudeModel: string + contextWindow: number + maxOutput: number +} + +// Claude Max available models — only confirmed working models +export const MODEL_CATALOG: ModelEntry[] = [ + { + id: "claude-opus-4-6", + aliases: ["opus", "opus-4.6", "claude-opus-4-6-20250610"], + claudeModel: "opus", + contextWindow: 1000000, + maxOutput: 16384, + }, + { + id: "claude-sonnet-4-6", + aliases: ["sonnet", "sonnet-4.6", "claude-sonnet-4-6-20250514"], + claudeModel: "sonnet", + contextWindow: 200000, + maxOutput: 16384, + }, + { + id: "claude-haiku-4-5", + aliases: ["haiku", "haiku-4.5", "claude-haiku-4-5-20251001"], + claudeModel: "haiku", + contextWindow: 200000, + maxOutput: 16384, + }, +] + +function resolveModel(model: string): ModelEntry { + const lower = model.toLowerCase() + for (const m of MODEL_CATALOG) { + if (m.id === lower || m.aliases.includes(lower)) return m + } + // Default to sonnet + return MODEL_CATALOG[0]! +} + +function toOpenAIModel(entry: ModelEntry): object { + return { + id: entry.id, + object: "model", + created: 1700000000, + owned_by: "anthropic", + permission: [], + root: entry.id, + parent: null, + } +} + +// --------------------------------------------------------------------------- +// Request transcoding: OpenAI → Anthropic +// --------------------------------------------------------------------------- + +interface OpenAIMessage { + role: "system" | "user" | "assistant" | "tool" | "function" + content: string | Array<{ type: string; text?: string; image_url?: { url: string } }> | null + name?: string + tool_calls?: Array<{ id: string; type: "function"; function: { name: string; arguments: string } }> + tool_call_id?: string +} + +function transcodeRequest(body: any): { anthropicBody: any; model: string } { + const messages: OpenAIMessage[] = body.messages || [] + const model = resolveModel(body.model || "sonnet") + + // Extract system messages + const systemParts: string[] = [] + const anthropicMessages: any[] = [] + + for (const msg of messages) { + if (msg.role === "system") { + const text = typeof msg.content === "string" + ? msg.content + : Array.isArray(msg.content) + ? msg.content.filter((b: any) => b.type === "text").map((b: any) => b.text).join("\n") + : "" + if (text) systemParts.push(text) + continue + } + + if (msg.role === "tool") { + // OpenAI tool result → Anthropic tool_result + anthropicMessages.push({ + role: "user", + content: [{ + type: "tool_result", + tool_use_id: msg.tool_call_id, + content: typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content), + }], + }) + continue + } + + if (msg.role === "assistant" && msg.tool_calls) { + // Assistant with tool_calls → Anthropic tool_use blocks + const content: any[] = [] + if (msg.content) { + content.push({ type: "text", text: typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content) }) + } + for (const tc of msg.tool_calls) { + content.push({ + type: "tool_use", + id: tc.id, + name: tc.function.name, + input: JSON.parse(tc.function.arguments || "{}"), + }) + } + anthropicMessages.push({ role: "assistant", content }) + continue + } + + // Regular user/assistant message + let content: any + if (typeof msg.content === "string") { + content = msg.content + } else if (Array.isArray(msg.content)) { + content = msg.content.map((block: any) => { + if (block.type === "text") return { type: "text", text: block.text } + if (block.type === "image_url" && block.image_url?.url) { + // Data URL → Anthropic image block + const url = block.image_url.url + if (url.startsWith("data:")) { + const match = url.match(/^data:(image\/\w+);base64,(.+)$/) + if (match) { + return { + type: "image", + source: { type: "base64", media_type: match[1], data: match[2] }, + } + } + } + // URL reference — pass as-is (Anthropic supports URL images) + return { type: "image", source: { type: "url", url } } + } + return { type: "text", text: JSON.stringify(block) } + }) + } else { + content = msg.content ?? "" + } + + anthropicMessages.push({ + role: msg.role === "function" ? "user" : msg.role, + content, + }) + } + + // Build Anthropic request body + const anthropicBody: any = { + model: model.id, + messages: anthropicMessages, + stream: body.stream ?? false, + } + + if (systemParts.length > 0) { + anthropicBody.system = systemParts.join("\n\n") + } + + // Map parameters + if (body.max_tokens != null) anthropicBody.max_tokens = body.max_tokens + if (body.max_completion_tokens != null) anthropicBody.max_tokens = body.max_completion_tokens + if (body.temperature != null) anthropicBody.temperature = body.temperature + if (body.top_p != null) anthropicBody.top_p = body.top_p + if (body.stop != null) anthropicBody.stop_sequences = Array.isArray(body.stop) ? body.stop : [body.stop] + + // Map tools + if (Array.isArray(body.tools)) { + anthropicBody.tools = body.tools.map((t: any) => ({ + name: t.function?.name || t.name, + description: t.function?.description || t.description || "", + input_schema: t.function?.parameters || t.parameters || { type: "object", properties: {} }, + })) + } + + return { anthropicBody, model: model.id } +} + +// --------------------------------------------------------------------------- +// Response transcoding: Anthropic → OpenAI +// --------------------------------------------------------------------------- + +function mapStopReason(reason: string | null): string { + if (reason === "tool_use") return "tool_calls" + if (reason === "max_tokens") return "length" + if (reason === "end_turn" || reason === "stop") return "stop" + return "stop" +} + +function transcodeResponse(anthropicResp: any, requestModel: string): object { + const content = anthropicResp.content || [] + + // Extract text and tool_use blocks + const textParts: string[] = [] + const toolCalls: any[] = [] + let toolIdx = 0 + + for (const block of content) { + if (block.type === "text" && block.text) { + textParts.push(block.text) + } else if (block.type === "tool_use") { + toolCalls.push({ + id: block.id, + type: "function", + index: toolIdx++, + function: { + name: block.name, + arguments: JSON.stringify(block.input || {}), + }, + }) + } + } + + const message: any = { + role: "assistant", + content: textParts.join("\n") || null, + } + if (toolCalls.length > 0) { + message.tool_calls = toolCalls + } + + return { + id: `chatcmpl-${anthropicResp.id || Date.now()}`, + object: "chat.completion", + created: Math.floor(Date.now() / 1000), + model: requestModel, + choices: [{ + index: 0, + message, + finish_reason: mapStopReason(anthropicResp.stop_reason), + }], + usage: { + prompt_tokens: anthropicResp.usage?.input_tokens || 0, + completion_tokens: anthropicResp.usage?.output_tokens || 0, + total_tokens: (anthropicResp.usage?.input_tokens || 0) + (anthropicResp.usage?.output_tokens || 0), + }, + } +} + +// --------------------------------------------------------------------------- +// Streaming transcoding: Anthropic SSE → OpenAI SSE +// --------------------------------------------------------------------------- + +function transcodeStreamChunk(eventType: string, data: any, requestModel: string, chatId: string): string | null { + const ts = Math.floor(Date.now() / 1000) + + if (eventType === "message_start") { + // Emit initial chunk with role + const chunk = { + id: chatId, + object: "chat.completion.chunk", + created: ts, + model: requestModel, + choices: [{ index: 0, delta: { role: "assistant", content: "" }, finish_reason: null }], + } + return `data: ${JSON.stringify(chunk)}\n\n` + } + + if (eventType === "content_block_start") { + const block = data.content_block + if (block?.type === "tool_use") { + const chunk = { + id: chatId, + object: "chat.completion.chunk", + created: ts, + model: requestModel, + choices: [{ + index: 0, + delta: { + tool_calls: [{ + index: data.index || 0, + id: block.id, + type: "function", + function: { name: block.name, arguments: "" }, + }], + }, + finish_reason: null, + }], + } + return `data: ${JSON.stringify(chunk)}\n\n` + } + return null + } + + if (eventType === "content_block_delta") { + const delta = data.delta + if (delta?.type === "text_delta" && delta.text) { + const chunk = { + id: chatId, + object: "chat.completion.chunk", + created: ts, + model: requestModel, + choices: [{ index: 0, delta: { content: delta.text }, finish_reason: null }], + } + return `data: ${JSON.stringify(chunk)}\n\n` + } + if (delta?.type === "input_json_delta" && delta.partial_json) { + const chunk = { + id: chatId, + object: "chat.completion.chunk", + created: ts, + model: requestModel, + choices: [{ + index: 0, + delta: { + tool_calls: [{ + index: data.index || 0, + function: { arguments: delta.partial_json }, + }], + }, + finish_reason: null, + }], + } + return `data: ${JSON.stringify(chunk)}\n\n` + } + return null + } + + if (eventType === "message_delta") { + const stopReason = data.delta?.stop_reason + const chunk = { + id: chatId, + object: "chat.completion.chunk", + created: ts, + model: requestModel, + choices: [{ index: 0, delta: {}, finish_reason: mapStopReason(stopReason) }], + usage: { + prompt_tokens: 0, + completion_tokens: data.usage?.output_tokens || 0, + total_tokens: data.usage?.output_tokens || 0, + }, + } + return `data: ${JSON.stringify(chunk)}\n\n` + } + + if (eventType === "message_stop") { + return `data: [DONE]\n\n` + } + + return null +} + +// --------------------------------------------------------------------------- +// Routes +// --------------------------------------------------------------------------- + +export function createOpenAIRoutes(internalFetch: (request: Request) => Response | Promise) { + const routes = new Hono() + + // GET /models + routes.get("/models", (c) => { + return c.json({ object: "list", data: MODEL_CATALOG.map(toOpenAIModel) }) + }) + + // GET /models/:id + routes.get("/models/:id", (c) => { + const id = c.req.param("id") + const entry = MODEL_CATALOG.find( + (m) => m.id === id || m.aliases.includes(id.toLowerCase()) + ) + if (!entry) { + return c.json({ error: { message: `Model '${id}' not found`, type: "invalid_request_error" } }, 404) + } + return c.json(toOpenAIModel(entry)) + }) + + // POST /chat/completions — main endpoint + routes.post("/chat/completions", async (c) => { + const body = await c.req.json() + const { anthropicBody, model: requestModel } = transcodeRequest(body) + const isStream = anthropicBody.stream + + // Forward auth headers from the original request + const headers: Record = { "Content-Type": "application/json" } + const auth = c.req.header("authorization") + const xApiKey = c.req.header("x-api-key") + if (auth) headers["Authorization"] = auth + if (xApiKey) headers["x-api-key"] = xApiKey + + // Forward OpenCode-specific headers + const openCodeSession = c.req.header("x-opencode-session") + if (openCodeSession) headers["x-opencode-session"] = openCodeSession + + const internalReq = new Request("http://internal/v1/messages", { + method: "POST", + headers, + body: JSON.stringify(anthropicBody), + }) + + const resp = await internalFetch(internalReq) + + if (!isStream) { + // Non-streaming: transcode the JSON response + const anthropicResp = await resp.json() as any + if (anthropicResp.error) { + return c.json({ error: { message: anthropicResp.error.message, type: anthropicResp.error.type } }, resp.status as any) + } + return c.json(transcodeResponse(anthropicResp, requestModel)) + } + + // Streaming: transcode Anthropic SSE → OpenAI SSE + const chatId = `chatcmpl-${Date.now()}` + const encoder = new TextEncoder() + + const readable = new ReadableStream({ + async start(controller) { + const reader = resp.body?.getReader() + if (!reader) { + controller.close() + return + } + + const decoder = new TextDecoder() + let buffer = "" + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + // Parse SSE lines + const lines = buffer.split("\n") + buffer = lines.pop() || "" // keep incomplete line + + let eventType = "" + for (const line of lines) { + if (line.startsWith("event: ")) { + eventType = line.slice(7).trim() + } else if (line.startsWith("data: ") && eventType) { + try { + const data = JSON.parse(line.slice(6)) + const chunk = transcodeStreamChunk(eventType, data, requestModel, chatId) + if (chunk) { + controller.enqueue(encoder.encode(chunk)) + } + } catch { + // Skip unparseable data + } + eventType = "" + } else if (line.startsWith(": ping")) { + // Forward keepalive + controller.enqueue(encoder.encode(": ping\n\n")) + } + } + } + + // Ensure [DONE] is sent + controller.enqueue(encoder.encode("data: [DONE]\n\n")) + } catch { + // Stream ended + } finally { + try { controller.close() } catch {} + } + }, + }) + + return new Response(readable, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + }) + }) + + // POST /completions — legacy text completions + routes.post("/completions", async (c) => { + const body = await c.req.json() + // Convert to chat format + const chatBody = { + ...body, + messages: [{ role: "user", content: body.prompt || "" }], + } + delete chatBody.prompt + + const { anthropicBody, model: requestModel } = transcodeRequest(chatBody) + anthropicBody.stream = false + + const headers: Record = { "Content-Type": "application/json" } + const auth = c.req.header("authorization") + const xApiKey = c.req.header("x-api-key") + if (auth) headers["Authorization"] = auth + if (xApiKey) headers["x-api-key"] = xApiKey + + const internalReq = new Request("http://internal/v1/messages", { + method: "POST", + headers, + body: JSON.stringify(anthropicBody), + }) + + const resp = await internalFetch(internalReq) + const anthropicResp = await resp.json() as any + + if (anthropicResp.error) { + return c.json({ error: { message: anthropicResp.error.message, type: anthropicResp.error.type } }, resp.status as any) + } + + const textParts = (anthropicResp.content || []) + .filter((b: any) => b.type === "text") + .map((b: any) => b.text) + + return c.json({ + id: `cmpl-${Date.now()}`, + object: "text_completion", + created: Math.floor(Date.now() / 1000), + model: requestModel, + choices: [{ + text: textParts.join(""), + index: 0, + finish_reason: mapStopReason(anthropicResp.stop_reason), + }], + usage: { + prompt_tokens: anthropicResp.usage?.input_tokens || 0, + completion_tokens: anthropicResp.usage?.output_tokens || 0, + total_tokens: (anthropicResp.usage?.input_tokens || 0) + (anthropicResp.usage?.output_tokens || 0), + }, + }) + }) + + // POST /responses — OpenAI Responses API (simplified) + routes.post("/responses", async (c) => { + const body = await c.req.json() + + // Responses API accepts either `input` (string) or `messages` array + let messages: any[] + if (typeof body.input === "string") { + messages = [{ role: "user", content: body.input }] + } else if (Array.isArray(body.input)) { + messages = body.input + } else { + messages = body.messages || [{ role: "user", content: "" }] + } + + const chatBody = { ...body, messages, stream: false } + + const { anthropicBody, model: requestModel } = transcodeRequest(chatBody) + + const headers: Record = { "Content-Type": "application/json" } + const auth = c.req.header("authorization") + const xApiKey = c.req.header("x-api-key") + if (auth) headers["Authorization"] = auth + if (xApiKey) headers["x-api-key"] = xApiKey + + const internalReq = new Request("http://internal/v1/messages", { + method: "POST", + headers, + body: JSON.stringify(anthropicBody), + }) + + const resp = await internalFetch(internalReq) + const anthropicResp = await resp.json() as any + + if (anthropicResp.error) { + return c.json({ error: { message: anthropicResp.error.message, type: anthropicResp.error.type } }, resp.status as any) + } + + // Build Responses API output format + const output: any[] = [] + for (const block of anthropicResp.content || []) { + if (block.type === "text") { + output.push({ type: "message", role: "assistant", content: [{ type: "output_text", text: block.text }] }) + } + } + + return c.json({ + id: `resp-${Date.now()}`, + object: "response", + created_at: Math.floor(Date.now() / 1000), + model: requestModel, + status: "completed", + output, + usage: { + input_tokens: anthropicResp.usage?.input_tokens || 0, + output_tokens: anthropicResp.usage?.output_tokens || 0, + total_tokens: (anthropicResp.usage?.input_tokens || 0) + (anthropicResp.usage?.output_tokens || 0), + }, + }) + }) + + // --- Unsupported endpoints (return 501 with clear message) --- + const unsupported = (name: string) => (c: Context) => + c.json({ + error: { + message: `${name} is not supported. Meridian only supports text generation.`, + type: "invalid_request_error", + code: "unsupported_endpoint", + }, + }, 501) + + routes.post("/embeddings", unsupported("Embeddings")) + routes.post("/audio/transcriptions", unsupported("Audio transcription")) + routes.post("/audio/speech", unsupported("Text-to-speech")) + routes.post("/images/generations", unsupported("Image generation")) + routes.post("/images/edits", unsupported("Image editing")) + routes.post("/images/variations", unsupported("Image variations")) + + return routes +} diff --git a/src/proxy/query.ts b/src/proxy/query.ts index 03cb049..bff88a8 100644 --- a/src/proxy/query.ts +++ b/src/proxy/query.ts @@ -40,6 +40,8 @@ export interface QueryContext { sdkHooks?: any /** The agent adapter providing tool configuration */ adapter: AgentAdapter + /** Abort controller for cancellation/timeout */ + abortController?: AbortController } /** @@ -52,6 +54,7 @@ export function buildQueryOptions(ctx: QueryContext) { prompt, model, workingDirectory, systemContext, claudeExecutable, passthrough, stream, sdkAgents, passthroughMcp, cleanEnv, resumeSessionId, isUndo, undoRollbackUuid, sdkHooks, adapter, + abortController, } = ctx const blockedTools = [...adapter.getBlockedBuiltinTools(), ...adapter.getAgentIncompatibleTools()] @@ -90,6 +93,7 @@ export function buildQueryOptions(ctx: QueryContext) { ...(resumeSessionId ? { resume: resumeSessionId } : {}), ...(isUndo ? { forkSession: true, ...(undoRollbackUuid ? { resumeSessionAt: undoRollbackUuid } : {}) } : {}), ...(sdkHooks ? { hooks: sdkHooks } : {}), + ...(abortController ? { abortController } : {}), } } } diff --git a/src/proxy/server.ts b/src/proxy/server.ts index 3e3324d..b4d2e1c 100644 --- a/src/proxy/server.ts +++ b/src/proxy/server.ts @@ -7,21 +7,26 @@ import type { Context } from "hono" import { DEFAULT_PROXY_CONFIG } from "./types" import type { ProxyConfig, ProxyInstance, ProxyServer } from "./types" export type { ProxyConfig, ProxyInstance, ProxyServer } +import { env } from "../env" import { claudeLog } from "../logger" import { exec as execCallback } from "child_process" +import { existsSync } from "fs" import { promisify } from "util" import { randomUUID } from "crypto" import { withClaudeLogContext } from "../logger" import { createPassthroughMcpServer, stripMcpPrefix, PASSTHROUGH_MCP_NAME, PASSTHROUGH_MCP_PREFIX } from "./passthroughTools" -import { telemetryStore, diagnosticLog, createTelemetryRoutes, landingHtml } from "../telemetry" -import type { RequestMetric } from "../telemetry" +import { telemetryStore, diagnosticLog } from "../telemetry" +import { keyStore, createAdminRoutes } from "../keys" +import { getProxySettings } from "../keys/settings" +import { initAdmin, isAdminConfigured, verifyJwt } from "../keys/auth" import { classifyError, isStaleSessionError, isRateLimitError } from "./errors" import { mapModelToClaudeModel, resolveClaudeExecutableAsync, isClosedControllerError, getClaudeAuthStatusAsync, hasExtendedContext, stripExtendedContext } from "./models" import { getLastUserMessage } from "./messages" import { detectAdapter } from "./adapters/detect" import { buildQueryOptions, type QueryContext } from "./query" +import { createOpenAIRoutes } from "./openai" import { computeLineageHash, hashMessage, @@ -36,16 +41,6 @@ export { computeLineageHash, hashMessage, computeMessageHashes } export { clearSessionCache, getMaxSessionsLimit } export type { LineageResult } - - - - - - - - - - const exec = promisify(execCallback) let claudeExecutable = "" @@ -123,40 +118,117 @@ function buildFreshPrompt( .join("\n\n") || "" } +type ProxyEnv = { Variables: { authKeyString?: string } } + export function createProxyServer(config: Partial = {}): ProxyServer { const finalConfig = { ...DEFAULT_PROXY_CONFIG, ...config } - const app = new Hono() + const app = new Hono() app.use("*", cors()) - app.get("/", (c) => { - // API clients get JSON, browsers get the landing page - const accept = c.req.header("accept") || "" - if (accept.includes("application/json") && !accept.includes("text/html")) { + // API key authentication middleware + // Auth is opt-in: only enforced when admin is configured (master key hash exists). + // Without it, the proxy works in open-access mode (backward compatible). + // When enabled: + // Admin routes: JWT (own middleware) + // Health: JWT + // All other endpoints: managed API key or ANTHROPIC_API_KEY static key + app.use("*", async (c, next) => { + const path = new URL(c.req.url).pathname + + // Public: root info only + if (path === "/") return next() + + // Admin routes have their own JWT auth middleware + if (path.startsWith("/admin")) return next() + + const staticKey = process.env.ANTHROPIC_API_KEY + + // No admin configured — open access (backward compatible) + if (!isAdminConfigured()) return next() + + // Health requires admin JWT + if (path === "/health") { + const bearer = c.req.header("authorization")?.replace(/^Bearer\s+/i, "") + if (!bearer || !verifyJwt(bearer)) { + return c.json({ error: { type: "authentication_error", message: "Admin authentication required." } }, 401) + } + return next() + } + + // All other endpoints: require auth + const bearer = c.req.header("authorization") + const xApiKey = c.req.header("x-api-key") + const providedKey = bearer?.startsWith("Bearer ") + ? bearer.slice(7) + : xApiKey + + if (!providedKey) { return c.json({ - status: "ok", - service: "meridian", - format: "anthropic", - endpoints: ["/v1/messages", "/messages", "/telemetry", "/health"] + error: { type: "authentication_error", message: "API key required. Provide via Authorization: Bearer or x-api-key header." } + }, 401) + } + + // Check static env key (ANTHROPIC_API_KEY) for backward compat + if (staticKey && providedKey === staticKey) { + return next() + } + + // Check managed key store + const managedKey = keyStore.validate(providedKey) + if (managedKey) { + // Check rate limits before allowing the request + const settings = getProxySettings() + const limitError = keyStore.checkLimits(providedKey, { + limit6h: settings.globalLimit6h, + limitWeekly: settings.globalLimitWeekly, }) + if (limitError) { + return c.json({ + error: { type: "rate_limit_error", message: limitError } + }, 429) + } + c.set("authKeyString", providedKey) + + return next() } - return c.html(landingHtml) + + return c.json({ + error: { type: "authentication_error", message: "Invalid API key." } + }, 401) + }) + + app.get("/", (c) => { + return c.json({ + status: "ok", + service: "meridian", + format: "anthropic", + endpoints: ["/v1/messages", "/messages", "/v1/chat/completions", "/v1/models", "/admin"] + }) }) // --- Concurrency Control --- // Each request spawns an SDK subprocess (cli.js, ~11MB). Spawning multiple // simultaneously can crash the process. Serialize SDK queries with a queue. - const MAX_CONCURRENT_SESSIONS = parseInt((process.env.MERIDIAN_MAX_CONCURRENT ?? process.env.CLAUDE_PROXY_MAX_CONCURRENT) || "10", 10) + let activeSessions = 0 const sessionQueue: Array<{ resolve: () => void }> = [] async function acquireSession(): Promise { - if (activeSessions < MAX_CONCURRENT_SESSIONS) { + if (activeSessions < getProxySettings().maxConcurrent) { activeSessions++ return } - return new Promise((resolve) => { - sessionQueue.push({ resolve }) + const QUEUE_TIMEOUT_MS = 120_000 // 2 minutes + return new Promise((resolve, reject) => { + const entry = { resolve, reject: undefined as (() => void) | undefined } + const timer = setTimeout(() => { + const idx = sessionQueue.indexOf(entry as any) + if (idx !== -1) sessionQueue.splice(idx, 1) + reject(new Error(`Queue timeout: waited ${QUEUE_TIMEOUT_MS / 1000}s for a session slot (active=${activeSessions}/${getProxySettings().maxConcurrent}, queued=${sessionQueue.length})`)) + }, QUEUE_TIMEOUT_MS) + ;(entry as any).resolve = () => { clearTimeout(timer); resolve() } + sessionQueue.push(entry as any) }) } @@ -170,7 +242,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe } const handleMessages = async ( - c: Context, + c: Context, requestMeta: { requestId: string; endpoint: string; queueEnteredAt: number; queueStartedAt: number } ) => { const requestStartAt = Date.now() @@ -182,7 +254,11 @@ export function createProxyServer(config: Partial = {}): ProxyServe let model = mapModelToClaudeModel(body.model || "sonnet", authStatus?.subscriptionType) const stream = body.stream ?? true const adapter = detectAdapter(c) - const workingDirectory = (process.env.MERIDIAN_WORKDIR ?? process.env.CLAUDE_PROXY_WORKDIR) || adapter.extractWorkingDirectory(body) || process.cwd() + const clientCwd = adapter.extractWorkingDirectory(body) + const fallbackCwd = env('WORKDIR') || process.cwd() + // SDK spawn cwd must exist on this machine. Remote clients may send paths + // that don't exist here (e.g., Windows paths on a Linux proxy). + const workingDirectory = (clientCwd && existsSync(clientCwd)) ? clientCwd : fallbackCwd // Strip env vars that would cause the SDK subprocess to loop back through // the proxy instead of using its native Claude Max auth. Also strip vars @@ -226,7 +302,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe }).join(" → ") const lineageType = lineageResult.type === "diverged" && !cachedSession ? "new" : lineageResult.type const msgCount = Array.isArray(body.messages) ? body.messages.length : 0 - const requestLogLine = `${requestMeta.requestId} model=${model} stream=${stream} tools=${body.tools?.length ?? 0} lineage=${lineageType} session=${resumeSessionId?.slice(0, 8) || "new"}${isUndo && undoRollbackUuid ? ` rollback=${undoRollbackUuid.slice(0, 8)}` : ""} active=${activeSessions}/${MAX_CONCURRENT_SESSIONS} msgCount=${msgCount}` + const requestLogLine = `${requestMeta.requestId} model=${model} stream=${stream} tools=${body.tools?.length ?? 0} lineage=${lineageType} session=${resumeSessionId?.slice(0, 8) || "new"}${isUndo && undoRollbackUuid ? ` rollback=${undoRollbackUuid.slice(0, 8)}` : ""} active=${activeSessions}/${getProxySettings().maxConcurrent} msgCount=${msgCount}` console.error(`[PROXY] ${requestLogLine} msgs=${msgSummary}`) diagnosticLog.session(`${requestLogLine}`, requestMeta.requestId) @@ -242,7 +318,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe // OpenCode parses the Task tool description; other adapters return empty. const sdkAgents = adapter.buildSdkAgents?.(body, adapter.getAllowedMcpTools()) ?? {} const validAgentNames = Object.keys(sdkAgents) - if ((process.env.MERIDIAN_DEBUG ?? process.env.CLAUDE_PROXY_DEBUG) && validAgentNames.length > 0) { + if (env('DEBUG') && validAgentNames.length > 0) { claudeLog("debug.agents", { names: validAgentNames, count: validAgentNames.length }) } systemContext += adapter.buildSystemContextAddendum?.(body, sdkAgents) ?? "" @@ -394,7 +470,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe const adapterPassthrough = adapter.usesPassthrough?.() const passthrough = adapterPassthrough !== undefined ? adapterPassthrough - : Boolean((process.env.MERIDIAN_PASSTHROUGH ?? process.env.CLAUDE_PROXY_PASSTHROUGH)) + : getProxySettings().passthrough const capturedToolUses: Array<{ id: string; name: string; input: any }> = [] // In passthrough mode, register OpenCode's tools as MCP tools so Claude @@ -427,11 +503,16 @@ export function createProxyServer(config: Partial = {}): ProxyServe } : adapter.buildSdkHooks?.(body, sdkAgents) ?? undefined - + // Lazy-resolve executable if not already set (e.g. when using createProxyServer directly) + if (!claudeExecutable) { + claudeExecutable = await resolveClaudeExecutableAsync() + } if (!stream) { const contentBlocks: Array> = [] let assistantMessages = 0 + let totalInputTokens = 0 + let totalOutputTokens = 0 const upstreamStartAt = Date.now() let firstChunkAt: number | undefined let currentSessionId: string | undefined @@ -445,21 +526,26 @@ export function createProxyServer(config: Partial = {}): ProxyServe // Pad to current message count (the last user message has no UUID yet) while (sdkUuidMap.length < allMessages.length) sdkUuidMap.push(null) + // AbortController for cancelling SDK subprocess on idle timeout + const requestAbort = new AbortController() + const idleMinutes = getProxySettings().idleTimeoutMinutes + const IDLE_TIMEOUT_MS = idleMinutes > 0 ? idleMinutes * 60 * 1000 : 0 + let idleTimer: ReturnType | null = null + const resetIdleTimer = () => { + if (!IDLE_TIMEOUT_MS) return + if (idleTimer) clearTimeout(idleTimer) + idleTimer = setTimeout(() => { + console.error(`[PROXY] ${requestMeta.requestId} idle timeout (${idleMinutes}min no activity), aborting`) + requestAbort.abort() + }, IDLE_TIMEOUT_MS) + } + const clearIdleTimer = () => { if (idleTimer) { clearTimeout(idleTimer); idleTimer = null } } + resetIdleTimer() + claudeLog("upstream.start", { mode: "non_stream", model }) try { - // Lazy-resolve executable if not already set (e.g. when using createProxyServer directly) - if (!claudeExecutable) { - claudeExecutable = await resolveClaudeExecutableAsync() - } - // Wrap SDK call with transparent retry for recoverable errors. - // Both stale-UUID and rate-limit retries happen inside the generator, - // so the message-processing loop doesn't need any retry logic. - // - // Rate-limit retry strategy: - // 1. Strip [1m] context (immediate, different model tier) - // 2. Backoff retries on base model (1s, 2s — exponential) const MAX_RATE_LIMIT_RETRIES = 2 const RATE_LIMIT_BASE_DELAY_MS = 1000 @@ -476,7 +562,9 @@ export function createProxyServer(config: Partial = {}): ProxyServe prompt: makePrompt(), model, workingDirectory, systemContext, claudeExecutable, passthrough, stream: false, sdkAgents, passthroughMcp, cleanEnv, resumeSessionId, isUndo, undoRollbackUuid, sdkHooks, adapter, + abortController: requestAbort, }))) { + resetIdleTimer() // Activity detected if ((event as any).type === "assistant") { didYieldContent = true } @@ -505,6 +593,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe model, workingDirectory, systemContext, claudeExecutable, passthrough, stream: false, sdkAgents, passthroughMcp, cleanEnv, resumeSessionId: undefined, isUndo: false, undoRollbackUuid: undefined, sdkHooks, adapter, + abortController: requestAbort, })) return } @@ -555,6 +644,12 @@ export function createProxyServer(config: Partial = {}): ProxyServe if ((message as any).uuid) { sdkUuidMap.push((message as any).uuid) } + // Capture token usage from SDK response + const msgUsage = (message as any).message?.usage + if (msgUsage) { + totalInputTokens += msgUsage.input_tokens || 0 + totalOutputTokens += msgUsage.output_tokens || 0 + } if (!firstChunkAt) { firstChunkAt = Date.now() claudeLog("upstream.first_chunk", { @@ -590,6 +685,8 @@ export function createProxyServer(config: Partial = {}): ProxyServe error: error instanceof Error ? error.message : String(error) }) throw error + } finally { + clearIdleTimer() } // In passthrough mode, add captured tool_use blocks from the hook @@ -633,25 +730,14 @@ export function createProxyServer(config: Partial = {}): ProxyServe const nonStreamQueueWaitMs = requestMeta.queueStartedAt - requestMeta.queueEnteredAt telemetryStore.record({ - requestId: requestMeta.requestId, - timestamp: Date.now(), - model, - requestModel: body.model || undefined, - mode: "non-stream", - isResume, - isPassthrough: passthrough, - lineageType, - messageCount: allMessages.length, - sdkSessionId: currentSessionId || resumeSessionId, - status: 200, - queueWaitMs: nonStreamQueueWaitMs, - proxyOverheadMs: upstreamStartAt - requestStartAt - nonStreamQueueWaitMs, + requestId: requestMeta.requestId, timestamp: Date.now(), model, + requestModel: body.model || undefined, mode: "non-stream", + isResume, isPassthrough: passthrough, lineageType, messageCount: allMessages.length, + sdkSessionId: currentSessionId || resumeSessionId, status: 200, + queueWaitMs: nonStreamQueueWaitMs, proxyOverheadMs: upstreamStartAt - requestStartAt - nonStreamQueueWaitMs, ttfbMs: firstChunkAt ? firstChunkAt - upstreamStartAt : null, - upstreamDurationMs: Date.now() - upstreamStartAt, - totalDurationMs, - contentBlocks: contentBlocks.length, - textEvents: 0, - error: null, + upstreamDurationMs: Date.now() - upstreamStartAt, totalDurationMs, + contentBlocks: contentBlocks.length, textEvents: 0, error: null, }) // Store session for future resume @@ -661,6 +747,12 @@ export function createProxyServer(config: Partial = {}): ProxyServe const responseSessionId = currentSessionId || resumeSessionId || `session_${Date.now()}` + // Record token usage for managed API keys + const authKeyString = c.get("authKeyString") as string | undefined + if (authKeyString) { + keyStore.recordUsage(authKeyString, totalInputTokens, totalOutputTokens, model) + } + return new Response(JSON.stringify({ id: `msg_${Date.now()}`, type: "message", @@ -668,7 +760,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe content: contentBlocks, model: body.model, stop_reason: stopReason, - usage: { input_tokens: 0, output_tokens: 0 } + usage: { input_tokens: totalInputTokens, output_tokens: totalOutputTokens } }), { headers: { "Content-Type": "application/json", @@ -688,6 +780,25 @@ export function createProxyServer(config: Partial = {}): ProxyServe let textEventsForwarded = 0 let bytesSent = 0 let streamClosed = false + let streamInputTokens = 0 + let streamOutputTokens = 0 + + // AbortController for cancelling SDK subprocess on client disconnect or idle timeout. + // The timer resets on every SDK event — only fires if the subprocess goes silent. + const requestAbort = new AbortController() + const idleMinutes = getProxySettings().idleTimeoutMinutes + const IDLE_TIMEOUT_MS = idleMinutes > 0 ? idleMinutes * 60 * 1000 : 0 + let idleTimer: ReturnType | null = null + const resetIdleTimer = () => { + if (!IDLE_TIMEOUT_MS) return + if (idleTimer) clearTimeout(idleTimer) + idleTimer = setTimeout(() => { + console.error(`[PROXY] ${requestMeta.requestId} idle timeout (${idleMinutes}min no activity), aborting SDK subprocess`) + requestAbort.abort() + }, IDLE_TIMEOUT_MS) + } + const clearIdleTimer = () => { if (idleTimer) { clearTimeout(idleTimer); idleTimer = null } } + resetIdleTimer() // Start the idle timer claudeLog("upstream.start", { mode: "stream", model }) @@ -700,6 +811,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe } catch (error) { if (isClosedControllerError(error)) { streamClosed = true + requestAbort.abort() // Kill SDK subprocess on client disconnect claudeLog("stream.client_closed", { source, streamEventsSeen, eventsForwarded }) return false } @@ -743,6 +855,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe prompt: makePrompt(), model, workingDirectory, systemContext, claudeExecutable, passthrough, stream: true, sdkAgents, passthroughMcp, cleanEnv, resumeSessionId, isUndo, undoRollbackUuid, sdkHooks, adapter, + abortController: requestAbort, }))) { if ((event as any).type === "stream_event") { didYieldClientEvent = true @@ -772,6 +885,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe model, workingDirectory, systemContext, claudeExecutable, passthrough, stream: true, sdkAgents, passthroughMcp, cleanEnv, resumeSessionId: undefined, isUndo: false, undoRollbackUuid: undefined, sdkHooks, adapter, + abortController: requestAbort, })) return } @@ -845,6 +959,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe if (streamClosed) { break } + resetIdleTimer() // Activity detected — reset idle timeout // Capture session ID and assistant UUID from any SDK message if ((message as any).session_id) { @@ -874,6 +989,12 @@ export function createProxyServer(config: Partial = {}): ProxyServe if (eventType === "message_start") { skipBlockIndices.clear() sdkToClientIndex.clear() + // Capture token usage from message_start + const startUsage = (event as any).message?.usage + if (startUsage) { + streamInputTokens += startUsage.input_tokens || 0 + streamOutputTokens += startUsage.output_tokens || 0 + } // Only emit the first message_start — subsequent ones are internal SDK turns if (messageStartEmitted) { continue @@ -922,6 +1043,11 @@ export function createProxyServer(config: Partial = {}): ProxyServe // Skip intermediate message_delta with stop_reason: tool_use // (SDK is about to execute MCP tools and continue) if (eventType === "message_delta") { + // Capture token usage from message_delta + const deltaUsage = (event as any).usage + if (deltaUsage) { + streamOutputTokens += deltaUsage.output_tokens || 0 + } const stopReason = (event as any).delta?.stop_reason if (stopReason === "tool_use" && skipBlockIndices.size > 0) { // All tool_use blocks in this turn were MCP — skip this delta @@ -946,6 +1072,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe } } finally { clearInterval(heartbeat) + clearIdleTimer() } claudeLog("upstream.completed", { @@ -962,6 +1089,12 @@ export function createProxyServer(config: Partial = {}): ProxyServe storeSession(agentSessionId, body.messages || [], currentSessionId, workingDirectory, sdkUuidMap) } + // Record token usage for managed API keys + const streamAuthKeyString = c.get("authKeyString") as string | undefined + if (streamAuthKeyString) { + keyStore.recordUsage(streamAuthKeyString, streamInputTokens, streamOutputTokens, model) + } + if (!streamClosed) { // In passthrough mode, emit captured tool_use blocks as stream events // Skip any that were already forwarded during the stream (dedup by ID) @@ -1038,25 +1171,14 @@ export function createProxyServer(config: Partial = {}): ProxyServe const streamQueueWaitMs = requestMeta.queueStartedAt - requestMeta.queueEnteredAt telemetryStore.record({ - requestId: requestMeta.requestId, - timestamp: Date.now(), - model, - requestModel: body.model || undefined, - mode: "stream", - isResume, - isPassthrough: passthrough, - lineageType, - messageCount: allMessages.length, - sdkSessionId: currentSessionId || resumeSessionId, - status: 200, - queueWaitMs: streamQueueWaitMs, - proxyOverheadMs: upstreamStartAt - requestStartAt - streamQueueWaitMs, + requestId: requestMeta.requestId, timestamp: Date.now(), model, + requestModel: body.model || undefined, mode: "stream", + isResume, isPassthrough: passthrough, lineageType, messageCount: allMessages.length, + sdkSessionId: currentSessionId || resumeSessionId, status: 200, + queueWaitMs: streamQueueWaitMs, proxyOverheadMs: upstreamStartAt - requestStartAt - streamQueueWaitMs, ttfbMs: firstChunkAt ? firstChunkAt - upstreamStartAt : null, - upstreamDurationMs: Date.now() - upstreamStartAt, - totalDurationMs: streamTotalDurationMs, - contentBlocks: eventsForwarded, - textEvents: textEventsForwarded, - error: null, + upstreamDurationMs: Date.now() - upstreamStartAt, totalDurationMs: streamTotalDurationMs, + contentBlocks: eventsForwarded, textEvents: textEventsForwarded, error: null, }) if (textEventsForwarded === 0) { @@ -1069,6 +1191,8 @@ export function createProxyServer(config: Partial = {}): ProxyServe } } } catch (error) { + clearIdleTimer() + requestAbort.abort() // Ensure subprocess is cleaned up on any error if (isClosedControllerError(error)) { streamClosed = true claudeLog("stream.client_closed", { @@ -1143,25 +1267,13 @@ export function createProxyServer(config: Partial = {}): ProxyServe const errorQueueWaitMs = requestMeta.queueStartedAt - requestMeta.queueEnteredAt telemetryStore.record({ - requestId: requestMeta.requestId, - timestamp: Date.now(), - model: "unknown", - requestModel: undefined, - mode: "non-stream", - isResume: false, - isPassthrough: Boolean((process.env.MERIDIAN_PASSTHROUGH ?? process.env.CLAUDE_PROXY_PASSTHROUGH)), - lineageType: undefined, - messageCount: undefined, - sdkSessionId: undefined, - status: classified.status, - queueWaitMs: errorQueueWaitMs, - proxyOverheadMs: Date.now() - requestStartAt - errorQueueWaitMs, - ttfbMs: null, - upstreamDurationMs: Date.now() - requestStartAt, - totalDurationMs: Date.now() - requestStartAt, - contentBlocks: 0, - textEvents: 0, - error: classified.type, + requestId: requestMeta.requestId, timestamp: Date.now(), model: "unknown", + requestModel: undefined, mode: "non-stream", + isResume: false, isPassthrough: getProxySettings().passthrough, lineageType: undefined, + messageCount: undefined, sdkSessionId: undefined, status: classified.status, + queueWaitMs: errorQueueWaitMs, proxyOverheadMs: Date.now() - requestStartAt - errorQueueWaitMs, + ttfbMs: null, upstreamDurationMs: Date.now() - requestStartAt, + totalDurationMs: Date.now() - requestStartAt, contentBlocks: 0, textEvents: 0, error: classified.type, }) return new Response( @@ -1176,6 +1288,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe const requestId = c.req.header("x-request-id") || randomUUID() const queueEnteredAt = Date.now() claudeLog("request.enter", { requestId, endpoint }) + await acquireSession() const queueStartedAt = Date.now() try { @@ -1188,8 +1301,11 @@ export function createProxyServer(config: Partial = {}): ProxyServe app.post("/v1/messages", (c) => handleWithQueue(c, "/v1/messages")) app.post("/messages", (c) => handleWithQueue(c, "/messages")) - // Telemetry dashboard and API - app.route("/telemetry", createTelemetryRoutes()) + // Admin dashboard, key management, and telemetry + app.route("/admin", createAdminRoutes()) + + // OpenAI-compatible API (transcodes to Anthropic /v1/messages internally) + app.route("/v1", createOpenAIRoutes(app.fetch.bind(app))) // Health check endpoint — verifies auth status app.get("/health", async (c) => { @@ -1199,7 +1315,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe return c.json({ status: "degraded", error: "Could not verify auth status", - mode: (process.env.MERIDIAN_PASSTHROUGH ?? process.env.CLAUDE_PROXY_PASSTHROUGH) ? "passthrough" : "internal", + mode: getProxySettings().passthrough ? "passthrough" : "internal", }) } if (!auth.loggedIn) { @@ -1216,13 +1332,13 @@ export function createProxyServer(config: Partial = {}): ProxyServe email: auth.email, subscriptionType: auth.subscriptionType, }, - mode: (process.env.MERIDIAN_PASSTHROUGH ?? process.env.CLAUDE_PROXY_PASSTHROUGH) ? "passthrough" : "internal", + mode: getProxySettings().passthrough ? "passthrough" : "internal", }) } catch { return c.json({ status: "degraded", error: "Could not verify auth status", - mode: (process.env.MERIDIAN_PASSTHROUGH ?? process.env.CLAUDE_PROXY_PASSTHROUGH) ? "passthrough" : "internal", + mode: getProxySettings().passthrough ? "passthrough" : "internal", }) } }) @@ -1237,6 +1353,7 @@ export function createProxyServer(config: Partial = {}): ProxyServe } export async function startProxyServer(config: Partial = {}): Promise { + initAdmin() claudeExecutable = await resolveClaudeExecutableAsync() const { app, config: finalConfig } = createProxyServer(config) @@ -1247,10 +1364,14 @@ export async function startProxyServer(config: Partial = {}): Promi overrideGlobalObjects: false, }, (info) => { if (!finalConfig.silent) { - console.log(`Meridian running at http://${finalConfig.host}:${info.port}`) - console.log(`Telemetry dashboard: http://${finalConfig.host}:${info.port}/telemetry`) - console.log(`\nPoint any Anthropic-compatible tool at this endpoint:`) - console.log(` ANTHROPIC_API_KEY=x ANTHROPIC_BASE_URL=http://${finalConfig.host}:${info.port}`) + const url = `http://${finalConfig.host}:${info.port}` + console.log(`Meridian running at ${url}`) + console.log(` ANTHROPIC_BASE_URL=${url}`) + if (isAdminConfigured()) { + console.log(` Admin: configured`) + } + console.log(` Admin dashboard: ${url}/admin`) + console.log(` Managed keys: ${keyStore.size}`) } }) as Server diff --git a/src/proxy/session/cache.ts b/src/proxy/session/cache.ts index 8eb39b2..b613d58 100644 --- a/src/proxy/session/cache.ts +++ b/src/proxy/session/cache.ts @@ -194,7 +194,19 @@ export function storeSession( ) { if (!claudeSessionId) return const lineageHash = computeLineageHash(messages) - const messageHashes = computeMessageHashes(messages) + + // Incremental hashing: reuse cached hashes for unchanged prefix messages. + // Only hash messages beyond what we already have cached. + const cached = sessionId ? sessionCache.get(sessionId) : undefined + let messageHashes: string[] + if (cached?.messageHashes && cached.messageHashes.length <= messages.length) { + // Reuse cached hashes and only compute new ones + const newHashes = computeMessageHashes(messages.slice(cached.messageHashes.length)) + messageHashes = [...cached.messageHashes, ...newHashes] + } else { + messageHashes = computeMessageHashes(messages) + } + const state: SessionState = { claudeSessionId, lastAccess: Date.now(), diff --git a/src/proxy/sessionStore.ts b/src/proxy/sessionStore.ts index d05f96a..42c38bc 100644 --- a/src/proxy/sessionStore.ts +++ b/src/proxy/sessionStore.ts @@ -140,7 +140,20 @@ function getDefaultCacheDir(): string { return newDir } -function readStore(): Record { +// In-memory cache to avoid reading the full file on every request. +// Loaded lazily on first access, then kept in sync via debounced writes. +let memoryStore: Record | null = null +let flushTimer: ReturnType | null = null +const FLUSH_DEBOUNCE_MS = 500 + +function getStore(): Record { + if (memoryStore === null) { + memoryStore = readStoreFromDisk() + } + return memoryStore +} + +function readStoreFromDisk(): Record { const path = getStorePath() if (!existsSync(path)) return {} try { @@ -152,7 +165,15 @@ function readStore(): Record { } } -function writeStore(store: Record): void { +function scheduleFlush(): void { + if (flushTimer) return // already scheduled + flushTimer = setTimeout(() => { + flushTimer = null + if (memoryStore) writeStoreToDisk(memoryStore) + }, FLUSH_DEBOUNCE_MS) +} + +function writeStoreToDisk(store: Record): void { const path = getStorePath() const tmp = `${path}.tmp` try { @@ -160,7 +181,6 @@ function writeStore(store: Record): void { renameSync(tmp, path) // atomic write } catch (e) { console.error("[sessionStore] write failed:", (e as Error).message) - // If rename fails, try direct write try { writeFileSync(path, JSON.stringify(store, null, 2)) } catch (directWriteError) { @@ -170,72 +190,48 @@ function writeStore(store: Record): void { } export function lookupSharedSession(key: string): StoredSession | undefined { - const store = readStore() - return store[key] + return getStore()[key] } export function storeSharedSession(key: string, claudeSessionId: string, messageCount?: number, lineageHash?: string, messageHashes?: string[], sdkMessageUuids?: Array): void { - const path = getStorePath() - const lockPath = `${path}.lock` - const hasLock = skipLocking ? false : acquireLock(lockPath) - if (!hasLock && !skipLocking) { - console.warn("[sessionStore] could not acquire lock, proceeding without") + const store = getStore() + const existing = store[key] + store[key] = { + claudeSessionId, + createdAt: existing?.createdAt || Date.now(), + lastUsedAt: Date.now(), + messageCount: messageCount ?? existing?.messageCount ?? 0, + lineageHash: lineageHash ?? existing?.lineageHash, + messageHashes: messageHashes ?? existing?.messageHashes, + sdkMessageUuids: sdkMessageUuids ?? existing?.sdkMessageUuids, } - try { - const store = readStore() - const existing = store[key] - store[key] = { - claudeSessionId, - createdAt: existing?.createdAt || Date.now(), - lastUsedAt: Date.now(), - messageCount: messageCount ?? existing?.messageCount ?? 0, - lineageHash: lineageHash ?? existing?.lineageHash, - messageHashes: messageHashes ?? existing?.messageHashes, - sdkMessageUuids: sdkMessageUuids ?? existing?.sdkMessageUuids, - } - // Prune oldest entries if over capacity (count-based, not time-based) - const maxEntries = getMaxStoredSessions() - const keys = Object.keys(store) - if (keys.length > maxEntries) { - const sorted = keys.sort((a, b) => (store[a]!.lastUsedAt || 0) - (store[b]!.lastUsedAt || 0)) - const toRemove = sorted.slice(0, keys.length - maxEntries) - for (const k of toRemove) { - delete store[k] - } - } - - writeStore(store) - } finally { - if (hasLock) { - releaseLock(lockPath) + // Prune oldest entries if over capacity (count-based, not time-based) + const maxEntries = getMaxStoredSessions() + const keys = Object.keys(store) + if (keys.length > maxEntries) { + const sorted = keys.sort((a, b) => (store[a]!.lastUsedAt || 0) - (store[b]!.lastUsedAt || 0)) + const toRemove = sorted.slice(0, keys.length - maxEntries) + for (const k of toRemove) { + delete store[k] } } + + scheduleFlush() } /** Remove a single session from the shared file store. * Used when a session is detected as stale (e.g. expired upstream). */ export function evictSharedSession(key: string): void { - const path = getStorePath() - const lockPath = `${path}.lock` - const hasLock = skipLocking ? false : acquireLock(lockPath) - if (!hasLock && !skipLocking) { - console.warn("[sessionStore] could not acquire lock for eviction, proceeding without") - } - try { - const store = readStore() - if (store[key]) { - delete store[key] - writeStore(store) - } - } finally { - if (hasLock) { - releaseLock(lockPath) - } + const store = getStore() + if (store[key]) { + delete store[key] + scheduleFlush() } } export function clearSharedSessions(): void { + memoryStore = {} const path = getStorePath() try { writeFileSync(path, "{}") diff --git a/src/telemetry/dashboard.ts b/src/telemetry/dashboard.ts index 98b027d..941dce1 100644 --- a/src/telemetry/dashboard.ts +++ b/src/telemetry/dashboard.ts @@ -104,6 +104,9 @@ const $$ = s => document.querySelectorAll(s); let timer; let activeTab = 'requests'; let activeLogFilter = 'all'; +const authKey = localStorage.getItem('adminKey') || ''; +const authHeaders = authKey ? { 'Authorization': 'Bearer ' + authKey } : {}; +const basePath = window.location.pathname.replace(/\\/+$/, ''); function ms(v) { if (v == null) return '—'; @@ -148,9 +151,9 @@ async function refresh() { const w = $('#window').value; try { const [summary, reqs, logs] = await Promise.all([ - fetch('/telemetry/summary?window=' + w).then(r => r.json()), - fetch('/telemetry/requests?limit=50&since=' + (Date.now() - Number(w))).then(r => r.json()), - fetch('/telemetry/logs?limit=200&since=' + (Date.now() - Number(w))).then(r => r.json()), + fetch(basePath + '/summary?window=' + w, { headers: authHeaders }).then(r => r.json()), + fetch(basePath + '/requests?limit=50&since=' + (Date.now() - Number(w)), { headers: authHeaders }).then(r => r.json()), + fetch(basePath + '/logs?limit=200&since=' + (Date.now() - Number(w)), { headers: authHeaders }).then(r => r.json()), ]); render(summary, reqs, logs); $('#lastUpdate').textContent = 'Updated ' + new Date().toLocaleTimeString(); diff --git a/src/telemetry/index.ts b/src/telemetry/index.ts index 607ec42..f0f4229 100644 --- a/src/telemetry/index.ts +++ b/src/telemetry/index.ts @@ -1,6 +1,5 @@ export { telemetryStore, TelemetryStore } from "./store" export { diagnosticLog, DiagnosticLogStore } from "./logStore" export { createTelemetryRoutes } from "./routes" -export { landingHtml } from "./landing" export type { RequestMetric, TelemetrySummary, PhaseTiming } from "./types" export type { DiagnosticLog } from "./logStore"