diff --git a/src/runtime/process-io.ts b/src/runtime/process-io.ts index 659bcec..5303877 100644 --- a/src/runtime/process-io.ts +++ b/src/runtime/process-io.ts @@ -33,6 +33,9 @@ const DEFAULT_MAX_LINE_LENGTH = 100 * 1024 * 1024; /** Maximum stderr bytes to retain for diagnostics: 8KB */ const MAX_STDERR_BYTES = 8 * 1024; +/** Default write queue timeout: 30 seconds */ +const DEFAULT_WRITE_QUEUE_TIMEOUT_MS = 30_000; + /** Regex for ANSI escape sequences */ const ANSI_ESCAPE_RE = /\u001b\[[0-9;]*[A-Za-z]/g; @@ -64,6 +67,9 @@ export interface ProcessIOOptions { /** Restart process after N requests (0 = never). Default: 0 */ restartAfterRequests?: number; + + /** Write queue timeout in milliseconds. Default: 30000ms */ + writeQueueTimeoutMs?: number; } /** @@ -82,6 +88,10 @@ interface QueuedWrite { data: string; resolve: () => void; reject: (error: Error) => void; + /** Timestamp when the write was queued */ + queuedAt: number; + /** Timeout handle for write queue timeout */ + timeoutHandle?: NodeJS.Timeout; } // ============================================================================= @@ -142,6 +152,7 @@ export class ProcessIO extends BoundedContext implements Transport { private readonly cwd: string | undefined; private readonly maxLineLength: number; private readonly restartAfterRequests: number; + private readonly writeQueueTimeoutMs: number; // Process state private process: ChildProcess | null = null; @@ -155,6 +166,7 @@ export class ProcessIO extends BoundedContext implements Transport { // Request tracking private readonly pending = new Map(); private requestCount = 0; + private needsRestart = false; // Write queue for backpressure private readonly writeQueue: QueuedWrite[] = []; @@ -174,6 +186,7 @@ export class ProcessIO extends BoundedContext implements Transport { this.cwd = options.cwd; this.maxLineLength = options.maxLineLength ?? DEFAULT_MAX_LINE_LENGTH; this.restartAfterRequests = options.restartAfterRequests ?? 0; + this.writeQueueTimeoutMs = options.writeQueueTimeoutMs ?? DEFAULT_WRITE_QUEUE_TIMEOUT_MS; } // =========================================================================== @@ -218,8 +231,8 @@ export class ProcessIO extends BoundedContext implements Transport { throw new BridgeProtocolError('Message must contain an "id" field'); } - // Check for restart condition - if (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests) { + // Check for restart condition (either scheduled restart or forced by stream error) + if (this.needsRestart || (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests)) { await this.restartProcess(); } @@ -320,6 +333,7 @@ export class ProcessIO extends BoundedContext implements Transport { // Clear write queue for (const queued of this.writeQueue) { + this.clearQueuedWriteTimeout(queued); queued.reject(error); } this.writeQueue.length = 0; @@ -417,10 +431,12 @@ export class ProcessIO extends BoundedContext implements Transport { if (this.process.stdout) { this.process.stdout.on('data', this.handleStdoutData.bind(this)); + this.process.stdout.on('error', this.handleStdoutError.bind(this)); } if (this.process.stderr) { this.process.stderr.on('data', this.handleStderrData.bind(this)); + this.process.stderr.on('error', this.handleStderrError.bind(this)); } if (this.process.stdin) { @@ -493,15 +509,25 @@ export class ProcessIO extends BoundedContext implements Transport { // Kill existing process await this.killProcess(); - // Clear buffers + // Clear buffers and restart flags this.stdoutBuffer = ''; this.stderrBuffer = ''; this.requestCount = 0; + this.needsRestart = false; // Spawn new process await this.spawnProcess(); } + /** + * Mark the process for restart on the next send. + * This is called after stream errors to ensure the next request uses a fresh process. + * Works independently of restartAfterRequests setting. + */ + private markForRestart(): void { + this.needsRestart = true; + } + // =========================================================================== // STREAM HANDLERS // =========================================================================== @@ -654,18 +680,83 @@ export class ProcessIO extends BoundedContext implements Transport { // Reject all pending writes for (const queued of this.writeQueue) { + this.clearQueuedWriteTimeout(queued); queued.reject(error); } this.writeQueue.length = 0; // Reject all pending requests this.rejectAllPending(error); + + // Mark for restart on next send + this.markForRestart(); + } + + /** + * Handle stdout error event. + * This can occur during pipe errors or when the process crashes. + */ + private handleStdoutError(err: Error): void { + const error = new BridgeProtocolError(`stdout error: ${err.message}`); + this.rejectAllPending(error); + this.markForRestart(); + } + + /** + * Handle stderr error event. + * This can occur during pipe errors or when the process crashes. + */ + private handleStderrError(err: Error): void { + // Stderr errors are less critical but still indicate process health issues + const error = new BridgeProtocolError(`stderr error: ${err.message}`); + this.rejectAllPending(error); + this.markForRestart(); } // =========================================================================== // WRITE MANAGEMENT // =========================================================================== + /** + * Create a queued write entry with a timeout timer. + * The timer fires if the drain event never comes. + */ + private createQueuedWrite( + data: string, + resolve: () => void, + reject: (error: Error) => void + ): QueuedWrite { + const queuedAt = Date.now(); + const entry: QueuedWrite = { data, resolve, reject, queuedAt }; + + // Set up timeout timer that fires if drain never happens + entry.timeoutHandle = setTimeout(() => { + // Remove this entry from the queue + const index = this.writeQueue.indexOf(entry); + if (index !== -1) { + this.writeQueue.splice(index, 1); + reject(new BridgeTimeoutError( + `Write queue timeout: entry waited ${this.writeQueueTimeoutMs}ms without drain` + )); + } + }, this.writeQueueTimeoutMs); + + // Unref the timer so it doesn't keep the process alive + entry.timeoutHandle.unref(); + + return entry; + } + + /** + * Clear the timeout for a queued write entry. + */ + private clearQueuedWriteTimeout(entry: QueuedWrite): void { + if (entry.timeoutHandle) { + clearTimeout(entry.timeoutHandle); + entry.timeoutHandle = undefined; + } + } + /** * Write data to stdin with backpressure handling. */ @@ -677,8 +768,8 @@ export class ProcessIO extends BoundedContext implements Transport { } if (this.draining || this.writeQueue.length > 0) { - // Queue the write - this.writeQueue.push({ data, resolve, reject }); + // Queue the write with timestamp and timeout timer + this.writeQueue.push(this.createQueuedWrite(data, resolve, reject)); return; } @@ -691,10 +782,11 @@ export class ProcessIO extends BoundedContext implements Transport { } else { // Backpressure - queue this write and set draining flag this.draining = true; - this.writeQueue.push({ data, resolve, reject }); + this.writeQueue.push(this.createQueuedWrite(data, resolve, reject)); } } catch (err) { // Synchronous write error (e.g., EPIPE) + this.markForRestart(); reject(new BridgeProtocolError(`Write error: ${err instanceof Error ? err.message : 'unknown'}`)); } }); @@ -704,13 +796,17 @@ export class ProcessIO extends BoundedContext implements Transport { * Flush queued writes when backpressure clears. */ private flushWriteQueue(): void { + const now = Date.now(); + while (this.writeQueue.length > 0 && !this.draining) { if (!this.process?.stdin || this.processExited) { // Process died - reject all queued writes for (const q of this.writeQueue) { + this.clearQueuedWriteTimeout(q); q.reject(new BridgeProtocolError('Process stdin not available')); } this.writeQueue.length = 0; + this.markForRestart(); return; } @@ -719,14 +815,29 @@ export class ProcessIO extends BoundedContext implements Transport { return; } + // Clear the timeout since we're processing this entry now + this.clearQueuedWriteTimeout(queued); + + // Check for write queue timeout (fallback check, timer should have handled this) + if (now - queued.queuedAt > this.writeQueueTimeoutMs) { + queued.reject(new BridgeTimeoutError( + `Write queue timeout: entry waited ${now - queued.queuedAt}ms (limit: ${this.writeQueueTimeoutMs}ms)` + )); + continue; // Process next entry + } + try { const canWrite = this.process.stdin.write(queued.data); if (canWrite) { queued.resolve(); } else { - // Still under pressure - put it back - this.writeQueue.unshift(queued); + // Still under pressure - put it back with a new timeout + this.writeQueue.unshift(this.createQueuedWrite( + queued.data, + queued.resolve, + queued.reject + )); this.draining = true; return; } @@ -737,9 +848,11 @@ export class ProcessIO extends BoundedContext implements Transport { ); queued.reject(error); for (const q of this.writeQueue) { + this.clearQueuedWriteTimeout(q); q.reject(error); } this.writeQueue.length = 0; + this.markForRestart(); return; } }