From 781b0de8b54a9c35195b4b3987639db986be9270 Mon Sep 17 00:00:00 2001 From: chilu18 Date: Thu, 5 Mar 2026 06:47:59 +0000 Subject: [PATCH 1/5] docs: add anyway integration support handoff readme --- .../ANYWAY_SUPPORT_README.md | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 opencto/opencto-cloudbot-worker/ANYWAY_SUPPORT_README.md diff --git a/opencto/opencto-cloudbot-worker/ANYWAY_SUPPORT_README.md b/opencto/opencto-cloudbot-worker/ANYWAY_SUPPORT_README.md new file mode 100644 index 0000000..0badc1f --- /dev/null +++ b/opencto/opencto-cloudbot-worker/ANYWAY_SUPPORT_README.md @@ -0,0 +1,154 @@ +# Anyway Integration Support Handoff (OpenCTO) + +Date: 2026-03-05 +Owner: OpenCTO Team +Service: `opencto-cloudbot-worker` + Python sidecar + +## Summary + +We implemented Anyway tracing in two paths: + +1. Direct Worker emit path (Cloudflare Worker -> Anyway ingest-style payload) +2. Python sidecar path (Worker -> sidecar -> `anyway-sdk` / OTLP exporter) + +Current blocker is **Anyway authorization/entitlement and/or endpoint availability**: + +- API requests to `https://api.anyway.sh/v1/*` return `403` +- Collector endpoint `collector.anyway.sh` is unreachable from this host (`connection refused`) + +## What We Built + +## 1) Cloudflare Worker integration + +- Worker service: `opencto-cloudbot-worker` +- Runtime channels: Telegram, Slack, Infobip WhatsApp/SMS +- Emits spans around: + - webhook handling + - RAG retrieval (lexical/semantic) + - OpenAI response call +- Fail-open telemetry behavior (no user-facing disruption on telemetry failures) + +Health endpoint confirms active config: + +- `GET https://opencto-cloudbot-worker.heysalad-o.workers.dev/health` +- includes: + - `anyway_enabled` + - `anyway_endpoint` + - `sidecar_enabled` + - `sidecar_url` + +## 2) Python sidecar integration + +- Public URL: `https://anyway-sdk-sidecar.opencto.works` +- Endpoint: + - `POST /trace/event` (token-protected with `x-opencto-sidecar-token`) + - `GET /health` +- Framework: FastAPI + `anyway-sdk` +- Sidecar currently running as user services: + - `opencto-anyway-sidecar.service` + - `opencto-anyway-tunnel.service` + +## Environment / Deployment + +## Sidecar systemd + +- Env file: `~/.config/opencto-anyway-sidecar.env` +- Services: + - `~/.config/systemd/user/opencto-anyway-sidecar.service` + - `~/.config/systemd/user/opencto-anyway-tunnel.service` +- Tunnel config: + - `opencto/opencto-cloudbot-worker/sidecar/tunnel.yml` + +## Worker vars/secrets used + +- Vars: + - `OPENCTO_ANYWAY_ENABLED=true` + - `OPENCTO_ANYWAY_ENDPOINT=https://api.anyway.sh/v1/ingest` + - `OPENCTO_ANYWAY_APP_NAME=opencto-cloudbot-worker` + - `OPENCTO_SIDECAR_ENABLED=true` + - `OPENCTO_SIDECAR_URL=https://anyway-sdk-sidecar.opencto.works/trace/event` +- Secret: + - `OPENCTO_SIDECAR_TOKEN` + +## Verification Results + +## A) Worker -> sidecar path + +- `POST /trace/event` to sidecar returns `200 OK` +- Sidecar logs confirm requests received + +Conclusion: **forwarding path is healthy**. + +## B) Sidecar SDK exporter + +Observed log patterns: + +- gRPC metadata error fixed (`Authorization` uppercase issue) +- After fix, exporter still fails: + - `StatusCode.UNAVAILABLE` (gRPC path) + - HTTP exporter retries with: + - `Failed to establish a new connection: [Errno 111] Connection refused` + +Example host-level network checks: + +- `collector.anyway.sh:4317` -> connection refused +- `collector.anyway.sh:443` -> connection refused +- `api.anyway.sh:443` -> reachable + +Conclusion: **collector endpoint not reachable from this host/network**. + +## C) Anyway REST API auth checks + +With provided key(s), all return `403`: + +- `POST https://api.anyway.sh/v1/ingest` +- `GET https://api.anyway.sh/v1/traces?limit=1` + +Conclusion: **key permissions/scope likely insufficient (or sandbox-only restrictions)**. + +## Commands We Ran (Repro) + +```bash +# Ingest check +curl -i -X POST https://api.anyway.sh/v1/ingest \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"traces":[],"metrics":[]}' + +# Trace query check +curl -i "https://api.anyway.sh/v1/traces?limit=1" \ + -H "Authorization: Bearer " + +# Host connectivity checks +curl -I https://collector.anyway.sh +curl -I https://api.anyway.sh + +# TCP checks +bash -lc 'cat < /dev/null > /dev/tcp/collector.anyway.sh/4317' +bash -lc 'cat < /dev/null > /dev/tcp/collector.anyway.sh/443' +bash -lc 'cat < /dev/null > /dev/tcp/api.anyway.sh/443' +``` + +## What We Need From Anyway Support + +1. Confirm required key type for server-side ingest/query: + - should include `write:traces` + `read:traces` +2. Confirm whether sandbox/test keys can call: + - `POST /v1/ingest` + - `GET /v1/traces` +3. Confirm if our keys/project are restricted (cause of `403`) +4. Confirm collector endpoint/port for our account/network + - current docs suggest `collector.anyway.sh` + - from this host, collector is connection refused +5. Provide recommended endpoint fallback strategy if collector unavailable + +## Security Notes + +- Multiple test keys were used during troubleshooting. +- All exposed test keys should be rotated/revoked. +- No production secrets are included in this document. + +## Current Status + +- OpenCTO implementation is complete and operational up to sidecar ingestion. +- Blocked only on Anyway-side authorization/collector access. From b4c4a3cfef79c13faedfe9d03dfdd3ab0668ba4f Mon Sep 17 00:00:00 2001 From: chilu18 Date: Thu, 5 Mar 2026 17:48:27 +0000 Subject: [PATCH 2/5] feat(opencto-sdk): add marketplace, agents, and tracing modules --- opencto/opencto-sdk/README.md | 61 +++++++++++ opencto/opencto-sdk/package-lock.json | 30 ++++++ opencto/opencto-sdk/package.json | 30 ++++++ opencto/opencto-sdk/src/client.ts | 68 ++++++++++++ opencto/opencto-sdk/src/index.ts | 24 +++++ opencto/opencto-sdk/src/modules/agents.ts | 40 +++++++ .../opencto-sdk/src/modules/marketplace.ts | 51 +++++++++ opencto/opencto-sdk/src/tracing.ts | 28 +++++ opencto/opencto-sdk/src/types.ts | 101 ++++++++++++++++++ opencto/opencto-sdk/tsconfig.json | 15 +++ 10 files changed, 448 insertions(+) create mode 100644 opencto/opencto-sdk/README.md create mode 100644 opencto/opencto-sdk/package-lock.json create mode 100644 opencto/opencto-sdk/package.json create mode 100644 opencto/opencto-sdk/src/client.ts create mode 100644 opencto/opencto-sdk/src/index.ts create mode 100644 opencto/opencto-sdk/src/modules/agents.ts create mode 100644 opencto/opencto-sdk/src/modules/marketplace.ts create mode 100644 opencto/opencto-sdk/src/tracing.ts create mode 100644 opencto/opencto-sdk/src/types.ts create mode 100644 opencto/opencto-sdk/tsconfig.json diff --git a/opencto/opencto-sdk/README.md b/opencto/opencto-sdk/README.md new file mode 100644 index 0000000..510f09d --- /dev/null +++ b/opencto/opencto-sdk/README.md @@ -0,0 +1,61 @@ +# @heysalad/opencto + +OpenCTO JavaScript SDK for auth, chats, runs, realtime, marketplace, agents, and trace context propagation. + +## Install + +```bash +npm install @heysalad/opencto +``` + +## Quick Start + +```ts +import { createOpenCTO } from '@heysalad/opencto' + +const sdk = createOpenCTO({ + baseUrl: 'https://api.opencto.works', + token: process.env.OPENCTO_TOKEN, +}) + +const account = await sdk.marketplace.createConnectedAccount({ + businessName: 'My Agent Studio', + country: 'US', +}) + +const onboarding = await sdk.marketplace.createConnectedAccountOnboardingLink(account.stripeAccountId) +console.log(onboarding.onboardingUrl) +``` + +## Marketplace APIs + +- `marketplace.createConnectedAccount(input)` +- `marketplace.createConnectedAccountOnboardingLink(accountId)` +- `marketplace.createRentalCheckoutSession(input)` +- `marketplace.listMyRentals()` + +## Agent Identity APIs + +- `agents.listIdentities()` +- `agents.createIdentity(input)` +- `agents.rotateIdentityKey(identityId)` +- `agents.revokeIdentity(identityId)` + +## Trace Helpers + +```ts +import { createTraceHeaders } from '@heysalad/opencto' + +const traceHeaders = createTraceHeaders({ + traceparent: '00-...-...-01', + traceId: 'trace-abc', + sessionId: 'session-123', +}) +``` + +## Release + +```bash +npm run build +npm publish --access public +``` diff --git a/opencto/opencto-sdk/package-lock.json b/opencto/opencto-sdk/package-lock.json new file mode 100644 index 0000000..5babdde --- /dev/null +++ b/opencto/opencto-sdk/package-lock.json @@ -0,0 +1,30 @@ +{ + "name": "@heysalad/opencto", + "version": "0.2.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "@heysalad/opencto", + "version": "0.2.0", + "license": "Apache-2.0", + "devDependencies": { + "typescript": "^5.9.2" + } + }, + "node_modules/typescript": { + "version": "5.9.3", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.9.3.tgz", + "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", + "dev": true, + "license": "Apache-2.0", + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=14.17" + } + } + } +} diff --git a/opencto/opencto-sdk/package.json b/opencto/opencto-sdk/package.json new file mode 100644 index 0000000..b69b3d2 --- /dev/null +++ b/opencto/opencto-sdk/package.json @@ -0,0 +1,30 @@ +{ + "name": "@heysalad/opencto", + "version": "0.2.0", + "description": "OpenCTO JavaScript SDK for auth, chats, runs, realtime, marketplace, agents, and tracing.", + "type": "module", + "main": "dist/index.js", + "types": "dist/index.d.ts", + "files": [ + "dist", + "README.md" + ], + "scripts": { + "build": "tsc -p tsconfig.json", + "clean": "rm -rf dist", + "prepublishOnly": "npm run clean && npm run build" + }, + "keywords": [ + "opencto", + "sdk", + "agents", + "stripe", + "marketplace", + "tracing" + ], + "author": "HeySalad", + "license": "Apache-2.0", + "devDependencies": { + "typescript": "^5.9.2" + } +} diff --git a/opencto/opencto-sdk/src/client.ts b/opencto/opencto-sdk/src/client.ts new file mode 100644 index 0000000..1323455 --- /dev/null +++ b/opencto/opencto-sdk/src/client.ts @@ -0,0 +1,68 @@ +import { OpenCTOError, type OpenCTOClientOptions, type OpenCTORequestOptions } from './types.js' + +export class OpenCTOClient { + private readonly baseUrl: string + private readonly token?: string + private readonly headers: Record + private readonly fetchImpl: typeof fetch + private readonly timeoutMs: number + + constructor(options: OpenCTOClientOptions) { + this.baseUrl = options.baseUrl.replace(/\/+$/, '') + this.token = options.token + this.headers = options.headers ?? {} + this.fetchImpl = options.fetchImpl ?? fetch + this.timeoutMs = options.timeoutMs ?? 20_000 + } + + async request(path: string, options: OpenCTORequestOptions = {}): Promise { + const url = `${this.baseUrl}${path.startsWith('/') ? path : `/${path}`}` + const method = options.method ?? 'GET' + const headers: Record = { + ...this.headers, + ...(options.headers ?? {}), + } + + if (this.token) { + headers.Authorization = `Bearer ${this.token}` + } + + if (options.traceId) { + headers['x-opencto-trace-id'] = options.traceId + } + + let body: BodyInit | undefined + if (options.body !== undefined) { + headers['content-type'] = headers['content-type'] ?? 'application/json' + body = JSON.stringify(options.body) + } + + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), this.timeoutMs) + + try { + const response = await this.fetchImpl(url, { + method, + headers, + body, + signal: controller.signal, + }) + + const contentType = response.headers.get('content-type') ?? '' + const payload = contentType.includes('application/json') + ? await response.json().catch(() => ({})) + : await response.text().catch(() => '') + + if (!response.ok) { + const errorPayload = (typeof payload === 'object' && payload) ? payload as Record : {} + const message = String(errorPayload.error ?? `Request failed (${response.status})`) + const code = String(errorPayload.code ?? 'OPENCTO_REQUEST_FAILED') + throw new OpenCTOError(message, response.status, code, errorPayload) + } + + return payload as T + } finally { + clearTimeout(timeout) + } + } +} diff --git a/opencto/opencto-sdk/src/index.ts b/opencto/opencto-sdk/src/index.ts new file mode 100644 index 0000000..2943467 --- /dev/null +++ b/opencto/opencto-sdk/src/index.ts @@ -0,0 +1,24 @@ +import { OpenCTOClient } from './client.js' +import { AgentsClient } from './modules/agents.js' +import { MarketplaceClient } from './modules/marketplace.js' + +export * from './types.js' +export * from './tracing.js' +export { OpenCTOClient } from './client.js' +export { MarketplaceClient } from './modules/marketplace.js' +export { AgentsClient } from './modules/agents.js' + +export interface OpenCTOSDK { + client: OpenCTOClient + marketplace: MarketplaceClient + agents: AgentsClient +} + +export function createOpenCTO(options: ConstructorParameters[0]): OpenCTOSDK { + const client = new OpenCTOClient(options) + return { + client, + marketplace: new MarketplaceClient(client), + agents: new AgentsClient(client), + } +} diff --git a/opencto/opencto-sdk/src/modules/agents.ts b/opencto/opencto-sdk/src/modules/agents.ts new file mode 100644 index 0000000..1e3c6e4 --- /dev/null +++ b/opencto/opencto-sdk/src/modules/agents.ts @@ -0,0 +1,40 @@ +import { OpenCTOClient } from '../client.js' +import type { AgentIdentity } from '../types.js' + +export interface CreateAgentIdentityInput { + name: string + role?: AgentIdentity['role'] + scopes?: string[] +} + +export interface CreateAgentIdentityResult { + identity: AgentIdentity + apiKey?: string +} + +export class AgentsClient { + constructor(private readonly client: OpenCTOClient) {} + + listIdentities(): Promise<{ identities: AgentIdentity[] }> { + return this.client.request('/api/v1/agents/identities') + } + + createIdentity(input: CreateAgentIdentityInput): Promise { + return this.client.request('/api/v1/agents/identities', { + method: 'POST', + body: input, + }) + } + + rotateIdentityKey(identityId: string): Promise<{ identityId: string; apiKey: string }> { + return this.client.request(`/api/v1/agents/identities/${encodeURIComponent(identityId)}/rotate-key`, { + method: 'POST', + }) + } + + revokeIdentity(identityId: string): Promise<{ identityId: string; status: 'revoked' }> { + return this.client.request(`/api/v1/agents/identities/${encodeURIComponent(identityId)}/revoke`, { + method: 'POST', + }) + } +} diff --git a/opencto/opencto-sdk/src/modules/marketplace.ts b/opencto/opencto-sdk/src/modules/marketplace.ts new file mode 100644 index 0000000..9540d24 --- /dev/null +++ b/opencto/opencto-sdk/src/modules/marketplace.ts @@ -0,0 +1,51 @@ +import { OpenCTOClient } from '../client.js' +import type { + AgentRentalContract, + ConnectedAccountResponse, + OnboardingLinkResponse, + RentalCheckoutResponse, +} from '../types.js' + +export interface CreateConnectedAccountInput { + businessName?: string + country?: string +} + +export interface CreateRentalCheckoutInput { + providerWorkspaceId: string + providerStripeAccountId: string + agentSlug: string + description?: string + amountUsd: number + currency?: string + platformFeePercent?: number +} + +export class MarketplaceClient { + constructor(private readonly client: OpenCTOClient) {} + + createConnectedAccount(input: CreateConnectedAccountInput = {}): Promise { + return this.client.request('/api/v1/marketplace/connect/accounts', { + method: 'POST', + body: input, + }) + } + + createConnectedAccountOnboardingLink(accountId: string): Promise { + return this.client.request(`/api/v1/marketplace/connect/accounts/${encodeURIComponent(accountId)}/onboarding-link`, { + method: 'POST', + }) + } + + createRentalCheckoutSession(input: CreateRentalCheckoutInput): Promise { + return this.client.request('/api/v1/marketplace/agent-rentals/checkout/session', { + method: 'POST', + body: input, + }) + } + + async listMyRentals(): Promise { + const response = await this.client.request<{ contracts: AgentRentalContract[] }>('/api/v1/marketplace/agent-rentals') + return response.contracts + } +} diff --git a/opencto/opencto-sdk/src/tracing.ts b/opencto/opencto-sdk/src/tracing.ts new file mode 100644 index 0000000..469641e --- /dev/null +++ b/opencto/opencto-sdk/src/tracing.ts @@ -0,0 +1,28 @@ +import type { TraceContext, TraceContextInput } from './types.js' + +export function createTraceContext(input: TraceContextInput = {}): TraceContext { + return { + traceparent: input.traceparent, + tracestate: input.tracestate, + traceId: input.traceId, + sessionId: input.sessionId, + } +} + +export function createTraceHeaders(context: TraceContextInput = {}): Record { + const headers: Record = {} + if (context.traceparent) headers.traceparent = context.traceparent + if (context.tracestate) headers.tracestate = context.tracestate + if (context.traceId) headers['x-opencto-trace-id'] = context.traceId + if (context.sessionId) headers['x-opencto-session-id'] = context.sessionId + return headers +} + +export function extractTraceContextFromHeaders(headers: Headers): TraceContext { + return { + traceparent: headers.get('traceparent') ?? undefined, + tracestate: headers.get('tracestate') ?? undefined, + traceId: headers.get('x-opencto-trace-id') ?? undefined, + sessionId: headers.get('x-opencto-session-id') ?? undefined, + } +} diff --git a/opencto/opencto-sdk/src/types.ts b/opencto/opencto-sdk/src/types.ts new file mode 100644 index 0000000..d43a2a0 --- /dev/null +++ b/opencto/opencto-sdk/src/types.ts @@ -0,0 +1,101 @@ +export type HttpMethod = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE' + +export interface OpenCTOClientOptions { + baseUrl: string + token?: string + headers?: Record + fetchImpl?: typeof fetch + timeoutMs?: number +} + +export interface OpenCTORequestOptions { + method?: HttpMethod + body?: unknown + headers?: Record + traceId?: string +} + +export interface OpenCTOErrorPayload { + error?: string + code?: string + status?: number + details?: Record +} + +export class OpenCTOError extends Error { + status: number + code: string + payload: OpenCTOErrorPayload + + constructor(message: string, status: number, code: string, payload: OpenCTOErrorPayload) { + super(message) + this.name = 'OpenCTOError' + this.status = status + this.code = code + this.payload = payload + } +} + +export interface ConnectedAccountResponse { + workspaceId: string + stripeAccountId: string + onboardingComplete?: boolean + alreadyExists?: boolean +} + +export interface OnboardingLinkResponse { + stripeAccountId: string + onboardingUrl: string + expiresAt: number +} + +export interface RentalCheckoutResponse { + contractId: string + checkoutSessionId: string + checkoutUrl: string | null + amountCents: number + platformFeeCents: number + currency: string +} + +export interface AgentRentalContract { + id: string + renterWorkspaceId: string + providerWorkspaceId: string + providerStripeAccountId: string + agentSlug: string + description: string | null + amountCents: number + platformFeeCents: number + currency: string + status: string + checkoutSessionId: string | null + paymentIntentId: string | null + createdAt: string + updatedAt: string +} + +export interface AgentIdentity { + id: string + workspaceId: string + name: string + role: 'worker' | 'orchestrator' | 'reviewer' | 'custom' + scopes: string[] + status: 'active' | 'revoked' + createdAt: string + updatedAt: string +} + +export interface TraceContextInput { + traceparent?: string + tracestate?: string + traceId?: string + sessionId?: string +} + +export interface TraceContext { + traceparent?: string + tracestate?: string + traceId?: string + sessionId?: string +} diff --git a/opencto/opencto-sdk/tsconfig.json b/opencto/opencto-sdk/tsconfig.json new file mode 100644 index 0000000..2a7b2df --- /dev/null +++ b/opencto/opencto-sdk/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "declaration": true, + "outDir": "dist", + "rootDir": "src", + "strict": true, + "skipLibCheck": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true + }, + "include": ["src/**/*.ts"] +} From 4fd7a9f77211068abc201c6d4fb8fae03db1f46d Mon Sep 17 00:00:00 2001 From: chilu18 Date: Thu, 5 Mar 2026 23:34:53 +0000 Subject: [PATCH 3/5] feat(opencto): propagate and persist trace context across API and cloudbot --- .../src/__tests__/codebaseRuns.test.ts | 148 +++- opencto/opencto-api-worker/src/chats.ts | 26 +- .../opencto-api-worker/src/codebaseRuns.ts | 324 +++++++- opencto/opencto-api-worker/src/index.ts | 141 +++- opencto/opencto-api-worker/src/marketplace.ts | 411 ++++++++++ opencto/opencto-api-worker/src/tracing.ts | 81 ++ opencto/opencto-api-worker/src/types.ts | 22 + opencto/opencto-cloudbot-worker/src/index.ts | 770 ++++++++++++++++-- 8 files changed, 1779 insertions(+), 144 deletions(-) create mode 100644 opencto/opencto-api-worker/src/marketplace.ts create mode 100644 opencto/opencto-api-worker/src/tracing.ts diff --git a/opencto/opencto-api-worker/src/__tests__/codebaseRuns.test.ts b/opencto/opencto-api-worker/src/__tests__/codebaseRuns.test.ts index 5eeb46d..53f4cf1 100644 --- a/opencto/opencto-api-worker/src/__tests__/codebaseRuns.test.ts +++ b/opencto/opencto-api-worker/src/__tests__/codebaseRuns.test.ts @@ -8,6 +8,7 @@ type RunStatus = 'queued' | 'running' | 'succeeded' | 'failed' | 'canceled' | 't type RunRow = { id: string user_id: string + trace_id: string | null repo_url: string repo_full_name: string | null base_branch: string @@ -59,21 +60,22 @@ class MockD1Database { executeRun(sql: string, args: unknown[]): void { const normalized = normalizeSql(sql) - if (normalized.startsWith('create table') || normalized.startsWith('create index')) return + if (normalized.startsWith('create table') || normalized.startsWith('create index') || normalized.startsWith('alter table')) return if (normalized.startsWith('insert into codebase_runs')) { const row: RunRow = { id: String(args[0]), user_id: String(args[1]), - repo_url: String(args[2]), - repo_full_name: args[3] == null ? null : String(args[3]), - base_branch: String(args[4]), - target_branch: String(args[5]), - status: String(args[6]) as RunStatus, - requested_commands_json: String(args[7]), - command_allowlist_version: String(args[8]), - timeout_seconds: Number(args[9]), - created_at: String(args[10]), + trace_id: args[2] == null ? null : String(args[2]), + repo_url: String(args[3]), + repo_full_name: args[4] == null ? null : String(args[4]), + base_branch: String(args[5]), + target_branch: String(args[6]), + status: String(args[7]) as RunStatus, + requested_commands_json: String(args[8]), + command_allowlist_version: String(args[9]), + timeout_seconds: Number(args[10]), + created_at: String(args[11]), started_at: null, completed_at: null, canceled_at: null, @@ -98,6 +100,17 @@ class MockD1Database { return } + if (normalized.startsWith('update codebase_runs set status = ?, canceled_at = ?, completed_at = ?, error_message = ?')) { + const [status, canceledAt, completedAt, errorMessage, runId, userId] = args + const run = this.runs.get(String(runId)) + if (!run || run.user_id !== String(userId)) return + run.status = String(status) as RunStatus + run.canceled_at = String(canceledAt) + run.completed_at = String(completedAt) + run.error_message = String(errorMessage) + return + } + if (normalized.startsWith('update codebase_runs set status = ?')) { const [status, canceledAt, completedAt, runId, userId] = args const run = this.runs.get(String(runId)) @@ -134,7 +147,7 @@ class MockD1Database { executeFirst(sql: string, args: unknown[]): T | null { const normalized = normalizeSql(sql) - if (normalized.startsWith('select id, user_id, repo_url, repo_full_name')) { + if (normalized.startsWith('select id, user_id, trace_id, repo_url, repo_full_name')) { const [runId, userId] = args const row = this.runs.get(String(runId)) if (!row || row.user_id !== String(userId)) return null @@ -184,6 +197,23 @@ class MockD1Database { return { results: filtered.map((event) => structuredClone(event) as T) } } + if (normalized.startsWith("select event_type, payload_json, created_at from codebase_run_events where run_id = ? and event_type in ('run.approval_required', 'run.approval.approved', 'run.approval.denied')")) { + const [runId] = args + const filtered = this.events + .filter((event) => event.run_id === String(runId) && ( + event.event_type === 'run.approval_required' + || event.event_type === 'run.approval.approved' + || event.event_type === 'run.approval.denied' + )) + .sort((a, b) => a.seq - b.seq) + .map((event) => ({ + event_type: event.event_type, + payload_json: event.payload_json, + created_at: event.created_at, + }) as T) + return { results: filtered } + } + throw new Error(`Unhandled all SQL: ${sql}`) } } @@ -299,6 +329,21 @@ describe('Codebase run endpoints', () => { expect(body.error).toContain('Shell chaining') }) + it('POST /api/v1/codebase/runs blocks unsafe repo URLs', async () => { + const env = createMockEnv() + + const res = await createRun(env, { + repoUrl: 'http://localhost:3000/private.git', + commands: ['npm run build'], + }) + const body = await res.json() as { code?: string; status?: number; details?: { guardrailCodes?: string[] } } + + expect(res.status).toBe(403) + expect(body.code).toBe('FORBIDDEN') + expect(body.status).toBe(403) + expect(body.details?.guardrailCodes).toContain('UNSAFE_REPO_URL') + }) + it('POST /api/v1/codebase/runs rejects unauthorized requests', async () => { const env = createMockEnv({ ENVIRONMENT: 'production' }) @@ -354,6 +399,87 @@ describe('Codebase run endpoints', () => { expect(body.status).toBe(501) }) + it('POST /api/v1/codebase/runs creates pending human approval for high-risk runs', async () => { + const env = createMockEnv() + + const res = await createRun(env, { + repoUrl: 'https://github.com/Hey-Salad/CTO-AI.git', + commands: ['git push origin main'], + }) + const body = await res.json() as { run: { status: string; approval?: { state?: string; required?: boolean } } } + + expect(res.status).toBe(201) + expect(body.run.status).toBe('queued') + expect(body.run.approval?.required).toBe(true) + expect(body.run.approval?.state).toBe('pending') + }) + + it('POST /api/v1/codebase/runs/:id/approve executes pending run in container mode', async () => { + const db = new MockD1Database() + const env = createMockEnv( + { + CODEBASE_EXECUTION_MODE: 'container', + CODEBASE_EXECUTOR: {} as DurableObjectNamespace, + }, + db, + ) + + __setContainerDispatcherForTests(async () => ({ + status: 'succeeded', + logs: [{ level: 'info', message: 'approved run executed' }], + })) + + const created = await createRun(env, { + repoUrl: 'https://github.com/Hey-Salad/CTO-AI.git', + commands: ['git push origin main'], + }) + const createdBody = await created.json() as { run: { id: string } } + + const approveRes = await worker.fetch( + new Request(`https://api.opencto.works/api/v1/codebase/runs/${createdBody.run.id}/approve`, { + method: 'POST', + headers: { + Authorization: 'Bearer demo-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ note: 'approved in test' }), + }), + env, + ) + const approveBody = await approveRes.json() as { run: { status: string; approval?: { state?: string } } } + + expect(approveRes.status).toBe(200) + expect(approveBody.run.status).toBe('succeeded') + expect(approveBody.run.approval?.state).toBe('approved') + }) + + it('POST /api/v1/codebase/runs/:id/deny cancels pending run', async () => { + const env = createMockEnv() + + const created = await createRun(env, { + repoUrl: 'https://github.com/Hey-Salad/CTO-AI.git', + commands: ['git push origin main'], + }) + const createdBody = await created.json() as { run: { id: string } } + + const denyRes = await worker.fetch( + new Request(`https://api.opencto.works/api/v1/codebase/runs/${createdBody.run.id}/deny`, { + method: 'POST', + headers: { + Authorization: 'Bearer demo-token', + 'content-type': 'application/json', + }, + body: JSON.stringify({ note: 'deny in test' }), + }), + env, + ) + const denyBody = await denyRes.json() as { run: { status: string; approval?: { state?: string } } } + + expect(denyRes.status).toBe(200) + expect(denyBody.run.status).toBe('canceled') + expect(denyBody.run.approval?.state).toBe('denied') + }) + it('POST /api/v1/codebase/runs dispatches to container in container mode', async () => { const db = new MockD1Database() const env = createMockEnv({ diff --git a/opencto/opencto-api-worker/src/chats.ts b/opencto/opencto-api-worker/src/chats.ts index b84824e..7dedea4 100644 --- a/opencto/opencto-api-worker/src/chats.ts +++ b/opencto/opencto-api-worker/src/chats.ts @@ -1,9 +1,11 @@ import type { ChatMessageRecord, ChatSessionRecord, RequestContext } from './types' import { BadRequestException, InternalServerException, NotFoundException, jsonResponse } from './errors' +import { enforcePromptGuardrails } from './guardrails' type ChatRow = { id: string user_id: string + trace_id: string | null title: string content_json: string created_at: string @@ -21,12 +23,14 @@ async function ensureSchema(ctx: RequestContext): Promise { `CREATE TABLE IF NOT EXISTS chats ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, + trace_id TEXT, title TEXT NOT NULL, content_json TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL )`, ).run() + await ctx.env.DB.prepare('ALTER TABLE chats ADD COLUMN trace_id TEXT').run().catch(() => {}) await ctx.env.DB.prepare( 'CREATE INDEX IF NOT EXISTS idx_chats_user_updated ON chats (user_id, updated_at DESC)', ).run() @@ -58,10 +62,11 @@ function parseMessages(raw: string): ChatMessageRecord[] { export async function listChats(ctx: RequestContext): Promise { await ensureSchema(ctx) const result = await ctx.env.DB.prepare( - 'SELECT id, user_id, title, created_at, updated_at FROM chats WHERE user_id = ? ORDER BY updated_at DESC LIMIT 100', + 'SELECT id, user_id, trace_id, title, created_at, updated_at FROM chats WHERE user_id = ? ORDER BY updated_at DESC LIMIT 100', ).bind(ctx.userId).all<{ id: string user_id: string + trace_id: string | null title: string created_at: string updated_at: string @@ -71,6 +76,7 @@ export async function listChats(ctx: RequestContext): Promise { (result.results ?? []).map((row) => ({ id: row.id, userId: row.user_id, + traceId: row.trace_id, title: row.title, createdAt: row.created_at, updatedAt: row.updated_at, @@ -82,7 +88,7 @@ export async function listChats(ctx: RequestContext): Promise { export async function getChat(chatId: string, ctx: RequestContext): Promise { await ensureSchema(ctx) const row = await ctx.env.DB.prepare( - 'SELECT id, user_id, title, content_json, created_at, updated_at FROM chats WHERE id = ? AND user_id = ?', + 'SELECT id, user_id, trace_id, title, content_json, created_at, updated_at FROM chats WHERE id = ? AND user_id = ?', ).bind(chatId, ctx.userId).first() if (!row) throw new NotFoundException('Chat not found') @@ -90,6 +96,7 @@ export async function getChat(chatId: string, ctx: RequestContext): Promise message.role === 'USER') + .map((message) => message.text) + .join('\n') + if (userText) { + enforcePromptGuardrails(userText, 'chats.save') + } const id = (payload.id ?? '').trim() || crypto.randomUUID() const current = nowIso() @@ -114,18 +128,20 @@ export async function saveChat( const contentJson = JSON.stringify(messages) await ctx.env.DB.prepare( - `INSERT INTO chats (id, user_id, title, content_json, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?) + `INSERT INTO chats (id, user_id, trace_id, title, content_json, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET + trace_id = excluded.trace_id, title = excluded.title, content_json = excluded.content_json, updated_at = excluded.updated_at WHERE chats.user_id = excluded.user_id`, - ).bind(id, ctx.userId, title, contentJson, current, current).run() + ).bind(id, ctx.userId, ctx.traceContext.traceId, title, contentJson, current, current).run() const saved: ChatSessionRecord = { id, userId: ctx.userId, + traceId: ctx.traceContext.traceId, title, messages, createdAt: current, diff --git a/opencto/opencto-api-worker/src/codebaseRuns.ts b/opencto/opencto-api-worker/src/codebaseRuns.ts index 0160552..8b19fa6 100644 --- a/opencto/opencto-api-worker/src/codebaseRuns.ts +++ b/opencto/opencto-api-worker/src/codebaseRuns.ts @@ -2,6 +2,7 @@ import { getContainer } from '@cloudflare/containers' import type { RequestContext } from './types' import { BadRequestException, + ForbiddenException, InternalServerException, NotFoundException, NotImplementedException, @@ -9,6 +10,7 @@ import { jsonResponse, } from './errors' import { redactSecrets, redactUnknown } from './redaction' +import { enforcePromptGuardrails, enforceRepoUrlGuardrails } from './guardrails' type RunStatus = 'queued' | 'running' | 'succeeded' | 'failed' | 'canceled' | 'timed_out' type EventLevel = 'system' | 'info' | 'warn' | 'error' @@ -51,10 +53,12 @@ const MIN_TIMEOUT_SECONDS = 60 const MAX_TIMEOUT_SECONDS = 1800 const TERMINAL_RUN_STATUSES = new Set(['succeeded', 'failed', 'canceled', 'timed_out']) const BLOCKED_CHAINING_PATTERN = /(?:&&|;|\|\||\||`|\$\()/ +const PROTECTED_BRANCH_PATTERN = /^(main|master|production|prod|release.*)$/i interface CodebaseRunRow { id: string user_id: string + trace_id: string | null repo_url: string repo_full_name: string | null base_branch: string @@ -81,6 +85,12 @@ interface CodebaseRunEventRow { created_at: string } +type ApprovalEventRow = { + event_type: string + payload_json: string | null + created_at: string +} + let schemaReady = false let containerDispatcher: ContainerDispatchFn = defaultContainerDispatcher const getContainerUnsafe = getContainer as unknown as ( @@ -185,6 +195,7 @@ function mapRun(row: CodebaseRunRow) { return { id: row.id, userId: row.user_id, + traceId: row.trace_id, repoUrl: row.repo_url, repoFullName: row.repo_full_name, baseBranch: row.base_branch, @@ -201,6 +212,95 @@ function mapRun(row: CodebaseRunRow) { } } +function parseRequestedCommands(row: CodebaseRunRow): string[] { + try { + const parsed = JSON.parse(row.requested_commands_json) + return Array.isArray(parsed) ? parsed.filter((entry): entry is string => typeof entry === 'string') : [] + } catch { + return [] + } +} + +function runNeedsHumanApproval(commands: string[], targetBranch: string): { required: boolean; reason: string | null } { + if (commands.some((command) => command.startsWith('git push'))) { + return { required: true, reason: 'Run includes git push command' } + } + if (PROTECTED_BRANCH_PATTERN.test(targetBranch)) { + return { required: true, reason: 'Run targets protected branch' } + } + return { required: false, reason: null } +} + +function ensureApproverRole(ctx: RequestContext): void { + if (ctx.user.role === 'owner' || ctx.user.role === 'cto') return + throw new ForbiddenException('Only owner/cto can approve dangerous runs', { + role: ctx.user.role, + }) +} + +async function getRunApproval(runId: string, ctx: RequestContext) { + const rows = await ctx.env.DB.prepare( + `SELECT event_type, payload_json, created_at + FROM codebase_run_events + WHERE run_id = ? + AND event_type IN ('run.approval_required', 'run.approval.approved', 'run.approval.denied') + ORDER BY seq ASC`, + ).bind(runId).all() + + const events = rows.results ?? [] + const requiredEvent = events.find((event) => event.event_type === 'run.approval_required') + if (!requiredEvent) { + return { + required: false, + state: 'not_required' as const, + reason: null, + approvedByUserId: null, + decidedAt: null, + } + } + + const approvedEvent = events.find((event) => event.event_type === 'run.approval.approved') + if (approvedEvent) { + let approvedByUserId: string | null = null + try { + const payload = approvedEvent.payload_json ? JSON.parse(approvedEvent.payload_json) as Record : {} + approvedByUserId = typeof payload.approvedByUserId === 'string' ? payload.approvedByUserId : null + } catch {} + return { + required: true, + state: 'approved' as const, + reason: null, + approvedByUserId, + decidedAt: approvedEvent.created_at, + } + } + + const deniedEvent = events.find((event) => event.event_type === 'run.approval.denied') + if (deniedEvent) { + return { + required: true, + state: 'denied' as const, + reason: 'Denied by human approver', + approvedByUserId: null, + decidedAt: deniedEvent.created_at, + } + } + + let reason: string | null = null + try { + const payload = requiredEvent.payload_json ? JSON.parse(requiredEvent.payload_json) as Record : {} + reason = typeof payload.reason === 'string' ? payload.reason : null + } catch {} + + return { + required: true, + state: 'pending' as const, + reason, + approvedByUserId: null, + decidedAt: null, + } +} + function mapEvent(row: CodebaseRunEventRow) { return { id: row.id, @@ -224,6 +324,7 @@ async function ensureSchema(ctx: RequestContext): Promise { `CREATE TABLE IF NOT EXISTS codebase_runs ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, + trace_id TEXT, repo_url TEXT NOT NULL, repo_full_name TEXT, base_branch TEXT NOT NULL, @@ -239,6 +340,7 @@ async function ensureSchema(ctx: RequestContext): Promise { error_message TEXT )`, ).run() + await ctx.env.DB.prepare('ALTER TABLE codebase_runs ADD COLUMN trace_id TEXT').run().catch(() => {}) await ctx.env.DB.prepare( `CREATE TABLE IF NOT EXISTS codebase_run_events ( @@ -281,7 +383,7 @@ async function ensureSchema(ctx: RequestContext): Promise { async function getRunRow(runId: string, ctx: RequestContext): Promise { await ensureSchema(ctx) const row = await ctx.env.DB.prepare( - `SELECT id, user_id, repo_url, repo_full_name, base_branch, target_branch, status, requested_commands_json, + `SELECT id, user_id, trace_id, repo_url, repo_full_name, base_branch, target_branch, status, requested_commands_json, command_allowlist_version, timeout_seconds, created_at, started_at, completed_at, canceled_at, error_message FROM codebase_runs WHERE id = ? AND user_id = ?`, ).bind(runId, ctx.userId).first() @@ -339,6 +441,54 @@ async function setRunStatus(runId: string, ctx: RequestContext, input: { status: ).bind(input.status, timestamp, input.errorMessage ?? null, runId, ctx.userId).run() } +async function executeContainerRun( + runId: string, + input: { + repoUrl: string + baseBranch: string + targetBranch: string + commands: string[] + timeoutSeconds: number + }, + ctx: RequestContext, +): Promise { + await setRunStatus(runId, ctx, { status: 'running' }) + await appendEvent(runId, { + level: 'system', + eventType: 'run.dispatched', + message: 'Run dispatched to container executor.', + }, ctx) + + const result = await containerDispatcher(runId, input, ctx) + + for (const log of result.logs) { + await appendEvent(runId, { + level: log.level, + eventType: 'container.log', + message: log.message, + }, ctx) + } + + if (result.status === 'succeeded') { + await setRunStatus(runId, ctx, { status: 'succeeded' }) + await appendEvent(runId, { + level: 'system', + eventType: 'run.completed', + message: 'Container execution completed successfully.', + }, ctx) + } else { + await setRunStatus(runId, ctx, { + status: result.status, + errorMessage: result.errorMessage ?? 'Container execution failed', + }) + await appendEvent(runId, { + level: 'error', + eventType: 'run.failed', + message: result.errorMessage ?? 'Container execution failed', + }, ctx) + } +} + async function enforceRunLimits(ctx: RequestContext): Promise { const concurrentCap = getConcurrentRunCap(ctx) const concurrent = await ctx.env.DB.prepare( @@ -450,8 +600,13 @@ export async function createCodebaseRun( if (!repoUrl) { throw new BadRequestException('repoUrl is required') } + enforceRepoUrlGuardrails(repoUrl) const commands = normalizeAndValidateCommands(payload.commands) + enforcePromptGuardrails( + `${repoUrl}\n${commands.join('\n')}\n${payload.baseBranch ?? ''}\n${payload.targetBranch ?? ''}`, + 'codebase.runs.create', + ) const timeoutBounds = getTimeoutBounds(ctx) const timeoutSeconds = normalizeTimeoutSeconds(payload.timeoutSeconds, timeoutBounds) await enforceRunLimits(ctx) @@ -461,6 +616,7 @@ export async function createCodebaseRun( const baseBranch = (payload.baseBranch ?? 'main').trim() || 'main' const targetBranch = (payload.targetBranch ?? `opencto/${runId.slice(0, 8)}`).trim() || `opencto/${runId.slice(0, 8)}` const executionMode = getExecutionMode(ctx) + const approvalPolicy = runNeedsHumanApproval(commands, targetBranch) if (executionMode === 'container' && !ctx.env.CODEBASE_EXECUTOR) { throw new NotImplementedException('Container execution requested but CODEBASE_EXECUTOR binding is not configured') @@ -468,12 +624,13 @@ export async function createCodebaseRun( await ctx.env.DB.prepare( `INSERT INTO codebase_runs ( - id, user_id, repo_url, repo_full_name, base_branch, target_branch, status, + id, user_id, trace_id, repo_url, repo_full_name, base_branch, target_branch, status, requested_commands_json, command_allowlist_version, timeout_seconds, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, ).bind( runId, ctx.userId, + ctx.traceContext.traceId, repoUrl, payload.repoFullName?.trim() || null, baseBranch, @@ -502,53 +659,32 @@ export async function createCodebaseRun( message: `Requested commands: ${commands.join(' | ')}`, }, ctx) - if (executionMode === 'container') { - await setRunStatus(runId, ctx, { status: 'running' }) + if (approvalPolicy.required) { await appendEvent(runId, { - level: 'system', - eventType: 'run.dispatched', - message: 'Run dispatched to container executor.', + level: 'warn', + eventType: 'run.approval_required', + message: 'Human approval required before run execution.', + payload: { + reason: approvalPolicy.reason, + }, }, ctx) - - const result = await containerDispatcher(runId, { + } else if (executionMode === 'container') { + await executeContainerRun(runId, { repoUrl, baseBranch, targetBranch, commands, timeoutSeconds, }, ctx) - - for (const log of result.logs) { - await appendEvent(runId, { - level: log.level, - eventType: 'container.log', - message: log.message, - }, ctx) - } - - if (result.status === 'succeeded') { - await setRunStatus(runId, ctx, { status: 'succeeded' }) - await appendEvent(runId, { - level: 'system', - eventType: 'run.completed', - message: 'Container execution completed successfully.', - }, ctx) - } else { - await setRunStatus(runId, ctx, { - status: result.status, - errorMessage: result.errorMessage ?? 'Container execution failed', - }) - await appendEvent(runId, { - level: 'error', - eventType: 'run.failed', - message: result.errorMessage ?? 'Container execution failed', - }, ctx) - } } const row = await getRunRow(runId, ctx) + const approval = await getRunApproval(runId, ctx) return jsonResponse({ - run: mapRun(row), + run: { + ...mapRun(row), + approval, + }, allowlist: [...ALLOWED_COMMAND_TEMPLATES], }, 201) } @@ -556,6 +692,7 @@ export async function createCodebaseRun( // GET /api/v1/codebase/runs/:id export async function getCodebaseRun(runId: string, ctx: RequestContext): Promise { const row = await getRunRow(runId, ctx) + const approval = await getRunApproval(runId, ctx) const eventCount = await ctx.env.DB.prepare('SELECT COUNT(*) AS count FROM codebase_run_events WHERE run_id = ?') .bind(runId) .first<{ count: number }>() @@ -564,7 +701,10 @@ export async function getCodebaseRun(runId: string, ctx: RequestContext): Promis .first<{ count: number }>() return jsonResponse({ - run: mapRun(row), + run: { + ...mapRun(row), + approval, + }, metrics: { eventCount: eventCount?.count ?? 0, artifactCount: artifactCount?.count ?? 0, @@ -572,6 +712,112 @@ export async function getCodebaseRun(runId: string, ctx: RequestContext): Promis }) } +// POST /api/v1/codebase/runs/:id/approve +export async function approveCodebaseRun( + runId: string, + payload: { note?: string } | undefined, + ctx: RequestContext, +): Promise { + ensureApproverRole(ctx) + const row = await getRunRow(runId, ctx) + if (TERMINAL_RUN_STATUSES.has(row.status)) { + throw new BadRequestException('Cannot approve a completed run') + } + + const approval = await getRunApproval(runId, ctx) + if (!approval.required) { + throw new BadRequestException('Run does not require human approval') + } + if (approval.state !== 'pending') { + throw new BadRequestException('Approval decision already recorded') + } + + await appendEvent(runId, { + level: 'system', + eventType: 'run.approval.approved', + message: 'Run approved by human reviewer.', + payload: { + approvedByUserId: ctx.userId, + approverRole: ctx.user.role, + note: payload?.note ?? null, + }, + }, ctx) + + if (getExecutionMode(ctx) === 'container') { + if (!ctx.env.CODEBASE_EXECUTOR) { + throw new NotImplementedException('Container execution requested but CODEBASE_EXECUTOR binding is not configured') + } + await executeContainerRun(runId, { + repoUrl: row.repo_url, + baseBranch: row.base_branch, + targetBranch: row.target_branch, + commands: parseRequestedCommands(row), + timeoutSeconds: row.timeout_seconds, + }, ctx) + } + + const updated = await getRunRow(runId, ctx) + const updatedApproval = await getRunApproval(runId, ctx) + return jsonResponse({ + run: { + ...mapRun(updated), + approval: updatedApproval, + }, + }) +} + +// POST /api/v1/codebase/runs/:id/deny +export async function denyCodebaseRun( + runId: string, + payload: { note?: string } | undefined, + ctx: RequestContext, +): Promise { + ensureApproverRole(ctx) + const row = await getRunRow(runId, ctx) + if (TERMINAL_RUN_STATUSES.has(row.status)) { + throw new BadRequestException('Cannot deny a completed run') + } + + const approval = await getRunApproval(runId, ctx) + if (!approval.required) { + throw new BadRequestException('Run does not require human approval') + } + if (approval.state !== 'pending') { + throw new BadRequestException('Approval decision already recorded') + } + + await appendEvent(runId, { + level: 'warn', + eventType: 'run.approval.denied', + message: 'Run denied by human reviewer.', + payload: { + deniedByUserId: ctx.userId, + approverRole: ctx.user.role, + note: payload?.note ?? null, + }, + }, ctx) + + const canceledAt = nowIso() + await ctx.env.DB.prepare( + 'UPDATE codebase_runs SET status = ?, canceled_at = ?, completed_at = ?, error_message = ? WHERE id = ? AND user_id = ?', + ).bind('canceled', canceledAt, canceledAt, 'Denied by human approver', runId, ctx.userId).run() + + await appendEvent(runId, { + level: 'warn', + eventType: 'run.canceled', + message: 'Run canceled after denial.', + }, ctx) + + const updated = await getRunRow(runId, ctx) + const updatedApproval = await getRunApproval(runId, ctx) + return jsonResponse({ + run: { + ...mapRun(updated), + approval: updatedApproval, + }, + }) +} + // GET /api/v1/codebase/runs/:id/events export async function getCodebaseRunEvents(runId: string, request: Request, ctx: RequestContext): Promise { await getRunRow(runId, ctx) diff --git a/opencto/opencto-api-worker/src/index.ts b/opencto/opencto-api-worker/src/index.ts index 2b6b6b4..21fb05f 100644 --- a/opencto/opencto-api-worker/src/index.ts +++ b/opencto/opencto-api-worker/src/index.ts @@ -12,6 +12,8 @@ import * as chats from './chats' import * as onboarding from './onboarding' import * as github from './github' import * as codebaseRuns from './codebaseRuns' +import * as marketplace from './marketplace' +import { appendTraceHeaders, extractTraceContext, tracePropagationHeaders, withTraceResponseHeaders } from './tracing' export class CodebaseExecutorContainer extends Container { defaultPort = 4000 @@ -20,14 +22,18 @@ export class CodebaseExecutorContainer extends Container { export default { async fetch(request: Request, env: Env): Promise { + const traceContext = extractTraceContext(request) + // Handle CORS preflight if (request.method === 'OPTIONS') { + const headers = new Headers({ + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', + 'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Idempotency-Key, x-opencto-trace-id, traceparent, tracestate, x-opencto-session-id', + }) + appendTraceHeaders(headers, traceContext) return new Response(null, { - headers: { - 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', - 'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Idempotency-Key', - }, + headers, }) } @@ -37,35 +43,38 @@ export default { // Health check endpoint (no auth required) if (path === '/health' || path === '/api/v1/health') { - return jsonResponse({ status: 'healthy', timestamp: new Date().toISOString() }) + return withTraceResponseHeaders( + jsonResponse({ status: 'healthy', timestamp: new Date().toISOString() }), + traceContext, + ) } // Webhook endpoint (no auth required, uses signature verification) if (path === '/api/v1/billing/webhooks/stripe' && request.method === 'POST') { - return await webhooks.handleStripeWebhook(request, env) + return withTraceResponseHeaders(await webhooks.handleStripeWebhook(request, env), traceContext) } // OAuth endpoints (no prior auth required) if (path === '/api/v1/auth/oauth/github/start' && request.method === 'GET') { - return await auth.startGitHubOAuth(request, env) + return withTraceResponseHeaders(await auth.startGitHubOAuth(request, env), traceContext) } if (path === '/api/v1/auth/oauth/github/callback' && request.method === 'GET') { - return await auth.completeGitHubOAuth(request, env) + return withTraceResponseHeaders(await auth.completeGitHubOAuth(request, env), traceContext) } // All other endpoints require authentication - const ctx = await authenticate(request, env) + const ctx = await authenticate(request, env, traceContext) // Route to handlers - return await route(path, request, ctx) + return withTraceResponseHeaders(await route(path, request, ctx), traceContext) } catch (error) { - return toJsonResponse(error) + return withTraceResponseHeaders(toJsonResponse(error), traceContext) } }, } // Authentication middleware -async function authenticate(request: Request, env: Env): Promise { +async function authenticate(request: Request, env: Env, traceContext = extractTraceContext(request)): Promise { const authHeader = request.headers.get('Authorization') const accessEmail = request.headers.get('CF-Access-Authenticated-User-Email') const bearerToken = authHeader?.startsWith('Bearer ') ? authHeader.substring(7) : null @@ -78,6 +87,7 @@ async function authenticate(request: Request, env: Env): Promise userId: sessionUser.id, user: sessionUser, env, + traceContext, } } } @@ -101,6 +111,7 @@ async function authenticate(request: Request, env: Env): Promise userId: user.id, user, env, + traceContext, } } @@ -212,6 +223,18 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi return await codebaseRuns.cancelCodebaseRun(runId, ctx) } + if (path.match(/^\/api\/v1\/codebase\/runs\/([^/]+)\/approve$/) && method === 'POST') { + const runId = path.split('/')[5] ?? '' + const body = await request.json().catch(() => ({})) as { note?: string } + return await codebaseRuns.approveCodebaseRun(runId, body, ctx) + } + + if (path.match(/^\/api\/v1\/codebase\/runs\/([^/]+)\/deny$/) && method === 'POST') { + const runId = path.split('/')[5] ?? '' + const body = await request.json().catch(() => ({})) as { note?: string } + return await codebaseRuns.denyCodebaseRun(runId, body, ctx) + } + // Onboarding endpoints if (path === '/api/v1/onboarding' && method === 'GET') { return await onboarding.getOnboarding(ctx) @@ -264,28 +287,56 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi return await billing.getInvoices(ctx) } + // Marketplace + Connect endpoints + if (path === '/api/v1/marketplace/connect/accounts' && method === 'POST') { + const body = await request.json().catch(() => ({})) as { businessName?: string; country?: string } + return await marketplace.createConnectedAccount(body, ctx) + } + + if (path.match(/^\/api\/v1\/marketplace\/connect\/accounts\/([^/]+)\/onboarding-link$/) && method === 'POST') { + const accountId = path.split('/')[6] ?? '' + return await marketplace.createConnectedAccountOnboardingLink(accountId, ctx) + } + + if (path === '/api/v1/marketplace/agent-rentals/checkout/session' && method === 'POST') { + const body = await request.json().catch(() => ({})) as { + providerWorkspaceId?: string + providerStripeAccountId?: string + agentSlug?: string + description?: string + amountUsd?: number + currency?: string + platformFeePercent?: number + } + return await marketplace.createAgentRentalCheckoutSession(body, ctx) + } + + if (path === '/api/v1/marketplace/agent-rentals' && method === 'GET') { + return await marketplace.listMyAgentRentals(ctx) + } + // CTO agent proxy routes — forward to external APIs using server-side tokens if (path === '/api/v1/cto/vercel/projects' && method === 'GET') { - return await proxyVercel('/v9/projects?limit=20', ctx.env) + return await proxyVercel('/v9/projects?limit=20', ctx.env, ctx.traceContext) } if (path.match(/^\/api\/v1\/cto\/vercel\/projects\/([^/]+)\/deployments$/) && method === 'GET') { const projectId = path.split('/')[6] ?? '' - return await proxyVercel(`/v6/deployments?projectId=${encodeURIComponent(projectId)}&limit=10`, ctx.env) + return await proxyVercel(`/v6/deployments?projectId=${encodeURIComponent(projectId)}&limit=10`, ctx.env, ctx.traceContext) } if (path.match(/^\/api\/v1\/cto\/vercel\/deployments\/([^/]+)$/) && method === 'GET') { const deploymentId = path.split('/')[6] ?? '' - return await proxyVercel(`/v13/deployments/${encodeURIComponent(deploymentId)}`, ctx.env) + return await proxyVercel(`/v13/deployments/${encodeURIComponent(deploymentId)}`, ctx.env, ctx.traceContext) } if (path === '/api/v1/cto/cloudflare/workers' && method === 'GET') { - return await proxyCF(`/client/v4/accounts/${ctx.env.CF_ACCOUNT_ID}/workers/scripts`, ctx.env) + return await proxyCF(`/client/v4/accounts/${ctx.env.CF_ACCOUNT_ID}/workers/scripts`, ctx.env, ctx.traceContext) } if (path === '/api/v1/cto/cloudflare/pages' && method === 'GET') { - return await proxyCF(`/client/v4/accounts/${ctx.env.CF_ACCOUNT_ID}/pages/projects`, ctx.env) + return await proxyCF(`/client/v4/accounts/${ctx.env.CF_ACCOUNT_ID}/pages/projects`, ctx.env, ctx.traceContext) } if (path.match(/^\/api\/v1\/cto\/cloudflare\/workers\/([^/]+)\/usage$/) && method === 'GET') { @@ -295,27 +346,28 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi return await proxyCF( `/client/v4/accounts/${ctx.env.CF_ACCOUNT_ID}/workers/scripts/${encodeURIComponent(scriptName)}/analytics/aggregate?since=${since}`, ctx.env, + ctx.traceContext, ) } if (path === '/api/v1/cto/openai/models' && method === 'GET') { - return await proxyOpenAI('/v1/models', ctx.env) + return await proxyOpenAI('/v1/models', ctx.env, ctx.traceContext) } if (path === '/api/v1/cto/openai/usage' && method === 'GET') { const url = new URL(request.url) const start = url.searchParams.get('start') ?? '' const end = url.searchParams.get('end') ?? '' - return await proxyOpenAI(`/v1/usage?start_time=${start}&end_time=${end}`, ctx.env) + return await proxyOpenAI(`/v1/usage?start_time=${start}&end_time=${end}`, ctx.env, ctx.traceContext) } if (path === '/api/v1/cto/github/orgs' && method === 'GET') { - return await proxyGitHub('/user/orgs?per_page=50', ctx.env) + return await proxyGitHub('/user/orgs?per_page=50', ctx.env, ctx.traceContext) } if (path.match(/^\/api\/v1\/cto\/github\/orgs\/([^/]+)\/repos$/) && method === 'GET') { const org = path.split('/')[6] ?? '' - return await proxyGitHub(`/orgs/${encodeURIComponent(org)}/repos?sort=updated&per_page=50`, ctx.env) + return await proxyGitHub(`/orgs/${encodeURIComponent(org)}/repos?sort=updated&per_page=50`, ctx.env, ctx.traceContext) } if (path.match(/^\/api\/v1\/cto\/github\/repos\/([^/]+)\/([^/]+)\/pulls$/) && method === 'GET') { @@ -324,6 +376,7 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi return await proxyGitHub( `/repos/${encodeURIComponent(owner)}/${encodeURIComponent(repo)}/pulls?state=all&sort=updated&direction=desc&per_page=20`, ctx.env, + ctx.traceContext, ) } @@ -333,15 +386,16 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi return await proxyGitHub( `/repos/${encodeURIComponent(owner)}/${encodeURIComponent(repo)}/actions/runs?per_page=20`, ctx.env, + ctx.traceContext, ) } if (path === '/api/v1/cto/github/chat/completions' && method === 'POST') { - return await proxyGitHubChatCompletions(request, ctx.env) + return await proxyGitHubChatCompletions(request, ctx.env, ctx.traceContext) } if (path === '/api/v1/agent/respond' && method === 'POST') { - return await proxySupervisorResponse(request, ctx.env) + return await proxySupervisorResponse(request, ctx.env, ctx.traceContext) } // 404 Not Found @@ -352,12 +406,15 @@ async function route(path: string, request: Request, ctx: RequestContext): Promi // CTO agent proxy helpers — keep external API tokens server-side // --------------------------------------------------------------------------- -async function proxyVercel(apiPath: string, env: Env): Promise { +async function proxyVercel(apiPath: string, env: Env, traceContext: RequestContext['traceContext']): Promise { if (!env.VERCEL_TOKEN) { return jsonResponse({ error: 'VERCEL_TOKEN is not configured', code: 'CONFIG_ERROR' }, 500) } const res = await fetch(`https://api.vercel.com${apiPath}`, { - headers: { Authorization: `Bearer ${env.VERCEL_TOKEN}` }, + headers: { + Authorization: `Bearer ${env.VERCEL_TOKEN}`, + ...tracePropagationHeaders(traceContext), + }, }) const body = await res.text() return new Response(body, { @@ -369,12 +426,15 @@ async function proxyVercel(apiPath: string, env: Env): Promise { }) } -async function proxyCF(apiPath: string, env: Env): Promise { +async function proxyCF(apiPath: string, env: Env, traceContext: RequestContext['traceContext']): Promise { if (!env.CF_API_TOKEN || !env.CF_ACCOUNT_ID) { return jsonResponse({ error: 'CF_API_TOKEN or CF_ACCOUNT_ID is not configured', code: 'CONFIG_ERROR' }, 500) } const res = await fetch(`https://api.cloudflare.com${apiPath}`, { - headers: { Authorization: `Bearer ${env.CF_API_TOKEN}` }, + headers: { + Authorization: `Bearer ${env.CF_API_TOKEN}`, + ...tracePropagationHeaders(traceContext), + }, }) const body = await res.text() return new Response(body, { @@ -386,12 +446,15 @@ async function proxyCF(apiPath: string, env: Env): Promise { }) } -async function proxyOpenAI(apiPath: string, env: Env): Promise { +async function proxyOpenAI(apiPath: string, env: Env, traceContext: RequestContext['traceContext']): Promise { if (!env.OPENAI_API_KEY) { return jsonResponse({ error: 'OPENAI_API_KEY is not configured', code: 'CONFIG_ERROR' }, 500) } const res = await fetch(`https://api.openai.com${apiPath}`, { - headers: { Authorization: `Bearer ${env.OPENAI_API_KEY}` }, + headers: { + Authorization: `Bearer ${env.OPENAI_API_KEY}`, + ...tracePropagationHeaders(traceContext), + }, }) const body = await res.text() return new Response(body, { @@ -403,7 +466,11 @@ async function proxyOpenAI(apiPath: string, env: Env): Promise { }) } -async function proxyGitHubChatCompletions(request: Request, env: Env): Promise { +async function proxyGitHubChatCompletions( + request: Request, + env: Env, + traceContext: RequestContext['traceContext'], +): Promise { if (!env.GITHUB_TOKEN) { return jsonResponse({ error: 'GITHUB_TOKEN is not configured', code: 'CONFIG_ERROR' }, 500) } @@ -434,6 +501,7 @@ async function proxyGitHubChatCompletions(request: Request, env: Env): Promise { +async function proxyGitHub(apiPath: string, env: Env, traceContext: RequestContext['traceContext']): Promise { if (!env.GITHUB_TOKEN) { return jsonResponse({ error: 'GITHUB_TOKEN is not configured', code: 'CONFIG_ERROR' }, 500) } @@ -459,6 +527,7 @@ async function proxyGitHub(apiPath: string, env: Env): Promise { Accept: 'application/vnd.github+json', 'User-Agent': 'opencto-api-worker', 'X-GitHub-Api-Version': '2022-11-28', + ...tracePropagationHeaders(traceContext), }, }) @@ -472,7 +541,11 @@ async function proxyGitHub(apiPath: string, env: Env): Promise { }) } -async function proxySupervisorResponse(request: Request, env: Env): Promise { +async function proxySupervisorResponse( + request: Request, + env: Env, + traceContext: RequestContext['traceContext'], +): Promise { if (!env.OPENCTO_AGENT_BASE_URL) { return jsonResponse({ error: 'OPENCTO_AGENT_BASE_URL is not configured', code: 'CONFIG_ERROR' }, 500) } @@ -486,6 +559,7 @@ async function proxySupervisorResponse(request: Request, env: Env): Promise 120) { + throw new BadRequestException(`${fieldName} must be between 1 and 120 chars`) + } + return cleaned +} + +function toCents(amount: number): number { + return Math.round(amount * 100) +} + +function defaultPlatformFeePercent(env: Env): number { + const raw = Number.parseFloat(env.OPENCTO_MARKETPLACE_PLATFORM_FEE_PERCENT || '') + if (!Number.isFinite(raw)) return 12 + return Math.min(Math.max(raw, 0), 90) +} + +async function ensureMarketplaceSchema(env: Env): Promise { + if (marketplaceSchemaReady) return + + await env.DB.prepare( + `CREATE TABLE IF NOT EXISTS marketplace_connected_accounts ( + workspace_id TEXT PRIMARY KEY, + stripe_account_id TEXT NOT NULL UNIQUE, + business_name TEXT, + country TEXT, + charges_enabled INTEGER NOT NULL DEFAULT 0, + payouts_enabled INTEGER NOT NULL DEFAULT 0, + details_submitted INTEGER NOT NULL DEFAULT 0, + onboarding_complete_at TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + ).run() + + await env.DB.prepare( + `CREATE TABLE IF NOT EXISTS agent_rental_contracts ( + id TEXT PRIMARY KEY, + trace_id TEXT, + renter_workspace_id TEXT NOT NULL, + provider_workspace_id TEXT NOT NULL, + provider_stripe_account_id TEXT NOT NULL, + agent_slug TEXT NOT NULL, + description TEXT, + amount_cents INTEGER NOT NULL, + platform_fee_cents INTEGER NOT NULL, + currency TEXT NOT NULL DEFAULT 'usd', + status TEXT NOT NULL, + stripe_checkout_session_id TEXT, + stripe_payment_intent_id TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + ).run() + await env.DB.prepare('ALTER TABLE agent_rental_contracts ADD COLUMN trace_id TEXT').run().catch(() => {}) + + await env.DB.prepare( + 'CREATE INDEX IF NOT EXISTS idx_agent_rental_contracts_renter_created ON agent_rental_contracts (renter_workspace_id, created_at DESC)', + ).run() + + await env.DB.prepare( + 'CREATE INDEX IF NOT EXISTS idx_agent_rental_contracts_provider_created ON agent_rental_contracts (provider_workspace_id, created_at DESC)', + ).run() + + await env.DB.prepare( + 'CREATE INDEX IF NOT EXISTS idx_agent_rental_contracts_checkout_session ON agent_rental_contracts (stripe_checkout_session_id)', + ).run() + + marketplaceSchemaReady = true +} + +export async function createConnectedAccount( + body: { businessName?: string; country?: string }, + ctx: RequestContext, +): Promise { + const { env, user } = ctx + await ensureMarketplaceSchema(env) + const stripe = getStripeClient(env) + const workspaceId = getWorkspaceId(user.id) + + const existing = await env.DB.prepare( + `SELECT stripe_account_id FROM marketplace_connected_accounts WHERE workspace_id = ?`, + ) + .bind(workspaceId) + .first<{ stripe_account_id: string }>() + + if (existing?.stripe_account_id) { + return jsonResponse({ + workspaceId, + stripeAccountId: existing.stripe_account_id, + alreadyExists: true, + }) + } + + const account = await stripe.accounts.create({ + controller: { + losses: { payments: 'application' }, + fees: { payer: 'application' }, + stripe_dashboard: { type: 'express' }, + requirement_collection: 'stripe', + }, + capabilities: { + card_payments: { requested: true }, + transfers: { requested: true }, + }, + business_profile: { + name: body.businessName?.trim() || user.displayName, + }, + metadata: { + workspaceId, + ownerUserId: user.id, + }, + country: body.country?.toUpperCase() || 'US', + }) + + await env.DB.prepare( + `INSERT INTO marketplace_connected_accounts ( + workspace_id, stripe_account_id, business_name, country, charges_enabled, payouts_enabled, details_submitted, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))`, + ) + .bind( + workspaceId, + account.id, + body.businessName?.trim() || user.displayName, + account.country || body.country?.toUpperCase() || 'US', + account.charges_enabled ? 1 : 0, + account.payouts_enabled ? 1 : 0, + account.details_submitted ? 1 : 0, + ) + .run() + + return jsonResponse({ + workspaceId, + stripeAccountId: account.id, + onboardingComplete: account.details_submitted, + }) +} + +export async function createConnectedAccountOnboardingLink( + accountId: string, + ctx: RequestContext, +): Promise { + const { env, user } = ctx + await ensureMarketplaceSchema(env) + const workspaceId = getWorkspaceId(user.id) + const stripe = getStripeClient(env) + + const existing = await env.DB.prepare( + `SELECT stripe_account_id FROM marketplace_connected_accounts WHERE workspace_id = ? AND stripe_account_id = ?`, + ) + .bind(workspaceId, accountId) + .first<{ stripe_account_id: string }>() + + if (!existing) { + throw new BadRequestException('Connected account not found for this workspace') + } + + const link = await stripe.accountLinks.create({ + account: accountId, + type: 'account_onboarding', + refresh_url: appUrl(env, '/marketplace/connect/refresh'), + return_url: appUrl(env, '/marketplace/connect/complete'), + }) + + return jsonResponse({ + stripeAccountId: accountId, + onboardingUrl: link.url, + expiresAt: link.expires_at, + }) +} + +export async function createAgentRentalCheckoutSession( + body: { + providerWorkspaceId?: string + providerStripeAccountId?: string + agentSlug?: string + description?: string + amountUsd?: number + currency?: string + platformFeePercent?: number + }, + ctx: RequestContext, +): Promise { + const { env, user } = ctx + await ensureMarketplaceSchema(env) + + const renterWorkspaceId = getWorkspaceId(user.id) + const providerWorkspaceId = parseSlug(body.providerWorkspaceId, 'providerWorkspaceId') + const providerStripeAccountId = parseSlug(body.providerStripeAccountId, 'providerStripeAccountId') + const agentSlug = parseSlug(body.agentSlug, 'agentSlug') + const description = typeof body.description === 'string' ? body.description.trim().slice(0, 250) : '' + const amountUsd = parsePositiveUsd(body.amountUsd, 'amountUsd') + const currency = parseCurrency(body.currency) + + const platformFeePercent = + typeof body.platformFeePercent === 'number' && Number.isFinite(body.platformFeePercent) + ? Math.min(Math.max(body.platformFeePercent, 0), 90) + : defaultPlatformFeePercent(env) + + const amountCents = toCents(amountUsd) + const platformFeeCents = Math.round((amountCents * platformFeePercent) / 100) + const contractId = crypto.randomUUID() + + await env.DB.prepare( + `INSERT INTO agent_rental_contracts ( + id, trace_id, renter_workspace_id, provider_workspace_id, provider_stripe_account_id, agent_slug, + description, amount_cents, platform_fee_cents, currency, status, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'))`, + ) + .bind( + contractId, + ctx.traceContext.traceId, + renterWorkspaceId, + providerWorkspaceId, + providerStripeAccountId, + agentSlug, + description || null, + amountCents, + platformFeeCents, + currency, + 'pending_checkout', + ) + .run() + + const stripe = getStripeClient(env) + + const session = await stripe.checkout.sessions.create({ + mode: 'payment', + line_items: [ + { + quantity: 1, + price_data: { + currency, + product_data: { + name: `Agent rental: ${agentSlug}`, + description: description || `Rental contract ${contractId}`, + }, + unit_amount: amountCents, + }, + }, + ], + payment_intent_data: { + application_fee_amount: platformFeeCents, + transfer_data: { + destination: providerStripeAccountId, + }, + metadata: { + contractId, + agentSlug, + renterWorkspaceId, + providerWorkspaceId, + }, + }, + metadata: { + contractId, + agentSlug, + renterWorkspaceId, + providerWorkspaceId, + flow: 'agent_marketplace_rental', + }, + success_url: `${appUrl(env, '/marketplace/rentals/success')}?contract_id=${contractId}&session_id={CHECKOUT_SESSION_ID}`, + cancel_url: `${appUrl(env, '/marketplace/rentals/cancel')}?contract_id=${contractId}`, + }) + + await env.DB.prepare( + `UPDATE agent_rental_contracts + SET stripe_checkout_session_id = ?, status = 'checkout_created', updated_at = datetime('now') + WHERE id = ?`, + ) + .bind(session.id, contractId) + .run() + + return jsonResponse({ + contractId, + traceId: ctx.traceContext.traceId, + checkoutSessionId: session.id, + checkoutUrl: session.url, + amountCents, + platformFeeCents, + currency, + }) +} + +export async function listMyAgentRentals(ctx: RequestContext): Promise { + const { env, user } = ctx + await ensureMarketplaceSchema(env) + const workspaceId = getWorkspaceId(user.id) + + const rows = await env.DB.prepare( + `SELECT id, trace_id, renter_workspace_id, provider_workspace_id, provider_stripe_account_id, agent_slug, + description, amount_cents, platform_fee_cents, currency, status, + stripe_checkout_session_id, stripe_payment_intent_id, created_at, updated_at + FROM agent_rental_contracts + WHERE renter_workspace_id = ? OR provider_workspace_id = ? + ORDER BY created_at DESC + LIMIT 100`, + ) + .bind(workspaceId, workspaceId) + .all<{ + id: string + trace_id: string | null + renter_workspace_id: string + provider_workspace_id: string + provider_stripe_account_id: string + agent_slug: string + description: string | null + amount_cents: number + platform_fee_cents: number + currency: string + status: string + stripe_checkout_session_id: string | null + stripe_payment_intent_id: string | null + created_at: string + updated_at: string + }>() + + return jsonResponse({ + workspaceId, + contracts: (rows.results || []).map((item) => ({ + id: item.id, + traceId: item.trace_id, + renterWorkspaceId: item.renter_workspace_id, + providerWorkspaceId: item.provider_workspace_id, + providerStripeAccountId: item.provider_stripe_account_id, + agentSlug: item.agent_slug, + description: item.description, + amountCents: item.amount_cents, + platformFeeCents: item.platform_fee_cents, + currency: item.currency, + status: item.status, + checkoutSessionId: item.stripe_checkout_session_id, + paymentIntentId: item.stripe_payment_intent_id, + createdAt: item.created_at, + updatedAt: item.updated_at, + })), + }) +} + +export async function markRentalPaidFromCheckout( + checkoutSessionId: string, + paymentIntentId: string | null, + env: Env, +): Promise { + await ensureMarketplaceSchema(env) + await env.DB.prepare( + `UPDATE agent_rental_contracts + SET status = 'paid', stripe_payment_intent_id = COALESCE(?, stripe_payment_intent_id), updated_at = datetime('now') + WHERE stripe_checkout_session_id = ?`, + ) + .bind(paymentIntentId, checkoutSessionId) + .run() +} diff --git a/opencto/opencto-api-worker/src/tracing.ts b/opencto/opencto-api-worker/src/tracing.ts new file mode 100644 index 0000000..24f8ab4 --- /dev/null +++ b/opencto/opencto-api-worker/src/tracing.ts @@ -0,0 +1,81 @@ +import type { TraceContext } from './types' + +function randomHex(length: number): string { + const bytes = new Uint8Array(Math.ceil(length / 2)) + crypto.getRandomValues(bytes) + return Array.from(bytes) + .map((value) => value.toString(16).padStart(2, '0')) + .join('') + .slice(0, length) +} + +function normalizeTraceId(input: string | null): string | null { + if (!input) return null + const normalized = input.trim().toLowerCase() + if (!/^[0-9a-f]{32}$/.test(normalized) || /^0+$/.test(normalized)) return null + return normalized +} + +function normalizeSpanId(input: string | null): string | null { + if (!input) return null + const normalized = input.trim().toLowerCase() + if (!/^[0-9a-f]{16}$/.test(normalized) || /^0+$/.test(normalized)) return null + return normalized +} + +function extractTraceIdFromTraceparent(traceparent: string | null): string | null { + if (!traceparent) return null + const parts = traceparent.trim().split('-') + if (parts.length !== 4) return null + const traceId = normalizeTraceId(parts[1] ?? null) + const spanId = normalizeSpanId(parts[2] ?? null) + if (!traceId || !spanId) return null + return traceId +} + +export function extractTraceContext(request: Request): TraceContext { + const traceparentHeader = request.headers.get('traceparent') + const explicitTraceId = normalizeTraceId(request.headers.get('x-opencto-trace-id')) + const traceId = explicitTraceId ?? extractTraceIdFromTraceparent(traceparentHeader) ?? randomHex(32) + const traceparent = traceparentHeader?.trim() || `00-${traceId}-${randomHex(16)}-01` + const tracestate = request.headers.get('tracestate') ?? undefined + const sessionId = request.headers.get('x-opencto-session-id') ?? undefined + + return { + traceId, + traceparent, + tracestate, + sessionId, + } +} + +export function appendTraceHeaders(headers: Headers, traceContext: TraceContext): void { + headers.set('x-opencto-trace-id', traceContext.traceId) + headers.set('traceparent', traceContext.traceparent) + if (traceContext.tracestate) { + headers.set('tracestate', traceContext.tracestate) + } + if (traceContext.sessionId) { + headers.set('x-opencto-session-id', traceContext.sessionId) + } +} + +export function withTraceResponseHeaders(response: Response, traceContext: TraceContext): Response { + const headers = new Headers(response.headers) + appendTraceHeaders(headers, traceContext) + return new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers, + }) +} + +export function tracePropagationHeaders(traceContext: TraceContext): Record { + const headers: Record = { + 'x-opencto-trace-id': traceContext.traceId, + traceparent: traceContext.traceparent, + } + if (traceContext.tracestate) headers.tracestate = traceContext.tracestate + if (traceContext.sessionId) headers['x-opencto-session-id'] = traceContext.sessionId + return headers +} diff --git a/opencto/opencto-api-worker/src/types.ts b/opencto/opencto-api-worker/src/types.ts index 7b5cf19..3c69dd8 100644 --- a/opencto/opencto-api-worker/src/types.ts +++ b/opencto/opencto-api-worker/src/types.ts @@ -19,6 +19,7 @@ export interface Env { API_BASE_URL: string OPENCTO_AGENT_BASE_URL: string APP_BASE_URL: string + OPENCTO_MARKETPLACE_PLATFORM_FEE_PERCENT?: string CODEBASE_EXECUTOR?: DurableObjectNamespace CODEBASE_EXECUTION_MODE?: 'stub' | 'container' CODEBASE_MAX_CONCURRENT_RUNS?: string @@ -166,6 +167,14 @@ export interface RequestContext { userId: string user: SessionUser env: Env + traceContext: TraceContext +} + +export interface TraceContext { + traceId: string + traceparent: string + tracestate?: string + sessionId?: string } export interface ChatMessageRecord { @@ -182,6 +191,7 @@ export interface ChatMessageRecord { export interface ChatSessionRecord { id: string userId: string + traceId?: string | null title: string messages: ChatMessageRecord[] createdAt: string @@ -194,6 +204,7 @@ export type CodebaseRunEventLevel = 'system' | 'info' | 'warn' | 'error' export interface CodebaseRun { id: string userId: string + traceId?: string | null repoUrl: string repoFullName: string | null baseBranch: string @@ -207,6 +218,17 @@ export interface CodebaseRun { completedAt: string | null canceledAt: string | null errorMessage: string | null + approval?: CodebaseRunApproval +} + +export type CodebaseRunApprovalState = 'not_required' | 'pending' | 'approved' | 'denied' + +export interface CodebaseRunApproval { + required: boolean + state: CodebaseRunApprovalState + reason: string | null + approvedByUserId: string | null + decidedAt: string | null } export interface CodebaseRunEvent { diff --git a/opencto/opencto-cloudbot-worker/src/index.ts b/opencto/opencto-cloudbot-worker/src/index.ts index 6aa7863..f4c8881 100644 --- a/opencto/opencto-cloudbot-worker/src/index.ts +++ b/opencto/opencto-cloudbot-worker/src/index.ts @@ -3,12 +3,18 @@ import OpenAI from "openai"; interface Env { OPENAI_API_KEY: string; TELEGRAM_BOT_TOKEN: string; + OPENCTO_TELEGRAM_CONSUMER_BOT_TOKEN?: string; OPENCTO_SLACK_BOT_TOKEN?: string; OPENCTO_SLACK_SIGNING_SECRET?: string; OPENCTO_SLACK_ALLOWED_CHANNELS?: string; + OPENCTO_DISCORD_PUBLIC_KEY?: string; + OPENCTO_DISCORD_ALLOWED_CHANNELS?: string; + OPENCTO_DISCORD_ALLOWED_GUILDS?: string; OPENCTO_INFOBIP_BASE_URL?: string; OPENCTO_INFOBIP_API_KEY?: string; OPENCTO_INFOBIP_WHATSAPP_FROM?: string; + OPENCTO_INFOBIP_WHATSAPP_TEMPLATE_NAME?: string; + OPENCTO_INFOBIP_WHATSAPP_TEMPLATE_LANGUAGE?: string; OPENCTO_INFOBIP_SMS_FROM?: string; OPENCTO_INFOBIP_WEBHOOK_TOKEN?: string; OPENCTO_VECTOR_RAG_ENABLED?: string; @@ -19,6 +25,10 @@ interface Env { OPENCTO_ANYWAY_ENABLED?: string; OPENCTO_ANYWAY_API_KEY?: string; OPENCTO_ANYWAY_ENDPOINT?: string; + OPENCTO_ANYWAY_APP_NAME?: string; + OPENCTO_SIDECAR_ENABLED?: string; + OPENCTO_SIDECAR_URL?: string; + OPENCTO_SIDECAR_TOKEN?: string; OPENCTO_KV: KVNamespace; OPENCTO_VECTOR_INDEX?: VectorizeIndex; } @@ -57,6 +67,42 @@ type InfobipInboundMessage = { messageId?: string; }; +type InfobipStatusEvent = { + from?: string; + to?: string; + messageId?: string; + status: string; + description?: string; +}; + +type DiscordInteractionOption = { + type?: number; + name?: string; + value?: string | number | boolean; + options?: DiscordInteractionOption[]; +}; + +type DiscordInteraction = { + type?: number; + id?: string; + token?: string; + application_id?: string; + guild_id?: string; + channel_id?: string; + member?: { + user?: { + id?: string; + }; + }; + user?: { + id?: string; + }; + data?: { + name?: string; + options?: DiscordInteractionOption[]; + }; +}; + const SYSTEM_PROMPT = [ "You are OpenCTO CloudBot.", "Behave like a coding/ops assistant with safe autonomous defaults.", @@ -70,6 +116,7 @@ const TASK_LIMIT = 300; const DAY_ACTIVITY_LIMIT = 500; const VECTOR_TOP_K = 6; const ANYWAY_INGEST_DEFAULT = "https://api.anyway.sh/v1/ingest"; +const WHATSAPP_FREEFORM_WINDOW_MS = 24 * 60 * 60 * 1000; type AnywaySpanStatus = { code: "OK" | "ERROR"; @@ -86,6 +133,17 @@ type AnywaySpan = { status?: AnywaySpanStatus; }; +type SidecarTraceEvent = { + channel: "telegram" | "slack" | "whatsapp" | "sms" | "discord"; + scope: string; + text: string; + direction: "user" | "assistant"; + model?: string; + attributes?: Record; +}; + +type SidecarEnqueue = (event: SidecarTraceEvent) => void; + function randomHex(length: number) { const bytes = new Uint8Array(Math.ceil(length / 2)); crypto.getRandomValues(bytes); @@ -100,14 +158,49 @@ function anywayEnabled(env: Env) { return flag === "true" && Boolean(env.OPENCTO_ANYWAY_API_KEY); } +function sidecarEnabled(env: Env) { + const flag = (env.OPENCTO_SIDECAR_ENABLED || "").toLowerCase(); + return ( + flag === "true" && + Boolean(env.OPENCTO_SIDECAR_URL) && + Boolean(env.OPENCTO_SIDECAR_TOKEN) + ); +} + +async function sendSidecarEvent(env: Env, event: SidecarTraceEvent) { + if (!sidecarEnabled(env) || !env.OPENCTO_SIDECAR_URL || !env.OPENCTO_SIDECAR_TOKEN) return; + try { + const response = await fetch(env.OPENCTO_SIDECAR_URL, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-opencto-sidecar-token": env.OPENCTO_SIDECAR_TOKEN, + }, + body: JSON.stringify(event), + }); + if (!response.ok) { + const body = (await response.text()).slice(0, 300); + console.error(`sidecar trace failed: ${response.status} ${body}`); + } + } catch { + console.error("sidecar trace failed: network error"); + } +} + class AnywayTraceBuffer { private traceId: string; private spans: AnywaySpan[] = []; private enabled: boolean; + private appName: string; constructor(private env: Env) { this.traceId = randomHex(32); this.enabled = anywayEnabled(env); + this.appName = env.OPENCTO_ANYWAY_APP_NAME || "opencto-cloudbot-worker"; + } + + getTraceId() { + return this.traceId; } start(name: string, attributes?: Record) { @@ -126,7 +219,12 @@ class AnywayTraceBuffer { name, start_time: start, end_time: nowIso(), - attributes: { ...(attributes || {}), ...(result?.attributes || {}) }, + attributes: { + "service.name": this.appName, + "app.name": this.appName, + ...(attributes || {}), + ...(result?.attributes || {}), + }, status: result?.status || { code: "OK" }, }); }, @@ -138,7 +236,7 @@ class AnywayTraceBuffer { const endpoint = this.env.OPENCTO_ANYWAY_ENDPOINT || ANYWAY_INGEST_DEFAULT; const traces = [{ trace_id: this.traceId, spans: this.spans }]; try { - await fetch(endpoint, { + const res = await fetch(endpoint, { method: "POST", headers: { Authorization: `Bearer ${this.env.OPENCTO_ANYWAY_API_KEY}`, @@ -146,22 +244,40 @@ class AnywayTraceBuffer { }, body: JSON.stringify({ traces, metrics: [] }), }); + if (!res.ok) { + const body = (await res.text()).slice(0, 300); + console.error(`anyway ingest failed: ${res.status} ${body}`); + } } catch { // Fail open: telemetry must not affect bot behavior. + console.error("anyway ingest failed: network error"); } } } +function openAITraceHeaders(trace?: AnywayTraceBuffer) { + if (!trace) return undefined; + const traceId = trace.getTraceId(); + return { + "x-opencto-trace-id": traceId, + traceparent: `00-${traceId}-${randomHex(16)}-01`, + }; +} + function tgSendUrl(token: string) { return `https://api.telegram.org/bot${token}/sendMessage`; } -async function sendTelegram(env: Env, chatId: number, text: string) { - await fetch(tgSendUrl(env.TELEGRAM_BOT_TOKEN), { +async function sendTelegram(chatId: number, text: string, token: string) { + const response = await fetch(tgSendUrl(token), { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ chat_id: chatId, text: text.slice(0, 4000) }), }); + const raw = await response.text(); + if (!response.ok) { + throw new Error(`Telegram send failed (${response.status}): ${raw.slice(0, 300)}`); + } } function sessionKey(chatId: ChatScope) { @@ -248,6 +364,13 @@ function sanitizeText(text: string, maxLen = 4000) { return text.trim().replace(/\s+/g, " ").slice(0, maxLen); } +function parseChatScopeParam(raw: string | null) { + if (!raw) return null; + const value = raw.trim(); + if (!value) return null; + return /^\d+$/.test(value) ? Number(value) : value; +} + function memoryIndexKey(chatId: ChatScope) { return `memory_index:${chatId}`; } @@ -272,6 +395,10 @@ function activityKey(chatId: ChatScope, yyyyMmDd: string) { return `activity:${chatId}:${yyyyMmDd}`; } +function infobipLastInboundKey(channel: "whatsapp" | "sms", contact: string) { + return `infobip_last_inbound:${channel}:${contact}`; +} + function makeId(prefix: string) { const rand = Math.random().toString(36).slice(2, 8); return `${prefix}_${Date.now().toString(36)}${rand}`; @@ -291,9 +418,12 @@ function vectorEnabled(env: Env) { return flag !== "false" && Boolean(env.OPENCTO_VECTOR_INDEX); } -async function embedText(env: Env, text: string) { +async function embedText(env: Env, text: string, trace?: AnywayTraceBuffer) { const model = env.OPENCTO_EMBED_MODEL || "text-embedding-3-small"; - const openai = new OpenAI({ apiKey: env.OPENAI_API_KEY }); + const openai = new OpenAI({ + apiKey: env.OPENAI_API_KEY, + defaultHeaders: openAITraceHeaders(trace), + }); const res = await openai.embeddings.create({ model, input: sanitizeText(text, 3000), @@ -534,7 +664,7 @@ async function retrieveSemanticMemories( return []; } try { - const embedding = await embedText(env, query); + const embedding = await embedText(env, query, trace); if (!embedding.length) { span?.end({ attributes: { "rag.semantic.count": 0 } }); return []; @@ -693,6 +823,23 @@ async function handleCommand(env: Env, chatId: ChatScope, text: string) { return null; } +async function runChatTurn( + env: Env, + scope: ChatScope, + text: string, + channel: "telegram" | "slack" | "whatsapp" | "sms" | "discord", + trace?: AnywayTraceBuffer, +) { + const commandReply = isCommand(text) ? await handleCommand(env, scope, text) : null; + const answer = commandReply || (await generateAssistantReply(env, scope, text, trace)); + return { + answer, + command: Boolean(commandReply), + userType: `${channel}.user`, + botType: `${channel}.bot`, + }; +} + function badRequest(message: string) { return Response.json({ ok: false, error: message }, { status: 400 }); } @@ -738,7 +885,7 @@ async function handleApi(request: Request, env: Env, url: URL) { } if (request.method === "GET" && url.pathname === "/api/tasks") { - const chatId = Number(url.searchParams.get("chatId") || "0"); + const chatId = parseChatScopeParam(url.searchParams.get("chatId")); const status = url.searchParams.get("status"); if (!chatId) return badRequest("chatId required"); const tasks = await listTasks( @@ -750,7 +897,7 @@ async function handleApi(request: Request, env: Env, url: URL) { } if (request.method === "GET" && url.pathname === "/api/activity/daily") { - const chatId = Number(url.searchParams.get("chatId") || "0"); + const chatId = parseChatScopeParam(url.searchParams.get("chatId")); const date = url.searchParams.get("date") || dayKey(); if (!chatId) return badRequest("chatId required"); const activities = await getDailyActivity(env, chatId, date, 100); @@ -763,41 +910,62 @@ async function handleApi(request: Request, env: Env, url: URL) { async function handleTelegramUpdate( env: Env, update: TelegramUpdate, + botToken: string, + botKind: "admin" | "consumer", trace?: AnywayTraceBuffer, + enqueueSidecar?: SidecarEnqueue, ): Promise { - const root = trace?.start("telegram.webhook.handle"); + const root = trace?.start("telegram.webhook.handle", { "telegram.bot_kind": botKind }); const chatId = update.message?.chat?.id; const text = update.message?.text?.trim(); + const userActivityType = botKind === "consumer" ? "telegram.consumer.user" : "telegram.user"; + const botActivityType = botKind === "consumer" ? "telegram.consumer.bot" : "telegram.bot"; + const errorActivityType = botKind === "consumer" ? "telegram.consumer.error" : "telegram.error"; if (!chatId || !text) { root?.end({ attributes: { "telegram.empty": true } }); return new Response("ok", { status: 200 }); } - await addActivity(env, chatId, "telegram.user", text.slice(0, 300)); - - const commandReply = isCommand(text) ? await handleCommand(env, chatId, text) : null; - if (commandReply) { - await sendTelegram(env, chatId, commandReply); - await addActivity(env, chatId, "telegram.bot", commandReply.slice(0, 300)); + enqueueSidecar?.({ + channel: "telegram", + scope: scopeToString(chatId), + text, + direction: "user", + attributes: { bot_kind: botKind }, + }); + await addActivity(env, chatId, userActivityType, text.slice(0, 300)); + try { + const turn = await runChatTurn(env, chatId, text, "telegram", trace); + const answer = turn.answer; + await sendTelegram(chatId, answer, botToken); + enqueueSidecar?.({ + channel: "telegram", + scope: scopeToString(chatId), + text: answer, + direction: "assistant", + model: env.OPENCTO_AGENT_MODEL || "gpt-4.1-mini", + attributes: { bot_kind: botKind, command: turn.command }, + }); + await addActivity(env, chatId, botActivityType, answer.slice(0, 300)); root?.end({ attributes: { "chat.scope": scopeToString(chatId), - "telegram.command": true, + "telegram.command": turn.command, + "telegram.bot_kind": botKind, + }, + }); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await addActivity(env, chatId, errorActivityType, message.slice(0, 350)); + root?.end({ + status: { code: "ERROR", message: "telegram reply failed" }, + attributes: { + "chat.scope": scopeToString(chatId), + "telegram.bot_kind": botKind, }, }); - return new Response("ok", { status: 200 }); } - const answer = await generateAssistantReply(env, chatId, text, trace); - await sendTelegram(env, chatId, answer); - await addActivity(env, chatId, "telegram.bot", answer.slice(0, 300)); - root?.end({ - attributes: { - "chat.scope": scopeToString(chatId), - "telegram.command": false, - }, - }); - return new Response("ok", { status: 200 }); } @@ -836,18 +1004,96 @@ function extractInfobipMessages(body: unknown): InfobipInboundMessage[] { ...(Array.isArray(payload.messages) ? payload.messages : []), ]; - return candidates - .map((raw) => { - if (!raw || typeof raw !== "object") return null; - const result = raw as Record; - const from = typeof result.from === "string" ? result.from : ""; - const to = typeof result.to === "string" ? result.to : undefined; - const text = parseInfobipText(result); - const messageId = typeof result.messageId === "string" ? result.messageId : undefined; - if (!from || !text) return null; - return { from, to, text, messageId }; - }) - .filter((x): x is InfobipInboundMessage => Boolean(x)); + const events: InfobipInboundMessage[] = []; + for (const raw of candidates) { + if (!raw || typeof raw !== "object") continue; + const result = raw as Record; + const from = typeof result.from === "string" ? result.from : ""; + const to = typeof result.to === "string" ? result.to : undefined; + const text = parseInfobipText(result); + const messageId = typeof result.messageId === "string" ? result.messageId : undefined; + if (!from || !text) continue; + events.push({ from, to, text, messageId }); + } + return events; +} + +function normalizeStatusToken(value: string) { + const token = value.toLowerCase().replace(/[^a-z0-9]+/g, "_").replace(/^_+|_+$/g, ""); + return token || "unknown"; +} + +function parseInfobipStatus(result: Record) { + const nestedStatus = result.status as Record | undefined; + const nestedMessage = result.message as Record | undefined; + const messageStatus = nestedMessage?.status as Record | undefined; + const groupName = + (typeof nestedStatus?.groupName === "string" && nestedStatus.groupName) || + (typeof messageStatus?.groupName === "string" && messageStatus.groupName) || + ""; + const name = + (typeof nestedStatus?.name === "string" && nestedStatus.name) || + (typeof messageStatus?.name === "string" && messageStatus.name) || + ""; + const fallback = + (typeof result.type === "string" && result.type) || + (typeof result.eventType === "string" && result.eventType) || + ""; + return groupName || name || fallback; +} + +function parseInfobipStatusDescription(result: Record) { + const nestedStatus = result.status as Record | undefined; + const nestedMessage = result.message as Record | undefined; + const messageStatus = nestedMessage?.status as Record | undefined; + const direct = + (typeof nestedStatus?.description === "string" && nestedStatus.description) || + (typeof messageStatus?.description === "string" && messageStatus.description) || + ""; + if (direct) return direct; + const nestedError = result.error as Record | undefined; + return typeof nestedError?.description === "string" ? nestedError.description : ""; +} + +function extractInfobipStatusEvents(body: unknown): InfobipStatusEvent[] { + if (!body || typeof body !== "object") return []; + const payload = body as Record; + const candidates = [ + ...(Array.isArray(payload.results) ? payload.results : []), + ...(Array.isArray(payload.messages) ? payload.messages : []), + ]; + + const events: InfobipStatusEvent[] = []; + for (const raw of candidates) { + if (!raw || typeof raw !== "object") continue; + const result = raw as Record; + const statusRaw = parseInfobipStatus(result); + if (!statusRaw) continue; + const status = normalizeStatusToken(statusRaw); + const from = typeof result.from === "string" ? result.from : undefined; + const to = typeof result.to === "string" ? result.to : undefined; + const messageId = typeof result.messageId === "string" ? result.messageId : undefined; + const description = parseInfobipStatusDescription(result); + events.push({ from, to, messageId, status, description }); + } + return events; +} + +function resolveInfobipContactScope( + env: Env, + channel: "whatsapp" | "sms", + from?: string, + to?: string, +) { + const configuredSender = normalizeInfobipAddress( + channel === "whatsapp" + ? env.OPENCTO_INFOBIP_WHATSAPP_FROM || "" + : env.OPENCTO_INFOBIP_SMS_FROM || "", + ); + const fromNorm = normalizeInfobipAddress(from || ""); + const toNorm = normalizeInfobipAddress(to || ""); + if (fromNorm && configuredSender && fromNorm === configuredSender && toNorm) return toNorm; + return fromNorm || toNorm || "unknown"; } function infobipWebhookAuthorized(request: Request, env: Env) { @@ -859,11 +1105,74 @@ function infobipWebhookAuthorized(request: Request, env: Env) { return token.trim() === expected; } +async function setInfobipLastInbound( + env: Env, + channel: "whatsapp" | "sms", + contact: string, + ts = nowIso(), +) { + const key = infobipLastInboundKey(channel, contact); + await putJson(env, key, { ts }); +} + +async function getInfobipLastInboundAgeMs( + env: Env, + channel: "whatsapp" | "sms", + contact: string, +) { + const key = infobipLastInboundKey(channel, contact); + const value = await getJson<{ ts?: string } | null>(env, key, null); + const rawTs = value?.ts || ""; + if (!rawTs) return null; + const then = Date.parse(rawTs); + if (!Number.isFinite(then)) return null; + return Date.now() - then; +} + +async function sendInfobipWhatsappTemplate( + env: Env, + destination: string, + text: string, + headers: Record, + baseUrl: string, +) { + const from = normalizeInfobipAddress(env.OPENCTO_INFOBIP_WHATSAPP_FROM || ""); + const templateName = (env.OPENCTO_INFOBIP_WHATSAPP_TEMPLATE_NAME || "").trim(); + if (!templateName) { + throw new Error( + "Infobip WhatsApp free-form window exceeded and OPENCTO_INFOBIP_WHATSAPP_TEMPLATE_NAME is not configured", + ); + } + const language = (env.OPENCTO_INFOBIP_WHATSAPP_TEMPLATE_LANGUAGE || "en").trim(); + const response = await fetch(`${baseUrl}/whatsapp/1/message/template`, { + method: "POST", + headers, + body: JSON.stringify({ + from, + to: destination, + content: { + templateName, + templateData: { + body: { + placeholders: [text.slice(0, 1024)], + }, + }, + language, + }, + }), + }); + const raw = await response.text(); + if (!response.ok) { + throw new Error(`Infobip WhatsApp template send failed (${response.status}): ${raw.slice(0, 300)}`); + } +} + async function sendInfobipMessage( env: Env, channel: "whatsapp" | "sms", to: string, text: string, + scopeContact?: string, ) { const destination = normalizeInfobipAddress(to); const baseUrl = normalizeInfobipBaseUrl(env); @@ -875,6 +1184,13 @@ async function sendInfobipMessage( if (channel === "whatsapp") { const from = normalizeInfobipAddress(env.OPENCTO_INFOBIP_WHATSAPP_FROM || ""); + const contact = normalizeInfobipAddress(scopeContact || to); + const ageMs = await getInfobipLastInboundAgeMs(env, channel, contact); + const withinWindow = ageMs !== null && ageMs <= WHATSAPP_FREEFORM_WINDOW_MS; + if (!withinWindow) { + await sendInfobipWhatsappTemplate(env, destination, text, headers, baseUrl); + return; + } const response = await fetch(`${baseUrl}/whatsapp/1/message/text`, { method: "POST", headers, @@ -917,6 +1233,7 @@ async function handleInfobipWebhook( env: Env, channel: "whatsapp" | "sms", trace?: AnywayTraceBuffer, + enqueueSidecar?: SidecarEnqueue, ): Promise { const root = trace?.start("infobip.webhook.handle", { "infobip.channel": channel }); if (!infobipConfigured(env, channel)) { @@ -938,26 +1255,64 @@ async function handleInfobipWebhook( const payload = (await request.json()) as unknown; const messages = extractInfobipMessages(payload); - if (!messages.length) { + const statusEvents = extractInfobipStatusEvents(payload); + if (!messages.length && !statusEvents.length) { root?.end({ attributes: { "infobip.messages": 0 } }); return new Response("ok", { status: 200 }); } + for (const event of statusEvents) { + const contact = resolveInfobipContactScope(env, channel, event.from, event.to); + const scope = `infobip:${channel}:${contact}`; + const parts = [`status=${event.status}`]; + if (event.messageId) parts.push(`messageId=${event.messageId}`); + if (event.description) parts.push(`detail=${event.description}`); + await addActivity( + env, + scope, + `infobip.${channel}.status.${event.status}`, + parts.join(" ").slice(0, 300), + ); + } + for (const msg of messages) { - const scope = `infobip:${channel}:${msg.from}`; + const contact = normalizeInfobipAddress(msg.from); + const scope = `infobip:${channel}:${contact}`; + enqueueSidecar?.({ + channel, + scope: scopeToString(scope), + text: msg.text, + direction: "user", + attributes: { contact }, + }); + await setInfobipLastInbound(env, channel, contact); await addActivity(env, scope, `infobip.${channel}.user`, msg.text.slice(0, 300)); try { - const commandReply = isCommand(msg.text) ? await handleCommand(env, scope, msg.text) : null; - const answer = commandReply || (await generateAssistantReply(env, scope, msg.text, trace)); - await sendInfobipMessage(env, channel, msg.from, answer); - await addActivity(env, scope, `infobip.${channel}.bot`, answer.slice(0, 300)); + const chatChannel = channel === "whatsapp" ? "whatsapp" : "sms"; + const turn = await runChatTurn(env, scope, msg.text, chatChannel, trace); + const answer = turn.answer; + await sendInfobipMessage(env, channel, msg.from, answer, contact); + enqueueSidecar?.({ + channel, + scope: scopeToString(scope), + text: answer, + direction: "assistant", + model: env.OPENCTO_AGENT_MODEL || "gpt-4.1-mini", + attributes: { contact, command: turn.command }, + }); + await addActivity(env, scope, turn.botType, answer.slice(0, 300)); } catch (error) { const message = error instanceof Error ? error.message : String(error); await addActivity(env, scope, `infobip.${channel}.error`, message.slice(0, 350)); } } - root?.end({ attributes: { "infobip.messages": messages.length } }); + root?.end({ + attributes: { + "infobip.messages": messages.length, + "infobip.status_events": statusEvents.length, + }, + }); return new Response("ok", { status: 200 }); } @@ -967,7 +1322,10 @@ async function generateAssistantReply( text: string, trace?: AnywayTraceBuffer, ) { - const openai = new OpenAI({ apiKey: env.OPENAI_API_KEY }); + const openai = new OpenAI({ + apiKey: env.OPENAI_API_KEY, + defaultHeaders: openAITraceHeaders(trace), + }); const session = await loadSession(env, chatId); session.push({ role: "user", content: text }); const ragContext = await buildContext(env, chatId, text, trace); @@ -1088,10 +1446,239 @@ function normalizeSlackText(text: string) { return text.replace(/<@[^>]+>\s*/g, "").trim(); } +function parseHex(input: string) { + const value = input.trim().toLowerCase(); + if (!/^[0-9a-f]+$/.test(value) || value.length % 2 !== 0) return null; + const out = new Uint8Array(value.length / 2); + for (let i = 0; i < value.length; i += 2) { + out[i / 2] = parseInt(value.slice(i, i + 2), 16); + } + return out; +} + +async function verifyDiscordRequest(request: Request, env: Env, rawBody: string) { + const publicKeyHex = (env.OPENCTO_DISCORD_PUBLIC_KEY || "").trim(); + if (!publicKeyHex) return false; + const signatureHex = (request.headers.get("x-signature-ed25519") || "").trim(); + const timestamp = (request.headers.get("x-signature-timestamp") || "").trim(); + if (!signatureHex || !timestamp) return false; + + const publicKey = parseHex(publicKeyHex); + const signature = parseHex(signatureHex); + if (!publicKey || !signature) return false; + + const key = await crypto.subtle.importKey( + "raw", + publicKey, + { name: "Ed25519" }, + false, + ["verify"], + ); + return crypto.subtle.verify("Ed25519", key, signature, utf8(`${timestamp}${rawBody}`)); +} + +function csvAllowed(raw: string | undefined, value: string | undefined) { + const v = (value || "").trim(); + if (!v) return true; + const cfg = (raw || "").trim(); + if (!cfg) return true; + const allowed = new Set(cfg.split(",").map((x) => x.trim()).filter(Boolean)); + return allowed.has(v); +} + +function discordUserId(interaction: DiscordInteraction) { + return interaction.member?.user?.id || interaction.user?.id || "unknown"; +} + +function firstStringOption(options: DiscordInteractionOption[] | undefined): string | null { + if (!options?.length) return null; + for (const option of options) { + if (typeof option.value === "string" && option.value.trim()) { + return option.value.trim(); + } + const nested = firstStringOption(option.options); + if (nested) return nested; + } + return null; +} + +function mapDiscordCommandToText(interaction: DiscordInteraction) { + const name = (interaction.data?.name || "").trim().toLowerCase(); + const arg = firstStringOption(interaction.data?.options); + if (!name) return null; + if (name === "help") return "/help"; + if (name === "tasks") return "/tasks"; + if (name === "daily") return "/daily"; + if (name === "remember") return arg ? `/remember ${arg}` : "/remember"; + if (name === "task") { + const action = (interaction.data?.options?.[0]?.name || "").trim().toLowerCase(); + if (action === "add" && arg) return `/task add ${arg}`; + if (action === "done" && arg) return `/task done ${arg}`; + return arg || "/task"; + } + return arg || null; +} + +function discordJson(payload: unknown) { + return new Response(JSON.stringify(payload), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); +} + +async function updateDiscordInteractionMessage( + applicationId: string, + interactionToken: string, + content: string, +) { + const response = await fetch( + `https://discord.com/api/v10/webhooks/${applicationId}/${interactionToken}/messages/@original`, + { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ content: content.slice(0, 1800) }), + }, + ); + if (!response.ok) { + const raw = await response.text(); + throw new Error(`discord message update failed (${response.status}): ${raw.slice(0, 300)}`); + } +} + +async function handleDiscordWebhook( + request: Request, + env: Env, + ctx: ExecutionContext, + trace?: AnywayTraceBuffer, + enqueueSidecar?: SidecarEnqueue, +): Promise { + const root = trace?.start("discord.webhook.handle"); + const raw = await request.text(); + const ok = await verifyDiscordRequest(request, env, raw); + if (!ok) { + root?.end({ + status: { code: "ERROR", message: "invalid signature" }, + attributes: { "discord.signature.valid": false }, + }); + return new Response("invalid signature", { status: 401 }); + } + + const interaction = JSON.parse(raw) as DiscordInteraction; + const interactionType = interaction.type || 0; + if (interactionType === 1) { + root?.end({ attributes: { "discord.ping": true } }); + return discordJson({ type: 1 }); + } + if (interactionType !== 2) { + root?.end({ attributes: { "discord.unsupported": interactionType } }); + return discordJson({ + type: 4, + data: { + content: "Unsupported Discord interaction type.", + flags: 64, + }, + }); + } + + if (!csvAllowed(env.OPENCTO_DISCORD_ALLOWED_GUILDS, interaction.guild_id)) { + root?.end({ attributes: { "discord.allowed_guild": false } }); + return discordJson({ + type: 4, + data: { + content: "This Discord server is not allowed for this bot.", + flags: 64, + }, + }); + } + if (!csvAllowed(env.OPENCTO_DISCORD_ALLOWED_CHANNELS, interaction.channel_id)) { + root?.end({ attributes: { "discord.allowed_channel": false } }); + return discordJson({ + type: 4, + data: { + content: "This Discord channel is not allowed for this bot.", + flags: 64, + }, + }); + } + + const text = mapDiscordCommandToText(interaction); + if (!text) { + root?.end({ attributes: { "discord.empty_text": true } }); + return discordJson({ + type: 4, + data: { + content: + "No prompt found. Use a command with text input, e.g. `/chat prompt:`.", + flags: 64, + }, + }); + } + + const guildId = interaction.guild_id || "dm"; + const channelId = interaction.channel_id || "unknown"; + const userId = discordUserId(interaction); + const scope = `discord:${guildId}:${channelId}:${userId}`; + const applicationId = interaction.application_id || ""; + const interactionToken = interaction.token || ""; + const model = env.OPENCTO_AGENT_MODEL || "gpt-4.1-mini"; + + enqueueSidecar?.({ + channel: "discord", + scope: scopeToString(scope), + text, + direction: "user", + attributes: { guild_id: guildId, channel_id: channelId, user_id: userId }, + }); + await addActivity(env, scope, "discord.user", text.slice(0, 300), nowIso()); + root?.end({ attributes: { "chat.scope": scopeToString(scope) } }); + + ctx.waitUntil( + (async () => { + try { + const turn = await runChatTurn(env, scope, text, "discord", trace); + await updateDiscordInteractionMessage(applicationId, interactionToken, turn.answer); + enqueueSidecar?.({ + channel: "discord", + scope: scopeToString(scope), + text: turn.answer, + direction: "assistant", + model, + attributes: { + guild_id: guildId, + channel_id: channelId, + user_id: userId, + command: turn.command, + }, + }); + await addActivity(env, scope, turn.botType, turn.answer.slice(0, 300), nowIso()); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + await addActivity(env, scope, "discord.error", message.slice(0, 350), nowIso()); + if (applicationId && interactionToken) { + try { + await updateDiscordInteractionMessage( + applicationId, + interactionToken, + "I hit an error while processing that request.", + ); + } catch { + // no-op + } + } + } finally { + await trace?.flush(); + } + })(), + ); + + return discordJson({ type: 5 }); +} + async function handleSlackWebhook( request: Request, env: Env, trace?: AnywayTraceBuffer, + enqueueSidecar?: SidecarEnqueue, ): Promise { const root = trace?.start("slack.webhook.handle"); const raw = await request.text(); @@ -1156,17 +1743,36 @@ async function handleSlackWebhook( } const scope = `slack:${team}:${event.channel}:${threadTs}`; + enqueueSidecar?.({ + channel: "slack", + scope: scopeToString(scope), + text, + direction: "user", + attributes: { channel_id: event.channel, thread_ts: threadTs }, + }); await addActivity(env, scope, "slack.user", text.slice(0, 300), nowIso()); - const commandReply = isCommand(text) ? await handleCommand(env, scope, text) : null; - const answer = commandReply || (await generateAssistantReply(env, scope, text, trace)); + const turn = await runChatTurn(env, scope, text, "slack", trace); + const answer = turn.answer; await sendSlackMessage(env, event.channel, answer, threadTs); + enqueueSidecar?.({ + channel: "slack", + scope: scopeToString(scope), + text: answer, + direction: "assistant", + model: env.OPENCTO_AGENT_MODEL || "gpt-4.1-mini", + attributes: { + channel_id: event.channel, + thread_ts: threadTs, + command: turn.command, + }, + }); await markSlackThreadActive(env, team, event.channel, threadTs); - await addActivity(env, scope, "slack.bot", answer.slice(0, 300), nowIso()); + await addActivity(env, scope, turn.botType, answer.slice(0, 300), nowIso()); root?.end({ attributes: { "chat.scope": scopeToString(scope), - "slack.command": Boolean(commandReply), + "slack.command": turn.command, }, }); return new Response("ok", { status: 200 }); @@ -1175,19 +1781,28 @@ async function handleSlackWebhook( export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { const url = new URL(request.url); + const enqueueSidecar: SidecarEnqueue = (event) => { + ctx.waitUntil(sendSidecarEvent(env, event)); + }; if (request.method === "GET" && url.pathname === "/health") { return Response.json({ ok: true, service: "opencto-cloudbot-worker", + telegram_admin_configured: Boolean(env.TELEGRAM_BOT_TOKEN), + telegram_consumer_configured: Boolean(env.OPENCTO_TELEGRAM_CONSUMER_BOT_TOKEN), slack_configured: Boolean( env.OPENCTO_SLACK_BOT_TOKEN && env.OPENCTO_SLACK_SIGNING_SECRET, ), + discord_configured: Boolean(env.OPENCTO_DISCORD_PUBLIC_KEY), vector_rag_enabled: vectorEnabled(env), vector_bound: Boolean(env.OPENCTO_VECTOR_INDEX), embed_model: env.OPENCTO_EMBED_MODEL || "text-embedding-3-small", anyway_enabled: anywayEnabled(env), anyway_endpoint: env.OPENCTO_ANYWAY_ENDPOINT || ANYWAY_INGEST_DEFAULT, + anyway_app_name: env.OPENCTO_ANYWAY_APP_NAME || "opencto-cloudbot-worker", + sidecar_enabled: sidecarEnabled(env), + sidecar_url: env.OPENCTO_SIDECAR_URL || null, infobip_whatsapp_configured: infobipConfigured(env, "whatsapp"), infobip_sms_configured: infobipConfigured(env, "sms"), }); @@ -1196,25 +1811,68 @@ export default { if (request.method === "POST" && url.pathname === "/webhook/telegram") { const trace = new AnywayTraceBuffer(env); const payload = (await request.json()) as TelegramUpdate; - const response = await handleTelegramUpdate(env, payload, trace); + const response = await handleTelegramUpdate( + env, + payload, + env.TELEGRAM_BOT_TOKEN, + "admin", + trace, + enqueueSidecar, + ); + ctx.waitUntil(trace.flush()); + return response; + } + if (request.method === "POST" && url.pathname === "/webhook/telegram-consumer") { + if (!env.OPENCTO_TELEGRAM_CONSUMER_BOT_TOKEN) { + return Response.json( + { ok: false, error: "consumer telegram bot token not configured" }, + { status: 503 }, + ); + } + const trace = new AnywayTraceBuffer(env); + const payload = (await request.json()) as TelegramUpdate; + const response = await handleTelegramUpdate( + env, + payload, + env.OPENCTO_TELEGRAM_CONSUMER_BOT_TOKEN, + "consumer", + trace, + enqueueSidecar, + ); ctx.waitUntil(trace.flush()); return response; } if (request.method === "POST" && url.pathname === "/webhook/slack") { const trace = new AnywayTraceBuffer(env); - const response = await handleSlackWebhook(request, env, trace); + const response = await handleSlackWebhook(request, env, trace, enqueueSidecar); ctx.waitUntil(trace.flush()); return response; } + if (request.method === "POST" && url.pathname === "/webhook/discord") { + const trace = new AnywayTraceBuffer(env); + return handleDiscordWebhook(request, env, ctx, trace, enqueueSidecar); + } if (request.method === "POST" && url.pathname === "/webhook/infobip/whatsapp") { const trace = new AnywayTraceBuffer(env); - const response = await handleInfobipWebhook(request, env, "whatsapp", trace); + const response = await handleInfobipWebhook( + request, + env, + "whatsapp", + trace, + enqueueSidecar, + ); ctx.waitUntil(trace.flush()); return response; } if (request.method === "POST" && url.pathname === "/webhook/infobip/sms") { const trace = new AnywayTraceBuffer(env); - const response = await handleInfobipWebhook(request, env, "sms", trace); + const response = await handleInfobipWebhook( + request, + env, + "sms", + trace, + enqueueSidecar, + ); ctx.waitUntil(trace.flush()); return response; } From e9cb4f2f467327988d3ef7887ba44720ed360828 Mon Sep 17 00:00:00 2001 From: chilu18 Date: Thu, 5 Mar 2026 23:45:03 +0000 Subject: [PATCH 4/5] fix(codeql): replace regex normalizers with linear-time sanitizers --- opencto/opencto-cloudbot-worker/src/index.ts | 20 +++++++++++++++++++- opencto/opencto-sdk/src/client.ts | 10 +++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/opencto/opencto-cloudbot-worker/src/index.ts b/opencto/opencto-cloudbot-worker/src/index.ts index f4c8881..bba43d2 100644 --- a/opencto/opencto-cloudbot-worker/src/index.ts +++ b/opencto/opencto-cloudbot-worker/src/index.ts @@ -1019,7 +1019,25 @@ function extractInfobipMessages(body: unknown): InfobipInboundMessage[] { } function normalizeStatusToken(value: string) { - const token = value.toLowerCase().replace(/[^a-z0-9]+/g, "_").replace(/^_+|_+$/g, ""); + const lowered = value.toLowerCase(); + let token = ""; + let lastWasUnderscore = false; + for (let i = 0; i < lowered.length; i += 1) { + const ch = lowered.charCodeAt(i); + const isAlphaNum = (ch >= 97 && ch <= 122) || (ch >= 48 && ch <= 57); + if (isAlphaNum) { + token += lowered[i]; + lastWasUnderscore = false; + continue; + } + if (!lastWasUnderscore && token.length > 0) { + token += "_"; + lastWasUnderscore = true; + } + } + if (token.endsWith("_")) { + token = token.slice(0, -1); + } return token || "unknown"; } diff --git a/opencto/opencto-sdk/src/client.ts b/opencto/opencto-sdk/src/client.ts index 1323455..6a8aedd 100644 --- a/opencto/opencto-sdk/src/client.ts +++ b/opencto/opencto-sdk/src/client.ts @@ -8,7 +8,7 @@ export class OpenCTOClient { private readonly timeoutMs: number constructor(options: OpenCTOClientOptions) { - this.baseUrl = options.baseUrl.replace(/\/+$/, '') + this.baseUrl = trimTrailingSlashes(options.baseUrl) this.token = options.token this.headers = options.headers ?? {} this.fetchImpl = options.fetchImpl ?? fetch @@ -66,3 +66,11 @@ export class OpenCTOClient { } } } + +function trimTrailingSlashes(input: string): string { + let end = input.length + while (end > 0 && input.charCodeAt(end - 1) === 47) { + end -= 1 + } + return input.slice(0, end) +} From 01d35afd8112d05921ea75bfb80a334bd6e10f93 Mon Sep 17 00:00:00 2001 From: chilu18 Date: Fri, 6 Mar 2026 02:40:05 +0000 Subject: [PATCH 5/5] chore: switch anyway tracing to sandbox collector and disable fallback ingest --- opencto/opencto-cloudbot-worker/README.md | 16 +- .../sidecar/Dockerfile | 12 + .../opencto-cloudbot-worker/sidecar/README.md | 68 ++++++ .../opencto-cloudbot-worker/sidecar/app.py | 218 ++++++++++++++++++ .../sidecar/requirements.txt | 3 + opencto/opencto-cloudbot-worker/src/index.ts | 20 +- opencto/opencto-cloudbot-worker/wrangler.toml | 9 +- 7 files changed, 339 insertions(+), 7 deletions(-) create mode 100644 opencto/opencto-cloudbot-worker/sidecar/Dockerfile create mode 100644 opencto/opencto-cloudbot-worker/sidecar/README.md create mode 100644 opencto/opencto-cloudbot-worker/sidecar/app.py create mode 100644 opencto/opencto-cloudbot-worker/sidecar/requirements.txt diff --git a/opencto/opencto-cloudbot-worker/README.md b/opencto/opencto-cloudbot-worker/README.md index 02bfea7..fc1935a 100644 --- a/opencto/opencto-cloudbot-worker/README.md +++ b/opencto/opencto-cloudbot-worker/README.md @@ -1,6 +1,6 @@ # OpenCTO CloudBot Worker -Cloudflare Worker powering OpenCTO across Telegram, Slack, WhatsApp, and SMS. +Cloudflare Worker powering OpenCTO across Telegram, Slack, Discord, WhatsApp, and SMS. It runs a shared AI orchestration path with persistent memory, task management, daily activity logs, and optional semantic RAG. ## Architecture @@ -10,6 +10,7 @@ flowchart LR TG1[Telegram Admin Bot] --> W[OpenCTO CloudBot Worker] TG2[Telegram Consumer Bot] --> W SL[Slack Events API] --> W + DC[Discord Interactions API] --> W IB[Infobip Webhooks
WhatsApp/SMS + Status] --> W W --> OA[OpenAI Responses API] @@ -37,6 +38,7 @@ flowchart LR - Telegram: admin webhook at `/webhook/telegram`, consumer webhook at `/webhook/telegram-consumer`; each replies with its own bot token. - Slack: verifies signature/timestamp, handles mentions/DMs/active thread replies, and keeps thread context in KV. +- Discord: verifies Ed25519 request signature, accepts Interactions at `/webhook/discord`, and reuses shared command/chat logic. - Infobip WhatsApp/SMS: inbound message webhooks, delivery/read callback logging, 24h WhatsApp free-form window, and template fallback outside the window. ## Data Model (KV / Vectorize) @@ -65,7 +67,11 @@ OPENCTO_AGENT_MODEL = "gpt-4.1-mini" OPENCTO_VECTOR_RAG_ENABLED = "true" OPENCTO_EMBED_MODEL = "text-embedding-3-small" OPENCTO_ANYWAY_ENABLED = "true" -OPENCTO_ANYWAY_ENDPOINT = "https://api.anyway.sh/v1/ingest" +OPENCTO_ANYWAY_ENDPOINT = "https://trace-dev-collector.anyway.sh/" +OPENCTO_SIDECAR_ENABLED = "true" +OPENCTO_SIDECAR_URL = "https://anyway-sdk-sidecar.opencto.works/trace/event" +OPENCTO_DISCORD_ALLOWED_CHANNELS = "123456789012345678,234567890123456789" +OPENCTO_DISCORD_ALLOWED_GUILDS = "345678901234567890" OPENCTO_INFOBIP_BASE_URL = "https://.api.infobip.com" OPENCTO_INFOBIP_WHATSAPP_FROM = "" @@ -84,9 +90,11 @@ wrangler secret put TELEGRAM_BOT_TOKEN wrangler secret put OPENCTO_TELEGRAM_CONSUMER_BOT_TOKEN wrangler secret put OPENCTO_SLACK_BOT_TOKEN wrangler secret put OPENCTO_SLACK_SIGNING_SECRET +wrangler secret put OPENCTO_DISCORD_PUBLIC_KEY wrangler secret put OPENCTO_INFOBIP_API_KEY wrangler secret put OPENCTO_INFOBIP_WEBHOOK_TOKEN wrangler secret put OPENCTO_ANYWAY_API_KEY +wrangler secret put OPENCTO_SIDECAR_TOKEN wrangler secret put OPENCTO_ADMIN_TOKEN ``` @@ -125,6 +133,9 @@ curl -sS "https://api.telegram.org/bot/setWebhook?url=https:///webhook/slack` - Slack required events: `app_mention`, `message.channels`, `message.groups`, `message.im` - Slack bot scope: `chat:write` +- Discord Interactions Endpoint URL: `https:///webhook/discord` +- Discord secret required: `OPENCTO_DISCORD_PUBLIC_KEY` (from Discord Developer Portal) +- Optional hardening: set `OPENCTO_DISCORD_ALLOWED_GUILDS` and `OPENCTO_DISCORD_ALLOWED_CHANNELS` (comma-separated IDs) - Infobip WhatsApp webhook: `https:///webhook/infobip/whatsapp?token=` - Infobip SMS webhook: `https:///webhook/infobip/sms?token=` @@ -158,4 +169,5 @@ If `OPENCTO_ADMIN_TOKEN` is set, include `x-opencto-admin-token` on `/api/*`. ## Notes - Tracing is fail-open: telemetry failures never block channel replies. +- Sidecar forwarding is fail-open and uses `OPENCTO_SIDECAR_TOKEN` via `x-opencto-sidecar-token`. - Python tracing sidecar lives in `sidecar/`. diff --git a/opencto/opencto-cloudbot-worker/sidecar/Dockerfile b/opencto/opencto-cloudbot-worker/sidecar/Dockerfile new file mode 100644 index 0000000..71ebf2b --- /dev/null +++ b/opencto/opencto-cloudbot-worker/sidecar/Dockerfile @@ -0,0 +1,12 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY app.py /app/app.py + +EXPOSE 8788 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8788"] diff --git a/opencto/opencto-cloudbot-worker/sidecar/README.md b/opencto/opencto-cloudbot-worker/sidecar/README.md new file mode 100644 index 0000000..3e1abfe --- /dev/null +++ b/opencto/opencto-cloudbot-worker/sidecar/README.md @@ -0,0 +1,68 @@ +# OpenCTO Anyway Python Sidecar + +FastAPI sidecar that initializes `anyway-sdk` (`Traceloop.init`) and exposes a simple HTTP endpoint to create workflow/task traces. + +## 1) Install + +```bash +cd sidecar +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +## 2) Configure + +Required: + +```bash +export ANYWAY_API_KEY="" +``` + +Optional: + +```bash +export OPENCTO_ANYWAY_SIDECAR_APP_NAME="opencto-cloudbot-sidecar" +export OPENCTO_ANYWAY_SIDECAR_ENDPOINT="https://trace-dev-collector.anyway.sh/" +export OPENCTO_SIDECAR_TOKEN="" +export OPENCTO_ANYWAY_FALLBACK_ENABLED="false" +export OPENCTO_ANYWAY_INGEST_ENDPOINT="https://trace-dev-collector.anyway.sh/v1/ingest" +``` + +Notes: +- For `trace-dev-collector.anyway.sh`, keep `OPENCTO_ANYWAY_FALLBACK_ENABLED=false`. +- The sidecar already defaults fallback off for this sandbox collector if the env var is unset. + +## 3) Run + +```bash +uvicorn app:app --host 0.0.0.0 --port 8788 +``` + +## 4) Test + +Health: + +```bash +curl -sS http://127.0.0.1:8788/health +``` + +Send event: + +```bash +curl -sS -X POST http://127.0.0.1:8788/trace/event \ + -H "Content-Type: application/json" \ + -H "x-opencto-sidecar-token: " \ + -d '{ + "channel":"telegram", + "scope":"telegram:1110137791", + "text":"hello from sidecar", + "direction":"user", + "model":"gpt-4.1-mini", + "attributes":{"source":"cloudbot-worker"} + }' +``` + +Auth note: +- Preferred: `x-opencto-sidecar-token: ` +- Also accepted: `Authorization: Bearer ` diff --git a/opencto/opencto-cloudbot-worker/sidecar/app.py b/opencto/opencto-cloudbot-worker/sidecar/app.py new file mode 100644 index 0000000..222197f --- /dev/null +++ b/opencto/opencto-cloudbot-worker/sidecar/app.py @@ -0,0 +1,218 @@ +import logging +import os +import secrets +import json +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen +from datetime import datetime, timezone +from typing import Any + +from fastapi import FastAPI, Header, HTTPException +from pydantic import BaseModel, Field + +from anyway.sdk import Traceloop +from anyway.sdk.decorators import task, workflow + + +APP_NAME = os.getenv("OPENCTO_ANYWAY_SIDECAR_APP_NAME", "opencto-cloudbot-sidecar") +COLLECTOR_ENDPOINT = os.getenv( + "OPENCTO_ANYWAY_SIDECAR_ENDPOINT", + "https://trace-dev-collector.anyway.sh/", +) +ANYWAY_API_KEY = os.getenv("ANYWAY_API_KEY") or os.getenv("TRACELOOP_API_KEY") +SIDECAR_TOKEN = os.getenv("OPENCTO_SIDECAR_TOKEN", "").strip() +INGEST_ENDPOINT = os.getenv("OPENCTO_ANYWAY_INGEST_ENDPOINT", "https://trace-dev-collector.anyway.sh/v1/ingest") + + +def _parse_bool(value: str | None) -> bool | None: + if value is None: + return None + normalized = value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + return None + + +def _is_sandbox_collector(endpoint: str) -> bool: + return "trace-dev-collector.anyway.sh" in endpoint + + +fallback_override = _parse_bool(os.getenv("OPENCTO_ANYWAY_FALLBACK_ENABLED")) +if fallback_override is not None: + FALLBACK_ENABLED = fallback_override +else: + # Sandbox collector handles OTLP exporter directly; disable fallback ingest noise by default. + FALLBACK_ENABLED = not _is_sandbox_collector(COLLECTOR_ENDPOINT) + +# Reduce noisy exporter failures when collector endpoint is unreachable. +os.environ.setdefault("TRACELOOP_METRICS_ENABLED", "false") +os.environ.setdefault("TRACELOOP_LOGGING_ENABLED", "false") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("opencto-anyway-sidecar") + +init_error: str | None = None + +if ANYWAY_API_KEY: + try: + Traceloop.init( + app_name=APP_NAME, + api_endpoint=COLLECTOR_ENDPOINT, + headers={"authorization": f"Bearer {ANYWAY_API_KEY}"}, + disable_batch=False, + ) + logger.info("Traceloop initialized for app_name=%s", APP_NAME) + except Exception as exc: # pragma: no cover + init_error = str(exc) + logger.exception("Failed to initialize Traceloop") +else: + init_error = "Missing ANYWAY_API_KEY (or TRACELOOP_API_KEY)" + logger.warning(init_error) + + +class TraceEvent(BaseModel): + channel: str = Field(..., examples=["telegram"]) + scope: str = Field(..., examples=["telegram:12345"]) + text: str + direction: str = Field(default="user") + model: str | None = None + attributes: dict[str, Any] = Field(default_factory=dict) + timestamp: str = Field( + default_factory=lambda: datetime.now(timezone.utc).isoformat(), + ) + + +@task(name="capture_message") +def capture_message(event: TraceEvent) -> dict[str, Any]: + return { + "channel": event.channel, + "scope": event.scope, + "direction": event.direction, + "model": event.model, + "text_len": len(event.text), + "attributes": event.attributes, + "timestamp": event.timestamp, + } + + +@workflow(name="opencto_cloudbot_message") +def create_trace(event: TraceEvent) -> dict[str, Any]: + return capture_message(event) + + +def ingest_fallback(event: TraceEvent) -> None: + if not FALLBACK_ENABLED or not ANYWAY_API_KEY: + return + ts = event.timestamp or datetime.now(timezone.utc).isoformat() + span_attributes: dict[str, Any] = { + "service.name": APP_NAME, + "app.name": APP_NAME, + "channel": event.channel, + "scope": event.scope, + "direction": event.direction, + "text_len": len(event.text), + } + if event.model: + span_attributes["llm.model"] = event.model + if event.attributes: + for k, v in event.attributes.items(): + span_attributes[f"app.attr.{k}"] = v + + payload = { + "traces": [ + { + "trace_id": secrets.token_hex(16), + "spans": [ + { + "span_id": secrets.token_hex(8), + "name": "opencto.sidecar.message", + "start_time": ts, + "end_time": ts, + "attributes": span_attributes, + "status": {"code": "OK"}, + } + ], + } + ], + "metrics": [], + } + body = json.dumps(payload).encode("utf-8") + auth_variants = [f"Bearer {ANYWAY_API_KEY}", ANYWAY_API_KEY] + last_http_error: int | None = None + for auth_value in auth_variants: + req = Request( + INGEST_ENDPOINT, + data=body, + headers={ + "Content-Type": "application/json", + "Authorization": auth_value, + }, + method="POST", + ) + try: + with urlopen(req, timeout=10) as resp: + if resp.status < 400: + return + last_http_error = resp.status + except HTTPError as exc: + last_http_error = exc.code + except URLError as exc: + logger.warning("Fallback ingest network error=%s", exc.reason) + return + logger.warning("Fallback ingest failed after auth variants, status=%s", last_http_error) + + +app = FastAPI(title="OpenCTO Anyway Sidecar", version="0.1.0") + + +def _extract_bearer_token(value: str | None) -> str | None: + if not value: + return None + v = value.strip() + if not v: + return None + if v.lower().startswith("bearer "): + return v[7:].strip() or None + return v + + +def check_auth(token: str | None, authorization: str | None = None) -> None: + if not SIDECAR_TOKEN: + return + provided = token or _extract_bearer_token(authorization) + if provided != SIDECAR_TOKEN: + raise HTTPException( + status_code=401, + detail="invalid sidecar token; use x-opencto-sidecar-token or Authorization: Bearer ", + ) + + +@app.get("/health") +def health() -> dict[str, Any]: + return { + "ok": init_error is None, + "service": "opencto-anyway-sidecar", + "app_name": APP_NAME, + "collector_endpoint": COLLECTOR_ENDPOINT, + "ingest_endpoint": INGEST_ENDPOINT, + "fallback_enabled": FALLBACK_ENABLED, + "sdk_initialized": init_error is None, + "error": init_error, + "auth_required": bool(SIDECAR_TOKEN), + } + + +@app.post("/trace/event") +def trace_event( + event: TraceEvent, + x_opencto_sidecar_token: str | None = Header(default=None), + authorization: str | None = Header(default=None), +) -> dict[str, Any]: + check_auth(x_opencto_sidecar_token, authorization) + if init_error: + raise HTTPException(status_code=503, detail=init_error) + result = create_trace(event) + ingest_fallback(event) + return {"ok": True, "trace": result} diff --git a/opencto/opencto-cloudbot-worker/sidecar/requirements.txt b/opencto/opencto-cloudbot-worker/sidecar/requirements.txt new file mode 100644 index 0000000..082e6fa --- /dev/null +++ b/opencto/opencto-cloudbot-worker/sidecar/requirements.txt @@ -0,0 +1,3 @@ +anyway-sdk==0.0.6 +fastapi==0.116.1 +uvicorn[standard]==0.35.0 diff --git a/opencto/opencto-cloudbot-worker/src/index.ts b/opencto/opencto-cloudbot-worker/src/index.ts index bba43d2..746a22a 100644 --- a/opencto/opencto-cloudbot-worker/src/index.ts +++ b/opencto/opencto-cloudbot-worker/src/index.ts @@ -115,7 +115,7 @@ const MEMORY_LIMIT = 300; const TASK_LIMIT = 300; const DAY_ACTIVITY_LIMIT = 500; const VECTOR_TOP_K = 6; -const ANYWAY_INGEST_DEFAULT = "https://api.anyway.sh/v1/ingest"; +const ANYWAY_INGEST_DEFAULT = "https://trace-dev-collector.anyway.sh/v1/ingest"; const WHATSAPP_FREEFORM_WINDOW_MS = 24 * 60 * 60 * 1000; type AnywaySpanStatus = { @@ -167,6 +167,20 @@ function sidecarEnabled(env: Env) { ); } +function resolveAnywayIngestEndpoint(raw: string | undefined): string { + const candidate = (raw || "").trim(); + if (!candidate) return ANYWAY_INGEST_DEFAULT; + try { + const url = new URL(candidate); + if (url.pathname === "/" || url.pathname === "") { + url.pathname = "/v1/ingest"; + } + return url.toString(); + } catch { + return candidate; + } +} + async function sendSidecarEvent(env: Env, event: SidecarTraceEvent) { if (!sidecarEnabled(env) || !env.OPENCTO_SIDECAR_URL || !env.OPENCTO_SIDECAR_TOKEN) return; try { @@ -233,7 +247,7 @@ class AnywayTraceBuffer { async flush() { if (!this.enabled || !this.spans.length || !this.env.OPENCTO_ANYWAY_API_KEY) return; - const endpoint = this.env.OPENCTO_ANYWAY_ENDPOINT || ANYWAY_INGEST_DEFAULT; + const endpoint = resolveAnywayIngestEndpoint(this.env.OPENCTO_ANYWAY_ENDPOINT); const traces = [{ trace_id: this.traceId, spans: this.spans }]; try { const res = await fetch(endpoint, { @@ -1817,7 +1831,7 @@ export default { vector_bound: Boolean(env.OPENCTO_VECTOR_INDEX), embed_model: env.OPENCTO_EMBED_MODEL || "text-embedding-3-small", anyway_enabled: anywayEnabled(env), - anyway_endpoint: env.OPENCTO_ANYWAY_ENDPOINT || ANYWAY_INGEST_DEFAULT, + anyway_endpoint: resolveAnywayIngestEndpoint(env.OPENCTO_ANYWAY_ENDPOINT), anyway_app_name: env.OPENCTO_ANYWAY_APP_NAME || "opencto-cloudbot-worker", sidecar_enabled: sidecarEnabled(env), sidecar_url: env.OPENCTO_SIDECAR_URL || null, diff --git a/opencto/opencto-cloudbot-worker/wrangler.toml b/opencto/opencto-cloudbot-worker/wrangler.toml index 6a53cd6..8f453c8 100644 --- a/opencto/opencto-cloudbot-worker/wrangler.toml +++ b/opencto/opencto-cloudbot-worker/wrangler.toml @@ -8,8 +8,13 @@ OPENCTO_AGENT_MODEL = "gpt-4.1-mini" OPENCTO_APPROVAL_REQUIRED = "true" OPENCTO_VECTOR_RAG_ENABLED = "true" OPENCTO_EMBED_MODEL = "text-embedding-3-small" -OPENCTO_ANYWAY_ENABLED = "false" -OPENCTO_ANYWAY_ENDPOINT = "https://api.anyway.sh/v1/ingest" +OPENCTO_ANYWAY_ENABLED = "true" +OPENCTO_ANYWAY_ENDPOINT = "https://trace-dev-collector.anyway.sh/" +OPENCTO_ANYWAY_APP_NAME = "opencto-cloudbot-worker" +OPENCTO_SIDECAR_ENABLED = "true" +OPENCTO_SIDECAR_URL = "https://anyway-sdk-sidecar.opencto.works/trace/event" +OPENCTO_DISCORD_ALLOWED_CHANNELS = "" +OPENCTO_DISCORD_ALLOWED_GUILDS = "1477259119588016241" OPENCTO_INFOBIP_BASE_URL = "https://gr6pre.api.infobip.com" OPENCTO_INFOBIP_WHATSAPP_FROM = "+447307810053" OPENCTO_INFOBIP_SMS_FROM = "+447307810053"