diff --git a/.changeset/unified-job-stream-hooks.md b/.changeset/unified-job-stream-hooks.md new file mode 100644 index 00000000..79f75b22 --- /dev/null +++ b/.changeset/unified-job-stream-hooks.md @@ -0,0 +1,5 @@ +--- +"@perstack/react": patch +--- + +Add unified job stream hooks (useJobStream, useJobStreams) with StreamConnector interface diff --git a/packages/react/src/hooks/index.ts b/packages/react/src/hooks/index.ts index 9d934e4d..8391efc6 100644 --- a/packages/react/src/hooks/index.ts +++ b/packages/react/src/hooks/index.ts @@ -1 +1,7 @@ +export { + type JobStreamState, + type StreamConnector, + useJobStream, +} from "./use-job-stream.js" +export { type JobStreamSummary, useJobStreams } from "./use-job-streams.js" export { type ActivityProcessState, type RunResult, useRun } from "./use-run.js" diff --git a/packages/react/src/hooks/use-job-stream.test.ts b/packages/react/src/hooks/use-job-stream.test.ts new file mode 100644 index 00000000..87a042da --- /dev/null +++ b/packages/react/src/hooks/use-job-stream.test.ts @@ -0,0 +1,333 @@ +import type { + Checkpoint, + ExpertMessage, + PerstackEvent, + RunEvent, + Step, + ToolCall, + ToolResult, +} from "@perstack/core" +import { createEmptyUsage } from "@perstack/core" +import { act, renderHook } from "@testing-library/react" +import { describe, expect, it } from "vitest" +import type { StreamConnector } from "./use-job-stream.js" +import { useJobStream } from "./use-job-stream.js" + +function createToolCall(overrides: Partial = {}): ToolCall { + return { + id: "tc-1", + skillName: "@perstack/base", + toolName: "readTextFile", + args: { path: "/test.txt" }, + ...overrides, + } +} + +function createToolResult(overrides: Partial = {}): ToolResult { + return { + id: "tc-1", + skillName: "@perstack/base", + toolName: "readTextFile", + result: [{ type: "textPart", id: "tp-1", text: '{"content": "file content"}' }], + ...overrides, + } +} + +function createBaseEvent(overrides: Partial = {}): RunEvent { + return { + id: "e-1", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "startRun", + ...overrides, + } as RunEvent +} + +function createDeferredStream(): { + emit: (event: PerstackEvent) => void + end: () => void + error: (err: Error) => void + connector: StreamConnector + connectCalls: Array<{ jobId: string; signal: AbortSignal }> +} { + const connectCalls: Array<{ jobId: string; signal: AbortSignal }> = [] + let resolveNext: ((value: IteratorResult) => void) | null = null + let rejectNext: ((err: Error) => void) | null = null + let done = false + const pending: PerstackEvent[] = [] + + const iterable: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + next(): Promise> { + if (pending.length > 0) { + return Promise.resolve({ value: pending.shift()!, done: false }) + } + if (done) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise((resolve, reject) => { + resolveNext = resolve + rejectNext = reject + }) + }, + } + }, + } + + const connector: StreamConnector = async (jobId, signal) => { + connectCalls.push({ jobId, signal }) + return iterable + } + + return { + emit(event: PerstackEvent) { + if (resolveNext) { + const resolve = resolveNext + resolveNext = null + rejectNext = null + resolve({ value: event, done: false }) + } else { + pending.push(event) + } + }, + end() { + done = true + if (resolveNext) { + const resolve = resolveNext + resolveNext = null + rejectNext = null + resolve({ value: undefined, done: true }) + } + }, + error(err: Error) { + if (rejectNext) { + const reject = rejectNext + resolveNext = null + rejectNext = null + reject(err) + } + }, + connector, + connectCalls, + } +} + +describe("useJobStream", () => { + it("initializes with empty state when disabled", () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: null, connect: stream.connector })) + + expect(result.current.activities).toEqual([]) + expect(result.current.streaming.runs).toEqual({}) + expect(result.current.isConnected).toBe(false) + expect(result.current.error).toBeNull() + expect(result.current.latestActivity).toBeNull() + expect(stream.connectCalls).toHaveLength(0) + }) + + it("does not connect when enabled is false", () => { + const stream = createDeferredStream() + const { result } = renderHook(() => + useJobStream({ jobId: "job-1", connect: stream.connector, enabled: false }), + ) + + expect(result.current.isConnected).toBe(false) + expect(stream.connectCalls).toHaveLength(0) + }) + + it("connects when jobId and enabled are set", async () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector })) + + // Wait for the async connect to complete + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(stream.connectCalls).toHaveLength(1) + expect(stream.connectCalls[0].jobId).toBe("job-1") + expect(result.current.isConnected).toBe(true) + }) + + it("processes events into activities", async () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + await act(async () => { + stream.emit( + createBaseEvent({ + type: "callTools", + toolCalls: [createToolCall()], + newMessage: {} as ExpertMessage, + usage: createEmptyUsage(), + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + await act(async () => { + stream.emit( + createBaseEvent({ + id: "e-2", + type: "resolveToolResults", + toolResults: [createToolResult()], + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.activities).toHaveLength(1) + expect(result.current.activities[0].type).toBe("readTextFile") + expect(result.current.latestActivity).toBe(result.current.activities[0]) + }) + + it("sets isConnected to false when stream ends", async () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + expect(result.current.isConnected).toBe(true) + + await act(async () => { + stream.end() + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.isConnected).toBe(false) + }) + + it("stores non-abort errors", async () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + await act(async () => { + stream.error(new Error("connection failed")) + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.error).toBeInstanceOf(Error) + expect(result.current.error?.message).toBe("connection failed") + expect(result.current.isConnected).toBe(false) + }) + + it("ignores abort errors", async () => { + const stream = createDeferredStream() + const { result, unmount } = renderHook(() => + useJobStream({ jobId: "job-1", connect: stream.connector }), + ) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + expect(result.current.isConnected).toBe(true) + + act(() => { + unmount() + }) + + expect(result.current.error).toBeNull() + }) + + it("disconnects when enabled changes to false", async () => { + const stream = createDeferredStream() + const { result, rerender } = renderHook( + ({ enabled }: { enabled: boolean }) => + useJobStream({ jobId: "job-1", connect: stream.connector, enabled }), + { initialProps: { enabled: true } }, + ) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + expect(result.current.isConnected).toBe(true) + expect(stream.connectCalls[0].signal.aborted).toBe(false) + + rerender({ enabled: false }) + + expect(stream.connectCalls[0].signal.aborted).toBe(true) + expect(result.current.isConnected).toBe(false) + }) + + it("handles connector that throws immediately", async () => { + const failingConnector: StreamConnector = async () => { + throw new Error("failed to connect") + } + + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: failingConnector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.error?.message).toBe("failed to connect") + expect(result.current.isConnected).toBe(false) + }) + + it("derives latestActivity from activities array", async () => { + const stream = createDeferredStream() + const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + // Emit call + result to generate first activity + await act(async () => { + stream.emit( + createBaseEvent({ + type: "callTools", + toolCalls: [createToolCall()], + newMessage: {} as ExpertMessage, + usage: createEmptyUsage(), + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + await act(async () => { + stream.emit( + createBaseEvent({ + id: "e-2", + type: "resolveToolResults", + toolResults: [createToolResult()], + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + // Emit completeRun to generate second activity + await act(async () => { + stream.emit( + createBaseEvent({ + id: "e-3", + type: "completeRun", + text: "Task completed", + checkpoint: {} as Checkpoint, + step: {} as Step, + usage: createEmptyUsage(), + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.activities).toHaveLength(2) + expect(result.current.latestActivity).toBe( + result.current.activities[result.current.activities.length - 1], + ) + expect(result.current.latestActivity?.type).toBe("complete") + }) +}) diff --git a/packages/react/src/hooks/use-job-stream.ts b/packages/react/src/hooks/use-job-stream.ts new file mode 100644 index 00000000..6819ffcb --- /dev/null +++ b/packages/react/src/hooks/use-job-stream.ts @@ -0,0 +1,67 @@ +import type { ActivityOrGroup } from "@perstack/core" +import { useEffect, useMemo, useRef, useState } from "react" +import type { StreamingState } from "../types/index.js" +import { consumeStream, isAbortError, type StreamConnector } from "../utils/stream.js" +import { useRun } from "./use-run.js" + +export type { StreamConnector } + +export type JobStreamState = { + activities: ActivityOrGroup[] + streaming: StreamingState + latestActivity: ActivityOrGroup | null + isConnected: boolean + error: Error | null +} + +export function useJobStream(options: { + jobId: string | null + connect: StreamConnector + enabled?: boolean +}): JobStreamState { + const { jobId, connect, enabled = true } = options + const shouldConnect = Boolean(jobId && enabled) + const { activities, streaming, addEvent } = useRun() + const [isConnected, setIsConnected] = useState(false) + const [error, setError] = useState(null) + const addEventRef = useRef(addEvent) + addEventRef.current = addEvent + const connectRef = useRef(connect) + connectRef.current = connect + + useEffect(() => { + if (!shouldConnect || !jobId) { + setIsConnected(false) + return + } + + const controller = new AbortController() + + async function run() { + setError(null) + setIsConnected(true) + try { + await consumeStream(connectRef.current, jobId!, controller.signal, (event) => { + addEventRef.current(event) + }) + } catch (err) { + if (isAbortError(err)) return + setError(err instanceof Error ? err : new Error(String(err))) + } finally { + setIsConnected(false) + } + } + + run() + return () => { + controller.abort() + } + }, [shouldConnect, jobId]) + + const latestActivity = useMemo( + () => (activities.length > 0 ? activities[activities.length - 1] : null), + [activities], + ) + + return { activities, streaming, latestActivity, isConnected, error } +} diff --git a/packages/react/src/hooks/use-job-streams.test.ts b/packages/react/src/hooks/use-job-streams.test.ts new file mode 100644 index 00000000..ebcfeedb --- /dev/null +++ b/packages/react/src/hooks/use-job-streams.test.ts @@ -0,0 +1,371 @@ +import type { ExpertMessage, PerstackEvent, RunEvent, ToolCall, ToolResult } from "@perstack/core" +import { createEmptyUsage } from "@perstack/core" +import { act, renderHook } from "@testing-library/react" +import { describe, expect, it, vi } from "vitest" +import type { StreamConnector } from "./use-job-stream.js" +import { useJobStreams } from "./use-job-streams.js" + +function createToolCall(overrides: Partial = {}): ToolCall { + return { + id: "tc-1", + skillName: "@perstack/base", + toolName: "readTextFile", + args: { path: "/test.txt" }, + ...overrides, + } +} + +function createToolResult(overrides: Partial = {}): ToolResult { + return { + id: "tc-1", + skillName: "@perstack/base", + toolName: "readTextFile", + result: [{ type: "textPart", id: "tp-1", text: '{"content": "file content"}' }], + ...overrides, + } +} + +function createBaseEvent(overrides: Partial = {}): RunEvent { + return { + id: "e-1", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "startRun", + ...overrides, + } as RunEvent +} + +type DeferredStream = { + emit: (event: PerstackEvent) => void + end: () => void + error: (err: Error) => void +} + +function createMultiStreamConnector(): { + connector: StreamConnector + getStream: (jobId: string) => DeferredStream | undefined + connectCalls: Array<{ jobId: string; signal: AbortSignal }> +} { + const connectCalls: Array<{ jobId: string; signal: AbortSignal }> = [] + const streams = new Map< + string, + { + resolveNext: ((value: IteratorResult) => void) | null + rejectNext: ((err: Error) => void) | null + done: boolean + pending: PerstackEvent[] + } + >() + + const connector: StreamConnector = async (jobId, signal) => { + connectCalls.push({ jobId, signal }) + + const streamState = { + resolveNext: null as ((value: IteratorResult) => void) | null, + rejectNext: null as ((err: Error) => void) | null, + done: false, + pending: [] as PerstackEvent[], + } + streams.set(jobId, streamState) + + return { + [Symbol.asyncIterator]() { + return { + next(): Promise> { + if (streamState.pending.length > 0) { + return Promise.resolve({ value: streamState.pending.shift()!, done: false }) + } + if (streamState.done) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise((resolve, reject) => { + streamState.resolveNext = resolve + streamState.rejectNext = reject + }) + }, + } + }, + } + } + + function getStream(jobId: string): DeferredStream | undefined { + const streamState = streams.get(jobId) + if (!streamState) return undefined + return { + emit(event: PerstackEvent) { + if (streamState.resolveNext) { + const resolve = streamState.resolveNext + streamState.resolveNext = null + streamState.rejectNext = null + resolve({ value: event, done: false }) + } else { + streamState.pending.push(event) + } + }, + end() { + streamState.done = true + if (streamState.resolveNext) { + const resolve = streamState.resolveNext + streamState.resolveNext = null + streamState.rejectNext = null + resolve({ value: undefined, done: true }) + } + }, + error(err: Error) { + if (streamState.rejectNext) { + const reject = streamState.rejectNext + streamState.resolveNext = null + streamState.rejectNext = null + reject(err) + } + }, + } + } + + return { connector, getStream, connectCalls } +} + +describe("useJobStreams", () => { + it("initializes with empty state when no jobs provided", () => { + const { connector } = createMultiStreamConnector() + const { result } = renderHook(() => useJobStreams({ jobs: [], connect: connector })) + + expect(result.current.size).toBe(0) + }) + + it("connects to enabled jobs", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const jobs = [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: true }, + ] + + renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(2) + expect(connectCalls.map((c) => c.jobId).sort()).toEqual(["job-1", "job-2"]) + }) + + it("skips disabled jobs", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const jobs = [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: false }, + ] + + renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(1) + expect(connectCalls[0].jobId).toBe("job-1") + }) + + it("tracks per-job latest activity", async () => { + const { connector, getStream } = createMultiStreamConnector() + const jobs = [{ id: "job-1", enabled: true }] + + const { result } = renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + const stream = getStream("job-1")! + + // Emit callTools + resolveToolResults to generate an activity + await act(async () => { + stream.emit( + createBaseEvent({ + type: "callTools", + toolCalls: [createToolCall()], + newMessage: {} as ExpertMessage, + usage: createEmptyUsage(), + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + await act(async () => { + stream.emit( + createBaseEvent({ + id: "e-2", + type: "resolveToolResults", + toolResults: [createToolResult()], + } as Partial) as RunEvent, + ) + await new Promise((r) => setTimeout(r, 0)) + }) + + const state = result.current.get("job-1") + expect(state).toBeDefined() + expect(state?.latestActivity?.type).toBe("readTextFile") + expect(state?.isConnected).toBe(true) + }) + + it("sets isConnected to false when stream ends", async () => { + const { connector, getStream } = createMultiStreamConnector() + const jobs = [{ id: "job-1", enabled: true }] + + const { result } = renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.get("job-1")?.isConnected).toBe(true) + + await act(async () => { + getStream("job-1")!.end() + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(result.current.get("job-1")?.isConnected).toBe(false) + }) + + it("disconnects jobs when removed from list", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const initialJobs = [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: true }, + ] + + const { result, rerender } = renderHook( + ({ jobs }: { jobs: Array<{ id: string; enabled: boolean }> }) => + useJobStreams({ jobs, connect: connector }), + { initialProps: { jobs: initialJobs } }, + ) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(2) + + // Remove job-2 + const updatedJobs = [{ id: "job-1", enabled: true }] + rerender({ jobs: updatedJobs }) + + // The abort controller for job-2 should be aborted + const job2Call = connectCalls.find((c) => c.jobId === "job-2") + expect(job2Call?.signal.aborted).toBe(true) + + // job-2 should be removed from states + expect(result.current.has("job-2")).toBe(false) + }) + + it("disconnects jobs when disabled", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const initialJobs = [{ id: "job-1", enabled: true }] + + const { result, rerender } = renderHook( + ({ jobs }: { jobs: Array<{ id: string; enabled: boolean }> }) => + useJobStreams({ jobs, connect: connector }), + { initialProps: { jobs: initialJobs } }, + ) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(1) + + // Disable job-1 + rerender({ jobs: [{ id: "job-1", enabled: false }] }) + + expect(connectCalls[0].signal.aborted).toBe(true) + expect(result.current.has("job-1")).toBe(false) + }) + + it("cleans up all connections on unmount", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const jobs = [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: true }, + ] + + const { unmount } = renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(2) + + act(() => { + unmount() + }) + + for (const call of connectCalls) { + expect(call.signal.aborted).toBe(true) + } + }) + + it("isolates errors between jobs", async () => { + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}) + const { connector, getStream } = createMultiStreamConnector() + const jobs = [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: true }, + ] + + const { result } = renderHook(() => useJobStreams({ jobs, connect: connector })) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + // Error on job-1 + await act(async () => { + getStream("job-1")!.error(new Error("connection lost")) + await new Promise((r) => setTimeout(r, 0)) + }) + + // job-1 disconnected, job-2 still connected + expect(result.current.get("job-1")?.isConnected).toBe(false) + expect(result.current.get("job-2")?.isConnected).toBe(true) + + consoleSpy.mockRestore() + }) + + it("connects new jobs added dynamically", async () => { + const { connector, connectCalls } = createMultiStreamConnector() + const initialJobs = [{ id: "job-1", enabled: true }] + + const { rerender } = renderHook( + ({ jobs }: { jobs: Array<{ id: string; enabled: boolean }> }) => + useJobStreams({ jobs, connect: connector }), + { initialProps: { jobs: initialJobs } }, + ) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(1) + + // Add job-2 + rerender({ + jobs: [ + { id: "job-1", enabled: true }, + { id: "job-2", enabled: true }, + ], + }) + + await act(async () => { + await new Promise((r) => setTimeout(r, 0)) + }) + + expect(connectCalls).toHaveLength(2) + expect(connectCalls[1].jobId).toBe("job-2") + }) +}) diff --git a/packages/react/src/hooks/use-job-streams.ts b/packages/react/src/hooks/use-job-streams.ts new file mode 100644 index 00000000..b8eb36c1 --- /dev/null +++ b/packages/react/src/hooks/use-job-streams.ts @@ -0,0 +1,106 @@ +import type { ActivityOrGroup } from "@perstack/core" +import { useCallback, useEffect, useRef, useState } from "react" +import { + type ActivityProcessState, + createInitialActivityProcessState, + processRunEventToActivity, +} from "../utils/event-to-activity.js" +import { consumeStream, isAbortError, type StreamConnector } from "../utils/stream.js" + +export type JobStreamSummary = { + latestActivity: ActivityOrGroup | null + isConnected: boolean +} + +export function useJobStreams(options: { + jobs: Array<{ id: string; enabled: boolean }> + connect: StreamConnector +}): Map { + const { jobs, connect } = options + const [states, setStates] = useState>(new Map()) + const controllersRef = useRef>(new Map()) + const activityStateRef = useRef>(new Map()) + const connectRef = useRef(connect) + connectRef.current = connect + + const connectToJob = useCallback(async (jobId: string, signal: AbortSignal) => { + setStates((prev) => { + const next = new Map(prev) + next.set(jobId, { latestActivity: null, isConnected: true }) + return next + }) + + let state = activityStateRef.current.get(jobId) + if (!state) { + state = createInitialActivityProcessState() + activityStateRef.current.set(jobId, state) + } + + try { + await consumeStream(connectRef.current, jobId, signal, (event) => { + let latestActivity: ActivityOrGroup | null = null + processRunEventToActivity(state!, event, (activity) => { + latestActivity = activity + }) + if (latestActivity) { + setStates((prev) => { + const next = new Map(prev) + next.set(jobId, { latestActivity, isConnected: true }) + return next + }) + } + }) + } catch (err) { + if (isAbortError(err)) return + console.error(`Stream connection failed for job ${jobId}:`, err) + } finally { + setStates((prev) => { + const current = prev.get(jobId) + if (!current) return prev + const next = new Map(prev) + next.set(jobId, { ...current, isConnected: false }) + return next + }) + } + }, []) + + useEffect(() => { + const enabledIds = new Set(jobs.filter((j) => j.enabled).map((j) => j.id)) + + // Disconnect removed/disabled jobs + for (const [jobId, controller] of controllersRef.current) { + if (!enabledIds.has(jobId)) { + controller.abort() + controllersRef.current.delete(jobId) + activityStateRef.current.delete(jobId) + setStates((prev) => { + const next = new Map(prev) + next.delete(jobId) + return next + }) + } + } + + // Connect new jobs + for (const jobId of enabledIds) { + if (!controllersRef.current.has(jobId)) { + const controller = new AbortController() + controllersRef.current.set(jobId, controller) + connectToJob(jobId, controller.signal) + } + } + }, [jobs, connectToJob]) + + // Cleanup on unmount only + useEffect(() => { + return () => { + for (const controller of controllersRef.current.values()) { + controller.abort() + } + controllersRef.current.clear() + activityStateRef.current.clear() + } + }, []) + + return states +} diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts index 08cc3564..8c30a9ff 100644 --- a/packages/react/src/index.ts +++ b/packages/react/src/index.ts @@ -1,7 +1,12 @@ // Hooks export { type ActivityProcessState, + type JobStreamState, + type JobStreamSummary, type RunResult, + type StreamConnector, + useJobStream, + useJobStreams, useRun, } from "./hooks/index.js" diff --git a/packages/react/src/utils/index.ts b/packages/react/src/utils/index.ts index 4907e6dd..f2c19806 100644 --- a/packages/react/src/utils/index.ts +++ b/packages/react/src/utils/index.ts @@ -5,3 +5,4 @@ export { toolToActivity, } from "./event-to-activity.js" export { groupActivitiesByRun, type RunGroup } from "./group-by-run.js" +export { consumeStream, isAbortError, type StreamConnector } from "./stream.js" diff --git a/packages/react/src/utils/stream.test.ts b/packages/react/src/utils/stream.test.ts new file mode 100644 index 00000000..65ecf5c3 --- /dev/null +++ b/packages/react/src/utils/stream.test.ts @@ -0,0 +1,152 @@ +import type { PerstackEvent, RunEvent } from "@perstack/core" +import { describe, expect, it } from "vitest" +import { consumeStream, isAbortError, type StreamConnector } from "./stream.js" + +function createBaseEvent(overrides: Partial = {}): PerstackEvent { + return { + id: "e-1", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "startRun", + ...overrides, + } as PerstackEvent +} + +function createDeferredStream(): { + emit: (event: PerstackEvent) => void + end: () => void + error: (err: Error) => void + connector: StreamConnector +} { + let resolveNext: ((value: IteratorResult) => void) | null = null + let rejectNext: ((err: Error) => void) | null = null + let done = false + const pending: PerstackEvent[] = [] + + const iterable: AsyncIterable = { + [Symbol.asyncIterator]() { + return { + next(): Promise> { + if (pending.length > 0) { + return Promise.resolve({ value: pending.shift()!, done: false }) + } + if (done) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise((resolve, reject) => { + resolveNext = resolve + rejectNext = reject + }) + }, + } + }, + } + + const connector: StreamConnector = async () => iterable + + return { + emit(event: PerstackEvent) { + if (resolveNext) { + const resolve = resolveNext + resolveNext = null + rejectNext = null + resolve({ value: event, done: false }) + } else { + pending.push(event) + } + }, + end() { + done = true + if (resolveNext) { + const resolve = resolveNext + resolveNext = null + rejectNext = null + resolve({ value: undefined, done: true }) + } + }, + error(err: Error) { + if (rejectNext) { + const reject = rejectNext + resolveNext = null + rejectNext = null + reject(err) + } + }, + connector, + } +} + +describe("consumeStream", () => { + it("delivers events to onEvent callback", async () => { + const stream = createDeferredStream() + const received: PerstackEvent[] = [] + + const event1 = createBaseEvent({ id: "e-1" }) + const event2 = createBaseEvent({ id: "e-2" }) + stream.emit(event1) + stream.emit(event2) + stream.end() + + const controller = new AbortController() + await consumeStream(stream.connector, "job-1", controller.signal, (event) => { + received.push(event) + }) + + expect(received).toHaveLength(2) + expect(received[0]).toBe(event1) + expect(received[1]).toBe(event2) + }) + + it("stops iterating when signal is aborted", async () => { + const stream = createDeferredStream() + const received: PerstackEvent[] = [] + const controller = new AbortController() + + stream.emit(createBaseEvent({ id: "e-1" })) + controller.abort() + stream.emit(createBaseEvent({ id: "e-2" })) + stream.end() + + await consumeStream(stream.connector, "job-1", controller.signal, (event) => { + received.push(event) + }) + + expect(received.length).toBeLessThanOrEqual(1) + }) + + it("propagates non-abort errors", async () => { + const failingConnector: StreamConnector = async () => { + throw new Error("connection refused") + } + + const controller = new AbortController() + await expect( + consumeStream(failingConnector, "job-1", controller.signal, () => {}), + ).rejects.toThrow("connection refused") + }) +}) + +describe("isAbortError", () => { + it("returns true for AbortError DOMException", () => { + const err = new DOMException("The operation was aborted", "AbortError") + expect(isAbortError(err)).toBe(true) + }) + + it("returns false for other DOMException types", () => { + const err = new DOMException("Not found", "NotFoundError") + expect(isAbortError(err)).toBe(false) + }) + + it("returns false for regular Error", () => { + expect(isAbortError(new Error("something"))).toBe(false) + }) + + it("returns false for non-Error values", () => { + expect(isAbortError("string")).toBe(false) + expect(isAbortError(null)).toBe(false) + expect(isAbortError(undefined)).toBe(false) + }) +}) diff --git a/packages/react/src/utils/stream.ts b/packages/react/src/utils/stream.ts new file mode 100644 index 00000000..eb38a8ff --- /dev/null +++ b/packages/react/src/utils/stream.ts @@ -0,0 +1,30 @@ +import type { PerstackEvent } from "@perstack/core" + +export type StreamConnector = ( + jobId: string, + signal: AbortSignal, +) => Promise> + +export const isAbortError = (err: unknown): boolean => + err instanceof DOMException && err.name === "AbortError" + +/** + * Consumes an SSE stream with abort-safe iteration and error normalization. + * + * Handles the connection lifecycle shared by all streaming hooks: + * 1. Calls `connect` to obtain an async iterable + * 2. Iterates events, stopping on signal abort + * 3. Filters out AbortError (normal cleanup), surfaces all others + */ +export async function consumeStream( + connect: StreamConnector, + jobId: string, + signal: AbortSignal, + onEvent: (event: PerstackEvent) => void, +): Promise { + const events = await connect(jobId, signal) + for await (const event of events) { + if (signal.aborted) break + onEvent(event) + } +}