diff --git a/direction.md b/direction.md new file mode 100644 index 0000000..7706134 --- /dev/null +++ b/direction.md @@ -0,0 +1,41 @@ +# Project Direction: HiveMind Protocol (memU) + +## 1. Assessment of Current State +The project has a solid architectural foundation following the "LLM-Vector-LLM Sandwich": +- **Layer 1 (Router):** Local SLM (Qwen 2.5) for intent and routing. +- **Layer 2 (Storage):** EMUs (Encapsulated Memory Units) using LanceDB. +- **Layer 3 (Reasoning):** Cloud LLMs for final synthesis. + +**Identified Gaps:** +- **Retrieval Bottleneck:** While EMUs are built with LanceDB vector tables, the current `EmuMemoryLayer` service primarily uses basic keyword matching on text chunks loaded into memory. This limits accuracy and performance as the memory grows. +- **API Maturity:** The API is tailored for the built-in Chat UI. To serve as a robust memory layer for external applications, it needs more direct access points to memory queries without the full agentic loop. +- **Modularity:** The hot-swapping logic is present but can be more robust in terms of resource management (e.g., active DB handles). + +## 2. Strategic Roadmap for Performance + +### Vector-First Retrieval +- **On-the-fly Embedding:** Integrate `@xenova/transformers` into the `EmuMemoryLayer` to embed user queries locally. +- **Native LanceDB Search:** Shift from keyword-based scoring to vector similarity search directly against the `vectors.lance` files within EMUs. +- **Hybrid Search:** Combine vector similarity with BM25-style keyword matching for better handling of specific technical terms. + +### Scalability +- **Connection Pooling:** Manage LanceDB connections efficiently during mount/unmount operations. +- **Background Indexing:** Ensure that `/learn` operations index new data in the background without blocking the main chat thread. + +## 3. Robustness for Chat and API + +### Unified Memory API +- **Direct Query Endpoint:** Implement `/api/query` to allow external tools to retrieve relevant memory blocks without triggering a full LLM response. +- **Session-Scoped Memory:** Enhance session management to allow different API consumers to have isolated "working memories" or specific sets of mounted EMUs. + +### Reliable Local Routing +- **Failover Mechanisms:** Improve robustness when local models (Ollama) are slow or unavailable. +- **Strict JSON Parsing:** Ensure the local SLM output is consistently parsed and handled. + +## 4. Modular Memory Ecosystem (EMUs) +- **Standardized Learning:** Create a clear pipeline for "Training" (building an EMU) vs "Learning" (appending to an active EMU). +- **Portable Artifacts:** Refine the export/import functionality to ensure EMUs can be easily shared via Git, IPFS, or Email, as envisioned. +- **Version Control:** Implement simple metadata-based versioning for EMUs to track knowledge updates. + +--- +*Signed, Jules (Software Engineer)* diff --git a/server/src/routes/router.ts b/server/src/routes/router.ts index 7a7013c..0d00b86 100644 --- a/server/src/routes/router.ts +++ b/server/src/routes/router.ts @@ -511,8 +511,12 @@ router.post('/emus/upload', emuUpload.single('file'), (req, res) => { } try { - const mount = memoryLayer.importEmuArchive(file.buffer, file.originalname); - res.json(mount); + memoryLayer.importEmuArchive(file.buffer, file.originalname).then((mount) => { + res.json(mount); + }).catch((error) => { + console.error('Failed to ingest EMU archive', error); + res.status(400).json({ error: 'Unable to import EMU archive' }); + }); } catch (error) { console.error('Failed to ingest EMU archive', error); res.status(400).json({ error: 'Unable to import EMU archive' }); @@ -560,7 +564,7 @@ router.post('/chat', async (req, res) => { } const searchQuery = body.transformedQuery || body.message; - const relevantBlocks = memoryLayer.findRelevantBlocks(searchQuery, { + const relevantBlocks = await memoryLayer.findRelevantBlocks(searchQuery, { intents: body.intent ? [body.intent] : undefined, tags: body.tags }); @@ -586,6 +590,45 @@ router.post('/chat', async (req, res) => { } }); +router.post('/query', async (req, res) => { + const body = req.body as { + message?: string; + sessionId?: string; + limit?: number; + tags?: string[]; + intent?: string; + }; + + if (!body?.message) { + return res.status(400).json({ error: 'Message (query) is required' }); + } + + try { + const limit = Math.max(1, Math.min(20, Number(body.limit) || 4)); + const blocks = await memoryLayer.findRelevantBlocks(body.message, { + intents: body.intent ? [body.intent] : undefined, + tags: body.tags, + limit + }); + + res.json({ + query: body.message, + sessionId: body.sessionId || 'anonymous', + blocks: blocks.map((b) => ({ + id: b.id, + title: b.title, + content: b.content, + source: b.source, + score: b.score, + tags: b.tags + })) + }); + } catch (error) { + console.error('Query error', error); + res.status(500).json({ error: 'Failed to perform memory query' }); + } +}); + router.post('/chat/remote', async (req, res) => { const body = req.body as RouterRequestBody & { transformedQuery?: string; @@ -600,7 +643,7 @@ router.post('/chat/remote', async (req, res) => { const sessionId = body.sessionId || 'default'; const searchQuery = body.transformedQuery || body.message; - const relevantBlocks = memoryLayer.findRelevantBlocks(searchQuery, { + const relevantBlocks = await memoryLayer.findRelevantBlocks(searchQuery, { intents: body.intent ? [body.intent] : undefined, tags: body.tags }); diff --git a/server/src/services/emuMemoryLayer.ts b/server/src/services/emuMemoryLayer.ts index f9a8a0e..f824836 100644 --- a/server/src/services/emuMemoryLayer.ts +++ b/server/src/services/emuMemoryLayer.ts @@ -64,6 +64,10 @@ interface MemoryLayerOptions { type MetadataOverride = MemoryBlockUpdatePayload & { updatedAt?: string }; +interface LanceDBModule { + connect: (uri: string) => Promise; +} + export class EmuMemoryLayer { private storePath: string; private emuBasePath: string; @@ -73,6 +77,9 @@ export class EmuMemoryLayer { private index: MemoryIndex = { byIntent: {}, byTag: {} }; private hiddenBlockIds = new Set(); private metadataOverrides: Record = {}; + private embeddingPipeline: Promise | null = null; + private lanceModule: Promise | null = null; + private activeConnections = new Map(); constructor(options?: MemoryLayerOptions) { this.storePath = @@ -87,7 +94,8 @@ export class EmuMemoryLayer { return [...this.emuMounts].sort((a, b) => a.name.localeCompare(b.name)); } - refreshEmuMounts() { + async refreshEmuMounts() { + this.activeConnections.clear(); this.loadEmuMounts(); this.rebuildIndex(); } @@ -193,6 +201,41 @@ export class EmuMemoryLayer { }; } + private async getEmbeddingPipeline() { + if (!this.embeddingPipeline) { + this.embeddingPipeline = import('@xenova/transformers').then(({ pipeline }) => + pipeline('feature-extraction', 'Xenova/all-MiniLM-L6-v2') + ); + } + return this.embeddingPipeline; + } + + private async loadLance(): Promise { + if (!this.lanceModule) { + this.lanceModule = import('@lancedb/lancedb') as unknown as Promise; + } + return this.lanceModule; + } + + private async getEmuConnection(emuId: string, emuPath: string) { + if (this.activeConnections.has(emuId)) { + return this.activeConnections.get(emuId); + } + + const lancePath = path.join(emuPath, 'lancedb'); + if (!fs.existsSync(lancePath)) return null; + + try { + const lance = await this.loadLance(); + const db = await lance.connect(lancePath); + this.activeConnections.set(emuId, db); + return db; + } catch (error) { + console.warn(`Failed to connect to LanceDB for EMU ${emuId}`, error); + return null; + } + } + addBlock(payload: NewMemoryBlockPayload): MemoryBlock { const content = payload.content.trim(); if (!content) { @@ -231,15 +274,60 @@ export class EmuMemoryLayer { return block; } - findRelevantBlocks( + async findRelevantBlocks( query: string, options?: { intents?: string[]; tags?: string[]; limit?: number } - ): MemoryBlock[] { + ): Promise { + const limit = options?.limit || 4; const tokens = this.tokenize(query); const tagSet = new Set((options?.tags || []).map((tag) => tag.toLowerCase())); const intentSet = new Set((options?.intents || []).map((intent) => intent.toLowerCase())); - const scored = this.getAllBlocks() + let vectorResults: MemoryBlock[] = []; + + try { + const embeddingPipeline = await this.getEmbeddingPipeline(); + const output = await embeddingPipeline(query, { pooling: 'mean', normalize: true }); + const vector = Array.from(output.data as number[]); + + const emuSearches = this.emuMounts.map(async (mount) => { + const db = await this.getEmuConnection(mount.id, mount.path); + if (!db) return []; + + try { + const table = await db.openTable('chunks'); + const results = await table.search(vector).limit(limit).execute(); + + return results.map((row: any) => ({ + id: row.id, + title: mount.name, + content: row.text, + intent: 'memory', + tags: mount.tags, + source: `emu:${mount.id}`, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + summary: row.text.slice(0, 140), + score: 1 - (row._distance || 0), + size: Buffer.byteLength(row.text, 'utf-8') + } as MemoryBlock)); + } catch (err) { + console.warn(`Vector search failed for EMU ${mount.id}`, err); + return []; + } + }); + + const emuBlocks = await Promise.all(emuSearches); + vectorResults = emuBlocks.flat(); + } catch (error) { + console.error('Vector search initialization failed, falling back to keyword search', error); + } + + const allBlocks = [...this.userBlocks, ...this.emuBlocks] + .filter((block) => !this.hiddenBlockIds.has(block.id)) + .map((block) => this.applyOverrides(block)); + + const scored = allBlocks .map((block) => { const blockTags = block.tags.map((tag) => tag.toLowerCase()); const tagMatches = blockTags.filter((tag) => tokens.has(tag) || tagSet.has(tag)).length; @@ -250,12 +338,23 @@ export class EmuMemoryLayer { return { block, score }; }) - .filter((entry) => entry.score > 0) + .filter((entry) => entry.score > 0); + + const combined = [...vectorResults.map(b => ({ block: b, score: b.score * 2 })), ...scored] .sort((a, b) => b.score - a.score) - .slice(0, options?.limit || 4) .map((entry) => entry.block); - return scored; + const seenIds = new Set(); + const unique = []; + for (const b of combined) { + if (!seenIds.has(b.id)) { + seenIds.add(b.id); + unique.push(b); + } + if (unique.length >= limit) break; + } + + return unique; } exportEmuArchive(id: string): { filename: string; buffer: Buffer } { @@ -278,7 +377,7 @@ export class EmuMemoryLayer { return { filename: `${folderName}.zip`, buffer: zip.toBuffer() }; } - importEmuArchive(buffer: Buffer, originalName?: string): EmuMount { + async importEmuArchive(buffer: Buffer, originalName?: string): Promise { const zip = new AdmZip(buffer); const entries = zip.getEntries(); @@ -330,7 +429,7 @@ export class EmuMemoryLayer { fs.writeFileSync(destPath, entry.getData()); } - this.refreshEmuMounts(); + await this.refreshEmuMounts(); const folderId = targetFolder.replace(/\.emu$/, ''); const mounted = this.emuMounts.find((mount) => mount.id === folderId) ||