From ff539d592c5ab2b26ae2eeb9bf84a5dfc82010d9 Mon Sep 17 00:00:00 2001 From: Jason Tang Date: Thu, 5 Mar 2026 19:57:12 -0600 Subject: [PATCH] fix: decouple stream listeners from stream entry lifecycle Stream event listeners were stored inside ActiveStream objects, which get garbage collected 5 minutes after completion. When a new stream started after GC (especially during Fast Refresh/HMR), startStream created a fresh entry with an empty listener set, causing the UI to freeze with no updates delivered. Move listeners to a separate globalThis-persisted registry (__streamSessionListeners__) that survives stream entry GC. The subscribe and emit functions now read/write from this independent map instead of stream.listeners. Co-Authored-By: Claude Opus 4.6 --- src/lib/stream-session-manager.ts | 70 +++++++++++-------------------- 1 file changed, 25 insertions(+), 45 deletions(-) diff --git a/src/lib/stream-session-manager.ts b/src/lib/stream-session-manager.ts index e0548aa7..fa4c4d1e 100644 --- a/src/lib/stream-session-manager.ts +++ b/src/lib/stream-session-manager.ts @@ -31,7 +31,6 @@ interface ActiveStream { sessionId: string; abortController: AbortController; snapshot: SessionStreamSnapshot; - listeners: Set; idleCheckTimer: ReturnType | null; lastEventTime: number; gcTimer: ReturnType | null; @@ -65,6 +64,7 @@ export interface StartStreamParams { // ========================================== const GLOBAL_KEY = '__streamSessionManager__' as const; +const LISTENERS_KEY = '__streamSessionListeners__' as const; const STREAM_IDLE_TIMEOUT_MS = 330_000; const GC_DELAY_MS = 5 * 60 * 1000; // 5 minutes @@ -75,6 +75,14 @@ function getStreamsMap(): Map { return (globalThis as Record)[GLOBAL_KEY] as Map; } +/** Listener registry — persists independently of stream entries so GC doesn't orphan listeners */ +function getListenersMap(): Map> { + if (!(globalThis as Record)[LISTENERS_KEY]) { + (globalThis as Record)[LISTENERS_KEY] = new Map>(); + } + return (globalThis as Record)[LISTENERS_KEY] as Map>; +} + // ========================================== // Helpers // ========================================== @@ -102,8 +110,11 @@ function emit(stream: ActiveStream, type: StreamEvent['type']) { const snapshot = buildSnapshot(stream); stream.snapshot = snapshot; // store latest const event: StreamEvent = { type, sessionId: stream.sessionId, snapshot }; - for (const listener of stream.listeners) { - try { listener(event); } catch { /* listener error */ } + const listeners = getListenersMap().get(stream.sessionId); + if (listeners) { + for (const listener of listeners) { + try { listener(event); } catch { /* listener error */ } + } } // Also dispatch window event for AppShell if (typeof window !== 'undefined') { @@ -164,7 +175,6 @@ export function startStream(params: StartStreamParams): void { error: null, finalMessageContent: null, }, - listeners: existing?.listeners ?? new Set(), idleCheckTimer: null, lastEventTime: Date.now(), gcTimer: null, @@ -487,49 +497,19 @@ export function stopStream(sessionId: string): void { // ========================================== export function subscribe(sessionId: string, listener: StreamEventListener): () => void { - const map = getStreamsMap(); - let stream = map.get(sessionId); - - if (!stream) { - // Create a placeholder entry to hold listeners even when no stream is active - stream = { - sessionId, - abortController: new AbortController(), - snapshot: { - sessionId, - phase: 'completed' as const, - streamingContent: '', - toolUses: [], - toolResults: [], - streamingToolOutput: '', - statusText: undefined, - pendingPermission: null, - permissionResolved: null, - tokenUsage: null, - startedAt: 0, - completedAt: null, - error: null, - finalMessageContent: null, - }, - listeners: new Set(), - idleCheckTimer: null, - lastEventTime: 0, - gcTimer: null, - accumulatedText: '', - toolUsesArray: [], - toolResultsArray: [], - toolOutputAccumulated: '', - toolTimeoutInfo: null, - isIdleTimeout: false, - sendMessageFn: null, - }; - map.set(sessionId, stream); + const listenersMap = getListenersMap(); + let listeners = listenersMap.get(sessionId); + if (!listeners) { + listeners = new Set(); + listenersMap.set(sessionId, listeners); } - - stream.listeners.add(listener); + listeners.add(listener); return () => { - stream!.listeners.delete(listener); + listeners!.delete(listener); + if (listeners!.size === 0) { + listenersMap.delete(sessionId); + } }; } @@ -627,7 +607,7 @@ export function clearSnapshot(sessionId: string): void { const stream = getStreamsMap().get(sessionId); if (stream && stream.snapshot.phase !== 'active') { if (stream.gcTimer) clearTimeout(stream.gcTimer); - // Keep the listeners entry but reset the snapshot + // Reset the snapshot (listeners are in a separate registry) stream.snapshot = { ...stream.snapshot, startedAt: 0,