Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/query-activity-generation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@perstack/core": patch
---

Add QueryActivity generation to getActivities() for parity with processRunEventToActivity()
229 changes: 229 additions & 0 deletions packages/core/src/utils/activity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1640,4 +1640,233 @@ describe("getActivities", () => {
}
})
})

describe("query activity", () => {
it("generates QueryActivity for stepNumber === 1 with UserMessage", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 1 })
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Hello, what can you do?" }],
},
],
newMessages: [
{
id: "m-2",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-2", text: "I can help you..." }],
},
],
toolCalls: [createToolCall({ toolName: "readTextFile", args: { path: "/test.txt" } })],
toolResults: [
createToolResult({
toolName: "readTextFile",
result: [{ type: "textPart", id: "tp-1", text: '{"content": "file content"}' }],
}),
],
})

const activities = getActivities({ checkpoint, step })

expect(activities.length).toBeGreaterThanOrEqual(2)
expect(activities[0].type).toBe("query")
if (activities[0].type === "query") {
expect(activities[0].text).toBe("Hello, what can you do?")
expect(activities[0].expertKey).toBe("test@1.0.0")
expect(activities[0].runId).toBe("run-1")
}
})

it("does not generate QueryActivity for stepNumber > 1", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 2 })
const step = createBaseStep({
stepNumber: 2,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Continue..." }],
},
],
newMessages: [
{
id: "m-2",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-2", text: "Continuing..." }],
},
],
toolCalls: [createToolCall({ toolName: "readTextFile", args: { path: "/test.txt" } })],
toolResults: [
createToolResult({
toolName: "readTextFile",
result: [{ type: "textPart", id: "tp-1", text: '{"content": "file content"}' }],
}),
],
})

const activities = getActivities({ checkpoint, step })

expect(activities.every((a) => a.type !== "query")).toBe(true)
})

it("does not generate QueryActivity when inputMessages is missing", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 1, status: "completed" })
const step = createBaseStep({
stepNumber: 1,
newMessages: [
{
id: "m-1",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Done!" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities.every((a) => a.type !== "query")).toBe(true)
})

it("does not generate QueryActivity when no UserMessage exists", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 1, status: "completed" })
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "instructionMessage",
contents: [{ type: "textPart", id: "tp-1", text: "System instruction" }],
},
],
newMessages: [
{
id: "m-2",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-2", text: "Done!" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities.every((a) => a.type !== "query")).toBe(true)
})

it("does not generate QueryActivity when UserMessage has no textPart", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 1, status: "completed" })
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [
{
type: "imageUrlPart",
id: "ip-1",
url: "https://example.com/image.png",
mimeType: "image/png",
},
],
},
],
newMessages: [
{
id: "m-2",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-2", text: "Done!" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities.every((a) => a.type !== "query")).toBe(true)
})

it("prepends QueryActivity to error activity", () => {
const checkpoint = createBaseCheckpoint({
stepNumber: 1,
status: "stoppedByError",
error: { name: "TestError", message: "Something went wrong", isRetryable: false },
})
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Do something" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities).toHaveLength(2)
expect(activities[0].type).toBe("query")
expect(activities[1].type).toBe("error")
})

it("prepends QueryActivity to delegate activity", () => {
const checkpoint = createBaseCheckpoint({
stepNumber: 1,
status: "stoppedByDelegate",
delegateTo: [
{
expert: { key: "child@1.0.0", name: "child", version: "1.0.0" },
toolCallId: "tc-1",
toolName: "delegateToChild",
query: "do something",
},
],
})
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Delegate this task" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities).toHaveLength(2)
expect(activities[0].type).toBe("query")
expect(activities[1].type).toBe("delegate")
})

it("prepends QueryActivity to complete activity", () => {
const checkpoint = createBaseCheckpoint({ stepNumber: 1, status: "completed" })
const step = createBaseStep({
stepNumber: 1,
inputMessages: [
{
id: "m-1",
type: "userMessage",
contents: [{ type: "textPart", id: "tp-1", text: "Simple task" }],
},
],
newMessages: [
{
id: "m-2",
type: "expertMessage",
contents: [{ type: "textPart", id: "tp-2", text: "Task completed!" }],
},
],
})

const activities = getActivities({ checkpoint, step })

expect(activities).toHaveLength(2)
expect(activities[0].type).toBe("query")
expect(activities[1].type).toBe("complete")
})
})
})
61 changes: 43 additions & 18 deletions packages/core/src/utils/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,36 +76,61 @@ export function getActivities(params: GetActivitiesParams): ActivityOrGroup[] {
const expertKey = checkpoint.expert.key
const reasoning = extractReasoning(step.newMessages)

// Generate QueryActivity for first step (parity with processRunEventToActivity)
let queryActivity: ActivityOrGroup | undefined
if (stepNumber === 1 && step.inputMessages) {
const userMessage = step.inputMessages.find((m) => m.type === "userMessage")
if (userMessage) {
const textPart = userMessage.contents.find((c) => c.type === "textPart")
if (textPart && "text" in textPart) {
queryActivity = {
type: "query",
id: "",
expertKey,
runId,
text: textPart.text,
}
}
}
}

const prependQuery = (result: ActivityOrGroup[]): ActivityOrGroup[] =>
queryActivity ? [queryActivity, ...result] : result

// Error status - use checkpoint error information
if (status === "stoppedByError") {
return [createErrorActivity(checkpoint, reasoning)]
return prependQuery([createErrorActivity(checkpoint, reasoning)])
}

// Parallel delegate activities - each delegation becomes a separate activity
if (status === "stoppedByDelegate") {
if (!delegateTo || delegateTo.length === 0) {
return [
return prependQuery([
createRetryActivity(
step.newMessages,
reasoning,
"Delegate status but no delegation targets",
),
]
])
}
const activities = delegateTo.map((d) => createDelegateActivity(d, reasoning))
return wrapInGroupIfParallel(activities, reasoning, expertKey, runId, stepNumber)
const delegateActivities = delegateTo.map((d) => createDelegateActivity(d, reasoning))
return prependQuery(
wrapInGroupIfParallel(delegateActivities, reasoning, expertKey, runId, stepNumber),
)
}

// Interactive tool activities - may be parallel
if (status === "stoppedByInteractiveTool") {
const toolCalls = step.toolCalls ?? []
if (toolCalls.length === 0) {
return [createRetryActivity(step.newMessages, reasoning)]
return prependQuery([createRetryActivity(step.newMessages, reasoning)])
}
const activities = toolCalls.map((tc) =>
const interactiveActivities = toolCalls.map((tc) =>
createInteractiveToolActivity(tc.skillName, tc.toolName, tc, reasoning),
)
return wrapInGroupIfParallel(activities, reasoning, expertKey, runId, stepNumber)
return prependQuery(
wrapInGroupIfParallel(interactiveActivities, reasoning, expertKey, runId, stepNumber),
)
}

// Normal tool activities - may be parallel
Expand All @@ -115,12 +140,12 @@ export function getActivities(params: GetActivitiesParams): ActivityOrGroup[] {
// For completed status with no tool calls, return CompleteActivity only
if (toolCalls.length === 0) {
if (status === "completed") {
return [createCompleteActivity(step.newMessages, reasoning)]
return prependQuery([createCompleteActivity(step.newMessages, reasoning)])
}
return [createRetryActivity(step.newMessages, reasoning)]
return prependQuery([createRetryActivity(step.newMessages, reasoning)])
}

const activities: Activity[] = []
const toolActivities: Activity[] = []
for (const toolCall of toolCalls) {
const toolResult = toolResults.find((tr) => tr.id === toolCall.id)
if (!toolResult) {
Expand All @@ -129,29 +154,29 @@ export function getActivities(params: GetActivitiesParams): ActivityOrGroup[] {
}
const { skillName, toolName } = toolCall
if (skillName.startsWith(BASE_SKILL_PREFIX)) {
activities.push(createBaseToolActivity(toolName, toolCall, toolResult, reasoning))
toolActivities.push(createBaseToolActivity(toolName, toolCall, toolResult, reasoning))
} else {
activities.push(
toolActivities.push(
createGeneralToolActivity(skillName, toolName, toolCall, toolResult, reasoning),
)
}
}

if (activities.length === 0) {
if (toolActivities.length === 0) {
if (status === "completed") {
return [createCompleteActivity(step.newMessages, reasoning)]
return prependQuery([createCompleteActivity(step.newMessages, reasoning)])
}
return [createRetryActivity(step.newMessages, reasoning)]
return prependQuery([createRetryActivity(step.newMessages, reasoning)])
}

const result = wrapInGroupIfParallel(activities, reasoning, expertKey, runId, stepNumber)
const result = wrapInGroupIfParallel(toolActivities, reasoning, expertKey, runId, stepNumber)

// Append CompleteActivity for completed status
if (status === "completed") {
result.push(createCompleteActivity(step.newMessages, undefined))
}

return result
return prependQuery(result)
}

function createCompleteActivity(newMessages: Message[], reasoning: string | undefined): Activity {
Expand Down