diff --git a/.gitignore b/.gitignore index 577a4f19..717b2186 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ node_modules/ # aider .aider* +# claude +.claude/ + # eslint cache .eslintcache diff --git a/bun.lock b/bun.lock index 20e895e7..9ece8757 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "copilot-api", diff --git a/src/lib/error.ts b/src/lib/error.ts index c39c2259..61cc293c 100644 --- a/src/lib/error.ts +++ b/src/lib/error.ts @@ -3,20 +3,57 @@ import type { ContentfulStatusCode } from "hono/utils/http-status" import consola from "consola" +import { RateLimitError } from "./retry" + export class HTTPError extends Error { response: Response + errorBody?: string - constructor(message: string, response: Response) { + constructor(message: string, response: Response, errorBody?: string) { super(message) this.response = response + this.errorBody = errorBody } } export async function forwardError(c: Context, error: unknown) { consola.error("Error occurred:", error) + // Handle rate limit errors with detailed retry information + if (error instanceof RateLimitError) { + const retryAfter = error.retryInfo.retryAfter + const message = + error.retryInfo.exceeded ? + `Rate limit exceeded: ${error.retryInfo.exceeded}. Retry after ${retryAfter} seconds.` + : `Rate limit exceeded. Retry after ${retryAfter} seconds.` + + return c.json( + { + error: { + message, + type: "rate_limit_error", + retry_after: retryAfter, + exceeded: error.retryInfo.exceeded, + }, + }, + 429, + { + "Retry-After": retryAfter.toString(), + }, + ) + } + if (error instanceof HTTPError) { - const errorText = await error.response.text() + // Use cached error body if available, otherwise try to read it + let errorText = error.errorBody + if (!errorText) { + try { + errorText = await error.response.text() + } catch { + errorText = "Failed to read error body" + } + } + let errorJson: unknown try { errorJson = JSON.parse(errorText) diff --git a/src/lib/queue.ts b/src/lib/queue.ts new file mode 100644 index 00000000..c8988254 --- /dev/null +++ b/src/lib/queue.ts @@ -0,0 +1,331 @@ +import consola from "consola" + +import { addJitter, isRetryableError, RateLimitError } from "./retry" + +interface QueueItem { + execute: () => Promise + resolve: (value: T) => void + reject: (error: unknown) => void + timestamp: number + retryCount: number +} + +export class RequestQueue { + private queue: Array> = [] + private processing = false + private rateLimitMs: number + private lastProcessedTime = 0 + private maxRetries = 5 // Maximum number of retries for rate limit errors + private requestTimeout = 60000 // 60s timeout per request + + // Adaptive rate limiting + private successfulRequestsInRow = 0 + private rateLimitHitsInWindow = 0 + private rateLimitWindowStart = Date.now() + private readonly rateLimitWindowMs = 60000 // 1 minute window + private readonly successThresholdToDecrease = 20 // Decrease after 20 successful requests (conservative) + private readonly minRateLimitMs = 100 // Minimum 100ms between requests + private readonly decreaseFactor = 0.95 // Decrease by 5% when successful (conservative) + private readonly maxRateLimitMs = 60000 // Maximum 60s between requests + + constructor(rateLimitSeconds?: number) { + // Smart default: 1 second if not specified (adaptive rate limiting enabled) + // Use 0 to explicitly disable rate limiting + this.rateLimitMs = + rateLimitSeconds !== undefined ? rateLimitSeconds * 1000 : 1000 // 1 second default + } + + async enqueue(execute: () => Promise): Promise { + // Log warning if queue is getting large, but never reject + if (this.queue.length > 100) { + consola.warn( + `Queue depth high: ${this.queue.length} requests waiting. Consider rate limiting.`, + ) + } + + // If no rate limit is set, execute immediately with retry handling + if (this.rateLimitMs === 0) { + return this.executeWithRetry(execute, 0) + } + + return new Promise((resolve, reject) => { + this.queue.push({ + execute: execute as () => Promise, + resolve: resolve as (value: unknown) => void, + reject, + timestamp: Date.now(), + retryCount: 0, + }) + + consola.debug(`Request queued. Queue size: ${this.queue.length}`) + + // Start processing if not already processing + if (!this.processing) { + void this.processQueue() + } + }) + } + + private async executeWithRetry( + execute: () => Promise, + retryCount: number, + ): Promise { + try { + // Execute with timeout + return await this.executeWithTimeout(execute) + } catch (error) { + // Check if error is retryable + if (!isRetryableError(error)) { + consola.debug("Non-retryable error, failing immediately") + throw error + } + + if (retryCount >= this.maxRetries) { + consola.error(`Max retries (${this.maxRetries}) exceeded`) + throw error + } + + // Handle rate limit errors with automatic retry + if (error instanceof RateLimitError) { + // Track rate limit hits for adaptive adjustment + this.trackRateLimitHit() + + // Add jitter to prevent thundering herd + const delayWithJitter = addJitter(error.retryInfo.retryAfter) + const waitTimeMs = delayWithJitter * 1000 + + consola.warn( + `Rate limit hit (attempt ${retryCount + 1}/${this.maxRetries}). Waiting ${delayWithJitter.toFixed(1)}s before retry...`, + ) + + // Adaptively adjust rate limit based on 429 frequency + this.adjustRateLimitUp(error.retryInfo.retryAfter) + + await new Promise((resolve) => setTimeout(resolve, waitTimeMs)) + + consola.info(`Retrying request after rate limit wait...`) + return this.executeWithRetry(execute, retryCount + 1) + } + + // Handle other retryable errors (network errors, timeouts, 5xx) + // Use exponential backoff with jitter + const baseDelay = 1 // 1 second base + const exponentialDelay = baseDelay * 2 ** retryCount // 1s, 2s, 4s, 8s, 16s + const delayWithJitter = addJitter(exponentialDelay) + const waitTimeMs = delayWithJitter * 1000 + + const errorMessage = + error instanceof Error ? error.message : String(error) + consola.warn( + `Transient error (attempt ${retryCount + 1}/${this.maxRetries}): ${errorMessage}. Waiting ${delayWithJitter.toFixed(1)}s before retry...`, + ) + + await new Promise((resolve) => setTimeout(resolve, waitTimeMs)) + + consola.info(`Retrying request after transient error...`) + return this.executeWithRetry(execute, retryCount + 1) + } + } + + private async executeWithTimeout(execute: () => Promise): Promise { + return Promise.race([ + execute(), + new Promise((_, reject) => + setTimeout( + () => + reject(new Error(`Request timeout after ${this.requestTimeout}ms`)), + this.requestTimeout, + ), + ), + ]) + } + + private async processQueue(): Promise { + if (this.processing) return + this.processing = true + + while (this.queue.length > 0) { + const now = Date.now() + const timeSinceLastRequest = now - this.lastProcessedTime + + // Wait if we need to respect rate limit + if ( + this.lastProcessedTime > 0 + && timeSinceLastRequest < this.rateLimitMs + ) { + const waitTime = this.rateLimitMs - timeSinceLastRequest + consola.info( + `Rate limit: waiting ${Math.ceil(waitTime / 1000)}s before processing next request (${this.queue.length} in queue)`, + ) + await new Promise((resolve) => setTimeout(resolve, waitTime)) + } + + const item = this.queue.shift() + if (!item) break + + const queueTime = Date.now() - item.timestamp + if (queueTime > 1000) { + consola.debug(`Request waited ${Math.ceil(queueTime / 1000)}s in queue`) + } + + try { + consola.debug( + `Processing request (${this.queue.length} remaining in queue)`, + ) + const result = await this.executeWithRetry( + item.execute, + item.retryCount, + ) + item.resolve(result) + this.lastProcessedTime = Date.now() + + // Track successful request for adaptive rate limit decrease + this.trackSuccessfulRequest() + } catch (error) { + // executeWithRetry already handles retries, so if we get here, all retries failed + consola.error("Request failed after all retries:", error) + item.reject(error) + this.lastProcessedTime = Date.now() + + // Reset success counter on failure + this.successfulRequestsInRow = 0 + } + } + + this.processing = false + consola.debug("Queue processing completed") + } + + getQueueSize(): number { + return this.queue.length + } + + getCurrentRateLimitSeconds(): number { + return this.rateLimitMs / 1000 + } + + getLastProcessedTime(): number { + return this.lastProcessedTime + } + + updateRateLimit(rateLimitSeconds?: number): void { + this.rateLimitMs = rateLimitSeconds ? rateLimitSeconds * 1000 : 0 + consola.info( + rateLimitSeconds ? + `Rate limit updated to ${rateLimitSeconds}s` + : "Rate limit disabled", + ) + } + + /** + * Track a rate limit hit (429 response) for adaptive adjustment + */ + private trackRateLimitHit(): void { + const now = Date.now() + + // Reset window if expired + if (now - this.rateLimitWindowStart > this.rateLimitWindowMs) { + this.rateLimitHitsInWindow = 0 + this.rateLimitWindowStart = now + } + + this.rateLimitHitsInWindow++ + this.successfulRequestsInRow = 0 // Reset success counter + } + + /** + * Adjust rate limit UP (slow down) when hitting 429s + * Always applies buffer - more aggressive if we're hitting many 429s in a short time + */ + private adjustRateLimitUp(retryAfterSeconds: number): void { + const suggestedRateLimitMs = retryAfterSeconds * 1000 + + // Always add buffer when hitting rate limits - be more conservative + // Use tiered approach: more hits = bigger buffer + let adjustedRateLimitMs: number + let bufferPercent: number + + if (this.rateLimitHitsInWindow >= 3) { + // 3+ hits: be very conservative - add 75% buffer + adjustedRateLimitMs = suggestedRateLimitMs * 1.75 + bufferPercent = 75 + } else if (this.rateLimitHitsInWindow >= 2) { + // 2+ hits: be quite conservative - add 50% buffer + adjustedRateLimitMs = suggestedRateLimitMs * 1.5 + bufferPercent = 50 + } else { + // First hit: add 25% buffer immediately + adjustedRateLimitMs = suggestedRateLimitMs * 1.25 + bufferPercent = 25 + } + + consola.debug( + `Rate limit hit ${this.rateLimitHitsInWindow} in last minute, adding ${bufferPercent}% buffer`, + ) + + // Only increase if the new limit is higher + if (adjustedRateLimitMs > this.rateLimitMs) { + const oldLimit = (this.rateLimitMs / 1000).toFixed(1) + const newLimit = (adjustedRateLimitMs / 1000).toFixed(1) + + // Cap at maximum + this.rateLimitMs = Math.min(adjustedRateLimitMs, this.maxRateLimitMs) + + consola.info( + `Rate limit increased: ${oldLimit}s → ${newLimit}s (${this.rateLimitHitsInWindow} hits in last minute, ${bufferPercent}% buffer)`, + ) + } + } + + /** + * Get adaptive decrease strategy based on distance from rate limit + * More aggressive when far from limit, more cautious when close + */ + private getDecreaseStrategy(): { threshold: number; factor: number } { + if (this.rateLimitMs > 10000) { + // Far from limit (>10s): be aggressive + return { threshold: 10, factor: 0.9 } // 10% decrease after 10 successes + } + if (this.rateLimitMs > 2000) { + // Medium distance (2-10s): be moderate + return { threshold: 15, factor: 0.93 } // 7% decrease after 15 successes + } + // Close to limit (<2s): be very cautious (use defaults) + return { threshold: 20, factor: 0.95 } // 5% decrease after 20 successes + } + + /** + * Track a successful request and potentially decrease rate limit (speed up) + * Uses adaptive strategy based on distance from rate limit + */ + private trackSuccessfulRequest(): void { + this.successfulRequestsInRow++ + + // Get adaptive strategy based on current rate limit + const strategy = this.getDecreaseStrategy() + + // Only decrease if we have a rate limit set and we've had enough successes + if ( + this.rateLimitMs > this.minRateLimitMs + && this.successfulRequestsInRow >= strategy.threshold + ) { + const oldLimit = (this.rateLimitMs / 1000).toFixed(1) + + // Decrease rate limit using adaptive factor + this.rateLimitMs = Math.max( + this.minRateLimitMs, + this.rateLimitMs * strategy.factor, + ) + + const newLimit = (this.rateLimitMs / 1000).toFixed(1) + const decreasePercent = ((1 - strategy.factor) * 100).toFixed(0) + + consola.info( + `Rate limit decreased: ${oldLimit}s → ${newLimit}s (${this.successfulRequestsInRow} consecutive successes, ${decreasePercent}% decrease)`, + ) + + // Reset counter after adjustment + this.successfulRequestsInRow = 0 + } + } +} diff --git a/src/lib/rate-limit-headers.ts b/src/lib/rate-limit-headers.ts new file mode 100644 index 00000000..c9f30331 --- /dev/null +++ b/src/lib/rate-limit-headers.ts @@ -0,0 +1,49 @@ +import type { Context } from "hono" + +import type { State } from "./state" + +/** + * Add rate limit headers to response + * These headers inform clients about rate limiting status + */ +export function addRateLimitHeaders(c: Context, state: State): void { + const queue = state.requestQueue + const rateLimitSeconds = queue.getCurrentRateLimitSeconds() + + // X-RateLimit-Limit: Maximum requests per period + // If rate limit is set, it's 1 request per N seconds, so limit = 60/N per minute + if (rateLimitSeconds > 0) { + const limit = Math.floor(60 / rateLimitSeconds) + c.header("X-RateLimit-Limit", limit.toString()) + } + + // X-RateLimit-Remaining: Requests remaining (based on queue depth) + // If queue is empty, remaining = limit; otherwise, it's decreasing + const queueSize = queue.getQueueSize() + if (rateLimitSeconds > 0) { + const limit = Math.floor(60 / rateLimitSeconds) + const remaining = Math.max(0, limit - queueSize) + c.header("X-RateLimit-Remaining", remaining.toString()) + } else { + // No rate limit, always "unlimited" + c.header("X-RateLimit-Remaining", "1000") + } + + // X-RateLimit-Reset: Unix timestamp when rate limit resets + // Calculate based on last processed time + rate limit interval + const lastProcessed = queue.getLastProcessedTime() + if (rateLimitSeconds > 0 && lastProcessed > 0) { + const resetTime = Math.floor( + (lastProcessed + rateLimitSeconds * 1000) / 1000, + ) + c.header("X-RateLimit-Reset", resetTime.toString()) + } + + // X-Queue-Depth: Custom header showing current queue size + c.header("X-Queue-Depth", queueSize.toString()) + + // Retry-After: Only set if queue is large (suggest client to slow down) + if (queueSize > 50 && rateLimitSeconds > 0) { + c.header("Retry-After", Math.ceil(rateLimitSeconds).toString()) + } +} diff --git a/src/lib/rate-limit.ts b/src/lib/rate-limit.ts index e41f5829..f72a8354 100644 --- a/src/lib/rate-limit.ts +++ b/src/lib/rate-limit.ts @@ -5,6 +5,23 @@ import type { State } from "./state" import { HTTPError } from "./error" import { sleep } from "./utils" +/** + * Execute a request with rate limiting using the request queue. + * Requests are automatically queued and processed at the configured rate limit. + * @param state - Application state containing the request queue + * @param execute - The async function to execute + * @returns The result of the executed function + */ +export async function executeWithRateLimit( + state: State, + execute: () => Promise, +): Promise { + return state.requestQueue.enqueue(execute) +} + +/** + * @deprecated Use executeWithRateLimit instead for better queue-based rate limiting + */ export async function checkRateLimit(state: State) { if (state.rateLimitSeconds === undefined) return diff --git a/src/lib/request-cache.ts b/src/lib/request-cache.ts new file mode 100644 index 00000000..37bcbd56 --- /dev/null +++ b/src/lib/request-cache.ts @@ -0,0 +1,117 @@ +import consola from "consola" +import { createHash } from "node:crypto" + +interface CacheEntry { + response: T + timestamp: number +} + +/** + * Simple in-memory cache for request deduplication + * Caches identical requests to reduce GitHub API calls + */ +export class RequestCache { + private cache = new Map>() + private readonly ttlMs: number + private readonly maxSize: number + + constructor(ttlSeconds = 30, maxSize = 1000) { + this.ttlMs = ttlSeconds * 1000 + this.maxSize = maxSize + } + + /** + * Generate cache key from request payload + */ + private generateKey(payload: unknown): string { + const hash = createHash("sha256") + hash.update(JSON.stringify(payload)) + return hash.digest("hex") + } + + /** + * Check if cache entry is expired + */ + private isExpired(entry: CacheEntry): boolean { + return Date.now() - entry.timestamp > this.ttlMs + } + + /** + * Get cached response if available and not expired + */ + get(payload: unknown): unknown { + const key = this.generateKey(payload) + const entry = this.cache.get(key) + + if (!entry) { + return null + } + + if (this.isExpired(entry)) { + this.cache.delete(key) + consola.debug(`Cache expired for key: ${key.slice(0, 8)}...`) + return null + } + + consola.debug( + `Cache hit for key: ${key.slice(0, 8)}... (age: ${Math.round((Date.now() - entry.timestamp) / 1000)}s)`, + ) + return entry.response + } + + /** + * Store response in cache + */ + set(payload: unknown, response: unknown): void { + // Evict old entries if cache is full + if (this.cache.size >= this.maxSize) { + const oldestKey = this.cache.keys().next().value + if (oldestKey) { + this.cache.delete(oldestKey) + consola.debug("Cache full, evicted oldest entry") + } + } + + const key = this.generateKey(payload) + this.cache.set(key, { + response, + timestamp: Date.now(), + }) + consola.debug(`Cached response for key: ${key.slice(0, 8)}...`) + } + + /** + * Clear all expired entries + */ + cleanup(): void { + let count = 0 + for (const [key, entry] of this.cache.entries()) { + if (this.isExpired(entry)) { + this.cache.delete(key) + count++ + } + } + if (count > 0) { + consola.debug(`Cleaned up ${count} expired cache entries`) + } + } + + /** + * Get cache statistics + */ + getStats(): { size: number; maxSize: number; ttlSeconds: number } { + return { + size: this.cache.size, + maxSize: this.maxSize, + ttlSeconds: this.ttlMs / 1000, + } + } + + /** + * Clear entire cache + */ + clear(): void { + this.cache.clear() + consola.info("Cache cleared") + } +} diff --git a/src/lib/retry.ts b/src/lib/retry.ts new file mode 100644 index 00000000..aaf0f89c --- /dev/null +++ b/src/lib/retry.ts @@ -0,0 +1,155 @@ +import consola from "consola" + +export interface RetryInfo { + retryAfter: number // seconds to wait + exceeded?: string // what limit was exceeded +} + +/** + * Add jitter to a delay to prevent thundering herd + * @param delaySeconds - Base delay in seconds + * @param jitterPercent - Jitter percentage (0.1 = ±10%) + * @returns Delay with jitter applied in seconds + */ +export function addJitter(delaySeconds: number, jitterPercent = 0.2): number { + const jitter = delaySeconds * jitterPercent * (Math.random() - 0.5) * 2 + return Math.max(0.1, delaySeconds + jitter) +} + +/** + * Parse Retry-After header from response + * Can be either seconds (number) or HTTP date (string) + */ +export function parseRetryAfter(response: Response): RetryInfo | null { + const retryAfter = response.headers.get("retry-after") + const exceeded = response.headers.get("x-ratelimit-exceeded") + const userRetryAfter = response.headers.get("x-ratelimit-user-retry-after") + + // Prefer x-ratelimit-user-retry-after if available + const retryValue = userRetryAfter || retryAfter + + if (!retryValue) { + return null + } + + // Try parsing as number (seconds) + const retrySeconds = Number.parseInt(retryValue, 10) + if (!Number.isNaN(retrySeconds)) { + return { + retryAfter: retrySeconds, + exceeded: exceeded || undefined, + } + } + + // Try parsing as HTTP date + const retryDate = new Date(retryValue) + if (!Number.isNaN(retryDate.getTime())) { + const secondsUntilRetry = Math.max( + 0, + Math.ceil((retryDate.getTime() - Date.now()) / 1000), + ) + return { + retryAfter: secondsUntilRetry, + exceeded: exceeded || undefined, + } + } + + return null +} + +/** + * Rate limit error with retry information + */ +export class RateLimitError extends Error { + retryInfo: RetryInfo + + constructor(message: string, retryInfo: RetryInfo) { + super(message) + this.name = "RateLimitError" + this.retryInfo = retryInfo + } +} + +/** + * Check if an HTTP status code indicates a transient error that should be retried + */ +export function isTransientError(statusCode: number): boolean { + // Retry on: + // - 429 (rate limit - handled specially) + // - 500 (internal server error) + // - 502 (bad gateway) + // - 503 (service unavailable) + // - 504 (gateway timeout) + return ( + statusCode === 429 + || statusCode === 500 + || statusCode === 502 + || statusCode === 503 + || statusCode === 504 + ) +} + +/** + * Check if an error is retryable (network errors, timeouts, transient errors) + */ +export function isRetryableError(error: unknown): boolean { + // Rate limit errors are always retryable + if (error instanceof RateLimitError) { + return true + } + + // Check HTTPError status codes + // Note: We need to import HTTPError here, but to avoid circular deps, + // we'll check for the response property instead + if ( + error + && typeof error === "object" + && "response" in error + && error.response instanceof Response + ) { + return isTransientError(error.response.status) + } + + // Timeout errors are retryable + if (error instanceof Error && error.message.includes("timeout")) { + return true + } + + // Network errors are retryable (ECONNRESET, ETIMEDOUT, etc.) + if ( + error instanceof Error + && (error.message.includes("ECONNRESET") + || error.message.includes("ETIMEDOUT") + || error.message.includes("ENOTFOUND") + || error.message.includes("ECONNREFUSED") + || error.message.includes("fetch failed")) + ) { + return true + } + + return false +} + +/** + * Check if a response is a rate limit error and parse retry info + */ +export function checkRateLimitError(response: Response): RateLimitError | null { + if (response.status !== 429) { + return null + } + + const retryInfo = parseRetryAfter(response) + if (!retryInfo) { + // 429 without retry info + return new RateLimitError("Rate limit exceeded", { retryAfter: 60 }) + } + + let message = `Rate limit exceeded. Retry after ${retryInfo.retryAfter}s` + if (retryInfo.exceeded) { + message += ` (${retryInfo.exceeded})` + } + + consola.warn(message) + + return new RateLimitError(message, retryInfo) +} diff --git a/src/lib/state.ts b/src/lib/state.ts index 5ba4dc1d..6815f900 100644 --- a/src/lib/state.ts +++ b/src/lib/state.ts @@ -1,5 +1,8 @@ import type { ModelsResponse } from "~/services/copilot/get-models" +import { RequestQueue } from "./queue" +import { RequestCache } from "./request-cache" + export interface State { githubToken?: string copilotToken?: string @@ -15,6 +18,10 @@ export interface State { // Rate limiting configuration rateLimitSeconds?: number lastRequestTimestamp?: number + requestQueue: RequestQueue + + // Request caching for deduplication + requestCache: RequestCache } export const state: State = { @@ -22,4 +29,6 @@ export const state: State = { manualApprove: false, rateLimitWait: false, showToken: false, + requestQueue: new RequestQueue(), + requestCache: new RequestCache(30, 1000), // 30s TTL, max 1000 entries } diff --git a/src/routes/chat-completions/handler.ts b/src/routes/chat-completions/handler.ts index 04a5ae9e..ebea33ae 100644 --- a/src/routes/chat-completions/handler.ts +++ b/src/routes/chat-completions/handler.ts @@ -4,7 +4,8 @@ import consola from "consola" import { streamSSE, type SSEMessage } from "hono/streaming" import { awaitApproval } from "~/lib/approval" -import { checkRateLimit } from "~/lib/rate-limit" +import { executeWithRateLimit } from "~/lib/rate-limit" +import { addRateLimitHeaders } from "~/lib/rate-limit-headers" import { state } from "~/lib/state" import { getTokenCount } from "~/lib/tokenizer" import { isNullish } from "~/lib/utils" @@ -15,51 +16,69 @@ import { } from "~/services/copilot/create-chat-completions" export async function handleCompletion(c: Context) { - await checkRateLimit(state) + return executeWithRateLimit(state, async () => { + let payload = await c.req.json() + consola.debug("Request payload:", JSON.stringify(payload).slice(-400)) - let payload = await c.req.json() - consola.debug("Request payload:", JSON.stringify(payload).slice(-400)) + // Find the selected model + const selectedModel = state.models?.data.find( + (model) => model.id === payload.model, + ) - // Find the selected model - const selectedModel = state.models?.data.find( - (model) => model.id === payload.model, - ) - - // Calculate and display token count - try { - if (selectedModel) { - const tokenCount = await getTokenCount(payload, selectedModel) - consola.info("Current token count:", tokenCount) - } else { - consola.warn("No model selected, skipping token count calculation") + // Calculate and display token count + try { + if (selectedModel) { + const tokenCount = await getTokenCount(payload, selectedModel) + consola.info("Current token count:", tokenCount) + } else { + consola.warn("No model selected, skipping token count calculation") + } + } catch (error) { + consola.warn("Failed to calculate token count:", error) } - } catch (error) { - consola.warn("Failed to calculate token count:", error) - } - if (state.manualApprove) await awaitApproval() + if (state.manualApprove) await awaitApproval() + + if (isNullish(payload.max_tokens)) { + payload = { + ...payload, + max_tokens: selectedModel?.capabilities.limits.max_output_tokens, + } + consola.debug("Set max_tokens to:", JSON.stringify(payload.max_tokens)) + } - if (isNullish(payload.max_tokens)) { - payload = { - ...payload, - max_tokens: selectedModel?.capabilities.limits.max_output_tokens, + // Check cache for non-streaming requests + if (!payload.stream) { + const cachedResponse = state.requestCache.get( + payload, + ) as ChatCompletionResponse | null + if (cachedResponse) { + // Add rate limit headers even for cached responses + addRateLimitHeaders(c, state) + return c.json(cachedResponse) + } } - consola.debug("Set max_tokens to:", JSON.stringify(payload.max_tokens)) - } - const response = await createChatCompletions(payload) + const response = await createChatCompletions(payload) - if (isNonStreaming(response)) { - consola.debug("Non-streaming response:", JSON.stringify(response)) - return c.json(response) - } + // Add rate limit headers to response + addRateLimitHeaders(c, state) - consola.debug("Streaming response") - return streamSSE(c, async (stream) => { - for await (const chunk of response) { - consola.debug("Streaming chunk:", JSON.stringify(chunk)) - await stream.writeSSE(chunk as SSEMessage) + if (isNonStreaming(response)) { + // Cache non-streaming responses + state.requestCache.set(payload, response) + + consola.debug("Non-streaming response:", JSON.stringify(response)) + return c.json(response) } + + consola.debug("Streaming response") + return streamSSE(c, async (stream) => { + for await (const chunk of response) { + consola.debug("Streaming chunk:", JSON.stringify(chunk)) + await stream.writeSSE(chunk as SSEMessage) + } + }) }) } diff --git a/src/routes/messages/handler.ts b/src/routes/messages/handler.ts index 85dbf624..130a76cd 100644 --- a/src/routes/messages/handler.ts +++ b/src/routes/messages/handler.ts @@ -4,7 +4,8 @@ import consola from "consola" import { streamSSE } from "hono/streaming" import { awaitApproval } from "~/lib/approval" -import { checkRateLimit } from "~/lib/rate-limit" +import { executeWithRateLimit } from "~/lib/rate-limit" +import { addRateLimitHeaders } from "~/lib/rate-limit-headers" import { state } from "~/lib/state" import { createChatCompletions, @@ -23,66 +24,88 @@ import { import { translateChunkToAnthropicEvents } from "./stream-translation" export async function handleCompletion(c: Context) { - await checkRateLimit(state) + return executeWithRateLimit(state, async () => { + const anthropicPayload = await c.req.json() + consola.debug( + "Anthropic request payload:", + JSON.stringify(anthropicPayload), + ) - const anthropicPayload = await c.req.json() - consola.debug("Anthropic request payload:", JSON.stringify(anthropicPayload)) + const openAIPayload = translateToOpenAI(anthropicPayload) + consola.debug( + "Translated OpenAI request payload:", + JSON.stringify(openAIPayload), + ) - const openAIPayload = translateToOpenAI(anthropicPayload) - consola.debug( - "Translated OpenAI request payload:", - JSON.stringify(openAIPayload), - ) + // Check cache for non-streaming requests + if (!openAIPayload.stream) { + const cachedResponse = state.requestCache.get( + openAIPayload, + ) as ChatCompletionResponse | null + if (cachedResponse) { + // Add rate limit headers even for cached responses + addRateLimitHeaders(c, state) + const anthropicResponse = translateToAnthropic(cachedResponse) + return c.json(anthropicResponse) + } + } - if (state.manualApprove) { - await awaitApproval() - } + if (state.manualApprove) { + await awaitApproval() + } - const response = await createChatCompletions(openAIPayload) + const response = await createChatCompletions(openAIPayload) - if (isNonStreaming(response)) { - consola.debug( - "Non-streaming response from Copilot:", - JSON.stringify(response).slice(-400), - ) - const anthropicResponse = translateToAnthropic(response) - consola.debug( - "Translated Anthropic response:", - JSON.stringify(anthropicResponse), - ) - return c.json(anthropicResponse) - } - - consola.debug("Streaming response from Copilot") - return streamSSE(c, async (stream) => { - const streamState: AnthropicStreamState = { - messageStartSent: false, - contentBlockIndex: 0, - contentBlockOpen: false, - toolCalls: {}, + // Add rate limit headers to response + addRateLimitHeaders(c, state) + + if (isNonStreaming(response)) { + // Cache non-streaming responses + state.requestCache.set(openAIPayload, response) + + consola.debug( + "Non-streaming response from Copilot:", + JSON.stringify(response).slice(-400), + ) + const anthropicResponse = translateToAnthropic(response) + consola.debug( + "Translated Anthropic response:", + JSON.stringify(anthropicResponse), + ) + return c.json(anthropicResponse) } - for await (const rawEvent of response) { - consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent)) - if (rawEvent.data === "[DONE]") { - break + consola.debug("Streaming response from Copilot") + return streamSSE(c, async (stream) => { + const streamState: AnthropicStreamState = { + messageStartSent: false, + contentBlockIndex: 0, + contentBlockOpen: false, + toolCalls: {}, } - if (!rawEvent.data) { - continue - } + for await (const rawEvent of response) { + consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent)) + if (rawEvent.data === "[DONE]") { + break + } - const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk - const events = translateChunkToAnthropicEvents(chunk, streamState) + if (!rawEvent.data) { + continue + } - for (const event of events) { - consola.debug("Translated Anthropic event:", JSON.stringify(event)) - await stream.writeSSE({ - event: event.type, - data: JSON.stringify(event), - }) + const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk + const events = translateChunkToAnthropicEvents(chunk, streamState) + + for (const event of events) { + consola.debug("Translated Anthropic event:", JSON.stringify(event)) + await stream.writeSSE({ + event: event.type, + data: JSON.stringify(event), + }) + } } - } + }) }) } diff --git a/src/services/copilot/create-chat-completions.ts b/src/services/copilot/create-chat-completions.ts index 8534151d..7d76755f 100644 --- a/src/services/copilot/create-chat-completions.ts +++ b/src/services/copilot/create-chat-completions.ts @@ -3,6 +3,7 @@ import { events } from "fetch-event-stream" import { copilotHeaders, copilotBaseUrl } from "~/lib/api-config" import { HTTPError } from "~/lib/error" +import { checkRateLimitError } from "~/lib/retry" import { state } from "~/lib/state" export const createChatCompletions = async ( @@ -35,8 +36,45 @@ export const createChatCompletions = async ( }) if (!response.ok) { - consola.error("Failed to create chat completions", response) - throw new HTTPError("Failed to create chat completions", response) + // Check if this is a rate limit error (429) + const rateLimitError = checkRateLimitError(response) + if (rateLimitError) { + throw rateLimitError + } + + // Log detailed error information for other errors + consola.error( + `Failed to create chat completions: ${response.status} ${response.statusText}`, + ) + + // Log all response headers + const responseHeaders: Record = {} + for (const [key, value] of response.headers.entries()) { + responseHeaders[key] = value + } + consola.error("Response headers:", JSON.stringify(responseHeaders, null, 2)) + + // Try to parse and log the error body, and store it for later use + let errorBodyText: string | undefined + try { + const errorBody = await response.json() + errorBodyText = JSON.stringify(errorBody) + consola.error("Error body:", errorBodyText) + } catch { + // Try to read as text if JSON parsing fails + try { + errorBodyText = await response.text() + consola.error("Error body:", errorBodyText || null) + } catch { + consola.error("Could not read error body") + } + } + + throw new HTTPError( + "Failed to create chat completions", + response, + errorBodyText, + ) } if (payload.stream) { diff --git a/src/start.ts b/src/start.ts index 14abbbdf..19208815 100644 --- a/src/start.ts +++ b/src/start.ts @@ -47,10 +47,16 @@ export async function runServer(options: RunServerOptions): Promise { state.rateLimitWait = options.rateLimitWait state.showToken = options.showToken + // Initialize request queue with rate limit (only if explicitly provided) + if (options.rateLimit !== undefined) { + state.requestQueue.updateRateLimit(options.rateLimit) + } + await ensurePaths() await cacheVSCodeVersion() if (options.githubToken) { + // eslint-disable-next-line require-atomic-updates state.githubToken = options.githubToken consola.info("Using provided GitHub token") } else { @@ -152,14 +158,15 @@ export const start = defineCommand({ "rate-limit": { alias: "r", type: "string", - description: "Rate limit in seconds between requests", + description: + "Rate limit in seconds between requests (default: 1s with adaptive adjustment, use 0 to disable)", }, wait: { alias: "w", type: "boolean", default: false, description: - "Wait instead of error when rate limit is hit. Has no effect if rate limit is not set", + "Wait instead of error when rate limit is hit. Only applies when --rate-limit is set", }, "github-token": { alias: "g",