diff --git a/.gitignore b/.gitignore index 9fb5e4c62..4dee6fc93 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ dist/ # Config files config.toml +.claude/settings.local.json # Log files logs/ diff --git a/src/app/api/chat/route.ts b/src/app/api/chat/route.ts index 6362ebc7b..3d2a52160 100644 --- a/src/app/api/chat/route.ts +++ b/src/app/api/chat/route.ts @@ -101,6 +101,9 @@ const ensureChatExists = async (input: { }; export const POST = async (req: Request) => { + let safeClose: (() => void) | undefined; + let disconnect: (() => void) | undefined; + try { const reqBody = (await req.json()) as Body; @@ -155,57 +158,66 @@ export const POST = async (req: Request) => { const responseStream = new TransformStream(); const writer = responseStream.writable.getWriter(); const encoder = new TextEncoder(); + const keepAliveMs = 15_000; + let streamClosed = false; + let keepAliveInterval: ReturnType | undefined; + + const safeWrite = (payload: Record) => { + if (streamClosed) return; + + writer.write(encoder.encode(JSON.stringify(payload) + '\n')).catch((error) => { + console.warn('Failed to write chat stream payload:', error); + streamClosed = true; + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + }); + }; + + safeClose = () => { + if (streamClosed) return; + + streamClosed = true; - const disconnect = session.subscribe((event: string, data: any) => { + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + + writer.close().catch((error) => { + console.warn('Failed to close chat stream:', error); + }); + }; + + disconnect = session.subscribe((event: string, data: any) => { if (event === 'data') { if (data.type === 'block') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'block', - block: data.block, - }) + '\n', - ), - ); + safeWrite({ + type: 'block', + block: data.block, + }); } else if (data.type === 'updateBlock') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'updateBlock', - blockId: data.blockId, - patch: data.patch, - }) + '\n', - ), - ); + safeWrite({ + type: 'updateBlock', + blockId: data.blockId, + patch: data.patch, + }); } else if (data.type === 'researchComplete') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'researchComplete', - }) + '\n', - ), - ); + safeWrite({ + type: 'researchComplete', + }); } } else if (event === 'end') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'messageEnd', - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'messageEnd', + }); + safeClose?.(); session.removeAllListeners(); } else if (event === 'error') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'error', - data: data.data, - }) + '\n', - ), - ); - writer.close(); + safeWrite({ + type: 'error', + data: data.data, + }); + safeClose?.(); session.removeAllListeners(); } }); @@ -233,10 +245,18 @@ export const POST = async (req: Request) => { }); req.signal.addEventListener('abort', () => { - disconnect(); - writer.close(); + disconnect?.(); + safeClose?.(); }); + // Start keepalives only after setup succeeds + if (!streamClosed) { + keepAliveInterval = setInterval(() => { + safeWrite({ type: 'keepAlive' }); + }, keepAliveMs); + safeWrite({ type: 'keepAlive' }); + } + return new Response(responseStream.readable, { headers: { 'Content-Type': 'text/event-stream', @@ -245,6 +265,8 @@ export const POST = async (req: Request) => { }, }); } catch (err) { + disconnect?.(); + safeClose?.(); console.error('An error occurred while processing chat request:', err); return Response.json( { message: 'An error occurred while processing chat request' }, diff --git a/src/app/api/reconnect/[id]/route.ts b/src/app/api/reconnect/[id]/route.ts index 08be11b16..ce7c55aa3 100644 --- a/src/app/api/reconnect/[id]/route.ts +++ b/src/app/api/reconnect/[id]/route.ts @@ -4,6 +4,9 @@ export const POST = async ( req: Request, { params }: { params: Promise<{ id: string }> }, ) => { + let safeClose: (() => void) | undefined; + let disconnect: (() => void) | undefined; + try { const { id } = await params; @@ -16,66 +19,91 @@ export const POST = async ( const responseStream = new TransformStream(); const writer = responseStream.writable.getWriter(); const encoder = new TextEncoder(); + const keepAliveMs = 15_000; + let streamClosed = false; + let keepAliveInterval: ReturnType | undefined; + + const safeWrite = (payload: Record) => { + if (streamClosed) return; + + writer.write(encoder.encode(JSON.stringify(payload) + '\n')).catch((error) => { + console.warn('Failed to write reconnect stream payload:', error); + streamClosed = true; + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + }); + }; - const disconnect = session.subscribe((event, data) => { + safeClose = () => { + if (streamClosed) return; + + streamClosed = true; + + if (keepAliveInterval) { + clearInterval(keepAliveInterval); + } + + writer.close().catch((error) => { + console.warn('Failed to close reconnect stream:', error); + }); + }; + + disconnect = session.subscribe((event, data) => { if (event === 'data') { if (data.type === 'block') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'block', - block: data.block, - }) + '\n', - ), - ); + safeWrite({ + type: 'block', + block: data.block, + }); } else if (data.type === 'updateBlock') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'updateBlock', - blockId: data.blockId, - patch: data.patch, - }) + '\n', - ), - ); + safeWrite({ + type: 'updateBlock', + blockId: data.blockId, + patch: data.patch, + }); } else if (data.type === 'researchComplete') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'researchComplete', - }) + '\n', - ), - ); + safeWrite({ + type: 'researchComplete', + }); } } else if (event === 'end') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'messageEnd', - }) + '\n', - ), - ); - writer.close(); - disconnect(); + safeWrite({ + type: 'messageEnd', + }); + safeClose?.(); + if (disconnect) { + disconnect(); + } else { + queueMicrotask(() => disconnect?.()); + } } else if (event === 'error') { - writer.write( - encoder.encode( - JSON.stringify({ - type: 'error', - data: data.data, - }) + '\n', - ), - ); - writer.close(); - disconnect(); + safeWrite({ + type: 'error', + data: data.data, + }); + safeClose?.(); + if (disconnect) { + disconnect(); + } else { + queueMicrotask(() => disconnect?.()); + } } }); req.signal.addEventListener('abort', () => { - disconnect(); - writer.close(); + disconnect?.(); + safeClose?.(); }); + // Start keepalives only after setup succeeds + if (!streamClosed) { + keepAliveInterval = setInterval(() => { + safeWrite({ type: 'keepAlive' }); + }, keepAliveMs); + safeWrite({ type: 'keepAlive' }); + } + return new Response(responseStream.readable, { headers: { 'Content-Type': 'text/event-stream', @@ -84,6 +112,8 @@ export const POST = async ( }, }); } catch (err) { + disconnect?.(); + safeClose?.(); console.error('Error in reconnecting to session stream: ', err); return Response.json( { message: 'An error has occurred.' }, diff --git a/src/lib/agents/media/video.ts b/src/lib/agents/media/video.ts index c8f19b69a..9250a9fed 100644 --- a/src/lib/agents/media/video.ts +++ b/src/lib/agents/media/video.ts @@ -43,19 +43,31 @@ const searchVideos = async ( schema: schema, }); - const searchRes = await searchSearxng(res.query, { - engines: ['youtube'], - }); + let searchRes; + try { + searchRes = await searchSearxng(res.query, { + engines: ['youtube'], + }); + } catch (error) { + console.error('Video search failed:', error instanceof Error ? error.message : error); + return []; + } const videos: VideoSearchResult[] = []; searchRes.results.forEach((result) => { if (result.thumbnail && result.url && result.title && result.iframe_src) { + // Normalize embed URL - some SearXNG instances return + // youtube-nocookie.com which doesn't resolve on all networks + const embedUrl = result.iframe_src.replace( + 'www.youtube-nocookie.com/embed', + 'www.youtube.com/embed', + ); videos.push({ img_src: result.thumbnail, url: result.url, title: result.title, - iframe_src: result.iframe_src, + iframe_src: embedUrl, }); } }); diff --git a/src/lib/agents/search/api.ts b/src/lib/agents/search/api.ts index 94b70ae9b..f865b52b3 100644 --- a/src/lib/agents/search/api.ts +++ b/src/lib/agents/search/api.ts @@ -4,98 +4,102 @@ import { classify } from './classifier'; import Researcher from './researcher'; import { getWriterPrompt } from '@/lib/prompts/search/writer'; import { WidgetExecutor } from './widgets'; +import { buildSearchResultsContext } from './context'; class APISearchAgent { async searchAsync(session: SessionManager, input: SearchAgentInput) { - const classification = await classify({ - chatHistory: input.chatHistory, - enabledSources: input.config.sources, - query: input.followUp, - llm: input.config.llm, - }); - - const widgetPromise = WidgetExecutor.executeAll({ - classification, - chatHistory: input.chatHistory, - followUp: input.followUp, - llm: input.config.llm, - }).catch((err) => { - console.error(`Error executing widgets: ${err}`); - return []; - }); - - let searchPromise: Promise | null = null; + try { + const classification = await classify({ + chatHistory: input.chatHistory, + enabledSources: input.config.sources, + query: input.followUp, + llm: input.config.llm, + }); - if (!classification.classification.skipSearch) { - const researcher = new Researcher(); - searchPromise = researcher.research(SessionManager.createSession(), { + const widgetPromise = WidgetExecutor.executeAll({ + classification, chatHistory: input.chatHistory, followUp: input.followUp, - classification: classification, - config: input.config, + llm: input.config.llm, + }).catch((err) => { + console.error(`Error executing widgets: ${err}`); + return []; }); - } - const [widgetOutputs, searchResults] = await Promise.all([ - widgetPromise, - searchPromise, - ]); + let searchPromise: Promise | null = null; + + if (!classification.classification.skipSearch) { + const researcher = new Researcher(); + searchPromise = researcher.research(SessionManager.createSession(), { + chatHistory: input.chatHistory, + followUp: input.followUp, + classification: classification, + config: input.config, + }); + } + + const [widgetOutputs, searchResults] = await Promise.all([ + widgetPromise, + searchPromise ?? Promise.resolve(null), + ]); + + if (searchResults) { + session.emit('data', { + type: 'searchResults', + data: searchResults.searchFindings, + }); + } - if (searchResults) { session.emit('data', { - type: 'searchResults', - data: searchResults.searchFindings, + type: 'researchComplete', }); - } - session.emit('data', { - type: 'researchComplete', - }); + const finalContext = buildSearchResultsContext( + searchResults?.searchFindings || [], + ); - const finalContext = - searchResults?.searchFindings - .map( - (f, index) => - `${f.content}`, - ) - .join('\n') || ''; + const widgetContext = widgetOutputs + .map((o) => { + return `${o.llmContext}`; + }) + .join('\n-------------\n'); - const widgetContext = widgetOutputs - .map((o) => { - return `${o.llmContext}`; - }) - .join('\n-------------\n'); + const finalContextWithWidgets = `\n${finalContext}\n\n\n${widgetContext}\n`; - const finalContextWithWidgets = `\n${finalContext}\n\n\n${widgetContext}\n`; + const writerPrompt = getWriterPrompt( + finalContextWithWidgets, + input.config.systemInstructions, + input.config.mode, + ); - const writerPrompt = getWriterPrompt( - finalContextWithWidgets, - input.config.systemInstructions, - input.config.mode, - ); + const answerStream = input.config.llm.streamText({ + messages: [ + { + role: 'system', + content: writerPrompt, + }, + ...input.chatHistory, + { + role: 'user', + content: input.followUp, + }, + ], + }); - const answerStream = input.config.llm.streamText({ - messages: [ - { - role: 'system', - content: writerPrompt, - }, - ...input.chatHistory, - { - role: 'user', - content: input.followUp, - }, - ], - }); + for await (const chunk of answerStream) { + session.emit('data', { + type: 'response', + data: chunk.contentChunk, + }); + } - for await (const chunk of answerStream) { - session.emit('data', { - type: 'response', - data: chunk.contentChunk, + session.emit('end', {}); + } catch (error) { + console.error('Error while running API search:', error); + session.emit('error', { + data: error instanceof Error ? error.message : 'Search failed', }); } - - session.emit('end', {}); } } diff --git a/src/lib/agents/search/context.ts b/src/lib/agents/search/context.ts new file mode 100644 index 000000000..b03a2bcec --- /dev/null +++ b/src/lib/agents/search/context.ts @@ -0,0 +1,66 @@ +import { Chunk } from '@/lib/types'; +import { getTokenCount, truncateTextByTokens } from '@/lib/utils/splitText'; + +const MAX_TOTAL_SEARCH_CONTEXT_TOKENS = 20000; +const MAX_RESULT_CONTEXT_TOKENS = 2500; +const TRUNCATION_NOTE = + '\n[Result content truncated to fit the model context window.]'; + +const escapeXmlText = (value: string): string => + value + .replace(/&/g, '&') + .replace(//g, '>'); + +const escapeAttribute = (value: string): string => + escapeXmlText(value).replace(/"/g, '"'); + +export const buildSearchResultsContext = (searchFindings: Chunk[] = []) => { + let remainingTokens = MAX_TOTAL_SEARCH_CONTEXT_TOKENS; + const contextParts: string[] = []; + + for (const [index, finding] of searchFindings.entries()) { + if (remainingTokens <= 0) { + break; + } + + const title = escapeAttribute( + String(finding.metadata?.title || `Result ${index + 1}`), + ); + const prefix = ``; + const suffix = ``; + const wrapperTokens = getTokenCount(prefix) + getTokenCount(suffix); + const availableContentTokens = Math.min( + MAX_RESULT_CONTEXT_TOKENS, + remainingTokens - wrapperTokens, + ); + + if (availableContentTokens <= 0) { + break; + } + + const fullContent = String(finding.content || ''); + const fullContentTokens = getTokenCount(fullContent); + let content = truncateTextByTokens(fullContent, availableContentTokens); + + if (fullContentTokens > availableContentTokens) { + const noteBudget = Math.max( + 0, + availableContentTokens - getTokenCount(TRUNCATION_NOTE), + ); + content = `${truncateTextByTokens(fullContent, noteBudget)}${TRUNCATION_NOTE}`; + } + + const entry = `${prefix}${escapeXmlText(content)}${suffix}`; + const entryTokens = getTokenCount(entry); + + if (entryTokens > remainingTokens) { + break; + } + + contextParts.push(entry); + remainingTokens -= entryTokens; + } + + return contextParts.join('\n'); +}; diff --git a/src/lib/agents/search/index.ts b/src/lib/agents/search/index.ts index 859183293..b38e8089c 100644 --- a/src/lib/agents/search/index.ts +++ b/src/lib/agents/search/index.ts @@ -8,178 +8,202 @@ import db from '@/lib/db'; import { chats, messages } from '@/lib/db/schema'; import { and, eq, gt } from 'drizzle-orm'; import { TextBlock } from '@/lib/types'; +import { buildSearchResultsContext } from './context'; class SearchAgent { async searchAsync(session: SessionManager, input: SearchAgentInput) { - const exists = await db.query.messages.findFirst({ - where: and( - eq(messages.chatId, input.chatId), - eq(messages.messageId, input.messageId), - ), - }); - - if (!exists) { - await db.insert(messages).values({ - chatId: input.chatId, - messageId: input.messageId, - backendId: session.id, - query: input.followUp, - createdAt: new Date().toISOString(), - status: 'answering', - responseBlocks: [], + try { + const exists = await db.query.messages.findFirst({ + where: and( + eq(messages.chatId, input.chatId), + eq(messages.messageId, input.messageId), + ), }); - } else { - await db - .delete(messages) - .where( - and(eq(messages.chatId, input.chatId), gt(messages.id, exists.id)), - ) - .execute(); - await db - .update(messages) - .set({ - status: 'answering', + + if (!exists) { + await db.insert(messages).values({ + chatId: input.chatId, + messageId: input.messageId, backendId: session.id, + query: input.followUp, + createdAt: new Date().toISOString(), + status: 'answering', responseBlocks: [], - }) - .where( - and( - eq(messages.chatId, input.chatId), - eq(messages.messageId, input.messageId), - ), - ) - .execute(); - } - - const classification = await classify({ - chatHistory: input.chatHistory, - enabledSources: input.config.sources, - query: input.followUp, - llm: input.config.llm, - }); - - const widgetPromise = WidgetExecutor.executeAll({ - classification, - chatHistory: input.chatHistory, - followUp: input.followUp, - llm: input.config.llm, - }).then((widgetOutputs) => { - widgetOutputs.forEach((o) => { - session.emitBlock({ - id: crypto.randomUUID(), - type: 'widget', - data: { - widgetType: o.type, - params: o.data, - }, }); - }); - return widgetOutputs; - }); + } else { + await db.transaction(async (tx) => { + await tx + .delete(messages) + .where( + and(eq(messages.chatId, input.chatId), gt(messages.id, exists.id)), + ) + .execute(); + await tx + .update(messages) + .set({ + status: 'answering', + backendId: session.id, + responseBlocks: [], + }) + .where( + and( + eq(messages.chatId, input.chatId), + eq(messages.messageId, input.messageId), + ), + ) + .execute(); + }); + } - let searchPromise: Promise | null = null; + const classification = await classify({ + chatHistory: input.chatHistory, + enabledSources: input.config.sources, + query: input.followUp, + llm: input.config.llm, + }); - if (!classification.classification.skipSearch) { - const researcher = new Researcher(); - searchPromise = researcher.research(session, { + const widgetPromise = WidgetExecutor.executeAll({ + classification, chatHistory: input.chatHistory, followUp: input.followUp, - classification: classification, - config: input.config, + llm: input.config.llm, + }).then((widgetOutputs) => { + widgetOutputs.forEach((o) => { + session.emitBlock({ + id: crypto.randomUUID(), + type: 'widget', + data: { + widgetType: o.type, + params: o.data, + }, + }); + }); + return widgetOutputs; }); - } - const [widgetOutputs, searchResults] = await Promise.all([ - widgetPromise, - searchPromise, - ]); + let searchPromise: Promise | null = null; - session.emit('data', { - type: 'researchComplete', - }); + if (!classification.classification.skipSearch) { + const researcher = new Researcher(); + searchPromise = researcher.research(session, { + chatHistory: input.chatHistory, + followUp: input.followUp, + classification: classification, + config: input.config, + }); + } - const finalContext = - searchResults?.searchFindings - .map( - (f, index) => - `${f.content}`, - ) - .join('\n') || ''; - - const widgetContext = widgetOutputs - .map((o) => { - return `${o.llmContext}`; - }) - .join('\n-------------\n'); - - const finalContextWithWidgets = `\n${finalContext}\n\n\n${widgetContext}\n`; - - const writerPrompt = getWriterPrompt( - finalContextWithWidgets, - input.config.systemInstructions, - input.config.mode, - ); - const answerStream = input.config.llm.streamText({ - messages: [ - { - role: 'system', - content: writerPrompt, - }, - ...input.chatHistory, - { - role: 'user', - content: input.followUp, - }, - ], - }); - - let responseBlockId = ''; - - for await (const chunk of answerStream) { - if (!responseBlockId) { - const block: TextBlock = { - id: crypto.randomUUID(), - type: 'text', - data: chunk.contentChunk, - }; - - session.emitBlock(block); - - responseBlockId = block.id; - } else { - const block = session.getBlock(responseBlockId) as TextBlock | null; + const [widgetOutputs, searchResults] = await Promise.all([ + widgetPromise, + searchPromise ?? Promise.resolve(null), + ]); - if (!block) { - continue; - } + session.emit('data', { + type: 'researchComplete', + }); - block.data += chunk.contentChunk; + const finalContext = buildSearchResultsContext( + searchResults?.searchFindings || [], + ); + + const widgetContext = widgetOutputs + .map((o) => { + return `${o.llmContext}`; + }) + .join('\n-------------\n'); - session.updateBlock(block.id, [ + const finalContextWithWidgets = `\n${finalContext}\n\n\n${widgetContext}\n`; + + const writerPrompt = getWriterPrompt( + finalContextWithWidgets, + input.config.systemInstructions, + input.config.mode, + ); + const answerStream = input.config.llm.streamText({ + messages: [ + { + role: 'system', + content: writerPrompt, + }, + ...input.chatHistory, { - op: 'replace', - path: '/data', - value: block.data, + role: 'user', + content: input.followUp, }, - ]); + ], + }); + + let responseBlockId = ''; + + for await (const chunk of answerStream) { + if (!responseBlockId) { + const block: TextBlock = { + id: crypto.randomUUID(), + type: 'text', + data: chunk.contentChunk, + }; + + session.emitBlock(block); + + responseBlockId = block.id; + } else { + const block = session.getBlock(responseBlockId) as TextBlock | null; + + if (!block) { + continue; + } + + block.data += chunk.contentChunk; + + session.updateBlock(block.id, [ + { + op: 'replace', + path: '/data', + value: block.data, + }, + ]); + } } - } - session.emit('end', {}); + session.emit('end', {}); - await db - .update(messages) - .set({ - status: 'completed', - responseBlocks: session.getAllBlocks(), - }) - .where( - and( - eq(messages.chatId, input.chatId), - eq(messages.messageId, input.messageId), - ), - ) - .execute(); + await db + .update(messages) + .set({ + status: 'completed', + responseBlocks: session.getAllBlocks(), + }) + .where( + and( + eq(messages.chatId, input.chatId), + eq(messages.messageId, input.messageId), + ), + ) + .execute(); + } catch (error) { + console.error('Error while running search:', error); + session.emit('error', { + data: error instanceof Error ? error.message : 'Search failed', + }); + + try { + await db + .update(messages) + .set({ + status: 'error', + responseBlocks: session.getAllBlocks(), + }) + .where( + and( + eq(messages.chatId, input.chatId), + eq(messages.messageId, input.messageId), + ), + ) + .execute(); + } catch (dbError) { + console.error('Failed to persist errored search state:', dbError); + } + } } } diff --git a/src/lib/agents/search/researcher/actions/academicSearch.ts b/src/lib/agents/search/researcher/actions/academicSearch.ts index 72e1f4b14..c328a7eb4 100644 --- a/src/lib/agents/search/researcher/actions/academicSearch.ts +++ b/src/lib/agents/search/researcher/actions/academicSearch.ts @@ -30,7 +30,7 @@ const academicSearchAction: ResearchAction = { config.classification.classification.skipSearch === false && config.classification.classification.academicSearch === true, execute: async (input, additionalConfig) => { - input.queries = input.queries.slice(0, 3); + input.queries = (input.queries ?? []).slice(0, 3); const researchBlock = additionalConfig.session.getBlock( additionalConfig.researchBlockId, @@ -58,9 +58,17 @@ const academicSearchAction: ResearchAction = { let results: Chunk[] = []; const search = async (q: string) => { - const res = await searchSearxng(q, { - engines: ['arxiv', 'google scholar', 'pubmed'], - }); + let res; + try { + res = await searchSearxng(q, { + engines: ['arxiv', 'google scholar', 'pubmed'], + }); + } catch (error) { + console.error(`Academic search failed for query "${q}":`, error); + return; + } + + if (!res.results || res.results.length === 0) return; const resultChunks: Chunk[] = res.results.map((r) => ({ content: r.content || r.title, diff --git a/src/lib/agents/search/researcher/actions/scrapeURL.ts b/src/lib/agents/search/researcher/actions/scrapeURL.ts index c702a7014..e5d9e1cf2 100644 --- a/src/lib/agents/search/researcher/actions/scrapeURL.ts +++ b/src/lib/agents/search/researcher/actions/scrapeURL.ts @@ -1,10 +1,22 @@ -import z from 'zod'; -import { ResearchAction } from '../../types'; import { Chunk, ReadingResearchBlock } from '@/lib/types'; +import dns from 'node:dns/promises'; +import net from 'node:net'; import TurnDown from 'turndown'; -import path from 'path'; +import z from 'zod'; +import { ResearchAction } from '../../types'; +import { splitText } from '@/lib/utils/splitText'; const turndownService = new TurnDown(); +const MAX_REDIRECTS = 5; +const BLOCKED_HOSTNAMES = new Set(['localhost', '127.0.0.1', '0.0.0.0', '::1']); +const BLOCKED_HOST_SUFFIXES = [ + '.localhost', + '.local', + '.localdomain', + '.internal', + '.lan', + '.home.arpa', +]; const schema = z.object({ urls: z.array(z.string()).describe('A list of URLs to scrape content from.'), @@ -17,6 +29,152 @@ You should only call this tool when the user has specifically requested informat For example, if the user says "Please summarize the content of https://example.com/article", you can call this tool with that URL to get the content and then provide the summary or "What does X mean according to https://example.com/page", you can call this tool with that URL to get the content and provide the explanation. `; +const normalizeAddress = (address: string) => + address + .toLowerCase() + .replace(/^\[|\]$/g, '') + .split('%')[0]; + +const isBlockedIPv4 = (address: string): boolean => { + const octets = address.split('.').map((part) => Number.parseInt(part, 10)); + + if (octets.length !== 4 || octets.some((part) => Number.isNaN(part))) { + return false; + } + + const [a, b] = octets; + + return ( + a === 0 || + a === 10 || + a === 127 || + (a === 100 && b >= 64 && b <= 127) || + (a === 169 && b === 254) || + (a === 172 && b >= 16 && b <= 31) || + (a === 192 && b === 168) + ); +}; + +const isBlockedIPv6 = (address: string): boolean => { + const normalized = normalizeAddress(address); + + if (normalized === '::1' || normalized === '::') { + return true; + } + + if (normalized.startsWith('::ffff:')) { + return isBlockedIPAddress(normalized.slice('::ffff:'.length)); + } + + return ( + normalized.startsWith('fc') || + normalized.startsWith('fd') || + normalized.startsWith('fe8') || + normalized.startsWith('fe9') || + normalized.startsWith('fea') || + normalized.startsWith('feb') || + normalized.startsWith('ff') // multicast + ); +}; + +const isBlockedIPAddress = (address: string): boolean => { + const normalized = normalizeAddress(address); + const version = net.isIP(normalized); + + if (version === 4) { + return isBlockedIPv4(normalized); + } + + if (version === 6) { + return isBlockedIPv6(normalized); + } + + return false; +}; + +const isBlockedHostname = (hostname: string): boolean => { + const normalized = hostname.toLowerCase(); + + return ( + BLOCKED_HOSTNAMES.has(normalized) || + BLOCKED_HOST_SUFFIXES.some((suffix) => normalized.endsWith(suffix)) + ); +}; + +const assertSafeScrapeURL = async (rawURL: string): Promise => { + let parsed: URL; + + try { + parsed = new URL(rawURL); + } catch { + throw new Error(`Invalid URL: ${rawURL}`); + } + + if (!['http:', 'https:'].includes(parsed.protocol)) { + throw new Error( + `Unsupported URL protocol for scraping: ${parsed.protocol}`, + ); + } + + const hostname = parsed.hostname.toLowerCase(); + + if (isBlockedHostname(hostname) || isBlockedIPAddress(hostname)) { + throw new Error( + `Refusing to access local or private network URL: ${rawURL}`, + ); + } + + try { + const resolved = await dns.lookup(hostname, { all: true, verbatim: true }); + + if (resolved.some((entry) => isBlockedIPAddress(entry.address))) { + throw new Error( + `Refusing to access local or private network URL: ${rawURL}`, + ); + } + } catch (error: any) { + if ( + error instanceof Error && + error.message.startsWith( + 'Refusing to access local or private network URL:', + ) + ) { + throw error; + } + // Fail closed: if DNS resolution fails, refuse the scrape + throw new Error( + `DNS lookup failed for ${hostname}, refusing to scrape: ${(error as Error).message}`, + ); + } + + return parsed; +}; + +const fetchScrapeURL = async ( + rawURL: string, + redirectCount = 0, +): Promise => { + const safeURL = await assertSafeScrapeURL(rawURL); + const res = await fetch(safeURL, { redirect: 'manual' }); + + if (res.status >= 300 && res.status < 400) { + if (redirectCount >= MAX_REDIRECTS) { + throw new Error(`Too many redirects while scraping ${rawURL}`); + } + + const location = res.headers.get('location'); + + if (!location) { + throw new Error(`Redirect missing location while scraping ${rawURL}`); + } + + const redirectedURL = new URL(location, safeURL).toString(); + return fetchScrapeURL(redirectedURL, redirectCount + 1); + } + + return res; +}; + const scrapeURLAction: ResearchAction = { name: 'scrape_url', schema: schema, @@ -25,7 +183,7 @@ const scrapeURLAction: ResearchAction = { getDescription: () => actionDescription, enabled: (_) => true, execute: async (params, additionalConfig) => { - params.urls = params.urls.slice(0, 3); + params.urls = (params.urls ?? []).slice(0, 3); let readingBlockId = crypto.randomUUID(); let readingEmitted = false; @@ -39,12 +197,19 @@ const scrapeURLAction: ResearchAction = { await Promise.all( params.urls.map(async (url) => { try { - const res = await fetch(url); - const text = await res.text(); + const res = await fetchScrapeURL(url); + let text = await res.text(); const title = text.match(/(.*?)<\/title>/i)?.[1] || `Content from ${url}`; + // Cap raw HTML before Turndown so we don't spend CPU converting + // megabytes of markup we'll mostly throw away after tokenization. + const maxHtmlChars = 200_000; + if (text.length > maxHtmlChars) { + text = text.slice(0, maxHtmlChars); + } + if ( !readingEmitted && researchBlock && @@ -110,14 +275,21 @@ const scrapeURLAction: ResearchAction<typeof schema> = { const markdown = turndownService.turndown(text); + // Limit scraped content to avoid blowing up the context window. + // splitText chunks by token count — we only keep the first chunk. + const maxTokensPerPage = 6000; + const chunks = splitText(markdown, maxTokensPerPage, 0); + const content = chunks.length > 0 ? chunks[0] : markdown; + results.push({ - content: markdown, + content, metadata: { url, title: title, }, }); } catch (error) { + console.error(`Failed to scrape URL ${url}:`, error); results.push({ content: `Failed to fetch content from ${url}: ${error}`, metadata: { diff --git a/src/lib/agents/search/researcher/actions/socialSearch.ts b/src/lib/agents/search/researcher/actions/socialSearch.ts index 16468ab4c..c99bd9593 100644 --- a/src/lib/agents/search/researcher/actions/socialSearch.ts +++ b/src/lib/agents/search/researcher/actions/socialSearch.ts @@ -30,7 +30,7 @@ const socialSearchAction: ResearchAction<typeof schema> = { config.classification.classification.skipSearch === false && config.classification.classification.discussionSearch === true, execute: async (input, additionalConfig) => { - input.queries = input.queries.slice(0, 3); + input.queries = (input.queries ?? []).slice(0, 3); const researchBlock = additionalConfig.session.getBlock( additionalConfig.researchBlockId, @@ -58,9 +58,17 @@ const socialSearchAction: ResearchAction<typeof schema> = { let results: Chunk[] = []; const search = async (q: string) => { - const res = await searchSearxng(q, { - engines: ['reddit'], - }); + let res; + try { + res = await searchSearxng(q, { + engines: ['reddit'], + }); + } catch (error) { + console.error(`Social search failed for query "${q}":`, error); + return; + } + + if (!res.results || res.results.length === 0) return; const resultChunks: Chunk[] = res.results.map((r) => ({ content: r.content || r.title, diff --git a/src/lib/agents/search/researcher/actions/uploadsSearch.ts b/src/lib/agents/search/researcher/actions/uploadsSearch.ts index 819506395..3fd4ad3b4 100644 --- a/src/lib/agents/search/researcher/actions/uploadsSearch.ts +++ b/src/lib/agents/search/researcher/actions/uploadsSearch.ts @@ -27,7 +27,7 @@ const uploadsSearchAction: ResearchAction<typeof schema> = { Never use this tool to search the web or for information that is not contained within the user's uploaded files. `, execute: async (input, additionalConfig) => { - input.queries = input.queries.slice(0, 3); + input.queries = (input.queries ?? []).slice(0, 3); const researchBlock = additionalConfig.session.getBlock( additionalConfig.researchBlockId, diff --git a/src/lib/agents/search/researcher/actions/webSearch.ts b/src/lib/agents/search/researcher/actions/webSearch.ts index 4d60b79f2..d927143b1 100644 --- a/src/lib/agents/search/researcher/actions/webSearch.ts +++ b/src/lib/agents/search/researcher/actions/webSearch.ts @@ -4,7 +4,6 @@ import { searchSearxng } from '@/lib/searxng'; import { Chunk, SearchResultsResearchBlock } from '@/lib/types'; const actionSchema = z.object({ - type: z.literal('web_search'), queries: z .array(z.string()) .describe('An array of search queries to perform web searches for.'), @@ -40,7 +39,7 @@ For example if the user is asking about Tesla, your actions should be like: 6. done. You can search for 3 queries in one go, make sure to utilize all 3 queries to maximize the information you can gather. If a question is simple, then split your queries to cover different aspects or related topics to get a comprehensive understanding. -If this tool is present and no other tools are more relevant, you MUST use this tool to get the needed information. You can call this tools, multiple times as needed. +If this tool is present and no other tools are more relevant, you MUST use this tool to get the needed information. You can call this tool, multiple times as needed. `; const qualityModePrompt = ` @@ -53,7 +52,7 @@ Never stop before at least 5-6 iterations of searches unless the user question i Your queries shouldn't be sentences but rather keywords that are SEO friendly and can be used to search the web for information. You can search for 3 queries in one go, make sure to utilize all 3 queries to maximize the information you can gather. If a question is simple, then split your queries to cover different aspects or related topics to get a comprehensive understanding. -If this tool is present and no other tools are more relevant, you MUST use this tool to get the needed information. You can call this tools, multiple times as needed. +If this tool is present and no other tools are more relevant, you MUST use this tool to get the needed information. You can call this tool, multiple times as needed. `; const webSearchAction: ResearchAction<typeof actionSchema> = { @@ -85,6 +84,13 @@ const webSearchAction: ResearchAction<typeof actionSchema> = { config.sources.includes('web') && config.classification.classification.skipSearch === false, execute: async (input, additionalConfig) => { + // Guard against undefined or empty queries + if (!input.queries || !Array.isArray(input.queries) || input.queries.length === 0) { + return { + type: 'search_results', + results: [], + }; + } input.queries = input.queries.slice(0, 3); const researchBlock = additionalConfig.session.getBlock( @@ -111,9 +117,19 @@ const webSearchAction: ResearchAction<typeof actionSchema> = { let searchResultsEmitted = false; let results: Chunk[] = []; + let failedQueries = 0; const search = async (q: string) => { - const res = await searchSearxng(q); + let res; + try { + res = await searchSearxng(q); + } catch (error) { + failedQueries++; + console.error('SearXNG search failed:', error instanceof Error ? error.message : error); + return; + } + + if (!res.results || res.results.length === 0) return; const resultChunks: Chunk[] = res.results.map((r) => ({ content: r.content || r.title, @@ -172,6 +188,12 @@ const webSearchAction: ResearchAction<typeof actionSchema> = { await Promise.all(input.queries.map(search)); + if (results.length === 0 && failedQueries > 0) { + throw new Error( + `All ${failedQueries} search queries failed. SearXNG may be unavailable.`, + ); + } + return { type: 'search_results', results, diff --git a/src/lib/agents/search/researcher/index.ts b/src/lib/agents/search/researcher/index.ts index d6532819e..65db1bd73 100644 --- a/src/lib/agents/search/researcher/index.ts +++ b/src/lib/agents/search/researcher/index.ts @@ -44,12 +44,13 @@ class Researcher { }, }); + const chatHistory = input.chatHistory || []; const agentMessageHistory: Message[] = [ { role: 'user', content: ` <conversation> - ${formatChatHistoryAsString(input.chatHistory.slice(-10))} + ${formatChatHistoryAsString(chatHistory.slice(-10))} User: ${input.followUp} (Standalone question: ${input.classification.standaloneFollowUp}) </conversation> `, diff --git a/src/lib/config/index.ts b/src/lib/config/index.ts index 483053811..7458a6b9a 100644 --- a/src/lib/config/index.ts +++ b/src/lib/config/index.ts @@ -228,9 +228,29 @@ class ConfigManager { /* search section */ this.uiConfigSections.search.forEach((f) => { - if (f.env && !this.currentConfig.search[f.key]) { - this.currentConfig.search[f.key] = - process.env[f.env] ?? f.default ?? ''; + if (!f.env) return; + + const envValue = process.env[f.env]?.trim(); + if (envValue) { + if (f.key === 'searxngURL') { + try { + const parsed = new URL(envValue); + if (parsed.protocol !== 'http:' && parsed.protocol !== 'https:') { + this.currentConfig.search[f.key] = f.default ?? ''; + return; + } + } catch { + this.currentConfig.search[f.key] = f.default ?? ''; + return; + } + } + + this.currentConfig.search[f.key] = envValue; + return; + } + + if (!this.currentConfig.search[f.key]) { + this.currentConfig.search[f.key] = f.default ?? ''; } }); diff --git a/src/lib/hooks/useChat.tsx b/src/lib/hooks/useChat.tsx index 5ee6d9fbe..449343970 100644 --- a/src/lib/hooks/useChat.tsx +++ b/src/lib/hooks/useChat.tsx @@ -335,6 +335,14 @@ export const ChatProvider = ({ children }: { children: React.ReactNode }) => { const citationRegex = /\[([^\]]+)\]/g; const regex = /\[(\d+)\]/g; + // Handle missing opening <think> tag (deepseek-r1-671b only emits </think>) + if ( + !processedText.includes('<think>') && + processedText.includes('</think>') + ) { + processedText = '<think>' + processedText; + } + if (processedText.includes('<think>')) { const openThinkTag = processedText.match(/<think>/g)?.length || 0; const closeThinkTag = @@ -551,6 +559,10 @@ export const ChatProvider = ({ children }: { children: React.ReactNode }) => { const messageId = message.messageId; return async (data: any) => { + if (data.type === 'keepAlive') { + return; + } + if (data.type === 'error') { toast.error(data.data); setLoading(false); diff --git a/src/lib/models/providers/groq/groqLLM.ts b/src/lib/models/providers/groq/groqLLM.ts index dfcb2942c..df9babc18 100644 --- a/src/lib/models/providers/groq/groqLLM.ts +++ b/src/lib/models/providers/groq/groqLLM.ts @@ -1,5 +1,42 @@ +import { repairJson } from '@toolsycc/json-repair'; +import { GenerateObjectInput } from '../../types'; import OpenAILLM from '../openai/openaiLLM'; -class GroqLLM extends OpenAILLM {} +class GroqLLM extends OpenAILLM { + async generateObject<T>(input: GenerateObjectInput): Promise<T> { + const response = await this.openAIClient.chat.completions.create({ + messages: this.convertToOpenAIMessages(input.messages), + model: this.config.model, + temperature: + input.options?.temperature ?? this.config.options?.temperature ?? 1.0, + top_p: input.options?.topP ?? this.config.options?.topP, + max_completion_tokens: + input.options?.maxTokens ?? this.config.options?.maxTokens, + stop: input.options?.stopSequences ?? this.config.options?.stopSequences, + frequency_penalty: + input.options?.frequencyPenalty ?? + this.config.options?.frequencyPenalty, + presence_penalty: + input.options?.presencePenalty ?? this.config.options?.presencePenalty, + response_format: { type: 'json_object' }, + }); + + if (response.choices && response.choices.length > 0) { + try { + return input.schema.parse( + JSON.parse( + repairJson(response.choices[0].message.content || '', { + extractJson: true, + }) as string, + ), + ) as T; + } catch (err) { + throw new Error(`Error parsing response from Groq: ${err}`); + } + } + + throw new Error('No response from Groq'); + } +} export default GroqLLM; diff --git a/src/lib/models/providers/index.ts b/src/lib/models/providers/index.ts index cabfaa96f..26752f5e2 100644 --- a/src/lib/models/providers/index.ts +++ b/src/lib/models/providers/index.ts @@ -8,6 +8,7 @@ import GroqProvider from './groq'; import LemonadeProvider from './lemonade'; import AnthropicProvider from './anthropic'; import LMStudioProvider from './lmstudio'; +import VeniceProvider from './venice'; export const providers: Record<string, ProviderConstructor<any>> = { openai: OpenAIProvider, @@ -18,6 +19,7 @@ export const providers: Record<string, ProviderConstructor<any>> = { lemonade: LemonadeProvider, anthropic: AnthropicProvider, lmstudio: LMStudioProvider, + venice: VeniceProvider, }; export const getModelProvidersUIConfigSection = diff --git a/src/lib/models/providers/lmstudio/lmstudioLLM.ts b/src/lib/models/providers/lmstudio/lmstudioLLM.ts index 4e98dc840..2fad7d61d 100644 --- a/src/lib/models/providers/lmstudio/lmstudioLLM.ts +++ b/src/lib/models/providers/lmstudio/lmstudioLLM.ts @@ -1,5 +1,148 @@ import OpenAILLM from '../openai/openaiLLM'; +import { GenerateObjectInput } from '../../types'; +import z from 'zod'; +import { repairJson } from '@toolsycc/json-repair'; -class LMStudioLLM extends OpenAILLM {} +/** + * LM Studio LLM implementation. + * + * Extends OpenAILLM but overrides generateObject because LM Studio + * (and most OpenAI-compatible APIs) don't support OpenAI's structured + * output feature (zodResponseFormat / chat.completions.parse). + * + * Instead, we use the standard chat completion endpoint with JSON mode + * and parse the response manually. + * + * Contributed by The Noble House™ AI Lab (https://thenoblehouse.ai) + */ +class LMStudioLLM extends OpenAILLM { + /** + * Generate a structured object response from the LLM. + * + * Uses standard chat completion with JSON mode instead of OpenAI's + * structured output feature for compatibility with LM Studio. + */ + async generateObject<T>(input: GenerateObjectInput): Promise<T> { + if (!input.messages || input.messages.length === 0) { + throw new Error('LM Studio generateObject requires at least one message'); + } + + // Convert schema to JSON schema for the prompt + const jsonSchema = z.toJSONSchema(input.schema); + const lastMessage = input.messages[input.messages.length - 1]; + + // Build messages with JSON instruction + const messagesWithJsonInstruction = [ + ...input.messages.slice(0, -1), // All messages except the last + { + ...lastMessage, + content: `${lastMessage.content}\n\nRespond with a valid JSON object matching this schema:\n${JSON.stringify(jsonSchema, null, 2)}`, + }, + ]; + + try { + const response = await this.openAIClient.chat.completions.create({ + model: this.config.model, + messages: this.convertToOpenAIMessages(messagesWithJsonInstruction), + temperature: + input.options?.temperature ?? this.config.options?.temperature ?? 0.7, + top_p: input.options?.topP ?? this.config.options?.topP, + max_tokens: + input.options?.maxTokens ?? this.config.options?.maxTokens, + stop: input.options?.stopSequences ?? this.config.options?.stopSequences, + frequency_penalty: + input.options?.frequencyPenalty ?? + this.config.options?.frequencyPenalty, + presence_penalty: + input.options?.presencePenalty ?? this.config.options?.presencePenalty, + response_format: { type: 'json_object' }, + }); + + if (response.choices && response.choices.length > 0) { + const content = response.choices[0].message.content; + if (!content) { + throw new Error('Empty response from LM Studio'); + } + + try { + const parsed = JSON.parse( + repairJson(content, { extractJson: true }) as string + ); + return input.schema.parse(parsed) as T; + } catch (parseErr) { + throw new Error(`Error parsing JSON response from LM Studio: ${parseErr}`); + } + } + + throw new Error('No response from LM Studio'); + } catch (err: any) { + // If JSON mode isn't supported, try without it + const combinedMessage = [err?.message, err?.error?.message] + .filter((v): v is string => typeof v === 'string') + .join(' '); + if (combinedMessage.includes('response_format')) { + return this.generateObjectWithoutJsonMode<T>(input, jsonSchema); + } + throw err; + } + } + + /** + * Fallback method for models that don't support response_format. + * Uses prompt engineering to get JSON output. + */ + private async generateObjectWithoutJsonMode<T>( + input: GenerateObjectInput, + jsonSchema: object, + ): Promise<T> { + // Add stronger JSON instruction when JSON mode isn't available + const lastMessage = input.messages[input.messages.length - 1]; + const messagesWithJsonInstruction = [ + { + role: 'system' as const, + content: `You must respond with valid JSON only. No markdown, no explanations, just the JSON object.`, + }, + ...input.messages.slice(0, -1), + { + ...lastMessage, + content: `${lastMessage.content}\n\nRespond with ONLY a valid JSON object (no markdown code blocks) matching this schema:\n${JSON.stringify(jsonSchema, null, 2)}`, + }, + ]; + + const response = await this.openAIClient.chat.completions.create({ + model: this.config.model, + messages: this.convertToOpenAIMessages(messagesWithJsonInstruction), + temperature: + input.options?.temperature ?? this.config.options?.temperature ?? 0.7, + top_p: input.options?.topP ?? this.config.options?.topP, + max_tokens: + input.options?.maxTokens ?? this.config.options?.maxTokens, + stop: input.options?.stopSequences ?? this.config.options?.stopSequences, + frequency_penalty: + input.options?.frequencyPenalty ?? + this.config.options?.frequencyPenalty, + presence_penalty: + input.options?.presencePenalty ?? this.config.options?.presencePenalty, + }); + + if (response.choices && response.choices.length > 0) { + const content = response.choices[0].message.content; + if (!content) { + throw new Error('Empty response from LM Studio'); + } + + try { + const parsed = JSON.parse( + repairJson(content, { extractJson: true }) as string + ); + return input.schema.parse(parsed) as T; + } catch (parseErr) { + throw new Error(`Error parsing JSON response from LM Studio: ${parseErr}`); + } + } + + throw new Error('No response from LM Studio'); + } +} export default LMStudioLLM; diff --git a/src/lib/models/providers/ollama/ollamaLLM.ts b/src/lib/models/providers/ollama/ollamaLLM.ts index 3bcd3ccf1..be91cfe4d 100644 --- a/src/lib/models/providers/ollama/ollamaLLM.ts +++ b/src/lib/models/providers/ollama/ollamaLLM.ts @@ -12,6 +12,7 @@ import { parse } from 'partial-json'; import crypto from 'crypto'; import { Message } from '@/lib/types'; import { repairJson } from '@toolsycc/json-repair'; +import { stripMarkdownFences } from '@/lib/utils/parseJson'; type OllamaConfig = { baseURL: string; @@ -92,7 +93,9 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { temperature: input.options?.temperature ?? this.config.options?.temperature ?? 0.7, num_predict: input.options?.maxTokens ?? this.config.options?.maxTokens, - num_ctx: 32000, + num_ctx: + input.options?.contextWindowSize ?? + this.config.options?.contextWindowSize, frequency_penalty: input.options?.frequencyPenalty ?? this.config.options?.frequencyPenalty, @@ -105,15 +108,15 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { }); return { - content: res.message.content, + content: res.message?.content ?? '', toolCalls: - res.message.tool_calls?.map((tc) => ({ + res.message?.tool_calls?.map((tc) => ({ id: crypto.randomUUID(), name: tc.function.name, arguments: tc.function.arguments, })) || [], additionalInfo: { - reasoning: res.message.thinking, + reasoning: res.message?.thinking, }, }; } @@ -146,7 +149,9 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { top_p: input.options?.topP ?? this.config.options?.topP, temperature: input.options?.temperature ?? this.config.options?.temperature ?? 0.7, - num_ctx: 32000, + num_ctx: + input.options?.contextWindowSize ?? + this.config.options?.contextWindowSize, num_predict: input.options?.maxTokens ?? this.config.options?.maxTokens, frequency_penalty: input.options?.frequencyPenalty ?? @@ -161,9 +166,9 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { for await (const chunk of stream) { yield { - contentChunk: chunk.message.content, + contentChunk: chunk.message?.content ?? '', toolCallChunk: - chunk.message.tool_calls?.map((tc, i) => ({ + chunk.message?.tool_calls?.map((tc, i) => ({ id: crypto .createHash('sha256') .update( @@ -175,7 +180,7 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { })) || [], done: chunk.done, additionalInfo: { - reasoning: chunk.message.thinking, + reasoning: chunk.message?.thinking, }, }; } @@ -193,6 +198,9 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { top_p: input.options?.topP ?? this.config.options?.topP, temperature: input.options?.temperature ?? this.config.options?.temperature ?? 0.7, + num_ctx: + input.options?.contextWindowSize ?? + this.config.options?.contextWindowSize, num_predict: input.options?.maxTokens ?? this.config.options?.maxTokens, frequency_penalty: input.options?.frequencyPenalty ?? @@ -206,9 +214,13 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { }); try { + const content = stripMarkdownFences(response.message.content); + if (!content.trim()) { + throw new Error('Empty response from model'); + } return input.schema.parse( JSON.parse( - repairJson(response.message.content, { + repairJson(content, { extractJson: true, }) as string, ), @@ -233,6 +245,9 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { top_p: input.options?.topP ?? this.config.options?.topP, temperature: input.options?.temperature ?? this.config.options?.temperature ?? 0.7, + num_ctx: + input.options?.contextWindowSize ?? + this.config.options?.contextWindowSize, num_predict: input.options?.maxTokens ?? this.config.options?.maxTokens, frequency_penalty: input.options?.frequencyPenalty ?? @@ -246,13 +261,18 @@ class OllamaLLM extends BaseLLM<OllamaConfig> { }); for await (const chunk of stream) { - recievedObj += chunk.message.content; + const delta = chunk.message?.content ?? ''; + if (!delta) continue; + recievedObj += delta; + + // Strip markdown fences if present + const cleanedObj = stripMarkdownFences(recievedObj); try { - yield parse(recievedObj) as T; + yield parse(cleanedObj) as T; } catch (err) { - console.log('Error parsing partial object from Ollama:', err); - yield {} as T; + // Partial JSON may not be parseable yet, skip + continue; } } } diff --git a/src/lib/models/providers/openai/openaiLLM.ts b/src/lib/models/providers/openai/openaiLLM.ts index 5ae1538a0..cf60a0cbd 100644 --- a/src/lib/models/providers/openai/openaiLLM.ts +++ b/src/lib/models/providers/openai/openaiLLM.ts @@ -1,13 +1,11 @@ import OpenAI from 'openai'; import BaseLLM from '../../base/llm'; -import { zodTextFormat, zodResponseFormat } from 'openai/helpers/zod'; import { GenerateObjectInput, GenerateOptions, GenerateTextInput, GenerateTextOutput, StreamTextOutput, - ToolCall, } from '../../types'; import { parse } from 'partial-json'; import z from 'zod'; @@ -19,6 +17,7 @@ import { } from 'openai/resources/index.mjs'; import { Message } from '@/lib/types'; import { repairJson } from '@toolsycc/json-repair'; +import { safeParseJson, stripMarkdownFences } from '@/lib/utils/parseJson'; type OpenAIConfig = { apiKey: string; @@ -40,25 +39,34 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { } convertToOpenAIMessages(messages: Message[]): ChatCompletionMessageParam[] { - return messages.map((msg) => { + return messages.map((msg): ChatCompletionMessageParam | null => { if (msg.role === 'tool') { + if (!msg.id) { + return null; // Skip tool messages without a tool_call_id + } return { role: 'tool', tool_call_id: msg.id, - content: msg.content, + content: msg.content || '', } as ChatCompletionToolMessageParam; } else if (msg.role === 'assistant') { + const validToolCalls = msg.tool_calls?.filter( + (tc) => tc.id && tc.name, + ); return { role: 'assistant', - content: msg.content, - ...(msg.tool_calls && - msg.tool_calls.length > 0 && { - tool_calls: msg.tool_calls?.map((tc) => ({ + content: msg.content || '', + ...(validToolCalls && + validToolCalls.length > 0 && { + tool_calls: validToolCalls.map((tc) => ({ id: tc.id, type: 'function', function: { name: tc.name, - arguments: JSON.stringify(tc.arguments), + arguments: + typeof tc.arguments === 'string' + ? tc.arguments + : JSON.stringify(tc.arguments || {}), }, })), }), @@ -66,7 +74,7 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { } return msg; - }); + }).filter((msg): msg is ChatCompletionMessageParam => msg !== null); } async generateText(input: GenerateTextInput): Promise<GenerateTextOutput> { @@ -102,15 +110,15 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { if (response.choices && response.choices.length > 0) { return { - content: response.choices[0].message.content!, + content: response.choices[0].message?.content ?? '', toolCalls: - response.choices[0].message.tool_calls + response.choices[0].message?.tool_calls ?.map((tc) => { if (tc.type === 'function') { return { name: tc.function.name, id: tc.id, - arguments: JSON.parse(tc.function.arguments), + arguments: safeParseJson<Record<string, any>>(tc.function.arguments) ?? {}, }; } }) @@ -164,27 +172,45 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { for await (const chunk of stream) { if (chunk.choices && chunk.choices.length > 0) { const toolCalls = chunk.choices[0].delta.tool_calls; - yield { - contentChunk: chunk.choices[0].delta.content || '', - toolCallChunk: - toolCalls?.map((tc) => { + let parsedToolCalls: any[] = []; + + if (toolCalls) { + for (const tc of toolCalls) { + try { if (!recievedToolCalls[tc.index]) { - const call = { - name: tc.function?.name!, - id: tc.id!, + recievedToolCalls[tc.index] = { + name: tc.function?.name || '', + id: tc.id || '', arguments: tc.function?.arguments || '', }; - recievedToolCalls.push(call); - return { ...call, arguments: parse(call.arguments || '{}') }; } else { const existingCall = recievedToolCalls[tc.index]; + if (tc.function?.name) existingCall.name = tc.function.name; + if (tc.id) existingCall.id = tc.id; existingCall.arguments += tc.function?.arguments || ''; - return { - ...existingCall, - arguments: parse(existingCall.arguments), - }; } - }) || [], + + // Only emit parsed tool call when arguments parse successfully + const current = recievedToolCalls[tc.index]; + if (current.arguments) { + try { + parsedToolCalls.push({ + ...current, + arguments: parse(current.arguments), + }); + } catch { + // Arguments still incomplete — will retry on next chunk + } + } + } catch (parseErr) { + console.error('Error parsing tool call:', parseErr instanceof Error ? parseErr.message : parseErr, 'tool:', tc.function?.name, 'index:', tc.index); + } + } + } + + yield { + contentChunk: chunk.choices[0].delta.content || '', + toolCallChunk: parsedToolCalls, done: chunk.choices[0].finish_reason !== null, additionalInfo: { finishReason: chunk.choices[0].finish_reason, @@ -195,8 +221,23 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { } async generateObject<T>(input: GenerateObjectInput): Promise<T> { - const response = await this.openAIClient.chat.completions.parse({ - messages: this.convertToOpenAIMessages(input.messages), + // Use chat.completions.create instead of chat.completions.parse + // for compatibility with OpenAI-compatible providers (OpenRouter, etc.) + // that don't support the /chat/completions/parse endpoint. + const schemaInstruction = JSON.stringify( + z.toJSONSchema(input.schema), + null, + 2, + ); + + const response = await this.openAIClient.chat.completions.create({ + messages: [ + { + role: 'system', + content: `You must respond with valid JSON only. No markdown code blocks, no explanatory text.\n\nReturn an object matching this JSON Schema:\n${schemaInstruction}`, + }, + ...this.convertToOpenAIMessages(input.messages), + ], model: this.config.model, temperature: input.options?.temperature ?? this.config.options?.temperature ?? 1.0, @@ -209,18 +250,27 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { this.config.options?.frequencyPenalty, presence_penalty: input.options?.presencePenalty ?? this.config.options?.presencePenalty, - response_format: zodResponseFormat(input.schema, 'object'), + response_format: { type: 'json_object' }, }); if (response.choices && response.choices.length > 0) { try { - return input.schema.parse( - JSON.parse( - repairJson(response.choices[0].message.content!, { - extractJson: true, - }) as string, - ), - ) as T; + const content = stripMarkdownFences( + response.choices[0].message.content || '', + ); + if (!content.trim()) { + throw new Error('Empty response from model'); + } + let repairedJson: string; + try { + repairedJson = repairJson(content, { + extractJson: true, + }) as string; + } catch (repairErr) { + console.error('repairJson failed', { contentLength: content.length, error: repairErr instanceof Error ? repairErr.message : String(repairErr) }); + throw new Error(`Failed to repair JSON: ${repairErr}`); + } + return input.schema.parse(JSON.parse(repairedJson)) as T; } catch (err) { throw new Error(`Error parsing response from OpenAI: ${err}`); } @@ -230,11 +280,26 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { } async *streamObject<T>(input: GenerateObjectInput): AsyncGenerator<T> { - let recievedObj: string = ''; + let receivedObj: string = ''; + + // Use chat.completions.create with streaming instead of responses.stream + // for compatibility with OpenAI-compatible providers (OpenRouter, etc.) + // that don't support the OpenAI Responses API. + const schemaInstruction = JSON.stringify( + z.toJSONSchema(input.schema), + null, + 2, + ); - const stream = this.openAIClient.responses.stream({ + const stream = await this.openAIClient.chat.completions.create({ model: this.config.model, - input: input.messages, + messages: [ + { + role: 'system', + content: `You must respond with valid JSON only. No markdown code blocks, no explanatory text.\n\nReturn an object matching this JSON Schema:\n${schemaInstruction}`, + }, + ...this.convertToOpenAIMessages(input.messages), + ], temperature: input.options?.temperature ?? this.config.options?.temperature ?? 1.0, top_p: input.options?.topP ?? this.config.options?.topP, @@ -246,26 +311,23 @@ class OpenAILLM extends BaseLLM<OpenAIConfig> { this.config.options?.frequencyPenalty, presence_penalty: input.options?.presencePenalty ?? this.config.options?.presencePenalty, - text: { - format: zodTextFormat(input.schema, 'object'), - }, + response_format: { type: 'json_object' }, + stream: true, }); for await (const chunk of stream) { - if (chunk.type === 'response.output_text.delta' && chunk.delta) { - recievedObj += chunk.delta; + if (chunk.choices && chunk.choices.length > 0) { + const delta = chunk.choices[0].delta.content || ''; + receivedObj += delta; + + // Strip markdown fences if present + const cleanedObj = stripMarkdownFences(receivedObj); try { - yield parse(recievedObj) as T; - } catch (err) { - console.log('Error parsing partial object from OpenAI:', err); - yield {} as T; - } - } else if (chunk.type === 'response.output_text.done' && chunk.text) { - try { - yield parse(chunk.text) as T; + yield parse(cleanedObj) as T; } catch (err) { - throw new Error(`Error parsing response from OpenAI: ${err}`); + // Partial JSON may not be parseable yet, skip + continue; } } } diff --git a/src/lib/models/providers/venice/index.ts b/src/lib/models/providers/venice/index.ts new file mode 100644 index 000000000..e60c7e999 --- /dev/null +++ b/src/lib/models/providers/venice/index.ts @@ -0,0 +1,132 @@ +import { UIConfigField } from '@/lib/config/types'; +import { getConfiguredModelProviderById } from '@/lib/config/serverRegistry'; +import { Model, ModelList, ProviderMetadata } from '../../types'; +import BaseEmbedding from '../../base/embedding'; +import BaseModelProvider from '../../base/provider'; +import BaseLLM from '../../base/llm'; +import VeniceLLM from './veniceLLM'; + +interface VeniceConfig { + apiKey: string; +} + +const providerConfigFields: UIConfigField[] = [ + { + type: 'password', + name: 'API Key', + key: 'apiKey', + description: 'Your Venice.ai API key', + required: true, + placeholder: 'Venice API Key', + env: 'VENICE_API_KEY', + scope: 'server', + }, +]; + +class VeniceProvider extends BaseModelProvider<VeniceConfig> { + constructor(id: string, name: string, config: VeniceConfig) { + super(id, name, config); + } + + async getDefaultModels(): Promise<ModelList> { + try { + const res = await fetch('https://api.venice.ai/api/v1/models', { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.config.apiKey}`, + }, + }); + + if (!res.ok) { + console.error(`Venice API returned status ${res.status}`); + return { embedding: [], chat: [] }; + } + + const data = await res.json(); + const defaultChatModels: Model[] = []; + + if (data?.data && Array.isArray(data.data)) { + data.data.forEach((m: any) => { + if (m.type === 'text' || !m.type) { + defaultChatModels.push({ + key: m.id, + name: m.id, + }); + } + }); + } + + return { + embedding: [], + chat: defaultChatModels, + }; + } catch (error) { + console.error('Failed to fetch Venice models:', error); + return { embedding: [], chat: [] }; + } + } + + async getModelList(): Promise<ModelList> { + const defaultModels = await this.getDefaultModels(); + const configProvider = getConfiguredModelProviderById(this.id); + + if (!configProvider) { + return defaultModels; + } + + return { + embedding: [ + ...defaultModels.embedding, + ...configProvider.embeddingModels, + ], + chat: [...defaultModels.chat, ...configProvider.chatModels], + }; + } + + async loadChatModel(key: string): Promise<BaseLLM<any>> { + const modelList = await this.getModelList(); + + const exists = modelList.chat.find((m) => m.key === key); + + if (!exists) { + throw new Error( + 'Error Loading Venice Chat Model. Invalid Model Selected', + ); + } + + return new VeniceLLM({ + apiKey: this.config.apiKey, + model: key, + baseURL: 'https://api.venice.ai/api/v1', + }); + } + + async loadEmbeddingModel(key: string): Promise<BaseEmbedding<any>> { + throw new Error('Venice Provider does not support embedding models.'); + } + + static parseAndValidate(raw: any): VeniceConfig { + if (!raw || typeof raw !== 'object') + throw new Error('Invalid config provided. Expected object'); + if (!raw.apiKey) + throw new Error('Invalid config provided. API key must be provided'); + + return { + apiKey: String(raw.apiKey), + }; + } + + static getProviderConfigFields(): UIConfigField[] { + return providerConfigFields; + } + + static getProviderMetadata(): ProviderMetadata { + return { + key: 'venice', + name: 'Venice', + }; + } +} + +export default VeniceProvider; diff --git a/src/lib/models/providers/venice/veniceLLM.ts b/src/lib/models/providers/venice/veniceLLM.ts new file mode 100644 index 000000000..8ae3ceed7 --- /dev/null +++ b/src/lib/models/providers/venice/veniceLLM.ts @@ -0,0 +1,38 @@ +import OpenAI from 'openai'; +import OpenAILLM from '../openai/openaiLLM'; +import { GenerateOptions } from '../../types'; + +type VeniceLLMConfig = { + apiKey: string; + model: string; + baseURL?: string; + options?: GenerateOptions; +}; + +class VeniceLLM extends OpenAILLM { + constructor(config: VeniceLLMConfig) { + super(config); + + this.openAIClient = new OpenAI({ + apiKey: config.apiKey, + baseURL: config.baseURL || 'https://api.venice.ai/api/v1', + fetch: async (url: RequestInfo | URL, init?: RequestInit) => { + if (init?.body && typeof init.body === 'string') { + try { + const body = JSON.parse(init.body); + body.venice_parameters = { + enable_web_search: 'off', + include_venice_system_prompt: false, + }; + init = { ...init, body: JSON.stringify(body) }; + } catch { + /* body isn't JSON, pass through unchanged */ + } + } + return globalThis.fetch(url, init); + }, + }); + } +} + +export default VeniceLLM; diff --git a/src/lib/models/types.ts b/src/lib/models/types.ts index 8abefd70a..8b27b0be8 100644 --- a/src/lib/models/types.ts +++ b/src/lib/models/types.ts @@ -35,6 +35,7 @@ type GenerateOptions = { stopSequences?: string[]; frequencyPenalty?: number; presencePenalty?: number; + contextWindowSize?: number; }; type Tool = { diff --git a/src/lib/prompts/search/classifier.ts b/src/lib/prompts/search/classifier.ts index 770b86dae..41994c5f9 100644 --- a/src/lib/prompts/search/classifier.ts +++ b/src/lib/prompts/search/classifier.ts @@ -38,9 +38,9 @@ NOTE: BY GENERAL KNOWLEDGE WE MEAN INFORMATION THAT IS OBVIOUS, WIDELY KNOWN, OR </labels> <standalone_followup> -For the standalone follow up, you have to generate a self contained, context independant reformulation of the user's query. +For the standalone follow up, you have to generate a self contained, context independent reformulation of the user's query. You basically have to rephrase the user's query in a way that it can be understood without any prior context from the conversation history. -Say for example the converastion is about cars and the user says "How do they work" then the standalone follow up should be "How do cars work?" +Say for example the conversation is about cars and the user says "How do they work" then the standalone follow up should be "How do cars work?" Do not contain excess information or everything that has been discussed before, just reformulate the user's last query in a self contained manner. The standalone follow-up should be concise and to the point. @@ -56,7 +56,7 @@ You must respond in the following JSON format without any extra text, explanatio "discussionSearch": boolean, "showWeatherWidget": boolean, "showStockWidget": boolean, - "showCalculationWidget": boolean, + "showCalculationWidget": boolean }, "standaloneFollowUp": string } diff --git a/src/lib/searxng.ts b/src/lib/searxng.ts index 87767e098..07accdf08 100644 --- a/src/lib/searxng.ts +++ b/src/lib/searxng.ts @@ -44,16 +44,27 @@ export const searchSearxng = async ( try { const res = await fetch(url, { signal: controller.signal, + headers: { + 'X-Forwarded-For': '127.0.0.1', + 'X-Real-IP': '127.0.0.1', + }, }); if (!res.ok) { - throw new Error(`SearXNG error: ${res.statusText}`); + throw new Error( + `SearXNG returned status ${res.status} for query: ${query}`, + ); } - const data = await res.json(); + let data; + try { + data = await res.json(); + } catch { + throw new Error(`SearXNG returned non-JSON response for query: ${query}`); + } - const results: SearxngSearchResult[] = data.results; - const suggestions: string[] = data.suggestions; + const results: SearxngSearchResult[] = data.results ?? []; + const suggestions: string[] = data.suggestions ?? []; return { results, suggestions }; } catch (err: any) { diff --git a/src/lib/session.ts b/src/lib/session.ts index 4f7433038..4c226c97e 100644 --- a/src/lib/session.ts +++ b/src/lib/session.ts @@ -78,8 +78,6 @@ class SessionManager { } subscribe(listener: (event: string, data: any) => void): () => void { - const currentEventsLength = this.events.length; - const handler = (event: string) => (data: any) => listener(event, data); const dataHandler = handler('data'); const endHandler = handler('end'); @@ -89,9 +87,23 @@ class SessionManager { this.emitter.on('end', endHandler); this.emitter.on('error', errorHandler); - for (let i = 0; i < currentEventsLength; i++) { - const { event, data } = this.events[i]; - listener(event, data); + // Send the current state of each block as a snapshot rather than + // replaying every historical event. The old approach replayed all + // block creation + every incremental updateBlock patch, which caused + // reconnecting clients to visually rebuild (and effectively duplicate) + // content they had already received before the connection dropped. + for (const block of this.blocks.values()) { + listener('data', { type: 'block', block: structuredClone(block) }); + } + + // Replay any non-block milestone events (researchComplete, end, error) + // so reconnecting subscribers know if the session already finished. + for (const { event, data } of this.events) { + if (event === 'end' || event === 'error') { + listener(event, data); + } else if (event === 'data' && data.type === 'researchComplete') { + listener(event, data); + } } return () => { diff --git a/src/lib/utils/parseJson.ts b/src/lib/utils/parseJson.ts new file mode 100644 index 000000000..8346481ef --- /dev/null +++ b/src/lib/utils/parseJson.ts @@ -0,0 +1,43 @@ +/** + * Utilities for parsing JSON from LLM responses. + * + * Many LLMs (especially when accessed via OpenAI-compatible APIs like OpenRouter, + * LiteLLM, etc.) wrap JSON responses in markdown code fences even when + * response_format is set to json_object. These utilities help handle such cases. + */ + +/** + * Strip markdown code fences from a string. + * Handles both ```json and plain ``` fences. + * + * @example + * stripMarkdownFences('```json\n{"foo": "bar"}\n```') // '{"foo": "bar"}' + * stripMarkdownFences('{"foo": "bar"}') // '{"foo": "bar"}' + */ +export function stripMarkdownFences(text: string): string { + const trimmed = text.trim(); + if (trimmed.startsWith('```')) { + return trimmed + .replace(/^```(?:json)?\s*/i, '') + .replace(/```\s*$/, '') + .trim(); + } + return trimmed; +} + +/** + * Safely parse JSON, stripping markdown fences first. + * Returns undefined if parsing fails instead of throwing. + * + * @example + * safeParseJson('```json\n{"foo": "bar"}\n```') // { foo: 'bar' } + * safeParseJson('invalid') // undefined + */ +export function safeParseJson<T = unknown>(text: string): T | undefined { + try { + const cleaned = stripMarkdownFences(text); + return JSON.parse(cleaned) as T; + } catch { + return undefined; + } +} diff --git a/src/lib/utils/splitText.ts b/src/lib/utils/splitText.ts index 796bf4b4c..c6cae9cfc 100644 --- a/src/lib/utils/splitText.ts +++ b/src/lib/utils/splitText.ts @@ -4,7 +4,7 @@ const splitRegex = /(?<=\. |\n|! |\? |; |:\s|\d+\.\s|- |\* )/g; const enc = getEncoding('cl100k_base'); -const getTokenCount = (text: string): number => { +export const getTokenCount = (text: string): number => { try { return enc.encode(text).length; } catch { @@ -12,6 +12,37 @@ const getTokenCount = (text: string): number => { } }; +export const truncateTextByTokens = ( + text: string, + maxTokens: number, +): string => { + if (maxTokens <= 0 || text.length === 0) { + return ''; + } + + if (getTokenCount(text) <= maxTokens) { + return text; + } + + let low = 0; + let high = text.length; + let best = ''; + + while (low <= high) { + const mid = Math.floor((low + high) / 2); + const candidate = text.slice(0, mid); + + if (getTokenCount(candidate) <= maxTokens) { + best = candidate; + low = mid + 1; + } else { + high = mid - 1; + } + } + + return best.trimEnd(); +}; + export const splitText = ( text: string, maxTokens = 512,