Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions direction.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Project Direction: HiveMind Protocol (memU)

## 1. Assessment of Current State
The project has a solid architectural foundation following the "LLM-Vector-LLM Sandwich":
- **Layer 1 (Router):** Local SLM (Qwen 2.5) for intent and routing.
- **Layer 2 (Storage):** EMUs (Encapsulated Memory Units) using LanceDB.
- **Layer 3 (Reasoning):** Cloud LLMs for final synthesis.

**Identified Gaps:**
- **Retrieval Bottleneck:** While EMUs are built with LanceDB vector tables, the current `EmuMemoryLayer` service primarily uses basic keyword matching on text chunks loaded into memory. This limits accuracy and performance as the memory grows.
- **API Maturity:** The API is tailored for the built-in Chat UI. To serve as a robust memory layer for external applications, it needs more direct access points to memory queries without the full agentic loop.
- **Modularity:** The hot-swapping logic is present but can be more robust in terms of resource management (e.g., active DB handles).

## 2. Strategic Roadmap for Performance

### Vector-First Retrieval
- **On-the-fly Embedding:** Integrate `@xenova/transformers` into the `EmuMemoryLayer` to embed user queries locally.
- **Native LanceDB Search:** Shift from keyword-based scoring to vector similarity search directly against the `vectors.lance` files within EMUs.
- **Hybrid Search:** Combine vector similarity with BM25-style keyword matching for better handling of specific technical terms.

### Scalability
- **Connection Pooling:** Manage LanceDB connections efficiently during mount/unmount operations.
- **Background Indexing:** Ensure that `/learn` operations index new data in the background without blocking the main chat thread.

## 3. Robustness for Chat and API

### Unified Memory API
- **Direct Query Endpoint:** Implement `/api/query` to allow external tools to retrieve relevant memory blocks without triggering a full LLM response.
- **Session-Scoped Memory:** Enhance session management to allow different API consumers to have isolated "working memories" or specific sets of mounted EMUs.

### Reliable Local Routing
- **Failover Mechanisms:** Improve robustness when local models (Ollama) are slow or unavailable.
- **Strict JSON Parsing:** Ensure the local SLM output is consistently parsed and handled.

## 4. Modular Memory Ecosystem (EMUs)
- **Standardized Learning:** Create a clear pipeline for "Training" (building an EMU) vs "Learning" (appending to an active EMU).
- **Portable Artifacts:** Refine the export/import functionality to ensure EMUs can be easily shared via Git, IPFS, or Email, as envisioned.
- **Version Control:** Implement simple metadata-based versioning for EMUs to track knowledge updates.

---
*Signed, Jules (Software Engineer)*
51 changes: 47 additions & 4 deletions server/src/routes/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,12 @@ router.post('/emus/upload', emuUpload.single('file'), (req, res) => {
}

try {
const mount = memoryLayer.importEmuArchive(file.buffer, file.originalname);
res.json(mount);
memoryLayer.importEmuArchive(file.buffer, file.originalname).then((mount) => {
res.json(mount);
}).catch((error) => {
console.error('Failed to ingest EMU archive', error);
res.status(400).json({ error: 'Unable to import EMU archive' });
});
} catch (error) {
console.error('Failed to ingest EMU archive', error);
res.status(400).json({ error: 'Unable to import EMU archive' });
Expand Down Expand Up @@ -560,7 +564,7 @@ router.post('/chat', async (req, res) => {
}

const searchQuery = body.transformedQuery || body.message;
const relevantBlocks = memoryLayer.findRelevantBlocks(searchQuery, {
const relevantBlocks = await memoryLayer.findRelevantBlocks(searchQuery, {
intents: body.intent ? [body.intent] : undefined,
tags: body.tags
});
Expand All @@ -586,6 +590,45 @@ router.post('/chat', async (req, res) => {
}
});

router.post('/query', async (req, res) => {
const body = req.body as {
message?: string;
sessionId?: string;
limit?: number;
tags?: string[];
intent?: string;
};

if (!body?.message) {
return res.status(400).json({ error: 'Message (query) is required' });
}

try {
const limit = Math.max(1, Math.min(20, Number(body.limit) || 4));
const blocks = await memoryLayer.findRelevantBlocks(body.message, {
intents: body.intent ? [body.intent] : undefined,
tags: body.tags,
limit
});

res.json({
query: body.message,
sessionId: body.sessionId || 'anonymous',
blocks: blocks.map((b) => ({
id: b.id,
title: b.title,
content: b.content,
source: b.source,
score: b.score,
tags: b.tags
}))
});
} catch (error) {
console.error('Query error', error);
res.status(500).json({ error: 'Failed to perform memory query' });
}
});

router.post('/chat/remote', async (req, res) => {
const body = req.body as RouterRequestBody & {
transformedQuery?: string;
Expand All @@ -600,7 +643,7 @@ router.post('/chat/remote', async (req, res) => {
const sessionId = body.sessionId || 'default';

const searchQuery = body.transformedQuery || body.message;
const relevantBlocks = memoryLayer.findRelevantBlocks(searchQuery, {
const relevantBlocks = await memoryLayer.findRelevantBlocks(searchQuery, {
intents: body.intent ? [body.intent] : undefined,
tags: body.tags
});
Expand Down
117 changes: 108 additions & 9 deletions server/src/services/emuMemoryLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ interface MemoryLayerOptions {

type MetadataOverride = MemoryBlockUpdatePayload & { updatedAt?: string };

interface LanceDBModule {
connect: (uri: string) => Promise<any>;
}

export class EmuMemoryLayer {
private storePath: string;
private emuBasePath: string;
Expand All @@ -73,6 +77,9 @@ export class EmuMemoryLayer {
private index: MemoryIndex = { byIntent: {}, byTag: {} };
private hiddenBlockIds = new Set<string>();
private metadataOverrides: Record<string, MetadataOverride> = {};
private embeddingPipeline: Promise<any> | null = null;
private lanceModule: Promise<LanceDBModule> | null = null;
private activeConnections = new Map<string, any>();

constructor(options?: MemoryLayerOptions) {
this.storePath =
Expand All @@ -87,7 +94,8 @@ export class EmuMemoryLayer {
return [...this.emuMounts].sort((a, b) => a.name.localeCompare(b.name));
}

refreshEmuMounts() {
async refreshEmuMounts() {
this.activeConnections.clear();
this.loadEmuMounts();
this.rebuildIndex();
}
Expand Down Expand Up @@ -193,6 +201,41 @@ export class EmuMemoryLayer {
};
}

private async getEmbeddingPipeline() {
if (!this.embeddingPipeline) {
this.embeddingPipeline = import('@xenova/transformers').then(({ pipeline }) =>
pipeline('feature-extraction', 'Xenova/all-MiniLM-L6-v2')
);
}
return this.embeddingPipeline;
}

private async loadLance(): Promise<LanceDBModule> {
if (!this.lanceModule) {
this.lanceModule = import('@lancedb/lancedb') as unknown as Promise<LanceDBModule>;
}
return this.lanceModule;
}

private async getEmuConnection(emuId: string, emuPath: string) {
if (this.activeConnections.has(emuId)) {
return this.activeConnections.get(emuId);
}

const lancePath = path.join(emuPath, 'lancedb');
if (!fs.existsSync(lancePath)) return null;

try {
const lance = await this.loadLance();
const db = await lance.connect(lancePath);
this.activeConnections.set(emuId, db);
return db;
} catch (error) {
console.warn(`Failed to connect to LanceDB for EMU ${emuId}`, error);
return null;
}
}

addBlock(payload: NewMemoryBlockPayload): MemoryBlock {
const content = payload.content.trim();
if (!content) {
Expand Down Expand Up @@ -231,15 +274,60 @@ export class EmuMemoryLayer {
return block;
}

findRelevantBlocks(
async findRelevantBlocks(
query: string,
options?: { intents?: string[]; tags?: string[]; limit?: number }
): MemoryBlock[] {
): Promise<MemoryBlock[]> {
const limit = options?.limit || 4;
const tokens = this.tokenize(query);
const tagSet = new Set((options?.tags || []).map((tag) => tag.toLowerCase()));
const intentSet = new Set((options?.intents || []).map((intent) => intent.toLowerCase()));

const scored = this.getAllBlocks()
let vectorResults: MemoryBlock[] = [];

try {
const embeddingPipeline = await this.getEmbeddingPipeline();
const output = await embeddingPipeline(query, { pooling: 'mean', normalize: true });
const vector = Array.from(output.data as number[]);

const emuSearches = this.emuMounts.map(async (mount) => {
const db = await this.getEmuConnection(mount.id, mount.path);
if (!db) return [];

try {
const table = await db.openTable('chunks');
const results = await table.search(vector).limit(limit).execute();

return results.map((row: any) => ({
id: row.id,
title: mount.name,
content: row.text,
intent: 'memory',
tags: mount.tags,
source: `emu:${mount.id}`,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
summary: row.text.slice(0, 140),
score: 1 - (row._distance || 0),
size: Buffer.byteLength(row.text, 'utf-8')
} as MemoryBlock));
} catch (err) {
console.warn(`Vector search failed for EMU ${mount.id}`, err);
return [];
}
});

const emuBlocks = await Promise.all(emuSearches);
vectorResults = emuBlocks.flat();
} catch (error) {
console.error('Vector search initialization failed, falling back to keyword search', error);
}

const allBlocks = [...this.userBlocks, ...this.emuBlocks]
.filter((block) => !this.hiddenBlockIds.has(block.id))
.map((block) => this.applyOverrides(block));

const scored = allBlocks
.map((block) => {
const blockTags = block.tags.map((tag) => tag.toLowerCase());
const tagMatches = blockTags.filter((tag) => tokens.has(tag) || tagSet.has(tag)).length;
Expand All @@ -250,12 +338,23 @@ export class EmuMemoryLayer {

return { block, score };
})
.filter((entry) => entry.score > 0)
.filter((entry) => entry.score > 0);

const combined = [...vectorResults.map(b => ({ block: b, score: b.score * 2 })), ...scored]
.sort((a, b) => b.score - a.score)
.slice(0, options?.limit || 4)
.map((entry) => entry.block);

return scored;
const seenIds = new Set();
const unique = [];
for (const b of combined) {
if (!seenIds.has(b.id)) {
seenIds.add(b.id);
unique.push(b);
}
if (unique.length >= limit) break;
}

return unique;
}

exportEmuArchive(id: string): { filename: string; buffer: Buffer } {
Expand All @@ -278,7 +377,7 @@ export class EmuMemoryLayer {
return { filename: `${folderName}.zip`, buffer: zip.toBuffer() };
}

importEmuArchive(buffer: Buffer, originalName?: string): EmuMount {
async importEmuArchive(buffer: Buffer, originalName?: string): Promise<EmuMount> {
const zip = new AdmZip(buffer);
const entries = zip.getEntries();

Expand Down Expand Up @@ -330,7 +429,7 @@ export class EmuMemoryLayer {
fs.writeFileSync(destPath, entry.getData());
}

this.refreshEmuMounts();
await this.refreshEmuMounts();
const folderId = targetFolder.replace(/\.emu$/, '');
const mounted =
this.emuMounts.find((mount) => mount.id === folderId) ||
Expand Down