Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 136 additions & 58 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
import packageJson from "../package.json" with { type: "json" };
import { maybeNotifyNewVersion } from "./update-check.js";

const rateLimitState = new Map<string, number>();

function trimTrailingZeros(num: string): string {
if (!num.includes(".")) return num;
const trimmed = num.replace(/(?:\.0+|(\.\d*?)0+)$/, "$1");
Expand Down Expand Up @@ -327,51 +329,97 @@ export async function callOpenRouter(
const reasoningPref =
reasoningEffortPref ? { effort: reasoningEffortPref } : undefined;

const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));
const maxRetries = 5;
const rateKey = `${apiBase}|${model}`;
const parseResetMs = (value: string | null): number | null => {
if (!value) return null;
const num = Number(value);
if (!Number.isFinite(num)) return null;
return num < 1_000_000_000_000 ? num * 1000 : num;
};
const updateRateLimit = (resetMs: number | null) => {
if (!resetMs) return;
const next = resetMs + 250;
const current = rateLimitState.get(rateKey);
if (current === undefined || next > current) {
rateLimitState.set(rateKey, next);
}
};
const waitForRateLimit = async () => {
const now = Date.now();
const waitUntil = rateLimitState.get(rateKey);
if (waitUntil && waitUntil > now) {
await sleep(waitUntil - now);
}
};

const controller = typeof timeout === "number" && timeout > 0 ? new AbortController() : undefined;
const timeoutId =
controller && typeof timeout === "number" && timeout > 0
? setTimeout(() => controller.abort(), timeout)
: undefined;

try {
const res = await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
model,
messages,
...(reasoningPref ? { reasoning: reasoningPref } : {}),
...(providerPref ? { provider: providerPref } : {})
}),
signal: controller?.signal
});
let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
await waitForRateLimit();
const res = await fetch(url, {
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
model,
messages,
...(reasoningPref ? { reasoning: reasoningPref } : {}),
...(providerPref ? { provider: providerPref } : {})
}),
signal: controller?.signal
});

if (timeoutId !== undefined) clearTimeout(timeoutId);

if (!res.ok) {
if (res.status === 429 && attempt < maxRetries) {
const resetMs = parseResetMs(res.headers?.get?.("x-ratelimit-reset") ?? null);
updateRateLimit(resetMs);
const nowMs = Date.now();
const waitMs = resetMs && resetMs > nowMs ? resetMs - nowMs + 250 : 1000 * (attempt + 1);
await sleep(waitMs);
continue;
}
const text = await res.text().catch(() => "");
throw new Error(`OpenRouter error ${res.status}: ${text}`);
}

if (timeoutId !== undefined) clearTimeout(timeoutId);
const remainingRaw = res.headers?.get?.("x-ratelimit-remaining") ?? null;
const remaining = remainingRaw ? Number(remainingRaw) : NaN;
if (Number.isFinite(remaining) && remaining <= 0) {
const resetMs = parseResetMs(res.headers?.get?.("x-ratelimit-reset") ?? null);
updateRateLimit(resetMs);
}

if (!res.ok) {
const text = await res.text().catch(() => "");
throw new Error(`OpenRouter error ${res.status}: ${text}`);
}
const data = (await res.json()) as any;
const choice = data?.choices?.[0];
const message = choice?.message ?? choice?.delta ?? {};
const content = message?.content ?? "";
const reasoning =
message?.reasoning ??
choice?.reasoning ??
data?.reasoning ??
undefined;
const usage = data?.usage as OpenRouterUsage | undefined;

if (typeof content !== "string" || !content.length) {
throw new Error("No assistant content returned from OpenRouter.");
}

const data = (await res.json()) as any;
const choice = data?.choices?.[0];
const message = choice?.message ?? choice?.delta ?? {};
const content = message?.content ?? "";
const reasoning =
message?.reasoning ??
choice?.reasoning ??
data?.reasoning ??
undefined;
const usage = data?.usage as OpenRouterUsage | undefined;

if (typeof content !== "string" || !content.length) {
throw new Error("No assistant content returned from OpenRouter.");
return { content, reasoning, usage };
}

return { content, reasoning, usage };
throw lastError ?? new Error("OpenRouter rate limit retries exhausted.");
} catch (err: any) {
if (timeoutId !== undefined) clearTimeout(timeoutId);
if (err?.name === "AbortError") {
Expand Down Expand Up @@ -484,6 +532,7 @@ export async function main(argv = process.argv.slice(2)) {
}
let requestKeys = [apiKey];
let spawnedKeyHashes: string[] = [];
let stopScheduling = false;
if (useFreeKeys) {
const baseName = `datagen-${Date.now()}`;
try {
Expand Down Expand Up @@ -526,7 +575,6 @@ export async function main(argv = process.argv.slice(2)) {
}`;
writeLine(msg);
};
let cleanupStarted = false;
const cleanupKeys = async () => {
if (!useFreeKeys) return;
const hashes = new Set(spawnedKeyHashes);
Expand All @@ -535,17 +583,7 @@ export async function main(argv = process.argv.slice(2)) {
await deleteKeyWithRetry(hash);
}
};
const handleTermination = () => {
if (cleanupStarted) return;
cleanupStarted = true;
writeLine("Cleaning up processes and exiting.");
void cleanupKeys().finally(() => {
process.exit(1);
});
};
process.once("SIGINT", handleTermination);
process.once("SIGTERM", handleTermination);
process.once("SIGQUIT", handleTermination);
let rl: ReturnType<typeof createInterface> | null = null;
const pricingKey = requestKeys[0];
const providerPref =
isOpenRouter && (openrouterProviderOrder || openrouterProviderSort)
Expand Down Expand Up @@ -597,12 +635,19 @@ export async function main(argv = process.argv.slice(2)) {
}
}

const rl = createInterface({
rl = createInterface({
input: createReadStream(absPromptsPath),
crlfDelay: Infinity
});
const promptReader = rl;

const out = createWriteStream(absOutPath, { flags: "w" });
let outputClosed = false;
const closeOutput = () => {
if (outputClosed) return;
out.end();
outputClosed = true;
};

let lineNum = 0;
let completed = 0;
Expand All @@ -620,6 +665,31 @@ export async function main(argv = process.argv.slice(2)) {
});

const inFlight = new Set<Promise<void>>();
const waitForActiveRequests = async () => {
while (inFlight.size > 0) {
await Promise.race(inFlight);
}
};

let shuttingDown = false;
const initiateShutdown = () => {
if (shuttingDown) return;
shuttingDown = true;
stopScheduling = true;
writeLine("Received termination signal. Waiting for in-flight requests to finish before cleanup.");
rl?.close();
void (async () => {
await waitForActiveRequests();
writeLine("All in-flight requests have been safely exited, cleaning up keys...");
await cleanupKeys();
closeOutput();
process.exit(1);
})();
};

process.once("SIGINT", initiateShutdown);
process.once("SIGTERM", initiateShutdown);
process.once("SIGQUIT", initiateShutdown);

let writeQueue = Promise.resolve();
const writeJsonlLine = (line: string) => {
Expand Down Expand Up @@ -710,28 +780,36 @@ export async function main(argv = process.argv.slice(2)) {
};

const waitForSlot = async () => {
if (stopScheduling) return;
while (inFlight.size >= maxConcurrent) {
await Promise.race(inFlight);
if (stopScheduling) return;
}
};

let promptIndex = 0;
for await (const line of rl) {
lineNum++;
const prompt = line.trim();
if (!prompt) continue;

await waitForSlot();
schedule(promptIndex, lineNum, prompt);
promptIndex++;
}
try {
for await (const line of promptReader) {
lineNum++;
const prompt = line.trim();
if (!prompt) continue;

if (stopScheduling) break;

while (inFlight.size > 0) {
await Promise.race(inFlight);
await waitForSlot();
if (stopScheduling) break;
schedule(promptIndex, lineNum, prompt);
promptIndex++;
}
} catch (err) {
if (!shuttingDown) throw err;
}

await waitForActiveRequests();
if (shuttingDown) return;

await cleanupKeys();
out.end();
closeOutput();
if (bar) {
bar.finish(completed, {
ok: okCount,
Expand Down