From dc47b0b3d784324752f6502f89bf2ee6db70e5d9 Mon Sep 17 00:00:00 2001 From: Jacob Zwang <59858341+JacobZwang@users.noreply.github.com> Date: Sat, 9 Aug 2025 01:21:04 -0700 Subject: [PATCH 1/2] improved durability --- src/app/api/chat/route.ts | 59 +---- src/app/api/health/route.ts | 42 ++++ src/components/chat.tsx | 292 ++++++++++++---------- src/components/use-chat.tsx | 56 ++++- src/hooks/use-stream-recovery.ts | 83 +++++++ src/lib/README.md | 162 ------------ src/lib/index.ts | 51 +--- src/lib/internal/chat-service.ts | 162 ++++++++++++ src/lib/internal/error-handler.ts | 158 ++++++++++++ src/lib/internal/monitoring.ts | 179 ++++++++++++++ src/lib/internal/resilience.ts | 226 +++++++++++++++++ src/lib/internal/stream-manager.ts | 383 +++++++++++++++++++++++++---- 12 files changed, 1415 insertions(+), 438 deletions(-) create mode 100644 src/app/api/health/route.ts create mode 100644 src/hooks/use-stream-recovery.ts delete mode 100644 src/lib/README.md create mode 100644 src/lib/internal/chat-service.ts create mode 100644 src/lib/internal/error-handler.ts create mode 100644 src/lib/internal/monitoring.ts create mode 100644 src/lib/internal/resilience.ts diff --git a/src/app/api/chat/route.ts b/src/app/api/chat/route.ts index c9323b70..65d6adb5 100644 --- a/src/app/api/chat/route.ts +++ b/src/app/api/chat/route.ts @@ -1,63 +1,26 @@ -import { getApp } from "@/actions/get-app"; -import { freestyle } from "@/lib/freestyle"; +import { NextRequest } from "next/server"; import { getAppIdFromHeaders } from "@/lib/utils"; -import { UIMessage } from "ai"; -import { builderAgent } from "@/mastra/agents/builder"; +import { ChatService } from "@/lib/internal/chat-service"; // "fix" mastra mcp bug import { EventEmitter } from "events"; -import { - isStreamRunning, - stopStream, - waitForStreamToStop, - clearStreamState, - sendMessageWithStreaming, -} from "@/lib/internal/stream-manager"; EventEmitter.defaultMaxListeners = 1000; -import { NextRequest } from "next/server"; - export async function POST(req: NextRequest) { - console.log("creating new chat stream"); - const appId = getAppIdFromHeaders(req); + console.log("Creating new chat stream"); + // Get app ID from headers + const appId = getAppIdFromHeaders(req); if (!appId) { return new Response("Missing App Id header", { status: 400 }); } - const app = await getApp(appId); - if (!app) { - return new Response("App not found", { status: 404 }); + // Validate request + const validationError = ChatService.validateChatRequest(req); + if (validationError) { + return validationError; } - // Check if a stream is already running and stop it if necessary - if (await isStreamRunning(appId)) { - console.log("Stopping previous stream for appId:", appId); - await stopStream(appId); - - // Wait until stream state is cleared - const stopped = await waitForStreamToStop(appId); - if (!stopped) { - await clearStreamState(appId); - return new Response( - "Previous stream is still shutting down, please try again", - { status: 429 } - ); - } - } - - const { messages }: { messages: UIMessage[] } = await req.json(); - - const { mcpEphemeralUrl } = await freestyle.requestDevServer({ - repoId: app.info.gitRepo, - }); - - const resumableStream = await sendMessageWithStreaming( - builderAgent, - appId, - mcpEphemeralUrl, - messages.at(-1)! - ); - - return resumableStream.response(); + // Process chat request with all the durability features + return await ChatService.processChatRequest(req, appId); } diff --git a/src/app/api/health/route.ts b/src/app/api/health/route.ts new file mode 100644 index 00000000..c6d4c746 --- /dev/null +++ b/src/app/api/health/route.ts @@ -0,0 +1,42 @@ +import { NextRequest } from "next/server"; +import { Monitoring } from "@/lib/internal/monitoring"; + +export async function GET(req: NextRequest) { + try { + const healthSummary = Monitoring.getHealthSummary(); + const metrics = Monitoring.getMetrics(); + + return new Response( + JSON.stringify({ + status: healthSummary.status, + timestamp: new Date().toISOString(), + issues: healthSummary.issues, + summary: { + successRate: `${metrics.performance.successRate.toFixed(1)}%`, + totalRequests: metrics.performance.totalRequests, + averageResponseTime: `${metrics.performance.averageResponseTime.toFixed(0)}ms`, + circuitBreakers: Object.keys(metrics.circuitBreakers).length, + }, + }), + { + status: healthSummary.status === "healthy" ? 200 : 503, + headers: { + "content-type": "application/json", + "cache-control": "no-cache", + }, + } + ); + } catch (error) { + return new Response( + JSON.stringify({ + status: "error", + message: "Failed to get health status", + timestamp: new Date().toISOString(), + }), + { + status: 500, + headers: { "content-type": "application/json" }, + } + ); + } +} diff --git a/src/components/chat.tsx b/src/components/chat.tsx index fd635d39..f0b9d011 100644 --- a/src/components/chat.tsx +++ b/src/components/chat.tsx @@ -1,85 +1,62 @@ "use client"; -import Image from "next/image"; - +import { useQuery } from "@tanstack/react-query"; +import { chatState } from "@/actions/chat-streaming"; import { PromptInputBasic } from "./chatinput"; -import { Markdown } from "./ui/markdown"; import { useState } from "react"; import { ChatContainer } from "./ui/chat-container"; import { UIMessage } from "ai"; -import { ToolMessage } from "./tools"; -import { useQuery } from "@tanstack/react-query"; -import { chatState } from "@/actions/chat-streaming"; -import { CompressedImage } from "@/lib/image-compression"; import { useChatSafe } from "./use-chat"; +import Image from "next/image"; +import { Markdown } from "./ui/markdown"; +import { ToolMessage } from "./tools"; +import { useStreamRecovery } from "@/hooks/use-stream-recovery"; export default function Chat(props: { appId: string; initialMessages: UIMessage[]; - isLoading?: boolean; - topBar?: React.ReactNode; running: boolean; }) { const { data: chat } = useQuery({ - queryKey: ["stream", props.appId], - queryFn: async () => { - return chatState(props.appId); - }, + queryKey: ["chat", props.appId], + queryFn: () => chatState(props.appId), refetchInterval: 1000, - refetchOnWindowFocus: true, }); - const { messages, sendMessage } = useChatSafe({ + const { messages, sendMessage, reload, stop } = useChatSafe({ messages: props.initialMessages, id: props.appId, resume: props.running && chat?.state === "running", + onError: (error: Error | unknown) => { + console.error("Chat error:", error); + const errorMessage = + error instanceof Error ? error.message : "Stream error occurred"; + setLastError(errorMessage); + setTimeout(() => { + reload(); + }, 2000); + }, }); const [input, setInput] = useState(""); + const [lastError, setLastError] = useState(null); + + const { retryCount, isRecovering, resetErrorState, forceRecovery } = + useStreamRecovery({ + onError: setLastError, + messageCount: messages.length, + }); const onSubmit = (e?: React.FormEvent) => { if (e?.preventDefault) { e.preventDefault(); } + resetErrorState(); sendMessage( { - parts: [ - { - type: "text", - text: input, - }, - ], - }, - { - headers: { - "Adorable-App-Id": props.appId, - }, - } - ); - setInput(""); - }; - - const onSubmitWithImages = (text: string, images: CompressedImage[]) => { - const parts: Parameters[0]["parts"] = []; - - if (text.trim()) { - parts.push({ - type: "text", - text: text, - }); - } - - images.forEach((image) => { - parts.push({ - type: "file", - mediaType: image.mimeType, - url: image.data, - }); - }); - - sendMessage( - { - parts, + id: crypto.randomUUID(), + parts: [{ text: input, type: "text" }], + role: "user", }, { headers: { @@ -90,74 +67,160 @@ export default function Chat(props: { setInput(""); }; - async function handleStop() { + const handleStop = async () => { await fetch("/api/chat/" + props.appId + "/stream", { method: "DELETE", headers: { "Adorable-App-Id": props.appId, }, }); - } + }; return ( -
- {props.topBar} -
- - {messages.map((message: any) => ( - - ))} - -
-
+
+ {/* Error display */} + {lastError && ( +
+
+
+
+ {isRecovering + ? "Recovering..." + : `Connection issue detected. ${retryCount > 0 && `Retry attempt ${retryCount}/3`}`} +
+
+
+ {!isRecovering && ( + + )} + +
+
+
+ )} + + {/* Recovery status */} + {isRecovering && ( +
+
+
+ Recovering stream... Please wait. +
+
+
+ )} + + + {messages.map((message: UIMessage) => ( + + ))} + + +
{ - setInput(value); - }} + onValueChange={setInput} onSubmit={onSubmit} - onSubmitWithImages={onSubmitWithImages} - isGenerating={props.isLoading || chat?.state === "running"} + onSubmitWithImages={(text, images) => { + if (text.trim()) { + sendMessage( + { + id: crypto.randomUUID(), + parts: [ + { text, type: "text" }, + ...images.map((img) => ({ + type: "file" as const, + url: img.data, + mediaType: img.mimeType, + })), + ], + role: "user", + }, + { + headers: { + "Adorable-App-Id": props.appId, + }, + } + ); + } else if (images.length > 0) { + sendMessage( + { + id: crypto.randomUUID(), + parts: images.map((img) => ({ + type: "file" as const, + url: img.data, + mediaType: img.mimeType, + })), + role: "user", + }, + { + headers: { + "Adorable-App-Id": props.appId, + }, + } + ); + } + setInput(""); + }} + stop={handleStop} + isGenerating={chat?.state === "running"} />
); } -function MessageBody({ message }: { message: any }) { +function MessageBody({ message }: { message: UIMessage }) { if (message.role === "user") { return (
- {message.parts.map((part: any, index: number) => { - if (part.type === "text") { - return
{part.text}
; - } else if ( - part.type === "file" && - part.mediaType?.startsWith("image/") - ) { - return ( -
- User uploaded image -
- ); + {message.parts.map( + ( + part: { + type: string; + text?: string; + url?: string; + mediaType?: string; + }, + index: number + ) => { + if (part.type === "text") { + return
{part.text}
; + } else if ( + part.type === "file" && + part.mediaType?.startsWith("image/") + ) { + return ( +
+ User uploaded image +
+ ); + } + return
unexpected message
; } - return
unexpected message
; - })} + )}
); @@ -166,7 +229,7 @@ function MessageBody({ message }: { message: any }) { if (Array.isArray(message.parts) && message.parts.length !== 0) { return (
- {message.parts.map((part: any, index: any) => { + {message.parts.map((part, index) => { if (part.type === "text") { return (
@@ -178,35 +241,10 @@ function MessageBody({ message }: { message: any }) { } if (part.type.startsWith("tool-")) { - // if ( - // part.toolInvocation.state === "result" && - // part.toolInvocation.result.isError - // ) { - // return ( - //
- // {part.toolInvocation.result?.content?.map( - // (content: { type: "text"; text: string }, i: number) => ( - //
{content.text}
- // ) - // )} - // {/* Unexpectedly failed while using tool{" "} - // {part.toolInvocation.toolName}. Please try again. again. */} - //
- // ); - // } - - // if ( - // message.parts!.length - 1 == index && - // part.toolInvocation.state !== "result" - // ) { return ; - // } else { - // return undefined; - // } } + + return null; })}
); @@ -216,7 +254,7 @@ function MessageBody({ message }: { message: any }) { return ( {message.parts - .map((part: any) => + .map((part: { type: string; text?: string }) => part.type === "text" ? part.text : "[something went wrong]" ) .join("")} diff --git a/src/components/use-chat.tsx b/src/components/use-chat.tsx index 76b382ef..4178719a 100644 --- a/src/components/use-chat.tsx +++ b/src/components/use-chat.tsx @@ -1,5 +1,5 @@ import { useChat } from "@ai-sdk/react"; -import { useEffect } from "react"; +import { useEffect, useCallback, useRef } from "react"; // For some reason, if the chat is resumed during a router page navigation, it // will try to resume the stream multiple times and result in some sort of leak @@ -9,25 +9,60 @@ import { useEffect } from "react"; // times. const runningChats = new Set(); export function useChatSafe( - options: Parameters[0] & { id: string; onFinish?: () => void } + options: Parameters[0] & { + id: string; + onFinish?: () => void; + onError?: (error: Error | unknown) => void; + } ) { const id = options.id; const resume = options?.resume; + const onError = options?.onError; + const isResumingRef = useRef(false); options.resume = undefined; + options.onError = undefined; const onFinish = options.onFinish; - options.onFinish = () => { + options.onFinish = useCallback(() => { runningChats.delete(id); - if (onFinish) { - onFinish(); - } - }; + isResumingRef.current = false; + onFinish?.(); + }, [id, onFinish]); const chat = useChat(options); + // Enhanced error handling with retry logic + useEffect(() => { + if (chat.error && onError) { + onError(chat.error); + } + }, [chat.error, onError]); + + // Add reload functionality + const reload = useCallback(() => { + if (runningChats.has(id)) { + runningChats.delete(id); + } + isResumingRef.current = false; + + return chat.stop().then(() => { + // Force a fresh start + if (resume) { + setTimeout(() => { + if (!runningChats.has(id) && !isResumingRef.current) { + isResumingRef.current = true; + chat.resumeStream(); + runningChats.add(id); + } + }, 100); + } + }); + }, [chat, id, resume]); + useEffect(() => { - if (!runningChats.has(id) && resume) { + if (!runningChats.has(id) && resume && !isResumingRef.current) { + isResumingRef.current = true; chat.resumeStream(); runningChats.add(id); } @@ -36,10 +71,11 @@ export function useChatSafe( if (runningChats.has(id)) { chat.stop().then(() => { runningChats.delete(id); + isResumingRef.current = false; }); } }; - }, [resume, id]); + }, [resume, id, chat]); - return chat; + return { ...chat, reload }; } diff --git a/src/hooks/use-stream-recovery.ts b/src/hooks/use-stream-recovery.ts new file mode 100644 index 00000000..bef907da --- /dev/null +++ b/src/hooks/use-stream-recovery.ts @@ -0,0 +1,83 @@ +import { useState, useCallback, useEffect } from "react"; + +interface UseStreamRecoveryOptions { + onError?: (error: string) => void; + messageCount?: number; +} + +export function useStreamRecovery({ + onError, + messageCount, +}: UseStreamRecoveryOptions) { + const [retryCount, setRetryCount] = useState(0); + const [lastError, setLastError] = useState(null); + const [isRecovering, setIsRecovering] = useState(false); + + // Clear error state when messages arrive successfully + useEffect(() => { + if (messageCount && messageCount > 0) { + setLastError(null); + setRetryCount(0); + setIsRecovering(false); + } + }, [messageCount]); + + const handleError = useCallback( + (error: Error | unknown) => { + console.error("Chat error:", error); + const errorMessage = + error instanceof Error ? error.message : "Stream error occurred"; + setLastError(errorMessage); + onError?.(errorMessage); + + // Auto-retry on error + setTimeout( + () => { + if (retryCount < 3) { + console.log( + `Auto-retrying chat stream (attempt ${retryCount + 1})` + ); + setRetryCount((prev) => prev + 1); + setIsRecovering(true); + // The reload will be called by the parent component + } + }, + 2000 * (retryCount + 1) + ); + }, + [retryCount, onError] + ); + + const resetErrorState = useCallback(() => { + setLastError(null); + setRetryCount(0); + setIsRecovering(false); + }, []); + + const forceRecovery = useCallback( + async (stop: () => Promise, reload: () => void) => { + setIsRecovering(true); + setLastError("Forcing stream recovery..."); + try { + await stop(); + setTimeout(() => { + reload(); + setIsRecovering(false); + }, 1000); + } catch (error) { + console.error("Force recovery failed:", error); + setIsRecovering(false); + } + }, + [] + ); + + return { + retryCount, + lastError, + isRecovering, + handleError, + resetErrorState, + forceRecovery, + }; +} diff --git a/src/lib/README.md b/src/lib/README.md deleted file mode 100644 index 81840d53..00000000 --- a/src/lib/README.md +++ /dev/null @@ -1,162 +0,0 @@ -# πŸš€ AI Builder Library - -This library provides a flexible foundation for building AI-powered applications using Mastra agents with Freestyle MCP tools. - -## 🎯 Core Concept - -The library is designed around **agents** - you can use the pre-configured `builderAgent` or create your own custom agents. All the streaming, durability, and MCP integration features work with any Mastra agent you provide. - -## πŸš€ Quick Start - -### Using the Default Agent - -```typescript -import { builderAgent, sendMessageWithStreaming } from "@/lib"; - -const response = await sendMessageWithStreaming( - builderAgent, - appId, - mcpUrl, - userMessage -); -``` - -### Creating Your Own Agent - -```typescript -import { Agent } from "@mastra/core/agent"; -import { anthropic } from "@ai-sdk/anthropic"; -import { Memory } from "@mastra/memory"; -import { PostgresStore, PgVector } from "@mastra/pg"; -import { sendMessageWithStreaming } from "@/lib"; - -// Create your custom agent -const myCustomAgent = new Agent({ - name: "MyCustomAgent", - model: anthropic("claude-3-5-sonnet-20241022"), - instructions: "Your custom instructions here", - memory: new Memory({ - options: { lastMessages: 1000 }, - vector: new PgVector({ connectionString: process.env.DATABASE_URL! }), - storage: new PostgresStore({ connectionString: process.env.DATABASE_URL! }), - }), - tools: { - your_custom_tool: yourTool, - }, -}); - -// Use your custom agent with all the streaming and durability features -const response = await sendMessageWithStreaming( - myCustomAgent, - appId, - mcpUrl, - userMessage -); -``` - -## πŸ› οΈ Available Services - -### Core Services - -- **`sendMessageWithStreaming`** - Main function for AI interactions with streaming -- **`AIService.sendMessage`** - Lower-level AI service for custom implementations -- **`builderAgent`** - Pre-configured agent with todo tool - -### Stream Management - -- **`getStreamState`** - Check current stream status -- **`isStreamRunning`** - Check if stream is active -- **`stopStream`** - Stop a running stream -- **`waitForStreamToStop`** - Wait for stream to finish -- **`clearStreamState`** - Clear stream state - -### Memory & Persistence - -- **`todoTool`** - Built-in task management tool -- **Memory integration** - Automatic conversation persistence -- **Redis streams** - Durable streaming with resumability - -## πŸ”§ Advanced Usage - -### Custom Tool Integration - -```typescript -import { createTool } from "@mastra/core/tools"; -import { z } from "zod"; - -const myCustomTool = createTool({ - id: "my_custom_tool", - description: "What my tool does", - inputSchema: z.object({ - input: z.string().describe("Input for the tool"), - }), - execute: async ({ input }) => { - // Your tool logic here - return { result: `Processed: ${input}` }; - }, -}); - -const myAgent = new Agent({ - // ... other config - tools: { my_custom_tool: myCustomTool }, -}); -``` - -### Direct AIService Usage - -```typescript -import { AIService } from "@/lib"; - -const response = await AIService.sendMessage(myAgent, appId, mcpUrl, message, { - maxSteps: 50, - maxOutputTokens: 32000, - onStepFinish: (step) => console.log("Step finished:", step), - onFinish: () => console.log("AI finished!"), -}); -``` - -## πŸ—οΈ Architecture - -``` -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Your Agent │───▢│ AIService │───▢│ MCP Client β”‚ -β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ -β”‚ - Custom model β”‚ β”‚ - Streaming β”‚ β”‚ - Freestyle β”‚ -β”‚ - Custom tools β”‚ β”‚ - Memory mgmt β”‚ β”‚ - Git ops β”‚ -β”‚ - Custom memory β”‚ β”‚ - Error handling β”‚ β”‚ - File ops β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ β”‚ β”‚ - β–Ό β–Ό β–Ό -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Stream Manager β”‚ β”‚ Redis Streams β”‚ β”‚ Freestyle MCP β”‚ -β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ -β”‚ - Durability β”‚ β”‚ - Persistence β”‚ β”‚ - State mgmt β”‚ -β”‚ - Resumability β”‚ β”‚ - Keep-alive β”‚ β”‚ - Execution β”‚ -β”‚ - Lifecycle β”‚ β”‚ - Deployment β”‚ β”‚ β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ -``` - -## 🎨 Key Benefits - -1. **Flexibility** - Use any Mastra agent you want -2. **Reusability** - All streaming and durability features work with any agent -3. **Simplicity** - Clean, focused API without unnecessary abstractions -4. **Power** - Full access to Freestyle MCP tools and Mastra capabilities -5. **Durability** - Redis-backed streaming with resumability - -## 🚫 What You Can Ignore - -- `internal/` folder - All the complex plumbing (but you can use the exported services) -- `redis.ts` - Database connections -- `stream-manager.ts` - Stream handling (but you can use the exported functions) - -**Focus on your agent logic, not the infrastructure!** - -## ❓ Need Help? - -- Check the examples above -- Look at `src/mastra/agents/builder.ts` for the default agent pattern -- Use the pre-configured `builderAgent` to get started quickly -- Create your own agent when you need custom behavior - -**Happy building! πŸŽ‰** diff --git a/src/lib/index.ts b/src/lib/index.ts index 4aef058a..e52ab9f3 100644 --- a/src/lib/index.ts +++ b/src/lib/index.ts @@ -5,10 +5,8 @@ * Import what you need to customize your AI app! */ -// Agent configuration - customize your AI's behavior -export { SYSTEM_MESSAGE } from "./system"; - -// Internal services - for advanced usage with custom agents +// Backward compatibility exports +export { builderAgent } from "@/mastra/agents/builder"; export { AIService } from "./internal/ai-service"; export { sendMessageWithStreaming, @@ -17,18 +15,17 @@ export { stopStream, waitForStreamToStop, clearStreamState, - getStream, - setStream, - setupAbortCallback, - updateKeepAlive, - handleStreamLifecycle, } from "./internal/stream-manager"; -// Pre-configured agent (uses the default todo tool) -export { builderAgent } from "@/mastra/agents/builder"; +// Internal utilities (for advanced use cases) +export { Resilience } from "./internal/resilience"; +export { ErrorHandler } from "./internal/error-handler"; +export { Monitoring } from "./internal/monitoring"; -// Todo tool for task management -export { todoTool } from "./todo-tool"; +// Legacy exports for existing code +export { freestyle } from "./freestyle"; +export { todoTool } from "@/tools/todo-tool"; +export { morphTool } from "@/tools/morph-tool"; // Type exports for TypeScript export type { UIMessage } from "ai"; @@ -46,32 +43,4 @@ export type { Agent } from "@mastra/core/agent"; * mcpUrl, * userMessage * ); - * - * // Create your own custom agent - * import { Agent } from "@mastra/core/agent"; - * import { anthropic } from "@ai-sdk/anthropic"; - * import { Memory } from "@mastra/memory"; - * import { PostgresStore, PgVector } from "@mastra/pg"; - * - * const myCustomAgent = new Agent({ - * name: "MyCustomAgent", - * model: anthropic("claude-3-5-sonnet-20241022"), - * instructions: "Your custom instructions here", - * memory: new Memory({ - * options: { lastMessages: 1000 }, - * vector: new PgVector({ connectionString: process.env.DATABASE_URL! }), - * storage: new PostgresStore({ connectionString: process.env.DATABASE_URL! }), - * }), - * tools: { - * your_custom_tool: yourTool, - * }, - * }); - * - * // Use your custom agent with all the streaming and durability features - * const response = await sendMessageWithStreaming( - * myCustomAgent, - * appId, - * mcpUrl, - * userMessage - * ); */ diff --git a/src/lib/internal/chat-service.ts b/src/lib/internal/chat-service.ts new file mode 100644 index 00000000..13f4dda0 --- /dev/null +++ b/src/lib/internal/chat-service.ts @@ -0,0 +1,162 @@ +import { UIMessage } from "ai"; +import { NextRequest } from "next/server"; +import { getApp } from "@/actions/get-app"; +import { freestyle } from "@/lib/freestyle"; +import { builderAgent } from "@/mastra/agents/builder"; +import { AIService } from "./ai-service"; +import { ErrorHandler } from "./error-handler"; +import { Resilience } from "./resilience"; +import { Monitoring } from "./monitoring"; +import { + isStreamRunning, + stopStream, + waitForStreamToStop, + clearStreamState, + sendMessageWithStreaming, +} from "./stream-manager"; + +export interface ChatRequest { + messages: UIMessage[]; +} + +export class ChatService { + /** + * Process a chat request with full error handling and fallbacks + * This method handles all the complexity while keeping the interface simple + */ + static async processChatRequest( + req: NextRequest, + appId: string + ): Promise { + const startTime = Date.now(); + + try { + // Parse request body + const { messages }: ChatRequest = await req.json(); + if (!messages || messages.length === 0) { + throw new Error("No messages provided"); + } + + // Get the app + const app = await getApp(appId); + if (!app) { + throw new Error("App not found"); + } + + // Handle stream lifecycle + await this.manageStreamLifecycle(appId); + + // Get MCP server with resilience + const { mcpEphemeralUrl } = await Resilience.withResilience( + () => freestyle.requestDevServer({ repoId: app.info.gitRepo }), + "mcp-server-request", + { maxAttempts: 2, baseDelay: 500 }, + { failureThreshold: 3, recoveryTimeout: 15000 } + ); + + // Try streaming first with resilience + try { + const resumableStream = await Resilience.withResilience( + () => + sendMessageWithStreaming( + builderAgent, + appId, + mcpEphemeralUrl, + messages.at(-1)! + ), + "streaming", + { maxAttempts: 2, baseDelay: 1000 }, + { failureThreshold: 2, recoveryTimeout: 10000 } + ); + + // Record success + Monitoring.recordSuccess(Date.now() - startTime); + return resumableStream.response(); + } catch (streamError) { + console.log("Streaming failed, falling back to memory:", streamError); + const fallbackResponse = await this.handleStreamingFallback(appId); + + // Record success for fallback + Monitoring.recordSuccess(Date.now() - startTime); + return fallbackResponse; + } + } catch (error) { + // Record failure + Monitoring.recordFailure( + Date.now() - startTime, + "chat-processing", + String(error) + ); + return ErrorHandler.handleError(error, "chat-processing"); + } + } + + /** + * Manage stream lifecycle - stop existing streams and ensure clean state + */ + private static async manageStreamLifecycle(appId: string): Promise { + if (await isStreamRunning(appId)) { + console.log("Stopping previous stream for appId:", appId); + await stopStream(appId); + + // Wait for stream to stop with timeout + const stopped = await waitForStreamToStop(appId); + if (!stopped) { + await clearStreamState(appId); + throw new Error( + "Previous stream is still shutting down, please try again" + ); + } + } + } + + /** + * Handle fallback when streaming fails - save messages to memory + */ + private static async handleStreamingFallback( + appId: string + ): Promise { + try { + // Try to save any unsaved messages to memory + const unsavedMessages = await AIService.getUnsavedMessages(appId); + if (unsavedMessages.length > 0) { + await AIService.saveMessagesToMemory( + builderAgent, + appId, + unsavedMessages + ); + } + + return ErrorHandler.createStreamSuccessResponse( + "Message processed and saved to memory. Streaming will resume on next request." + ); + } catch (fallbackError) { + console.error("Fallback also failed:", fallbackError); + throw new Error("Failed to process message. Please try again."); + } + } + + /** + * Validate that the request has all required components + */ + static validateChatRequest(req: NextRequest): Response | null { + // Check for required headers + const validationError = ErrorHandler.validateRequest(req, [ + "Adorable-App-Id", + ]); + if (validationError) { + return validationError; + } + + // Check content type + const contentType = req.headers.get("content-type"); + if (!contentType?.includes("application/json")) { + return ErrorHandler.handleError( + new Error("Invalid content type. Expected application/json"), + "request-validation" + ); + } + + return null; + } +} diff --git a/src/lib/internal/error-handler.ts b/src/lib/internal/error-handler.ts new file mode 100644 index 00000000..da0cc123 --- /dev/null +++ b/src/lib/internal/error-handler.ts @@ -0,0 +1,158 @@ +import { NextRequest } from "next/server"; + +export interface ErrorResponse { + message: string; + code?: string; + retryable?: boolean; +} + +export interface StreamErrorResponse { + type: "error"; + data: ErrorResponse; +} + +export class ErrorHandler { + /** + * Handle errors gracefully and return appropriate responses + * This keeps the main route handlers clean and focused + */ + static handleError(error: unknown, context: string): Response { + console.error(`${context}:`, error); + + // Determine if this is a retryable error + const isRetryable = this.isRetryableError(error); + const statusCode = this.getStatusCode(error); + const message = this.getErrorMessage(error); + + // For streaming endpoints, return SSE error format + if (context.includes("chat") || context.includes("stream")) { + const errorResponse: StreamErrorResponse = { + type: "error", + data: { + message, + retryable: isRetryable, + }, + }; + + return new Response(`data: ${JSON.stringify(errorResponse)}\n\n`, { + status: statusCode, + headers: { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + "x-vercel-ai-ui-message-stream": "v1", + "x-accel-buffering": "no", + }, + }); + } + + // For regular endpoints, return JSON error + return new Response( + JSON.stringify({ + error: message, + retryable: isRetryable, + }), + { + status: statusCode, + headers: { "content-type": "application/json" }, + } + ); + } + + /** + * Create a simple success response for streaming endpoints + */ + static createStreamSuccessResponse(message: string, data?: any): Response { + const response = { + type: "message", + data: { + id: crypto.randomUUID(), + role: "assistant", + content: message, + ...data, + }, + }; + + return new Response(`data: ${JSON.stringify(response)}\n\n`, { + headers: { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + "x-vercel-ai-ui-message-stream": "v1", + "x-accel-buffering": "no", + }, + }); + } + + /** + * Validate request and return early error response if invalid + */ + static validateRequest( + req: NextRequest, + requiredHeaders: string[] + ): Response | null { + for (const header of requiredHeaders) { + if (!req.headers.get(header)) { + return this.handleError( + new Error(`Missing required header: ${header}`), + "request-validation" + ); + } + } + return null; + } + + /** + * Determine if an error is retryable + */ + private static isRetryableError(error: unknown): boolean { + if (error instanceof Error) { + const retryableErrors = [ + "ECONNRESET", + "ETIMEDOUT", + "ENOTFOUND", + "ECONNREFUSED", + "NETWORK_ERROR", + "TIMEOUT", + ]; + return retryableErrors.some((retryableError) => + error.message.includes(retryableError) + ); + } + return false; + } + + /** + * Get appropriate HTTP status code for an error + */ + private static getStatusCode(error: unknown): number { + if (error instanceof Error) { + if (error.message.includes("not found")) return 404; + if (error.message.includes("unauthorized")) return 401; + if (error.message.includes("forbidden")) return 403; + if (error.message.includes("validation")) return 400; + if (error.message.includes("timeout")) return 408; + } + return 500; + } + + /** + * Get user-friendly error message + */ + private static getErrorMessage(error: unknown): string { + if (error instanceof Error) { + // Hide internal error details from users + if ( + error.message.includes("ECONN") || + error.message.includes("ENOTFOUND") + ) { + return "Connection issue. Please try again."; + } + if (error.message.includes("timeout")) { + return "Request timed out. Please try again."; + } + return error.message; + } + return "An unexpected error occurred. Please try again."; + } +} diff --git a/src/lib/internal/monitoring.ts b/src/lib/internal/monitoring.ts new file mode 100644 index 00000000..05bdf457 --- /dev/null +++ b/src/lib/internal/monitoring.ts @@ -0,0 +1,179 @@ +import { Resilience } from "./resilience"; + +export interface SystemMetrics { + timestamp: number; + circuitBreakers: Record; + performance: { + averageResponseTime: number; + totalRequests: number; + successfulRequests: number; + failedRequests: number; + successRate: number; + }; + errors: { + recent: Array<{ timestamp: number; context: string; message: string }>; + count: number; + }; +} + +export class Monitoring { + private static metrics: SystemMetrics = { + timestamp: Date.now(), + circuitBreakers: {}, + performance: { + averageResponseTime: 0, + totalRequests: 0, + successfulRequests: 0, + failedRequests: 0, + successRate: 100, + }, + errors: { + recent: [], + count: 0, + }, + }; + + private static requestTimes: number[] = []; + private static readonly MAX_ERRORS = 100; + private static readonly MAX_REQUEST_TIMES = 1000; + + /** + * Record a successful request + */ + static recordSuccess(duration: number): void { + this.metrics.performance.totalRequests++; + this.metrics.performance.successfulRequests++; + this.updateSuccessRate(); + this.recordRequestTime(duration); + } + + /** + * Record a failed request + */ + static recordFailure(duration: number, context: string, error: string): void { + this.metrics.performance.totalRequests++; + this.metrics.performance.failedRequests++; + this.updateSuccessRate(); + this.recordRequestTime(duration); + this.recordError(context, error); + } + + /** + * Get current system metrics + */ + static getMetrics(): SystemMetrics { + // Update circuit breaker status + this.metrics.circuitBreakers = Resilience.getHealthStatus(); + this.metrics.timestamp = Date.now(); + + return { ...this.metrics }; + } + + /** + * Get a summary of system health + */ + static getHealthSummary(): { + status: "healthy" | "degraded" | "unhealthy"; + issues: string[]; + } { + const issues: string[] = []; + const metrics = this.getMetrics(); + + // Check success rate + if (metrics.performance.successRate < 95) { + issues.push( + `Low success rate: ${metrics.performance.successRate.toFixed(1)}%` + ); + } + + // Check circuit breakers + for (const [context, status] of Object.entries(metrics.circuitBreakers)) { + if (status.state === "open") { + issues.push(`Circuit breaker open for ${context}`); + } + } + + // Check error rate + if (metrics.errors.count > 50) { + issues.push(`High error count: ${metrics.errors.count}`); + } + + // Determine overall status + let status: "healthy" | "degraded" | "unhealthy" = "healthy"; + if (issues.length > 0) { + status = issues.length > 2 ? "unhealthy" : "degraded"; + } + + return { status, issues }; + } + + /** + * Reset all metrics + */ + static resetMetrics(): void { + this.metrics = { + timestamp: Date.now(), + circuitBreakers: {}, + performance: { + averageResponseTime: 0, + totalRequests: 0, + successfulRequests: 0, + failedRequests: 0, + successRate: 100, + }, + errors: { + recent: [], + count: 0, + }, + }; + this.requestTimes = []; + } + + /** + * Export metrics for external monitoring systems + */ + static exportMetrics(): string { + return JSON.stringify(this.getMetrics(), null, 2); + } + + private static updateSuccessRate(): void { + const { totalRequests, successfulRequests } = this.metrics.performance; + if (totalRequests > 0) { + this.metrics.performance.successRate = + (successfulRequests / totalRequests) * 100; + } + } + + private static recordRequestTime(duration: number): void { + this.requestTimes.push(duration); + + // Keep only the last N request times + if (this.requestTimes.length > this.MAX_REQUEST_TIMES) { + this.requestTimes = this.requestTimes.slice(-this.MAX_REQUEST_TIMES); + } + + // Update average response time + const sum = this.requestTimes.reduce((acc, time) => acc + time, 0); + this.metrics.performance.averageResponseTime = + sum / this.requestTimes.length; + } + + private static recordError(context: string, error: string): void { + const errorEntry = { + timestamp: Date.now(), + context, + message: error, + }; + + this.metrics.errors.recent.unshift(errorEntry); + this.metrics.errors.count++; + + // Keep only the last N errors + if (this.metrics.errors.recent.length > this.MAX_ERRORS) { + this.metrics.errors.recent = this.metrics.errors.recent.slice( + 0, + this.MAX_ERRORS + ); + } + } +} diff --git a/src/lib/internal/resilience.ts b/src/lib/internal/resilience.ts new file mode 100644 index 00000000..64ef0d19 --- /dev/null +++ b/src/lib/internal/resilience.ts @@ -0,0 +1,226 @@ +import { ErrorHandler } from "./error-handler"; + +export interface RetryOptions { + maxAttempts: number; + baseDelay: number; + maxDelay: number; + backoffMultiplier: number; +} + +export interface CircuitBreakerOptions { + failureThreshold: number; + recoveryTimeout: number; + monitoringWindow: number; +} + +export class Resilience { + private static circuitBreakers = new Map(); + private static defaultRetryOptions: RetryOptions = { + maxAttempts: 3, + baseDelay: 1000, + maxDelay: 10000, + backoffMultiplier: 2, + }; + + private static defaultCircuitBreakerOptions: CircuitBreakerOptions = { + failureThreshold: 5, + recoveryTimeout: 30000, + monitoringWindow: 60000, + }; + + /** + * Execute a function with automatic retry logic + * Developers just call this and get resilience automatically + */ + static async withRetry( + operation: () => Promise, + context: string, + options?: Partial + ): Promise { + const config = { ...this.defaultRetryOptions, ...options }; + let lastError: unknown; + + for (let attempt = 1; attempt <= config.maxAttempts; attempt++) { + try { + return await operation(); + } catch (error) { + lastError = error; + + if (attempt === config.maxAttempts) { + console.error( + `${context}: All retry attempts failed after ${config.maxAttempts} attempts` + ); + throw error; + } + + if (!this.isRetryableError(error)) { + console.log(`${context}: Non-retryable error, not retrying:`, error); + throw error; + } + + const delay = this.calculateBackoffDelay(attempt, config); + console.log( + `${context}: Attempt ${attempt} failed, retrying in ${delay}ms...` + ); + + await this.sleep(delay); + } + } + + throw lastError; + } + + /** + * Execute a function with circuit breaker protection + * Automatically opens circuit on repeated failures + */ + static async withCircuitBreaker( + operation: () => Promise, + context: string, + options?: Partial + ): Promise { + const config = { ...this.defaultCircuitBreakerOptions, ...options }; + const circuitBreaker = this.getOrCreateCircuitBreaker(context, config); + + if (circuitBreaker.state === "open") { + if ( + Date.now() - circuitBreaker.lastFailureTime < + config.recoveryTimeout + ) { + throw new Error( + `Circuit breaker is open for ${context}. Please try again later.` + ); + } else { + // Try to close the circuit + circuitBreaker.state = "half-open"; + circuitBreaker.failureCount = 0; + } + } + + try { + const result = await operation(); + + // Success - close circuit if it was half-open + if (circuitBreaker.state === "half-open") { + circuitBreaker.state = "closed"; + circuitBreaker.failureCount = 0; + } + + return result; + } catch (error) { + // Failure - update circuit breaker state + circuitBreaker.failureCount++; + circuitBreaker.lastFailureTime = Date.now(); + + if (circuitBreaker.failureCount >= config.failureThreshold) { + circuitBreaker.state = "open"; + console.warn( + `Circuit breaker opened for ${context} after ${config.failureThreshold} failures` + ); + } + + throw error; + } + } + + /** + * Execute with both retry and circuit breaker protection + * This is the main method developers should use for critical operations + */ + static async withResilience( + operation: () => Promise, + context: string, + retryOptions?: Partial, + circuitBreakerOptions?: Partial + ): Promise { + return this.withCircuitBreaker( + () => this.withRetry(operation, context, retryOptions), + context, + circuitBreakerOptions + ); + } + + /** + * Health check for resilience systems + */ + static getHealthStatus(): Record { + const status: Record = {}; + + for (const [context, circuitBreaker] of this.circuitBreakers) { + status[context] = { + state: circuitBreaker.state, + failureCount: circuitBreaker.failureCount, + lastFailureTime: circuitBreaker.lastFailureTime, + isHealthy: circuitBreaker.state === "closed", + }; + } + + return status; + } + + /** + * Reset circuit breaker for a specific context + */ + static resetCircuitBreaker(context: string): void { + this.circuitBreakers.delete(context); + } + + /** + * Reset all circuit breakers + */ + static resetAllCircuitBreakers(): void { + this.circuitBreakers.clear(); + } + + private static getOrCreateCircuitBreaker( + context: string, + options: CircuitBreakerOptions + ): CircuitBreakerState { + if (!this.circuitBreakers.has(context)) { + this.circuitBreakers.set(context, { + state: "closed", + failureCount: 0, + lastFailureTime: 0, + }); + } + return this.circuitBreakers.get(context)!; + } + + private static isRetryableError(error: unknown): boolean { + if (error instanceof Error) { + const retryableErrors = [ + "ECONNRESET", + "ETIMEDOUT", + "ENOTFOUND", + "ECONNREFUSED", + "NETWORK_ERROR", + "TIMEOUT", + "RATE_LIMIT", + "SERVICE_UNAVAILABLE", + ]; + return retryableErrors.some((retryableError) => + error.message.includes(retryableError) + ); + } + return false; + } + + private static calculateBackoffDelay( + attempt: number, + config: RetryOptions + ): number { + const delay = + config.baseDelay * Math.pow(config.backoffMultiplier, attempt - 1); + return Math.min(delay, config.maxDelay); + } + + private static sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} + +interface CircuitBreakerState { + state: "closed" | "open" | "half-open"; + failureCount: number; + lastFailureTime: number; +} diff --git a/src/lib/internal/stream-manager.ts b/src/lib/internal/stream-manager.ts index 42b9ecbf..10e25977 100644 --- a/src/lib/internal/stream-manager.ts +++ b/src/lib/internal/stream-manager.ts @@ -215,6 +215,167 @@ export async function handleStreamLifecycle( } } +// Stream health monitoring +const streamHealth = new Map< + string, + { lastActivity: number; startTime: number; messageCount: number } +>(); +const STREAM_TIMEOUT = 5 * 60 * 1000; // 5 minutes +const STREAM_HEALTH_CHECK_INTERVAL = 30 * 1000; // 30 seconds + +// Health check timer +setInterval(async () => { + const now = Date.now(); + for (const [appId, health] of streamHealth.entries()) { + // Check for stuck streams (no activity for too long) + if (now - health.lastActivity > STREAM_TIMEOUT) { + console.warn(`Stream ${appId} appears stuck, forcing cleanup`); + try { + await forceCleanupStream(appId); + } catch (error) { + console.error(`Failed to cleanup stuck stream ${appId}:`, error); + } + } + + // Check for streams running too long + if (now - health.startTime > 10 * 60 * 1000) { + // 10 minutes + console.warn(`Stream ${appId} running too long, forcing cleanup`); + try { + await forceCleanupStream(appId); + } catch (error) { + console.error(`Failed to cleanup long-running stream ${appId}:`, error); + } + } + } +}, STREAM_HEALTH_CHECK_INTERVAL); + +/** + * Force cleanup a stream that's stuck or problematic + */ +async function forceCleanupStream(appId: string) { + try { + // Clear all stream state + await clearStreamState(appId); + + // Remove from health monitoring + streamHealth.delete(appId); + + // Release stream slot + releaseStreamSlot(appId); + + // Clear any Redis keys + await redis.del(`stream:${appId}`); + await redis.del(`stream:${appId}:keepalive`); + await redis.del(`stream:${appId}:events`); + + console.log(`Forced cleanup completed for stream ${appId}`); + } catch (error) { + console.error(`Error during forced cleanup of stream ${appId}:`, error); + } +} + +/** + * Check if a stream is actually healthy and not stuck + */ +async function isStreamHealthy(appId: string): Promise { + try { + const health = streamHealth.get(appId); + if (!health) return false; + + const now = Date.now(); + const timeSinceLastActivity = now - health.lastActivity; + const streamAge = now - health.startTime; + + // Stream is healthy if it's had recent activity and isn't too old + return timeSinceLastActivity < STREAM_TIMEOUT && streamAge < 10 * 60 * 1000; + } catch (error) { + console.error(`Error checking stream health for ${appId}:`, error); + return false; + } +} + +/** + * Prevent multiple streams from running simultaneously for the same app + */ +async function ensureSingleStream(appId: string): Promise { + try { + // Check if there's already a healthy stream running + if (await isStreamRunning(appId)) { + const isHealthy = await isStreamHealthy(appId); + if (isHealthy) { + console.log( + `Stream ${appId} is already running and healthy, stopping previous` + ); + await stopStream(appId); + await waitForStreamToStop(appId); + } else { + console.log(`Stream ${appId} appears stuck, forcing cleanup`); + await forceCleanupStream(appId); + } + } + + // Clear any stale state + await clearStreamState(appId); + + // Initialize health monitoring + streamHealth.set(appId, { + lastActivity: Date.now(), + startTime: Date.now(), + messageCount: 0, + }); + } catch (error) { + console.error(`Error ensuring single stream for ${appId}:`, error); + // If we can't ensure single stream, force cleanup and continue + await forceCleanupStream(appId); + } +} + +/** + * Update stream health with activity + */ +function updateStreamHealth(appId: string, messageCount?: number) { + const health = streamHealth.get(appId); + if (health) { + health.lastActivity = Date.now(); + if (messageCount !== undefined) { + health.messageCount = messageCount; + } + } +} + +// Connection pool to prevent too many simultaneous streams +const MAX_CONCURRENT_STREAMS = 10; +const activeStreams = new Set(); + +/** + * Check if we can start a new stream + */ +function canStartNewStream(): boolean { + return activeStreams.size < MAX_CONCURRENT_STREAMS; +} + +/** + * Reserve a stream slot + */ +function reserveStreamSlot(appId: string): boolean { + if (canStartNewStream()) { + activeStreams.add(appId); + return true; + } + return false; +} + +/** + * Release a stream slot + */ +function releaseStreamSlot(appId: string): void { + activeStreams.delete(appId); +} + +// Global lock to prevent multiple streams for the same app +const streamLocks = new Map>(); + /** * Send a message to the AI and handle all stream plumbing internally * This is the main interface that developers should use @@ -225,62 +386,184 @@ export async function sendMessageWithStreaming( mcpUrl: string, message: UIMessage ) { - const controller = new AbortController(); - let shouldAbort = false; + // Check if there's already a stream operation in progress for this app + if (streamLocks.has(appId)) { + console.log( + `Stream operation already in progress for ${appId}, waiting...` + ); + await streamLocks.get(appId); + // After waiting, check if we still need to send the message + if (await isStreamRunning(appId)) { + console.log(`Stream ${appId} is now running, returning existing stream`); + const existingStream = await getStream(appId); + if (existingStream) { + return existingStream; + } + } + } - // Set up abort callback - await setupAbortCallback(appId, () => { - shouldAbort = true; - }); + // Create a lock for this app + const streamPromise = (async (): Promise => { + try { + // Check connection pool + if (!canStartNewStream()) { + console.warn( + `Connection pool full (${activeStreams.size}/${MAX_CONCURRENT_STREAMS}), forcing cleanup of oldest streams` + ); + + // Force cleanup of oldest streams to make room + const oldestStreams = Array.from(activeStreams).slice(0, 2); + for (const oldAppId of oldestStreams) { + await forceCleanupStream(oldAppId); + } + } - let lastKeepAlive = Date.now(); + // Reserve stream slot + if (!reserveStreamSlot(appId)) { + throw new Error( + "Unable to reserve stream slot - connection pool exhausted" + ); + } - // Use the AI service to handle the AI interaction - const aiResponse = await AIService.sendMessage( - agent, - appId, - mcpUrl, - message, - { - threadId: appId, - resourceId: appId, - maxSteps: 100, - maxRetries: 0, - maxOutputTokens: 64000, - async onChunk() { - if (Date.now() - lastKeepAlive > 5000) { - lastKeepAlive = Date.now(); - await updateKeepAlive(appId); + // Ensure only one stream per app + await ensureSingleStream(appId); + + const controller = new AbortController(); + let shouldAbort = false; + let fallbackToMemory = false; + + // Set up abort callback + await setupAbortCallback(appId, () => { + shouldAbort = true; + }); + + let lastKeepAlive = Date.now(); + + try { + // Use the AI service to handle the AI interaction + const aiResponse = await AIService.sendMessage( + agent, + appId, + mcpUrl, + message, + { + threadId: appId, + resourceId: appId, + maxSteps: 100, + maxRetries: 0, + maxOutputTokens: 64000, + async onChunk() { + const now = Date.now(); + if (now - lastKeepAlive > 5000) { + lastKeepAlive = now; + await updateKeepAlive(appId); + updateStreamHealth(appId); + } + }, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async onStepFinish(_step: { response: { messages: unknown[] } }) { + updateStreamHealth(appId); + if (shouldAbort) { + await handleStreamLifecycle(appId, "error"); + controller.abort("Aborted stream after step finish"); + const messages = await AIService.getUnsavedMessages(appId); + console.log(messages); + await AIService.saveMessagesToMemory(agent, appId, messages); + } + }, + onError: async (error: { error: unknown }) => { + console.error("Stream error in manager:", error); + fallbackToMemory = true; + await handleStreamLifecycle(appId, "error"); + // Clean up health monitoring + streamHealth.delete(appId); + // Release stream slot + releaseStreamSlot(appId); + }, + onFinish: async () => { + await handleStreamLifecycle(appId, "finish"); + // Clean up health monitoring + streamHealth.delete(appId); + // Release stream slot + releaseStreamSlot(appId); + }, + abortSignal: controller.signal, + } + ); + + // Ensure the stream has the proper method + if (!aiResponse.stream.toUIMessageStreamResponse) { + console.error("Stream missing toUIMessageStreamResponse method!"); + throw new Error( + "Invalid stream format - missing toUIMessageStreamResponse method" + ); } - }, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - async onStepFinish(_step: { response: { messages: unknown[] } }) { - if (shouldAbort) { - await handleStreamLifecycle(appId, "error"); - controller.abort("Aborted stream after step finish"); - const messages = await AIService.getUnsavedMessages(appId); - console.log(messages); - await AIService.saveMessagesToMemory(agent, appId, messages); + + // Set up the resumable stream + const resumableStream = await setStream( + appId, + message, + aiResponse.stream + ); + + // Set up the abort callback + await setupAbortCallback(appId, () => { + controller.abort("Stream aborted by user"); + }); + + // Handle the stream lifecycle + await handleStreamLifecycle(appId, "start"); + + return resumableStream; + } catch (error) { + console.error("Stream setup failed, falling back to memory:", error); + fallbackToMemory = true; + streamHealth.delete(appId); // New: Clean up health monitoring on setup failure + releaseStreamSlot(appId); // New: Release slot on setup failure + } + + // Fallback logic (if fallbackToMemory is true) + if (fallbackToMemory) { + console.log("Falling back to memory for app:", appId); + try { + // Try to get any unsaved messages and save them to memory + const unsavedMessages = await AIService.getUnsavedMessages(appId); + if (unsavedMessages.length > 0) { + await AIService.saveMessagesToMemory(agent, appId, unsavedMessages); + } + + // Return a simple response that indicates the message was processed + return { + response() { + return new Response( + `data: {"type": "message", "data": {"id": "${crypto.randomUUID()}", "role": "assistant", "content": "Message processed and saved to memory"}}\n\n`, + { + headers: { + "content-type": "text/event-stream", + "cache-control": "no-cache", + connection: "keep-alive", + }, + } + ); + }, + }; + } catch (fallbackError) { + console.error("Fallback to memory also failed:", fallbackError); + throw new Error("Failed to process message and fallback failed"); } - }, - onError: async (error: { error: unknown }) => { - console.error("Stream error in manager:", error); - await handleStreamLifecycle(appId, "error"); - }, - onFinish: async () => { - await handleStreamLifecycle(appId, "finish"); - }, - abortSignal: controller.signal, + } + + // This should never be reached, but TypeScript requires it + throw new Error("Stream setup failed and no fallback available"); + } finally { + // Always release the lock + streamLocks.delete(appId); } - ); + })(); - // Ensure the stream has the proper method - if (!aiResponse.stream.toUIMessageStreamResponse) { - console.error("Stream missing toUIMessageStreamResponse method!"); - throw new Error( - "Invalid stream format - missing toUIMessageStreamResponse method" - ); - } + // Store the lock promise + streamLocks.set(appId, streamPromise); - return await setStream(appId, message, aiResponse.stream); + // Return the result + return streamPromise; } From 36f37042c7a52a78cb3158672a490dcd85824df7 Mon Sep 17 00:00:00 2001 From: Ben the greatest Date: Sat, 9 Aug 2025 01:27:51 -0700 Subject: [PATCH 2/2] add fs --- src/lib/internal/chat-service.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib/internal/chat-service.ts b/src/lib/internal/chat-service.ts index 13f4dda0..d09d4145 100644 --- a/src/lib/internal/chat-service.ts +++ b/src/lib/internal/chat-service.ts @@ -47,7 +47,7 @@ export class ChatService { await this.manageStreamLifecycle(appId); // Get MCP server with resilience - const { mcpEphemeralUrl } = await Resilience.withResilience( + const { mcpEphemeralUrl, fs } = await Resilience.withResilience( () => freestyle.requestDevServer({ repoId: app.info.gitRepo }), "mcp-server-request", { maxAttempts: 2, baseDelay: 500 }, @@ -62,6 +62,7 @@ export class ChatService { builderAgent, appId, mcpEphemeralUrl, + fs, messages.at(-1)! ), "streaming",