diff --git a/src/queue-ipc.ts b/src/queue-ipc.ts index df69f0b..29a2cc7 100644 --- a/src/queue-ipc.ts +++ b/src/queue-ipc.ts @@ -39,6 +39,8 @@ import type { const QUEUE_CONNECT_ATTEMPTS = 40; export const QUEUE_CONNECT_RETRY_MS = 50; +export const SOCKET_CONNECTION_TIMEOUT_MS = 5000; +export const MAX_MESSAGE_BUFFER_SIZE = 10 * 1024 * 1024; export { isProcessAlive, releaseQueueOwnerLease, @@ -103,15 +105,35 @@ function shouldRetryQueueConnect(error: unknown): boolean { return code === "ENOENT" || code === "ECONNREFUSED"; } -async function connectToSocket(socketPath: string): Promise { +async function connectToSocket(socketPath: string, timeoutMs = SOCKET_CONNECTION_TIMEOUT_MS): Promise { return await new Promise((resolve, reject) => { const socket = net.createConnection(socketPath); + let settled = false; + const timeout = setTimeout(() => { + if (settled) { + return; + } + settled = true; + socket.destroy(); + reject(new Error(`Connection to ${socketPath} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + const onConnect = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); socket.off("error", onError); resolve(socket); }; const onError = (error: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); socket.off("connect", onConnect); reject(error); }; @@ -337,6 +359,12 @@ async function runQueueOwnerRequest(options: { socket.on("data", (chunk: string) => { buffer += chunk; + if (buffer.length > MAX_MESSAGE_BUFFER_SIZE) { + socket.destroy(); + finishReject(new Error(`Message buffer exceeded ${MAX_MESSAGE_BUFFER_SIZE} bytes`)); + return; + } + let index = buffer.indexOf("\n"); while (index >= 0) { const line = buffer.slice(0, index).trim();