-
Notifications
You must be signed in to change notification settings - Fork 0
Add act stream support #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🦋 Changeset detectedLatest commit: e3b4cc6 The changes in this PR will be included in the next version bump. This PR includes changesets to release 7 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds streaming support to the agent framework through a new actStream method that yields response chunks as they arrive from the LLM.
Summary: The PR introduces streaming capabilities by adding new progress types (text_delta, stream_start, stream_end), a StreamChunk interface for typed streaming responses, and an actStream implementation in the configurable agent factory. However, the implementation has several critical gaps compared to the non-streaming act method.
Key Changes:
- Added streaming-related progress types to
ProgressMessageTypeEnum - Introduced
StreamChunkinterface and optionalactStreammethod to theAgentinterface - Implemented
actStreamincreateConfigurableAgentwith LLM streaming support and fallback behavior
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 17 comments.
| File | Description |
|---|---|
packages/core/src/types/progress.types.ts |
Adds three new enum values for streaming progress events |
packages/core/src/types/agent.types.ts |
Defines StreamChunk interface and adds optional actStream method to Agent interface |
packages/core/src/factories/configurable-agent.factory.ts |
Implements the actStream method with streaming LLM calls, Langfuse tracing, and non-streaming fallback |
.changeset/many-facts-grin.md |
Documents the change as a minor version bump |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Call streaming LLM with tools | ||
| // Tool events are sent via sendUpdate callback in onStepFinish | ||
| const { textStream, generation } = | ||
| await base.llmService.runStreamedLLMWithTools({ | ||
| messages: workingMessages, | ||
| tools: availableTools, | ||
| sendUpdate, | ||
| context: processedInput.context, | ||
| model: modelToUse, | ||
| provider: providerToUse, | ||
| traceContext: { | ||
| sessionId: processedInput.context?.sessionId, | ||
| metadata: { | ||
| ...processedInput.context?.metadata, | ||
| modelUsed: modelToUse, | ||
| providerUsed: providerToUse, | ||
| streaming: true, | ||
| }, | ||
| }, | ||
| ...mergedLlmOptions, | ||
| }); | ||
|
|
||
| // Stream text chunks | ||
| for await (const chunk of textStream) { | ||
| fullText += chunk; | ||
|
|
||
| // Yield chunk to caller | ||
| yield { | ||
| type: "text_delta", | ||
| content: chunk, | ||
| }; | ||
|
|
||
| // Also send via sendUpdate for IPC | ||
| await sendUpdate({ | ||
| type: "text_delta", | ||
| content: chunk, | ||
| }); |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actStream implementation doesn't handle tool calls during streaming. The non-streaming act method has a while (toolRound < maxToolRounds) loop (lines 317-439) to handle multiple rounds of tool execution, but actStream completely omits this logic. This means agents using actStream won't be able to execute tools, which is a significant functional gap. The comment on line 627 claims "Tool calls are automatically handled during streaming via onStepFinish" but there's no evidence of this in the implementation - no tool execution loop, no tool response handling, and no calls to toolExecutor.executeToolCalls.
| // Call streaming LLM with tools | |
| // Tool events are sent via sendUpdate callback in onStepFinish | |
| const { textStream, generation } = | |
| await base.llmService.runStreamedLLMWithTools({ | |
| messages: workingMessages, | |
| tools: availableTools, | |
| sendUpdate, | |
| context: processedInput.context, | |
| model: modelToUse, | |
| provider: providerToUse, | |
| traceContext: { | |
| sessionId: processedInput.context?.sessionId, | |
| metadata: { | |
| ...processedInput.context?.metadata, | |
| modelUsed: modelToUse, | |
| providerUsed: providerToUse, | |
| streaming: true, | |
| }, | |
| }, | |
| ...mergedLlmOptions, | |
| }); | |
| // Stream text chunks | |
| for await (const chunk of textStream) { | |
| fullText += chunk; | |
| // Yield chunk to caller | |
| yield { | |
| type: "text_delta", | |
| content: chunk, | |
| }; | |
| // Also send via sendUpdate for IPC | |
| await sendUpdate({ | |
| type: "text_delta", | |
| content: chunk, | |
| }); | |
| // Tool streaming loop | |
| let toolRound = 0; | |
| let currentMessages = workingMessages; | |
| let currentContext = processedInput.context; | |
| let continueToolLoop = true; | |
| let lastToolResults: any = null; | |
| const maxToolRounds = config.customConfig?.maxToolRounds ?? 3; | |
| while (toolRound < maxToolRounds && continueToolLoop) { | |
| // Call streaming LLM with tools | |
| const { textStream, generation } = | |
| await base.llmService.runStreamedLLMWithTools({ | |
| messages: currentMessages, | |
| tools: availableTools, | |
| sendUpdate, | |
| context: currentContext, | |
| model: modelToUse, | |
| provider: providerToUse, | |
| traceContext: { | |
| sessionId: currentContext?.sessionId, | |
| metadata: { | |
| ...currentContext?.metadata, | |
| modelUsed: modelToUse, | |
| providerUsed: providerToUse, | |
| streaming: true, | |
| }, | |
| }, | |
| ...mergedLlmOptions, | |
| }); | |
| // Stream text chunks and collect for tool call detection | |
| let streamedText = ""; | |
| for await (const chunk of textStream) { | |
| streamedText += chunk; | |
| fullText += chunk; | |
| // Yield chunk to caller | |
| yield { | |
| type: "text_delta", | |
| content: chunk, | |
| }; | |
| // Also send via sendUpdate for IPC | |
| await sendUpdate({ | |
| type: "text_delta", | |
| content: chunk, | |
| }); | |
| } | |
| // Detect tool calls in streamedText | |
| const toolCalls = base.toolService?.detectToolCalls?.(streamedText, availableTools) || []; | |
| if (toolCalls.length > 0) { | |
| // Execute tool calls | |
| const toolResults = await base.toolExecutor.executeToolCalls(toolCalls, currentContext); | |
| // Stream tool results | |
| for (const result of toolResults) { | |
| yield { | |
| type: "tool_result", | |
| content: result, | |
| }; | |
| await sendUpdate({ | |
| type: "tool_result", | |
| content: result, | |
| }); | |
| } | |
| // Prepare for next round: add tool results to messages/context | |
| currentMessages = [ | |
| ...currentMessages, | |
| ...base.toolService?.toolResultsToMessages?.(toolResults) || [], | |
| ]; | |
| lastToolResults = toolResults; | |
| toolRound += 1; | |
| continueToolLoop = true; | |
| } else { | |
| // No tool calls detected, finish loop | |
| continueToolLoop = false; | |
| } |
| // Call streaming LLM with tools | ||
| // Tool events are sent via sendUpdate callback in onStepFinish |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 777 states "Tool events are sent via sendUpdate callback in onStepFinish" but this mechanism isn't shown in the implementation. If tools are being handled by the LLM service's runStreamedLLMWithTools method, this should be clearly documented with an explanation of how it differs from the non-streaming approach.
| // Call streaming LLM with tools | |
| // Tool events are sent via sendUpdate callback in onStepFinish | |
| /** | |
| * Call streaming LLM with tools. | |
| * | |
| * Tool events (such as tool calls, tool results, etc.) are sent via the `sendUpdate` callback. | |
| * In the streaming approach, the LLM service's `runStreamedLLMWithTools` method is responsible for | |
| * invoking `sendUpdate` whenever a tool event occurs during the generation process. | |
| * | |
| * This differs from the non-streaming approach, where tool events may be handled synchronously | |
| * or via a different callback mechanism, and may not be interleaved with text streaming. | |
| * | |
| * For details on how tool events are dispatched, see the implementation of `runStreamedLLMWithTools` | |
| * in the LLM service. All tool-related updates are sent to the caller via the provided `sendUpdate` | |
| * function, ensuring that both text and tool events are streamed in real time. | |
| */ |
| try { | ||
| // Hook: transformInput | ||
| if (customHandlers.transformInput) { | ||
| processedInput = await customHandlers.transformInput({ | ||
| input: processedInput, | ||
| sendUpdate, | ||
| }); | ||
| } | ||
|
|
||
| // Hook: beforeAct | ||
| if (customHandlers.beforeAct) { | ||
| processedInput = await customHandlers.beforeAct({ | ||
| input: processedInput, | ||
| config, | ||
| sendUpdate, | ||
| }); | ||
| } | ||
|
|
||
| // Prepare messages with system prompt and cache control | ||
| const shouldCacheSystemPrompt = | ||
| config.prompts.systemCache && providerToUse === "anthropic"; | ||
|
|
||
| const workingMessages = [ | ||
| { | ||
| role: "system" as const, | ||
| content: config.prompts.system, | ||
| ...(shouldCacheSystemPrompt | ||
| ? { | ||
| experimental_providerMetadata: { | ||
| anthropic: { | ||
| cacheControl: { type: "ephemeral" as const }, | ||
| }, | ||
| }, | ||
| } | ||
| : {}), | ||
| }, | ||
| ...processedInput.messages.map((msg: any) => { | ||
| if (msg.providerOptions?.anthropic?.cacheControl) { | ||
| return { | ||
| ...msg, | ||
| experimental_providerMetadata: { | ||
| anthropic: { | ||
| cacheControl: msg.providerOptions.anthropic.cacheControl, | ||
| }, | ||
| }, | ||
| }; | ||
| } | ||
| return msg; | ||
| }), | ||
| ]; | ||
|
|
||
| // Merge LLM options | ||
| const mergedLlmOptions = { | ||
| ...(config.customConfig?.llmOptions || {}), | ||
| ...((processedInput.context as any)?.llmOptions || {}), | ||
| } as any; | ||
|
|
||
| // Collect full text for final response | ||
| let fullText = ""; | ||
|
|
||
| // Call streaming LLM with tools | ||
| // Tool events are sent via sendUpdate callback in onStepFinish | ||
| const { textStream, generation } = | ||
| await base.llmService.runStreamedLLMWithTools({ | ||
| messages: workingMessages, | ||
| tools: availableTools, | ||
| sendUpdate, | ||
| context: processedInput.context, | ||
| model: modelToUse, | ||
| provider: providerToUse, | ||
| traceContext: { | ||
| sessionId: processedInput.context?.sessionId, | ||
| metadata: { | ||
| ...processedInput.context?.metadata, | ||
| modelUsed: modelToUse, | ||
| providerUsed: providerToUse, | ||
| streaming: true, | ||
| }, | ||
| }, | ||
| ...mergedLlmOptions, | ||
| }); | ||
|
|
||
| // Stream text chunks | ||
| for await (const chunk of textStream) { | ||
| fullText += chunk; | ||
|
|
||
| // Yield chunk to caller | ||
| yield { | ||
| type: "text_delta", | ||
| content: chunk, | ||
| }; | ||
|
|
||
| // Also send via sendUpdate for IPC | ||
| await sendUpdate({ | ||
| type: "text_delta", | ||
| content: chunk, | ||
| }); | ||
| } | ||
|
|
||
| // Build final response | ||
| const response: AgentResponse = { | ||
| role: "assistant", | ||
| content: fullText, | ||
| }; | ||
|
|
||
| // Hook: afterResponse | ||
| let finalResponse = response; | ||
| if (customHandlers.afterResponse) { | ||
| finalResponse = await customHandlers.afterResponse({ | ||
| response, | ||
| input: processedInput, | ||
| sendUpdate, | ||
| }); | ||
| } | ||
|
|
||
| // Hook: transformOutput | ||
| if (customHandlers.transformOutput) { | ||
| finalResponse = await customHandlers.transformOutput({ | ||
| output: finalResponse, | ||
| sendUpdate, | ||
| }); | ||
| } | ||
|
|
||
| // Signal stream end | ||
| yield { type: "stream_end", content: "" }; | ||
|
|
||
| await sendUpdate({ | ||
| type: "stream_end", | ||
| content: fullText, | ||
| }); | ||
|
|
||
| // Close trace on success | ||
| try { | ||
| langfuseService.addEventToSession( | ||
| sessionToken, | ||
| "agent-stream-end", | ||
| {} | ||
| ); | ||
| langfuseService.endSpanForSession(sessionToken, "agent-act-stream"); | ||
| langfuseService.endExecutionTrace(sessionToken, { | ||
| contentPreview: fullText.slice(0, 500), | ||
| streaming: true, | ||
| }); | ||
| } catch {} | ||
|
|
||
| return finalResponse; | ||
| } catch (error) { | ||
| // Send error via stream | ||
| yield { | ||
| type: "stream_end", | ||
| content: "", | ||
| metadata: { error: (error as Error).message }, | ||
| }; | ||
|
|
||
| await sendUpdate({ | ||
| type: "error", | ||
| content: (error as Error).message, | ||
| }); | ||
|
|
||
| // End trace on error | ||
| try { | ||
| langfuseService.addEventToSession(sessionToken, "agent-stream-error", { | ||
| message: (error as Error)?.message, | ||
| }); | ||
| langfuseService.endSpanForSession( | ||
| sessionToken, | ||
| "agent-act-stream", | ||
| undefined, | ||
| error as Error | ||
| ); | ||
| langfuseService.endExecutionTrace( | ||
| sessionToken, | ||
| undefined, | ||
| error as Error | ||
| ); | ||
| } catch {} | ||
|
|
||
| throw error; | ||
| } |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actStream implementation doesn't support retry logic that exists in the non-streaming act method. The act method has a while (attempt < maxRetries) loop and validation with error correction (lines 219-620), but actStream has no retry mechanism. This inconsistency means that streaming requests won't benefit from the same error recovery capabilities as non-streaming requests.
| } catch (error) { | ||
| // Send error via stream | ||
| yield { | ||
| type: "stream_end", | ||
| content: "", | ||
| metadata: { error: (error as Error).message }, | ||
| }; | ||
|
|
||
| await sendUpdate({ | ||
| type: "error", | ||
| content: (error as Error).message, | ||
| }); | ||
|
|
||
| // End trace on error | ||
| try { | ||
| langfuseService.addEventToSession(sessionToken, "agent-stream-error", { | ||
| message: (error as Error)?.message, | ||
| }); | ||
| langfuseService.endSpanForSession( | ||
| sessionToken, | ||
| "agent-act-stream", | ||
| undefined, | ||
| error as Error | ||
| ); | ||
| langfuseService.endExecutionTrace( | ||
| sessionToken, | ||
| undefined, | ||
| error as Error | ||
| ); | ||
| } catch {} | ||
|
|
||
| throw error; | ||
| } |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actStream implementation doesn't call the onError custom handler in the error catch block. The non-streaming act method calls customHandlers.onError in multiple places (lines 362, 472, 527) to allow custom error handling and retry logic, but this is completely missing from actStream. This means custom error handling won't work for streaming requests.
| "@mrck-labs/grid-core": minor | ||
| --- | ||
|
|
||
| add act streamed function |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changeset description "add act streamed function" has inconsistent capitalization and should be a complete sentence. Consider: "Add actStream method for streaming agent responses".
| add act streamed function | |
| Add actStream method for streaming agent responses. |
| // Use the non-streaming runLLM as fallback | ||
| const fallbackMessages = [ | ||
| { role: "system" as const, content: config.prompts.system }, | ||
| ...input.messages, | ||
| ]; | ||
| const response = await base.llmService.runLLM({ | ||
| messages: fallbackMessages, | ||
| tools: availableTools, | ||
| sendUpdate, | ||
| context: input.context, | ||
| }); | ||
| yield { | ||
| type: "text_delta", | ||
| content: typeof response.content === "string" ? response.content : "", | ||
| }; | ||
| yield { type: "stream_end", content: "" }; | ||
| return response; |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback doesn't include any Langfuse tracing, so streaming requests that fall back to non-streaming won't be traced. This creates an observability gap where some requests are invisible to monitoring.
| langfuseService.addEventToSession(sessionToken, "agent-stream-error", { | ||
| message: (error as Error)?.message, | ||
| }); | ||
| langfuseService.endSpanForSession( | ||
| sessionToken, | ||
| "agent-act-stream", | ||
| undefined, | ||
| error as Error | ||
| ); | ||
| langfuseService.endExecutionTrace( | ||
| sessionToken, | ||
| undefined, | ||
| error as Error | ||
| ); |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the error handling block, sessionToken is used without verifying it was successfully extracted. If the error occurs before sessionToken is set (e.g., during the context initialization), the langfuse service calls will fail with another error, masking the original error.
| langfuseService.addEventToSession(sessionToken, "agent-stream-error", { | |
| message: (error as Error)?.message, | |
| }); | |
| langfuseService.endSpanForSession( | |
| sessionToken, | |
| "agent-act-stream", | |
| undefined, | |
| error as Error | |
| ); | |
| langfuseService.endExecutionTrace( | |
| sessionToken, | |
| undefined, | |
| error as Error | |
| ); | |
| if (sessionToken) { | |
| langfuseService.addEventToSession(sessionToken, "agent-stream-error", { | |
| message: (error as Error)?.message, | |
| }); | |
| langfuseService.endSpanForSession( | |
| sessionToken, | |
| "agent-act-stream", | |
| undefined, | |
| error as Error | |
| ); | |
| langfuseService.endExecutionTrace( | |
| sessionToken, | |
| undefined, | |
| error as Error | |
| ); | |
| } else { | |
| // Optionally log or handle the missing sessionToken case | |
| // console.warn("sessionToken is undefined; skipping langfuseService error reporting."); | |
| } |
|
|
||
| if (modelToUse) { | ||
| console.log( | ||
| `🤖 [${config.id}] Streaming with model: ${modelToUse}${ |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The console message uses "Streaming" but other log messages in the non-streaming act method use "Using". For consistency, consider changing to: 🤖 [${config.id}] Using model for streaming: ${modelToUse}... to match the pattern on line 208.
| `🤖 [${config.id}] Streaming with model: ${modelToUse}${ | |
| `🤖 [${config.id}] Using model for streaming: ${modelToUse}${ |
| const fallbackMessages = [ | ||
| { role: "system" as const, content: config.prompts.system }, | ||
| ...input.messages, | ||
| ]; | ||
| const response = await base.llmService.runLLM({ | ||
| messages: fallbackMessages, | ||
| tools: availableTools, | ||
| sendUpdate, | ||
| context: input.context, | ||
| }); | ||
| yield { | ||
| type: "text_delta", | ||
| content: typeof response.content === "string" ? response.content : "", |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback implementation when streaming is not supported doesn't call the custom handlers (transformInput, beforeAct, afterResponse, transformOutput) that are called in the main streaming path. This inconsistency means the fallback behavior will differ from the actual streaming behavior, potentially causing unexpected results.
| const fallbackMessages = [ | |
| { role: "system" as const, content: config.prompts.system }, | |
| ...input.messages, | |
| ]; | |
| const response = await base.llmService.runLLM({ | |
| messages: fallbackMessages, | |
| tools: availableTools, | |
| sendUpdate, | |
| context: input.context, | |
| }); | |
| yield { | |
| type: "text_delta", | |
| content: typeof response.content === "string" ? response.content : "", | |
| let processedInput = input; | |
| // Apply transformInput if defined | |
| if (config.handlers?.transformInput) { | |
| processedInput = await config.handlers.transformInput(processedInput); | |
| } | |
| // Apply beforeAct if defined | |
| if (config.handlers?.beforeAct) { | |
| processedInput = await config.handlers.beforeAct(processedInput); | |
| } | |
| const fallbackMessages = [ | |
| { role: "system" as const, content: config.prompts.system }, | |
| ...processedInput.messages, | |
| ]; | |
| let response = await base.llmService.runLLM({ | |
| messages: fallbackMessages, | |
| tools: availableTools, | |
| sendUpdate, | |
| context: processedInput.context, | |
| }); | |
| // Apply afterResponse if defined | |
| if (config.handlers?.afterResponse) { | |
| response = await config.handlers.afterResponse(response, processedInput); | |
| } | |
| // Apply transformOutput if defined | |
| let outputContent = typeof response.content === "string" ? response.content : ""; | |
| if (config.handlers?.transformOutput) { | |
| outputContent = await config.handlers.transformOutput(outputContent, response, processedInput); | |
| } | |
| yield { | |
| type: "text_delta", | |
| content: outputContent, |
|
|
||
| // Call streaming LLM with tools | ||
| // Tool events are sent via sendUpdate callback in onStepFinish | ||
| const { textStream, generation } = |
Copilot
AI
Nov 26, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused variable generation.
| const { textStream, generation } = | |
| const { textStream } = |
No description provided.