Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/unified-job-stream-hooks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@perstack/react": patch
---

Add unified job stream hooks (useJobStream, useJobStreams) with StreamConnector interface
6 changes: 6 additions & 0 deletions packages/react/src/hooks/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
export {
type JobStreamState,
type StreamConnector,
useJobStream,
} from "./use-job-stream.js"
export { type JobStreamSummary, useJobStreams } from "./use-job-streams.js"
export { type ActivityProcessState, type RunResult, useRun } from "./use-run.js"
333 changes: 333 additions & 0 deletions packages/react/src/hooks/use-job-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
import type {
Checkpoint,
ExpertMessage,
PerstackEvent,
RunEvent,
Step,
ToolCall,
ToolResult,
} from "@perstack/core"
import { createEmptyUsage } from "@perstack/core"
import { act, renderHook } from "@testing-library/react"
import { describe, expect, it } from "vitest"
import type { StreamConnector } from "./use-job-stream.js"
import { useJobStream } from "./use-job-stream.js"

function createToolCall(overrides: Partial<ToolCall> = {}): ToolCall {
return {
id: "tc-1",
skillName: "@perstack/base",
toolName: "readTextFile",
args: { path: "/test.txt" },
...overrides,
}
}

function createToolResult(overrides: Partial<ToolResult> = {}): ToolResult {
return {
id: "tc-1",
skillName: "@perstack/base",
toolName: "readTextFile",
result: [{ type: "textPart", id: "tp-1", text: '{"content": "file content"}' }],
...overrides,
}
}

function createBaseEvent(overrides: Partial<RunEvent> = {}): RunEvent {
return {
id: "e-1",
runId: "run-1",
expertKey: "test-expert@1.0.0",
jobId: "job-1",
stepNumber: 1,
timestamp: Date.now(),
type: "startRun",
...overrides,
} as RunEvent
}

function createDeferredStream(): {
emit: (event: PerstackEvent) => void
end: () => void
error: (err: Error) => void
connector: StreamConnector
connectCalls: Array<{ jobId: string; signal: AbortSignal }>
} {
const connectCalls: Array<{ jobId: string; signal: AbortSignal }> = []
let resolveNext: ((value: IteratorResult<PerstackEvent>) => void) | null = null
let rejectNext: ((err: Error) => void) | null = null
let done = false
const pending: PerstackEvent[] = []

const iterable: AsyncIterable<PerstackEvent> = {
[Symbol.asyncIterator]() {
return {
next(): Promise<IteratorResult<PerstackEvent>> {
if (pending.length > 0) {
return Promise.resolve({ value: pending.shift()!, done: false })
}
if (done) {
return Promise.resolve({ value: undefined, done: true })
}
return new Promise((resolve, reject) => {
resolveNext = resolve
rejectNext = reject
})
},
}
},
}

const connector: StreamConnector = async (jobId, signal) => {
connectCalls.push({ jobId, signal })
return iterable
}

return {
emit(event: PerstackEvent) {
if (resolveNext) {
const resolve = resolveNext
resolveNext = null
rejectNext = null
resolve({ value: event, done: false })
} else {
pending.push(event)
}
},
end() {
done = true
if (resolveNext) {
const resolve = resolveNext
resolveNext = null
rejectNext = null
resolve({ value: undefined, done: true })
}
},
error(err: Error) {
if (rejectNext) {
const reject = rejectNext
resolveNext = null
rejectNext = null
reject(err)
}
},
connector,
connectCalls,
}
}

describe("useJobStream", () => {
it("initializes with empty state when disabled", () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: null, connect: stream.connector }))

expect(result.current.activities).toEqual([])
expect(result.current.streaming.runs).toEqual({})
expect(result.current.isConnected).toBe(false)
expect(result.current.error).toBeNull()
expect(result.current.latestActivity).toBeNull()
expect(stream.connectCalls).toHaveLength(0)
})

it("does not connect when enabled is false", () => {
const stream = createDeferredStream()
const { result } = renderHook(() =>
useJobStream({ jobId: "job-1", connect: stream.connector, enabled: false }),
)

expect(result.current.isConnected).toBe(false)
expect(stream.connectCalls).toHaveLength(0)
})

it("connects when jobId and enabled are set", async () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector }))

// Wait for the async connect to complete
await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})

expect(stream.connectCalls).toHaveLength(1)
expect(stream.connectCalls[0].jobId).toBe("job-1")
expect(result.current.isConnected).toBe(true)
})

it("processes events into activities", async () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector }))

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})

await act(async () => {
stream.emit(
createBaseEvent({
type: "callTools",
toolCalls: [createToolCall()],
newMessage: {} as ExpertMessage,
usage: createEmptyUsage(),
} as Partial<RunEvent>) as RunEvent,
)
await new Promise((r) => setTimeout(r, 0))
})

await act(async () => {
stream.emit(
createBaseEvent({
id: "e-2",
type: "resolveToolResults",
toolResults: [createToolResult()],
} as Partial<RunEvent>) as RunEvent,
)
await new Promise((r) => setTimeout(r, 0))
})

expect(result.current.activities).toHaveLength(1)
expect(result.current.activities[0].type).toBe("readTextFile")
expect(result.current.latestActivity).toBe(result.current.activities[0])
})

it("sets isConnected to false when stream ends", async () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector }))

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})
expect(result.current.isConnected).toBe(true)

await act(async () => {
stream.end()
await new Promise((r) => setTimeout(r, 0))
})

expect(result.current.isConnected).toBe(false)
})

it("stores non-abort errors", async () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector }))

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})

await act(async () => {
stream.error(new Error("connection failed"))
await new Promise((r) => setTimeout(r, 0))
})

expect(result.current.error).toBeInstanceOf(Error)
expect(result.current.error?.message).toBe("connection failed")
expect(result.current.isConnected).toBe(false)
})

it("ignores abort errors", async () => {
const stream = createDeferredStream()
const { result, unmount } = renderHook(() =>
useJobStream({ jobId: "job-1", connect: stream.connector }),
)

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})
expect(result.current.isConnected).toBe(true)

act(() => {
unmount()
})

expect(result.current.error).toBeNull()
})

it("disconnects when enabled changes to false", async () => {
const stream = createDeferredStream()
const { result, rerender } = renderHook(
({ enabled }: { enabled: boolean }) =>
useJobStream({ jobId: "job-1", connect: stream.connector, enabled }),
{ initialProps: { enabled: true } },
)

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})
expect(result.current.isConnected).toBe(true)
expect(stream.connectCalls[0].signal.aborted).toBe(false)

rerender({ enabled: false })

expect(stream.connectCalls[0].signal.aborted).toBe(true)
expect(result.current.isConnected).toBe(false)
})

it("handles connector that throws immediately", async () => {
const failingConnector: StreamConnector = async () => {
throw new Error("failed to connect")
}

const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: failingConnector }))

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})

expect(result.current.error?.message).toBe("failed to connect")
expect(result.current.isConnected).toBe(false)
})

it("derives latestActivity from activities array", async () => {
const stream = createDeferredStream()
const { result } = renderHook(() => useJobStream({ jobId: "job-1", connect: stream.connector }))

await act(async () => {
await new Promise((r) => setTimeout(r, 0))
})

// Emit call + result to generate first activity
await act(async () => {
stream.emit(
createBaseEvent({
type: "callTools",
toolCalls: [createToolCall()],
newMessage: {} as ExpertMessage,
usage: createEmptyUsage(),
} as Partial<RunEvent>) as RunEvent,
)
await new Promise((r) => setTimeout(r, 0))
})

await act(async () => {
stream.emit(
createBaseEvent({
id: "e-2",
type: "resolveToolResults",
toolResults: [createToolResult()],
} as Partial<RunEvent>) as RunEvent,
)
await new Promise((r) => setTimeout(r, 0))
})

// Emit completeRun to generate second activity
await act(async () => {
stream.emit(
createBaseEvent({
id: "e-3",
type: "completeRun",
text: "Task completed",
checkpoint: {} as Checkpoint,
step: {} as Step,
usage: createEmptyUsage(),
} as Partial<RunEvent>) as RunEvent,
)
await new Promise((r) => setTimeout(r, 0))
})

expect(result.current.activities).toHaveLength(2)
expect(result.current.latestActivity).toBe(
result.current.activities[result.current.activities.length - 1],
)
expect(result.current.latestActivity?.type).toBe("complete")
})
})
Loading