Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 25 additions & 45 deletions src/lib/stream-session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ interface ActiveStream {
sessionId: string;
abortController: AbortController;
snapshot: SessionStreamSnapshot;
listeners: Set<StreamEventListener>;
idleCheckTimer: ReturnType<typeof setInterval> | null;
lastEventTime: number;
gcTimer: ReturnType<typeof setTimeout> | null;
Expand Down Expand Up @@ -65,6 +64,7 @@ export interface StartStreamParams {
// ==========================================

const GLOBAL_KEY = '__streamSessionManager__' as const;
const LISTENERS_KEY = '__streamSessionListeners__' as const;
const STREAM_IDLE_TIMEOUT_MS = 330_000;
const GC_DELAY_MS = 5 * 60 * 1000; // 5 minutes

Expand All @@ -75,6 +75,14 @@ function getStreamsMap(): Map<string, ActiveStream> {
return (globalThis as Record<string, unknown>)[GLOBAL_KEY] as Map<string, ActiveStream>;
}

/** Listener registry — persists independently of stream entries so GC doesn't orphan listeners */
function getListenersMap(): Map<string, Set<StreamEventListener>> {
if (!(globalThis as Record<string, unknown>)[LISTENERS_KEY]) {
(globalThis as Record<string, unknown>)[LISTENERS_KEY] = new Map<string, Set<StreamEventListener>>();
}
return (globalThis as Record<string, unknown>)[LISTENERS_KEY] as Map<string, Set<StreamEventListener>>;
}

// ==========================================
// Helpers
// ==========================================
Expand Down Expand Up @@ -102,8 +110,11 @@ function emit(stream: ActiveStream, type: StreamEvent['type']) {
const snapshot = buildSnapshot(stream);
stream.snapshot = snapshot; // store latest
const event: StreamEvent = { type, sessionId: stream.sessionId, snapshot };
for (const listener of stream.listeners) {
try { listener(event); } catch { /* listener error */ }
const listeners = getListenersMap().get(stream.sessionId);
if (listeners) {
for (const listener of listeners) {
try { listener(event); } catch { /* listener error */ }
}
}
// Also dispatch window event for AppShell
if (typeof window !== 'undefined') {
Expand Down Expand Up @@ -164,7 +175,6 @@ export function startStream(params: StartStreamParams): void {
error: null,
finalMessageContent: null,
},
listeners: existing?.listeners ?? new Set(),
idleCheckTimer: null,
lastEventTime: Date.now(),
gcTimer: null,
Expand Down Expand Up @@ -487,49 +497,19 @@ export function stopStream(sessionId: string): void {
// ==========================================

export function subscribe(sessionId: string, listener: StreamEventListener): () => void {
const map = getStreamsMap();
let stream = map.get(sessionId);

if (!stream) {
// Create a placeholder entry to hold listeners even when no stream is active
stream = {
sessionId,
abortController: new AbortController(),
snapshot: {
sessionId,
phase: 'completed' as const,
streamingContent: '',
toolUses: [],
toolResults: [],
streamingToolOutput: '',
statusText: undefined,
pendingPermission: null,
permissionResolved: null,
tokenUsage: null,
startedAt: 0,
completedAt: null,
error: null,
finalMessageContent: null,
},
listeners: new Set(),
idleCheckTimer: null,
lastEventTime: 0,
gcTimer: null,
accumulatedText: '',
toolUsesArray: [],
toolResultsArray: [],
toolOutputAccumulated: '',
toolTimeoutInfo: null,
isIdleTimeout: false,
sendMessageFn: null,
};
map.set(sessionId, stream);
const listenersMap = getListenersMap();
let listeners = listenersMap.get(sessionId);
if (!listeners) {
listeners = new Set();
listenersMap.set(sessionId, listeners);
}

stream.listeners.add(listener);
listeners.add(listener);

return () => {
stream!.listeners.delete(listener);
listeners!.delete(listener);
if (listeners!.size === 0) {
listenersMap.delete(sessionId);
}
};
}

Expand Down Expand Up @@ -627,7 +607,7 @@ export function clearSnapshot(sessionId: string): void {
const stream = getStreamsMap().get(sessionId);
if (stream && stream.snapshot.phase !== 'active') {
if (stream.gcTimer) clearTimeout(stream.gcTimer);
// Keep the listeners entry but reset the snapshot
// Reset the snapshot (listeners are in a separate registry)
stream.snapshot = {
...stream.snapshot,
startedAt: 0,
Expand Down
Loading