From b549acf1a1080fc808f068766e255936e08ba00e Mon Sep 17 00:00:00 2001 From: HiranoMasaaki Date: Wed, 28 Jan 2026 05:19:17 +0900 Subject: [PATCH] feat(react): add useEventStream hook for consuming PerstackEvent streams Add a new useEventStream hook that provides automatic connection management for PerstackEvent streams. The hook is API-agnostic and accepts an EventSourceFactory function, allowing it to work with any backend. Features: - Automatic connection lifecycle management - Event processing through useRun hook internally - Connection state tracking (isConnected, error) - Proper cleanup on unmount or when disabled - AbortSignal support for stream cancellation Co-Authored-By: Claude Opus 4.5 --- .changeset/add-use-event-stream.md | 14 + packages/react/README.md | 56 +++ packages/react/src/hooks/index.ts | 7 + .../react/src/hooks/use-event-stream.test.ts | 403 ++++++++++++++++++ packages/react/src/hooks/use-event-stream.ts | 117 +++++ packages/react/src/index.ts | 5 + 6 files changed, 602 insertions(+) create mode 100644 .changeset/add-use-event-stream.md create mode 100644 packages/react/src/hooks/use-event-stream.test.ts create mode 100644 packages/react/src/hooks/use-event-stream.ts diff --git a/.changeset/add-use-event-stream.md b/.changeset/add-use-event-stream.md new file mode 100644 index 00000000..46d15e7d --- /dev/null +++ b/.changeset/add-use-event-stream.md @@ -0,0 +1,14 @@ +--- +"@perstack/react": patch +--- + +feat(react): add useEventStream hook for consuming PerstackEvent streams + +Added a new `useEventStream` hook that provides automatic connection management for PerstackEvent streams. The hook is API-agnostic and accepts an `EventSourceFactory` function, allowing it to work with any backend that provides PerstackEvent streams. + +Features: +- Automatic connection lifecycle management (connect/disconnect based on `enabled` flag) +- Event processing through `useRun` hook internally +- Connection state tracking (`isConnected`, `error`) +- Proper cleanup on unmount or when disabled +- AbortSignal support for stream cancellation diff --git a/packages/react/README.md b/packages/react/README.md index b3141085..eca12bb3 100644 --- a/packages/react/README.md +++ b/packages/react/README.md @@ -54,6 +54,41 @@ function MyComponent() { } ``` +### useEventStream + +A hook for consuming PerstackEvent streams with automatic connection management. This hook is API-agnostic and accepts a factory function that creates the event source. + +```tsx +import { useEventStream } from "@perstack/react" + +function JobActivityView({ jobId, isRunning }: { jobId: string; isRunning: boolean }) { + const { activities, streaming, isConnected, isComplete, error } = useEventStream({ + enabled: isRunning, + createEventSource: async ({ signal }) => { + const response = await fetch(`/api/jobs/${jobId}/stream`, { signal }) + // Return an async generator of PerstackEvent + return parseSSEStream(response.body) + }, + }) + + return ( +
+ {isConnected && Live} + {activities.map((activity) => ( + + ))} + {Object.entries(streaming.runs).map(([runId, run]) => ( +
+ {run.isReasoningActive &&
Thinking: {run.reasoning}
} + {run.isRunResultActive &&
Generating: {run.runResult}
} +
+ ))} + {error &&
Error: {error.message}
} +
+ ) +} +``` + ### useRuntimeState A lower-level hook for managing RuntimeState separately. @@ -107,6 +142,27 @@ Returns an object with: **Note:** Logs are append-only and never cleared. This is required for compatibility with Ink's `` component. +### useEventStream(options) + +Options: + +- `enabled`: Whether the stream should be active +- `createEventSource`: Factory function that returns an async generator of `PerstackEvent` + +Returns an object with: + +- `activities`: Array of `ActivityOrGroup` from processed events +- `streaming`: Current `StreamingState` for real-time display +- `isConnected`: Whether currently connected to the event source +- `isComplete`: Whether the run has completed +- `error`: Last error encountered, if any + +The hook automatically: +- Connects when `enabled` is `true` and `createEventSource` is provided +- Disconnects and aborts when `enabled` becomes `false` or on unmount +- Processes events through `useRun` internally +- Clears error state on reconnection + ### useRuntimeState() Returns an object with: diff --git a/packages/react/src/hooks/index.ts b/packages/react/src/hooks/index.ts index 7e8522a5..371af867 100644 --- a/packages/react/src/hooks/index.ts +++ b/packages/react/src/hooks/index.ts @@ -1,2 +1,9 @@ +export { + type EventSourceFactory, + type EventStreamOptions, + type EventStreamState, + type UseEventStreamOptions, + useEventStream, +} from "./use-event-stream.js" export { type ActivityProcessState, type RunResult, useRun } from "./use-run.js" export { type RuntimeResult, useRuntime } from "./use-runtime.js" diff --git a/packages/react/src/hooks/use-event-stream.test.ts b/packages/react/src/hooks/use-event-stream.test.ts new file mode 100644 index 00000000..38814aed --- /dev/null +++ b/packages/react/src/hooks/use-event-stream.test.ts @@ -0,0 +1,403 @@ +import type { PerstackEvent, RunEvent } from "@perstack/core" +import { act, renderHook, waitFor } from "@testing-library/react" +import { describe, expect, it, vi } from "vitest" +import { type EventSourceFactory, useEventStream } from "./use-event-stream.js" + +function createBaseEvent(overrides: Partial = {}): RunEvent { + return { + id: "e-1", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "startRun", + ...overrides, + } as RunEvent +} + +async function* createMockEventGenerator( + events: PerstackEvent[], + options?: { delayMs?: number; signal?: AbortSignal }, +): AsyncGenerator { + for (const event of events) { + if (options?.signal?.aborted) return + if (options?.delayMs) { + await new Promise((resolve) => setTimeout(resolve, options.delayMs)) + } + yield event + } +} + +describe("useEventStream", () => { + it("initializes with disconnected state when disabled", () => { + const { result } = renderHook(() => + useEventStream({ + enabled: false, + createEventSource: null, + }), + ) + + expect(result.current.activities).toEqual([]) + expect(result.current.streaming.runs).toEqual({}) + expect(result.current.isConnected).toBe(false) + expect(result.current.isComplete).toBe(false) + expect(result.current.error).toBeNull() + }) + + it("does not connect when enabled is false", async () => { + const factory = vi.fn() + + const { result } = renderHook(() => + useEventStream({ + enabled: false, + createEventSource: factory, + }), + ) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(factory).not.toHaveBeenCalled() + expect(result.current.isConnected).toBe(false) + }) + + it("does not connect when createEventSource is null", async () => { + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: null, + }), + ) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(result.current.isConnected).toBe(false) + }) + + it("connects when enabled with createEventSource", async () => { + const events = [createBaseEvent({ type: "startRun" })] + const factory: EventSourceFactory = vi.fn(async () => createMockEventGenerator(events)) + + renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(factory).toHaveBeenCalledTimes(1) + }) + + expect(factory).toHaveBeenCalledWith( + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ) + }) + + it("processes events through useRun", async () => { + const events: PerstackEvent[] = [ + { + id: "e-1", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "startStreamingReasoning", + } as PerstackEvent, + { + id: "e-2", + runId: "run-1", + expertKey: "test-expert@1.0.0", + jobId: "job-1", + stepNumber: 1, + timestamp: Date.now(), + type: "streamReasoning", + delta: "Thinking...", + } as PerstackEvent, + ] + + const factory: EventSourceFactory = async () => createMockEventGenerator(events) + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.streaming.runs["run-1"]).toBeDefined() + }) + + expect(result.current.streaming.runs["run-1"].reasoning).toBe("Thinking...") + }) + + it("sets isConnected to true during streaming", async () => { + let resolveGenerator: (() => void) | undefined + const generatorPromise = new Promise((resolve) => { + resolveGenerator = resolve + }) + + const factory: EventSourceFactory = async () => { + // biome-ignore lint/correctness/useYield: Test needs a generator that blocks + return (async function* () { + await generatorPromise + })() + } + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.isConnected).toBe(true) + }) + + act(() => { + resolveGenerator?.() + }) + + await waitFor(() => { + expect(result.current.isConnected).toBe(false) + }) + }) + + it("sets isConnected to false after stream completes", async () => { + const events = [createBaseEvent()] + const factory: EventSourceFactory = async () => createMockEventGenerator(events) + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.isConnected).toBe(false) + }) + }) + + it("handles errors from event source factory", async () => { + const factory: EventSourceFactory = async () => { + throw new Error("Connection failed") + } + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.error).not.toBeNull() + }) + + expect(result.current.error?.message).toBe("Connection failed") + expect(result.current.isConnected).toBe(false) + }) + + it("handles errors during iteration", async () => { + let yieldCount = 0 + const factory: EventSourceFactory = async () => { + // biome-ignore lint/correctness/useYield: Test needs a generator that throws before yielding + return (async function* () { + yieldCount++ + throw new Error("Stream error") + })() + } + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.error).not.toBeNull() + }) + + expect(yieldCount).toBe(1) // Generator was called + expect(result.current.error?.message).toBe("Stream error") + }) + + it("converts non-Error objects to Error", async () => { + const factory: EventSourceFactory = async () => { + throw "String error" + } + + const { result } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.error).not.toBeNull() + }) + + expect(result.current.error?.message).toBe("Stream connection failed") + }) + + it("aborts stream on cleanup", async () => { + let abortSignal: AbortSignal | undefined + let resolveGenerator: (() => void) | undefined + const generatorPromise = new Promise((resolve) => { + resolveGenerator = resolve + }) + + const factory: EventSourceFactory = async ({ signal }) => { + abortSignal = signal + // biome-ignore lint/correctness/useYield: Test needs a generator that blocks + return (async function* () { + await generatorPromise + })() + } + + const { unmount } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(abortSignal).toBeDefined() + }) + + expect(abortSignal?.aborted).toBe(false) + + unmount() + + expect(abortSignal?.aborted).toBe(true) + + resolveGenerator?.() + }) + + it("does not set error on abort", async () => { + let resolveGenerator: (() => void) | undefined + const generatorPromise = new Promise((resolve) => { + resolveGenerator = resolve + }) + + const factory: EventSourceFactory = async ({ signal }) => { + // biome-ignore lint/correctness/useYield: Test needs a generator that waits then throws + return (async function* () { + await generatorPromise + if (signal.aborted) { + throw new DOMException("Aborted", "AbortError") + } + })() + } + + const { result, unmount } = renderHook(() => + useEventStream({ + enabled: true, + createEventSource: factory, + }), + ) + + await waitFor(() => { + expect(result.current.isConnected).toBe(true) + }) + + unmount() + resolveGenerator?.() + + expect(result.current.error).toBeNull() + }) + + it("reconnects when enabled changes from false to true", async () => { + const events = [createBaseEvent()] + const factory: EventSourceFactory = vi.fn(async () => createMockEventGenerator(events)) + + const { rerender } = renderHook( + ({ enabled }: { enabled: boolean }) => + useEventStream({ + enabled, + createEventSource: factory, + }), + { initialProps: { enabled: false } }, + ) + + expect(factory).not.toHaveBeenCalled() + + rerender({ enabled: true }) + + await waitFor(() => { + expect(factory).toHaveBeenCalledTimes(1) + }) + }) + + it("disconnects when enabled changes from true to false", async () => { + let abortSignal: AbortSignal | undefined + let resolveGenerator: (() => void) | undefined + const generatorPromise = new Promise((resolve) => { + resolveGenerator = resolve + }) + + const factory: EventSourceFactory = async ({ signal }) => { + abortSignal = signal + // biome-ignore lint/correctness/useYield: Test needs a generator that blocks + return (async function* () { + await generatorPromise + })() + } + + const { result, rerender } = renderHook( + ({ enabled }: { enabled: boolean }) => + useEventStream({ + enabled, + createEventSource: factory, + }), + { initialProps: { enabled: true } }, + ) + + await waitFor(() => { + expect(result.current.isConnected).toBe(true) + }) + + expect(abortSignal?.aborted).toBe(false) + + rerender({ enabled: false }) + + expect(abortSignal?.aborted).toBe(true) + + resolveGenerator?.() + }) + + it("clears error when reconnecting", async () => { + const failingFactory: EventSourceFactory = async () => { + throw new Error("Connection failed") + } + + const { result, rerender } = renderHook( + ({ createEventSource }: { createEventSource: EventSourceFactory }) => + useEventStream({ + enabled: true, + createEventSource, + }), + { initialProps: { createEventSource: failingFactory } }, + ) + + await waitFor(() => { + expect(result.current.error).not.toBeNull() + }) + + const successFactory: EventSourceFactory = async () => + createMockEventGenerator([createBaseEvent()]) + + rerender({ createEventSource: successFactory }) + + await waitFor(() => { + expect(result.current.error).toBeNull() + }) + }) +}) diff --git a/packages/react/src/hooks/use-event-stream.ts b/packages/react/src/hooks/use-event-stream.ts new file mode 100644 index 00000000..18b0318a --- /dev/null +++ b/packages/react/src/hooks/use-event-stream.ts @@ -0,0 +1,117 @@ +import type { ActivityOrGroup, PerstackEvent } from "@perstack/core" +import { useEffect, useRef, useState } from "react" +import type { StreamingState } from "../types/index.js" +import { useRun } from "./use-run.js" + +/** + * Options for creating an event stream connection. + */ +export type EventStreamOptions = { + /** AbortSignal to cancel the stream */ + signal: AbortSignal +} + +/** + * Factory function that creates an async generator of PerstackEvents. + * This abstraction allows the hook to work with any event source. + */ +export type EventSourceFactory = ( + options: EventStreamOptions, +) => Promise> + +export type EventStreamState = { + /** Accumulated activities from processed events */ + activities: ActivityOrGroup[] + /** Current streaming state for real-time display */ + streaming: StreamingState + /** Whether currently connected to the event source */ + isConnected: boolean + /** Whether the run has completed */ + isComplete: boolean + /** Last error encountered, if any */ + error: Error | null +} + +export type UseEventStreamOptions = { + /** Whether the stream should be active */ + enabled: boolean + /** Factory to create the event source when enabled */ + createEventSource: EventSourceFactory | null +} + +/** + * Hook for consuming PerstackEvent streams with automatic connection management. + * + * This hook is API-agnostic - it accepts a factory function that creates + * the event source, allowing it to work with any backend. + * + * @example + * ```tsx + * const { activities, streaming, isConnected, error } = useEventStream({ + * enabled: isRunning, + * createEventSource: async ({ signal }) => { + * const result = await apiClient.jobs.stream(jobId, { signal }) + * if (!result.ok) throw new Error(result.error.message) + * return result.data.events + * }, + * }) + * ``` + */ +export function useEventStream(options: UseEventStreamOptions): EventStreamState { + const { enabled, createEventSource } = options + + const runState = useRun() + const [isConnected, setIsConnected] = useState(false) + const [error, setError] = useState(null) + + const abortControllerRef = useRef(null) + // Store addEvent in a ref to avoid dependency changes triggering reconnection + const addEventRef = useRef(runState.addEvent) + addEventRef.current = runState.addEvent + + useEffect(() => { + if (!enabled || !createEventSource) { + return + } + + const abortController = new AbortController() + abortControllerRef.current = abortController + const { signal } = abortController + + const connect = async () => { + setIsConnected(true) + setError(null) + + try { + const events = await createEventSource({ signal }) + + for await (const event of events) { + if (signal.aborted) break + addEventRef.current(event) + } + } catch (err) { + if (err instanceof DOMException && err.name === "AbortError") { + return // Intentional cancellation + } + setError(err instanceof Error ? err : new Error("Stream connection failed")) + } finally { + setIsConnected(false) + } + } + + connect() + + return () => { + abortController.abort() + abortControllerRef.current = null + } + }, [enabled, createEventSource]) + + return { + activities: runState.activities, + streaming: runState.streaming, + isConnected, + isComplete: runState.isComplete, + error, + } +} diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts index 54417123..1c164be5 100644 --- a/packages/react/src/index.ts +++ b/packages/react/src/index.ts @@ -1,8 +1,13 @@ // Hooks export { type ActivityProcessState, + type EventSourceFactory, + type EventStreamOptions, + type EventStreamState, type RunResult, type RuntimeResult, + type UseEventStreamOptions, + useEventStream, useRun, useRuntime, } from "./hooks/index.js"