diff --git a/src/index.ts b/src/index.ts index 0a2b63f..b835267 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,8 @@ import { import packageJson from "../package.json" with { type: "json" }; import { maybeNotifyNewVersion } from "./update-check.js"; +const rateLimitState = new Map(); + function trimTrailingZeros(num: string): string { if (!num.includes(".")) return num; const trimmed = num.replace(/(?:\.0+|(\.\d*?)0+)$/, "$1"); @@ -327,6 +329,31 @@ export async function callOpenRouter( const reasoningPref = reasoningEffortPref ? { effort: reasoningEffortPref } : undefined; + const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const maxRetries = 5; + const rateKey = `${apiBase}|${model}`; + const parseResetMs = (value: string | null): number | null => { + if (!value) return null; + const num = Number(value); + if (!Number.isFinite(num)) return null; + return num < 1_000_000_000_000 ? num * 1000 : num; + }; + const updateRateLimit = (resetMs: number | null) => { + if (!resetMs) return; + const next = resetMs + 250; + const current = rateLimitState.get(rateKey); + if (current === undefined || next > current) { + rateLimitState.set(rateKey, next); + } + }; + const waitForRateLimit = async () => { + const now = Date.now(); + const waitUntil = rateLimitState.get(rateKey); + if (waitUntil && waitUntil > now) { + await sleep(waitUntil - now); + } + }; + const controller = typeof timeout === "number" && timeout > 0 ? new AbortController() : undefined; const timeoutId = controller && typeof timeout === "number" && timeout > 0 @@ -334,44 +361,65 @@ export async function callOpenRouter( : undefined; try { - const res = await fetch(url, { - method: "POST", - headers: { - Authorization: `Bearer ${apiKey}`, - "Content-Type": "application/json" - }, - body: JSON.stringify({ - model, - messages, - ...(reasoningPref ? { reasoning: reasoningPref } : {}), - ...(providerPref ? { provider: providerPref } : {}) - }), - signal: controller?.signal - }); + let lastError: unknown; + for (let attempt = 0; attempt <= maxRetries; attempt++) { + await waitForRateLimit(); + const res = await fetch(url, { + method: "POST", + headers: { + Authorization: `Bearer ${apiKey}`, + "Content-Type": "application/json" + }, + body: JSON.stringify({ + model, + messages, + ...(reasoningPref ? { reasoning: reasoningPref } : {}), + ...(providerPref ? { provider: providerPref } : {}) + }), + signal: controller?.signal + }); + + if (timeoutId !== undefined) clearTimeout(timeoutId); + + if (!res.ok) { + if (res.status === 429 && attempt < maxRetries) { + const resetMs = parseResetMs(res.headers?.get?.("x-ratelimit-reset") ?? null); + updateRateLimit(resetMs); + const nowMs = Date.now(); + const waitMs = resetMs && resetMs > nowMs ? resetMs - nowMs + 250 : 1000 * (attempt + 1); + await sleep(waitMs); + continue; + } + const text = await res.text().catch(() => ""); + throw new Error(`OpenRouter error ${res.status}: ${text}`); + } - if (timeoutId !== undefined) clearTimeout(timeoutId); + const remainingRaw = res.headers?.get?.("x-ratelimit-remaining") ?? null; + const remaining = remainingRaw ? Number(remainingRaw) : NaN; + if (Number.isFinite(remaining) && remaining <= 0) { + const resetMs = parseResetMs(res.headers?.get?.("x-ratelimit-reset") ?? null); + updateRateLimit(resetMs); + } - if (!res.ok) { - const text = await res.text().catch(() => ""); - throw new Error(`OpenRouter error ${res.status}: ${text}`); - } + const data = (await res.json()) as any; + const choice = data?.choices?.[0]; + const message = choice?.message ?? choice?.delta ?? {}; + const content = message?.content ?? ""; + const reasoning = + message?.reasoning ?? + choice?.reasoning ?? + data?.reasoning ?? + undefined; + const usage = data?.usage as OpenRouterUsage | undefined; + + if (typeof content !== "string" || !content.length) { + throw new Error("No assistant content returned from OpenRouter."); + } - const data = (await res.json()) as any; - const choice = data?.choices?.[0]; - const message = choice?.message ?? choice?.delta ?? {}; - const content = message?.content ?? ""; - const reasoning = - message?.reasoning ?? - choice?.reasoning ?? - data?.reasoning ?? - undefined; - const usage = data?.usage as OpenRouterUsage | undefined; - - if (typeof content !== "string" || !content.length) { - throw new Error("No assistant content returned from OpenRouter."); + return { content, reasoning, usage }; } - return { content, reasoning, usage }; + throw lastError ?? new Error("OpenRouter rate limit retries exhausted."); } catch (err: any) { if (timeoutId !== undefined) clearTimeout(timeoutId); if (err?.name === "AbortError") { @@ -484,6 +532,7 @@ export async function main(argv = process.argv.slice(2)) { } let requestKeys = [apiKey]; let spawnedKeyHashes: string[] = []; + let stopScheduling = false; if (useFreeKeys) { const baseName = `datagen-${Date.now()}`; try { @@ -526,7 +575,6 @@ export async function main(argv = process.argv.slice(2)) { }`; writeLine(msg); }; - let cleanupStarted = false; const cleanupKeys = async () => { if (!useFreeKeys) return; const hashes = new Set(spawnedKeyHashes); @@ -535,17 +583,7 @@ export async function main(argv = process.argv.slice(2)) { await deleteKeyWithRetry(hash); } }; - const handleTermination = () => { - if (cleanupStarted) return; - cleanupStarted = true; - writeLine("Cleaning up processes and exiting."); - void cleanupKeys().finally(() => { - process.exit(1); - }); - }; - process.once("SIGINT", handleTermination); - process.once("SIGTERM", handleTermination); - process.once("SIGQUIT", handleTermination); + let rl: ReturnType | null = null; const pricingKey = requestKeys[0]; const providerPref = isOpenRouter && (openrouterProviderOrder || openrouterProviderSort) @@ -597,12 +635,19 @@ export async function main(argv = process.argv.slice(2)) { } } - const rl = createInterface({ + rl = createInterface({ input: createReadStream(absPromptsPath), crlfDelay: Infinity }); + const promptReader = rl; const out = createWriteStream(absOutPath, { flags: "w" }); + let outputClosed = false; + const closeOutput = () => { + if (outputClosed) return; + out.end(); + outputClosed = true; + }; let lineNum = 0; let completed = 0; @@ -620,6 +665,31 @@ export async function main(argv = process.argv.slice(2)) { }); const inFlight = new Set>(); + const waitForActiveRequests = async () => { + while (inFlight.size > 0) { + await Promise.race(inFlight); + } + }; + + let shuttingDown = false; + const initiateShutdown = () => { + if (shuttingDown) return; + shuttingDown = true; + stopScheduling = true; + writeLine("Received termination signal. Waiting for in-flight requests to finish before cleanup."); + rl?.close(); + void (async () => { + await waitForActiveRequests(); + writeLine("All in-flight requests have been safely exited, cleaning up keys..."); + await cleanupKeys(); + closeOutput(); + process.exit(1); + })(); + }; + + process.once("SIGINT", initiateShutdown); + process.once("SIGTERM", initiateShutdown); + process.once("SIGQUIT", initiateShutdown); let writeQueue = Promise.resolve(); const writeJsonlLine = (line: string) => { @@ -710,28 +780,36 @@ export async function main(argv = process.argv.slice(2)) { }; const waitForSlot = async () => { + if (stopScheduling) return; while (inFlight.size >= maxConcurrent) { await Promise.race(inFlight); + if (stopScheduling) return; } }; let promptIndex = 0; - for await (const line of rl) { - lineNum++; - const prompt = line.trim(); - if (!prompt) continue; - - await waitForSlot(); - schedule(promptIndex, lineNum, prompt); - promptIndex++; - } + try { + for await (const line of promptReader) { + lineNum++; + const prompt = line.trim(); + if (!prompt) continue; + + if (stopScheduling) break; - while (inFlight.size > 0) { - await Promise.race(inFlight); + await waitForSlot(); + if (stopScheduling) break; + schedule(promptIndex, lineNum, prompt); + promptIndex++; + } + } catch (err) { + if (!shuttingDown) throw err; } + await waitForActiveRequests(); + if (shuttingDown) return; + await cleanupKeys(); - out.end(); + closeOutput(); if (bar) { bar.finish(completed, { ok: okCount,