diff --git a/.changeset/add-use-event-stream.md b/.changeset/add-use-event-stream.md
new file mode 100644
index 00000000..46d15e7d
--- /dev/null
+++ b/.changeset/add-use-event-stream.md
@@ -0,0 +1,14 @@
+---
+"@perstack/react": patch
+---
+
+feat(react): add useEventStream hook for consuming PerstackEvent streams
+
+Added a new `useEventStream` hook that provides automatic connection management for PerstackEvent streams. The hook is API-agnostic and accepts an `EventSourceFactory` function, allowing it to work with any backend that provides PerstackEvent streams.
+
+Features:
+- Automatic connection lifecycle management (connect/disconnect based on `enabled` flag)
+- Event processing through `useRun` hook internally
+- Connection state tracking (`isConnected`, `error`)
+- Proper cleanup on unmount or when disabled
+- AbortSignal support for stream cancellation
diff --git a/packages/react/README.md b/packages/react/README.md
index b3141085..eca12bb3 100644
--- a/packages/react/README.md
+++ b/packages/react/README.md
@@ -54,6 +54,41 @@ function MyComponent() {
}
```
+### useEventStream
+
+A hook for consuming PerstackEvent streams with automatic connection management. This hook is API-agnostic and accepts a factory function that creates the event source.
+
+```tsx
+import { useEventStream } from "@perstack/react"
+
+function JobActivityView({ jobId, isRunning }: { jobId: string; isRunning: boolean }) {
+ const { activities, streaming, isConnected, isComplete, error } = useEventStream({
+ enabled: isRunning,
+ createEventSource: async ({ signal }) => {
+ const response = await fetch(`/api/jobs/${jobId}/stream`, { signal })
+ // Return an async generator of PerstackEvent
+ return parseSSEStream(response.body)
+ },
+ })
+
+ return (
+
+ {isConnected &&
Live}
+ {activities.map((activity) => (
+
+ ))}
+ {Object.entries(streaming.runs).map(([runId, run]) => (
+
+ {run.isReasoningActive &&
Thinking: {run.reasoning}
}
+ {run.isRunResultActive &&
Generating: {run.runResult}
}
+
+ ))}
+ {error &&
Error: {error.message}
}
+
+ )
+}
+```
+
### useRuntimeState
A lower-level hook for managing RuntimeState separately.
@@ -107,6 +142,27 @@ Returns an object with:
**Note:** Logs are append-only and never cleared. This is required for compatibility with Ink's `` component.
+### useEventStream(options)
+
+Options:
+
+- `enabled`: Whether the stream should be active
+- `createEventSource`: Factory function that returns an async generator of `PerstackEvent`
+
+Returns an object with:
+
+- `activities`: Array of `ActivityOrGroup` from processed events
+- `streaming`: Current `StreamingState` for real-time display
+- `isConnected`: Whether currently connected to the event source
+- `isComplete`: Whether the run has completed
+- `error`: Last error encountered, if any
+
+The hook automatically:
+- Connects when `enabled` is `true` and `createEventSource` is provided
+- Disconnects and aborts when `enabled` becomes `false` or on unmount
+- Processes events through `useRun` internally
+- Clears error state on reconnection
+
### useRuntimeState()
Returns an object with:
diff --git a/packages/react/src/hooks/index.ts b/packages/react/src/hooks/index.ts
index 7e8522a5..371af867 100644
--- a/packages/react/src/hooks/index.ts
+++ b/packages/react/src/hooks/index.ts
@@ -1,2 +1,9 @@
+export {
+ type EventSourceFactory,
+ type EventStreamOptions,
+ type EventStreamState,
+ type UseEventStreamOptions,
+ useEventStream,
+} from "./use-event-stream.js"
export { type ActivityProcessState, type RunResult, useRun } from "./use-run.js"
export { type RuntimeResult, useRuntime } from "./use-runtime.js"
diff --git a/packages/react/src/hooks/use-event-stream.test.ts b/packages/react/src/hooks/use-event-stream.test.ts
new file mode 100644
index 00000000..38814aed
--- /dev/null
+++ b/packages/react/src/hooks/use-event-stream.test.ts
@@ -0,0 +1,403 @@
+import type { PerstackEvent, RunEvent } from "@perstack/core"
+import { act, renderHook, waitFor } from "@testing-library/react"
+import { describe, expect, it, vi } from "vitest"
+import { type EventSourceFactory, useEventStream } from "./use-event-stream.js"
+
+function createBaseEvent(overrides: Partial = {}): RunEvent {
+ return {
+ id: "e-1",
+ runId: "run-1",
+ expertKey: "test-expert@1.0.0",
+ jobId: "job-1",
+ stepNumber: 1,
+ timestamp: Date.now(),
+ type: "startRun",
+ ...overrides,
+ } as RunEvent
+}
+
+async function* createMockEventGenerator(
+ events: PerstackEvent[],
+ options?: { delayMs?: number; signal?: AbortSignal },
+): AsyncGenerator {
+ for (const event of events) {
+ if (options?.signal?.aborted) return
+ if (options?.delayMs) {
+ await new Promise((resolve) => setTimeout(resolve, options.delayMs))
+ }
+ yield event
+ }
+}
+
+describe("useEventStream", () => {
+ it("initializes with disconnected state when disabled", () => {
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: false,
+ createEventSource: null,
+ }),
+ )
+
+ expect(result.current.activities).toEqual([])
+ expect(result.current.streaming.runs).toEqual({})
+ expect(result.current.isConnected).toBe(false)
+ expect(result.current.isComplete).toBe(false)
+ expect(result.current.error).toBeNull()
+ })
+
+ it("does not connect when enabled is false", async () => {
+ const factory = vi.fn()
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: false,
+ createEventSource: factory,
+ }),
+ )
+
+ await new Promise((resolve) => setTimeout(resolve, 50))
+
+ expect(factory).not.toHaveBeenCalled()
+ expect(result.current.isConnected).toBe(false)
+ })
+
+ it("does not connect when createEventSource is null", async () => {
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: null,
+ }),
+ )
+
+ await new Promise((resolve) => setTimeout(resolve, 50))
+
+ expect(result.current.isConnected).toBe(false)
+ })
+
+ it("connects when enabled with createEventSource", async () => {
+ const events = [createBaseEvent({ type: "startRun" })]
+ const factory: EventSourceFactory = vi.fn(async () => createMockEventGenerator(events))
+
+ renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(factory).toHaveBeenCalledTimes(1)
+ })
+
+ expect(factory).toHaveBeenCalledWith(
+ expect.objectContaining({ signal: expect.any(AbortSignal) }),
+ )
+ })
+
+ it("processes events through useRun", async () => {
+ const events: PerstackEvent[] = [
+ {
+ id: "e-1",
+ runId: "run-1",
+ expertKey: "test-expert@1.0.0",
+ jobId: "job-1",
+ stepNumber: 1,
+ timestamp: Date.now(),
+ type: "startStreamingReasoning",
+ } as PerstackEvent,
+ {
+ id: "e-2",
+ runId: "run-1",
+ expertKey: "test-expert@1.0.0",
+ jobId: "job-1",
+ stepNumber: 1,
+ timestamp: Date.now(),
+ type: "streamReasoning",
+ delta: "Thinking...",
+ } as PerstackEvent,
+ ]
+
+ const factory: EventSourceFactory = async () => createMockEventGenerator(events)
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.streaming.runs["run-1"]).toBeDefined()
+ })
+
+ expect(result.current.streaming.runs["run-1"].reasoning).toBe("Thinking...")
+ })
+
+ it("sets isConnected to true during streaming", async () => {
+ let resolveGenerator: (() => void) | undefined
+ const generatorPromise = new Promise((resolve) => {
+ resolveGenerator = resolve
+ })
+
+ const factory: EventSourceFactory = async () => {
+ // biome-ignore lint/correctness/useYield: Test needs a generator that blocks
+ return (async function* () {
+ await generatorPromise
+ })()
+ }
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.isConnected).toBe(true)
+ })
+
+ act(() => {
+ resolveGenerator?.()
+ })
+
+ await waitFor(() => {
+ expect(result.current.isConnected).toBe(false)
+ })
+ })
+
+ it("sets isConnected to false after stream completes", async () => {
+ const events = [createBaseEvent()]
+ const factory: EventSourceFactory = async () => createMockEventGenerator(events)
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.isConnected).toBe(false)
+ })
+ })
+
+ it("handles errors from event source factory", async () => {
+ const factory: EventSourceFactory = async () => {
+ throw new Error("Connection failed")
+ }
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.error).not.toBeNull()
+ })
+
+ expect(result.current.error?.message).toBe("Connection failed")
+ expect(result.current.isConnected).toBe(false)
+ })
+
+ it("handles errors during iteration", async () => {
+ let yieldCount = 0
+ const factory: EventSourceFactory = async () => {
+ // biome-ignore lint/correctness/useYield: Test needs a generator that throws before yielding
+ return (async function* () {
+ yieldCount++
+ throw new Error("Stream error")
+ })()
+ }
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.error).not.toBeNull()
+ })
+
+ expect(yieldCount).toBe(1) // Generator was called
+ expect(result.current.error?.message).toBe("Stream error")
+ })
+
+ it("converts non-Error objects to Error", async () => {
+ const factory: EventSourceFactory = async () => {
+ throw "String error"
+ }
+
+ const { result } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.error).not.toBeNull()
+ })
+
+ expect(result.current.error?.message).toBe("Stream connection failed")
+ })
+
+ it("aborts stream on cleanup", async () => {
+ let abortSignal: AbortSignal | undefined
+ let resolveGenerator: (() => void) | undefined
+ const generatorPromise = new Promise((resolve) => {
+ resolveGenerator = resolve
+ })
+
+ const factory: EventSourceFactory = async ({ signal }) => {
+ abortSignal = signal
+ // biome-ignore lint/correctness/useYield: Test needs a generator that blocks
+ return (async function* () {
+ await generatorPromise
+ })()
+ }
+
+ const { unmount } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(abortSignal).toBeDefined()
+ })
+
+ expect(abortSignal?.aborted).toBe(false)
+
+ unmount()
+
+ expect(abortSignal?.aborted).toBe(true)
+
+ resolveGenerator?.()
+ })
+
+ it("does not set error on abort", async () => {
+ let resolveGenerator: (() => void) | undefined
+ const generatorPromise = new Promise((resolve) => {
+ resolveGenerator = resolve
+ })
+
+ const factory: EventSourceFactory = async ({ signal }) => {
+ // biome-ignore lint/correctness/useYield: Test needs a generator that waits then throws
+ return (async function* () {
+ await generatorPromise
+ if (signal.aborted) {
+ throw new DOMException("Aborted", "AbortError")
+ }
+ })()
+ }
+
+ const { result, unmount } = renderHook(() =>
+ useEventStream({
+ enabled: true,
+ createEventSource: factory,
+ }),
+ )
+
+ await waitFor(() => {
+ expect(result.current.isConnected).toBe(true)
+ })
+
+ unmount()
+ resolveGenerator?.()
+
+ expect(result.current.error).toBeNull()
+ })
+
+ it("reconnects when enabled changes from false to true", async () => {
+ const events = [createBaseEvent()]
+ const factory: EventSourceFactory = vi.fn(async () => createMockEventGenerator(events))
+
+ const { rerender } = renderHook(
+ ({ enabled }: { enabled: boolean }) =>
+ useEventStream({
+ enabled,
+ createEventSource: factory,
+ }),
+ { initialProps: { enabled: false } },
+ )
+
+ expect(factory).not.toHaveBeenCalled()
+
+ rerender({ enabled: true })
+
+ await waitFor(() => {
+ expect(factory).toHaveBeenCalledTimes(1)
+ })
+ })
+
+ it("disconnects when enabled changes from true to false", async () => {
+ let abortSignal: AbortSignal | undefined
+ let resolveGenerator: (() => void) | undefined
+ const generatorPromise = new Promise((resolve) => {
+ resolveGenerator = resolve
+ })
+
+ const factory: EventSourceFactory = async ({ signal }) => {
+ abortSignal = signal
+ // biome-ignore lint/correctness/useYield: Test needs a generator that blocks
+ return (async function* () {
+ await generatorPromise
+ })()
+ }
+
+ const { result, rerender } = renderHook(
+ ({ enabled }: { enabled: boolean }) =>
+ useEventStream({
+ enabled,
+ createEventSource: factory,
+ }),
+ { initialProps: { enabled: true } },
+ )
+
+ await waitFor(() => {
+ expect(result.current.isConnected).toBe(true)
+ })
+
+ expect(abortSignal?.aborted).toBe(false)
+
+ rerender({ enabled: false })
+
+ expect(abortSignal?.aborted).toBe(true)
+
+ resolveGenerator?.()
+ })
+
+ it("clears error when reconnecting", async () => {
+ const failingFactory: EventSourceFactory = async () => {
+ throw new Error("Connection failed")
+ }
+
+ const { result, rerender } = renderHook(
+ ({ createEventSource }: { createEventSource: EventSourceFactory }) =>
+ useEventStream({
+ enabled: true,
+ createEventSource,
+ }),
+ { initialProps: { createEventSource: failingFactory } },
+ )
+
+ await waitFor(() => {
+ expect(result.current.error).not.toBeNull()
+ })
+
+ const successFactory: EventSourceFactory = async () =>
+ createMockEventGenerator([createBaseEvent()])
+
+ rerender({ createEventSource: successFactory })
+
+ await waitFor(() => {
+ expect(result.current.error).toBeNull()
+ })
+ })
+})
diff --git a/packages/react/src/hooks/use-event-stream.ts b/packages/react/src/hooks/use-event-stream.ts
new file mode 100644
index 00000000..18b0318a
--- /dev/null
+++ b/packages/react/src/hooks/use-event-stream.ts
@@ -0,0 +1,117 @@
+import type { ActivityOrGroup, PerstackEvent } from "@perstack/core"
+import { useEffect, useRef, useState } from "react"
+import type { StreamingState } from "../types/index.js"
+import { useRun } from "./use-run.js"
+
+/**
+ * Options for creating an event stream connection.
+ */
+export type EventStreamOptions = {
+ /** AbortSignal to cancel the stream */
+ signal: AbortSignal
+}
+
+/**
+ * Factory function that creates an async generator of PerstackEvents.
+ * This abstraction allows the hook to work with any event source.
+ */
+export type EventSourceFactory = (
+ options: EventStreamOptions,
+) => Promise>
+
+export type EventStreamState = {
+ /** Accumulated activities from processed events */
+ activities: ActivityOrGroup[]
+ /** Current streaming state for real-time display */
+ streaming: StreamingState
+ /** Whether currently connected to the event source */
+ isConnected: boolean
+ /** Whether the run has completed */
+ isComplete: boolean
+ /** Last error encountered, if any */
+ error: Error | null
+}
+
+export type UseEventStreamOptions = {
+ /** Whether the stream should be active */
+ enabled: boolean
+ /** Factory to create the event source when enabled */
+ createEventSource: EventSourceFactory | null
+}
+
+/**
+ * Hook for consuming PerstackEvent streams with automatic connection management.
+ *
+ * This hook is API-agnostic - it accepts a factory function that creates
+ * the event source, allowing it to work with any backend.
+ *
+ * @example
+ * ```tsx
+ * const { activities, streaming, isConnected, error } = useEventStream({
+ * enabled: isRunning,
+ * createEventSource: async ({ signal }) => {
+ * const result = await apiClient.jobs.stream(jobId, { signal })
+ * if (!result.ok) throw new Error(result.error.message)
+ * return result.data.events
+ * },
+ * })
+ * ```
+ */
+export function useEventStream(options: UseEventStreamOptions): EventStreamState {
+ const { enabled, createEventSource } = options
+
+ const runState = useRun()
+ const [isConnected, setIsConnected] = useState(false)
+ const [error, setError] = useState(null)
+
+ const abortControllerRef = useRef(null)
+ // Store addEvent in a ref to avoid dependency changes triggering reconnection
+ const addEventRef = useRef(runState.addEvent)
+ addEventRef.current = runState.addEvent
+
+ useEffect(() => {
+ if (!enabled || !createEventSource) {
+ return
+ }
+
+ const abortController = new AbortController()
+ abortControllerRef.current = abortController
+ const { signal } = abortController
+
+ const connect = async () => {
+ setIsConnected(true)
+ setError(null)
+
+ try {
+ const events = await createEventSource({ signal })
+
+ for await (const event of events) {
+ if (signal.aborted) break
+ addEventRef.current(event)
+ }
+ } catch (err) {
+ if (err instanceof DOMException && err.name === "AbortError") {
+ return // Intentional cancellation
+ }
+ setError(err instanceof Error ? err : new Error("Stream connection failed"))
+ } finally {
+ setIsConnected(false)
+ }
+ }
+
+ connect()
+
+ return () => {
+ abortController.abort()
+ abortControllerRef.current = null
+ }
+ }, [enabled, createEventSource])
+
+ return {
+ activities: runState.activities,
+ streaming: runState.streaming,
+ isConnected,
+ isComplete: runState.isComplete,
+ error,
+ }
+}
diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts
index 54417123..1c164be5 100644
--- a/packages/react/src/index.ts
+++ b/packages/react/src/index.ts
@@ -1,8 +1,13 @@
// Hooks
export {
type ActivityProcessState,
+ type EventSourceFactory,
+ type EventStreamOptions,
+ type EventStreamState,
type RunResult,
type RuntimeResult,
+ type UseEventStreamOptions,
+ useEventStream,
useRun,
useRuntime,
} from "./hooks/index.js"