From 6b6dfc0e46317bf398e63283f5cd8662c19a4f0e Mon Sep 17 00:00:00 2001 From: bbopen Date: Wed, 21 Jan 2026 10:12:22 -0800 Subject: [PATCH 1/2] feat(process-io): harden stream error handling and add write queue timeout Improve ProcessIO transport resilience with: 1. Auto-restart on write failures (#107) - Add markForRestart() method to trigger process restart on next send - Call markForRestart() after stdin write errors, EPIPE, and queue failures 2. Add stdout/stderr error handlers (#91) - Add handleStdoutError() and handleStderrError() methods - Register error handlers in spawnProcess() to prevent unhandled crashes - Reject pending requests and mark for restart on stream errors 3. Add write queue timeout (#59) - Add writeQueueTimeoutMs option (default: 30000ms) - Track queuedAt timestamp for each queued write - Reject timed-out writes in flushWriteQueue() with BridgeTimeoutError Fixes #107, #91, #59 Co-Authored-By: Claude Opus 4.5 --- src/runtime/process-io.ts | 66 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/src/runtime/process-io.ts b/src/runtime/process-io.ts index 659bcec..de06c64 100644 --- a/src/runtime/process-io.ts +++ b/src/runtime/process-io.ts @@ -33,6 +33,9 @@ const DEFAULT_MAX_LINE_LENGTH = 100 * 1024 * 1024; /** Maximum stderr bytes to retain for diagnostics: 8KB */ const MAX_STDERR_BYTES = 8 * 1024; +/** Default write queue timeout: 30 seconds */ +const DEFAULT_WRITE_QUEUE_TIMEOUT_MS = 30_000; + /** Regex for ANSI escape sequences */ const ANSI_ESCAPE_RE = /\u001b\[[0-9;]*[A-Za-z]/g; @@ -64,6 +67,9 @@ export interface ProcessIOOptions { /** Restart process after N requests (0 = never). Default: 0 */ restartAfterRequests?: number; + + /** Write queue timeout in milliseconds. Default: 30000ms */ + writeQueueTimeoutMs?: number; } /** @@ -82,6 +88,8 @@ interface QueuedWrite { data: string; resolve: () => void; reject: (error: Error) => void; + /** Timestamp when the write was queued */ + queuedAt: number; } // ============================================================================= @@ -142,6 +150,7 @@ export class ProcessIO extends BoundedContext implements Transport { private readonly cwd: string | undefined; private readonly maxLineLength: number; private readonly restartAfterRequests: number; + private readonly writeQueueTimeoutMs: number; // Process state private process: ChildProcess | null = null; @@ -174,6 +183,7 @@ export class ProcessIO extends BoundedContext implements Transport { this.cwd = options.cwd; this.maxLineLength = options.maxLineLength ?? DEFAULT_MAX_LINE_LENGTH; this.restartAfterRequests = options.restartAfterRequests ?? 0; + this.writeQueueTimeoutMs = options.writeQueueTimeoutMs ?? DEFAULT_WRITE_QUEUE_TIMEOUT_MS; } // =========================================================================== @@ -417,10 +427,12 @@ export class ProcessIO extends BoundedContext implements Transport { if (this.process.stdout) { this.process.stdout.on('data', this.handleStdoutData.bind(this)); + this.process.stdout.on('error', this.handleStdoutError.bind(this)); } if (this.process.stderr) { this.process.stderr.on('data', this.handleStderrData.bind(this)); + this.process.stderr.on('error', this.handleStderrError.bind(this)); } if (this.process.stdin) { @@ -502,6 +514,17 @@ export class ProcessIO extends BoundedContext implements Transport { await this.spawnProcess(); } + /** + * Mark the process for restart on the next send. + * This is called after stream errors to ensure the next request uses a fresh process. + */ + private markForRestart(): void { + if (this.restartAfterRequests > 0) { + // Set requestCount to trigger restart on next send + this.requestCount = this.restartAfterRequests; + } + } + // =========================================================================== // STREAM HANDLERS // =========================================================================== @@ -660,6 +683,30 @@ export class ProcessIO extends BoundedContext implements Transport { // Reject all pending requests this.rejectAllPending(error); + + // Mark for restart on next send + this.markForRestart(); + } + + /** + * Handle stdout error event. + * This can occur during pipe errors or when the process crashes. + */ + private handleStdoutError(err: Error): void { + const error = new BridgeProtocolError(`stdout error: ${err.message}`); + this.rejectAllPending(error); + this.markForRestart(); + } + + /** + * Handle stderr error event. + * This can occur during pipe errors or when the process crashes. + */ + private handleStderrError(err: Error): void { + // Stderr errors are less critical but still indicate process health issues + const error = new BridgeProtocolError(`stderr error: ${err.message}`); + this.rejectAllPending(error); + this.markForRestart(); } // =========================================================================== @@ -677,8 +724,8 @@ export class ProcessIO extends BoundedContext implements Transport { } if (this.draining || this.writeQueue.length > 0) { - // Queue the write - this.writeQueue.push({ data, resolve, reject }); + // Queue the write with timestamp + this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() }); return; } @@ -691,10 +738,11 @@ export class ProcessIO extends BoundedContext implements Transport { } else { // Backpressure - queue this write and set draining flag this.draining = true; - this.writeQueue.push({ data, resolve, reject }); + this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() }); } } catch (err) { // Synchronous write error (e.g., EPIPE) + this.markForRestart(); reject(new BridgeProtocolError(`Write error: ${err instanceof Error ? err.message : 'unknown'}`)); } }); @@ -704,6 +752,8 @@ export class ProcessIO extends BoundedContext implements Transport { * Flush queued writes when backpressure clears. */ private flushWriteQueue(): void { + const now = Date.now(); + while (this.writeQueue.length > 0 && !this.draining) { if (!this.process?.stdin || this.processExited) { // Process died - reject all queued writes @@ -711,6 +761,7 @@ export class ProcessIO extends BoundedContext implements Transport { q.reject(new BridgeProtocolError('Process stdin not available')); } this.writeQueue.length = 0; + this.markForRestart(); return; } @@ -719,6 +770,14 @@ export class ProcessIO extends BoundedContext implements Transport { return; } + // Check for write queue timeout + if (now - queued.queuedAt > this.writeQueueTimeoutMs) { + queued.reject(new BridgeTimeoutError( + `Write queue timeout: entry waited ${now - queued.queuedAt}ms (limit: ${this.writeQueueTimeoutMs}ms)` + )); + continue; // Process next entry + } + try { const canWrite = this.process.stdin.write(queued.data); @@ -740,6 +799,7 @@ export class ProcessIO extends BoundedContext implements Transport { q.reject(error); } this.writeQueue.length = 0; + this.markForRestart(); return; } } From 600dfac739152b2ba0ddacfc7dc39145727349b8 Mon Sep 17 00:00:00 2001 From: bbopen Date: Wed, 21 Jan 2026 10:26:16 -0800 Subject: [PATCH 2/2] fix: address Codex review feedback for ProcessIO hardening 1. Fix markForRestart() no-op when restartAfterRequests=0 - Added needsRestart flag that works independently - Restart check now checks: needsRestart || scheduled restart 2. Fix write queue timeout not firing without drain event - Added createQueuedWrite() helper with timeout timer - Timer fires after writeQueueTimeoutMs even if drain never happens - Timer is unref'd so it doesn't keep process alive - Timeouts cleared properly on resolve/reject/dispose Co-Authored-By: Claude Opus 4.5 --- src/runtime/process-io.ts | 79 ++++++++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/src/runtime/process-io.ts b/src/runtime/process-io.ts index de06c64..5303877 100644 --- a/src/runtime/process-io.ts +++ b/src/runtime/process-io.ts @@ -90,6 +90,8 @@ interface QueuedWrite { reject: (error: Error) => void; /** Timestamp when the write was queued */ queuedAt: number; + /** Timeout handle for write queue timeout */ + timeoutHandle?: NodeJS.Timeout; } // ============================================================================= @@ -164,6 +166,7 @@ export class ProcessIO extends BoundedContext implements Transport { // Request tracking private readonly pending = new Map(); private requestCount = 0; + private needsRestart = false; // Write queue for backpressure private readonly writeQueue: QueuedWrite[] = []; @@ -228,8 +231,8 @@ export class ProcessIO extends BoundedContext implements Transport { throw new BridgeProtocolError('Message must contain an "id" field'); } - // Check for restart condition - if (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests) { + // Check for restart condition (either scheduled restart or forced by stream error) + if (this.needsRestart || (this.restartAfterRequests > 0 && this.requestCount >= this.restartAfterRequests)) { await this.restartProcess(); } @@ -330,6 +333,7 @@ export class ProcessIO extends BoundedContext implements Transport { // Clear write queue for (const queued of this.writeQueue) { + this.clearQueuedWriteTimeout(queued); queued.reject(error); } this.writeQueue.length = 0; @@ -505,10 +509,11 @@ export class ProcessIO extends BoundedContext implements Transport { // Kill existing process await this.killProcess(); - // Clear buffers + // Clear buffers and restart flags this.stdoutBuffer = ''; this.stderrBuffer = ''; this.requestCount = 0; + this.needsRestart = false; // Spawn new process await this.spawnProcess(); @@ -517,12 +522,10 @@ export class ProcessIO extends BoundedContext implements Transport { /** * Mark the process for restart on the next send. * This is called after stream errors to ensure the next request uses a fresh process. + * Works independently of restartAfterRequests setting. */ private markForRestart(): void { - if (this.restartAfterRequests > 0) { - // Set requestCount to trigger restart on next send - this.requestCount = this.restartAfterRequests; - } + this.needsRestart = true; } // =========================================================================== @@ -677,6 +680,7 @@ export class ProcessIO extends BoundedContext implements Transport { // Reject all pending writes for (const queued of this.writeQueue) { + this.clearQueuedWriteTimeout(queued); queued.reject(error); } this.writeQueue.length = 0; @@ -713,6 +717,46 @@ export class ProcessIO extends BoundedContext implements Transport { // WRITE MANAGEMENT // =========================================================================== + /** + * Create a queued write entry with a timeout timer. + * The timer fires if the drain event never comes. + */ + private createQueuedWrite( + data: string, + resolve: () => void, + reject: (error: Error) => void + ): QueuedWrite { + const queuedAt = Date.now(); + const entry: QueuedWrite = { data, resolve, reject, queuedAt }; + + // Set up timeout timer that fires if drain never happens + entry.timeoutHandle = setTimeout(() => { + // Remove this entry from the queue + const index = this.writeQueue.indexOf(entry); + if (index !== -1) { + this.writeQueue.splice(index, 1); + reject(new BridgeTimeoutError( + `Write queue timeout: entry waited ${this.writeQueueTimeoutMs}ms without drain` + )); + } + }, this.writeQueueTimeoutMs); + + // Unref the timer so it doesn't keep the process alive + entry.timeoutHandle.unref(); + + return entry; + } + + /** + * Clear the timeout for a queued write entry. + */ + private clearQueuedWriteTimeout(entry: QueuedWrite): void { + if (entry.timeoutHandle) { + clearTimeout(entry.timeoutHandle); + entry.timeoutHandle = undefined; + } + } + /** * Write data to stdin with backpressure handling. */ @@ -724,8 +768,8 @@ export class ProcessIO extends BoundedContext implements Transport { } if (this.draining || this.writeQueue.length > 0) { - // Queue the write with timestamp - this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() }); + // Queue the write with timestamp and timeout timer + this.writeQueue.push(this.createQueuedWrite(data, resolve, reject)); return; } @@ -738,7 +782,7 @@ export class ProcessIO extends BoundedContext implements Transport { } else { // Backpressure - queue this write and set draining flag this.draining = true; - this.writeQueue.push({ data, resolve, reject, queuedAt: Date.now() }); + this.writeQueue.push(this.createQueuedWrite(data, resolve, reject)); } } catch (err) { // Synchronous write error (e.g., EPIPE) @@ -758,6 +802,7 @@ export class ProcessIO extends BoundedContext implements Transport { if (!this.process?.stdin || this.processExited) { // Process died - reject all queued writes for (const q of this.writeQueue) { + this.clearQueuedWriteTimeout(q); q.reject(new BridgeProtocolError('Process stdin not available')); } this.writeQueue.length = 0; @@ -770,7 +815,10 @@ export class ProcessIO extends BoundedContext implements Transport { return; } - // Check for write queue timeout + // Clear the timeout since we're processing this entry now + this.clearQueuedWriteTimeout(queued); + + // Check for write queue timeout (fallback check, timer should have handled this) if (now - queued.queuedAt > this.writeQueueTimeoutMs) { queued.reject(new BridgeTimeoutError( `Write queue timeout: entry waited ${now - queued.queuedAt}ms (limit: ${this.writeQueueTimeoutMs}ms)` @@ -784,8 +832,12 @@ export class ProcessIO extends BoundedContext implements Transport { if (canWrite) { queued.resolve(); } else { - // Still under pressure - put it back - this.writeQueue.unshift(queued); + // Still under pressure - put it back with a new timeout + this.writeQueue.unshift(this.createQueuedWrite( + queued.data, + queued.resolve, + queued.reject + )); this.draining = true; return; } @@ -796,6 +848,7 @@ export class ProcessIO extends BoundedContext implements Transport { ); queued.reject(error); for (const q of this.writeQueue) { + this.clearQueuedWriteTimeout(q); q.reject(error); } this.writeQueue.length = 0;