Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/debug/jtag/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ Cargo.lock
# Rust backup files
**/*.rs.bk
**/*.rs.bk.*
.fastembed_cache/
Binary file modified src/debug/jtag/docs/screenshots/livewidget-voice-call.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion src/debug/jtag/generated-command-schemas.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"generated": "2026-01-30T05:40:56.725Z",
"generated": "2026-01-30T23:05:41.816Z",
"version": "1.0.0",
"commands": [
{
Expand Down
4 changes: 2 additions & 2 deletions src/debug/jtag/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/debug/jtag/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@continuum/jtag",
"version": "1.0.7467",
"version": "1.0.7478",
"description": "Global CLI debugging system for any Node.js project. Install once globally, use anywhere: npm install -g @continuum/jtag",
"config": {
"active_example": "widget-ui",
Expand Down
7 changes: 7 additions & 0 deletions src/debug/jtag/shared/generated/persona/ActivityDomain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

/**
* Activity domain for channel routing.
* Each domain has one ChannelQueue. Items route to their domain's queue.
*/
export type ActivityDomain = "AUDIO" | "CHAT" | "BACKGROUND";
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

/**
* IPC request to enqueue any item type. Discriminated by `item_type` field.
*/
export type ChannelEnqueueRequest = { "item_type": "voice", id: string, room_id: string, content: string, sender_id: string, sender_name: string, sender_type: string, voice_session_id: string, timestamp: number, priority: number, } | { "item_type": "chat", id: string, room_id: string, content: string, sender_id: string, sender_name: string, sender_type: string, mentions: boolean, timestamp: number, priority: number, } | { "item_type": "task", id: string, task_id: string, assignee_id: string, created_by: string, task_domain: string, task_type: string, context_id: string, description: string, priority: number, status: string, timestamp: number, due_date: bigint | null, estimated_duration: bigint | null, depends_on: Array<string>, blocked_by: Array<string>, };
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ChannelStatus } from "./ChannelStatus";

/**
* Full channel registry status
*/
export type ChannelRegistryStatus = { channels: Array<ChannelStatus>, total_size: number, has_urgent_work: boolean, has_work: boolean, };
7 changes: 7 additions & 0 deletions src/debug/jtag/shared/generated/persona/ChannelStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ActivityDomain } from "./ActivityDomain";

/**
* Per-channel status snapshot
*/
export type ChannelStatus = { domain: ActivityDomain, size: number, has_urgent: boolean, has_work: boolean, };
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

/**
* Context from a prior message consolidated into this chat item.
*/
export type ConsolidatedContext = { sender_id: string, sender_name: string, content: string, timestamp: bigint, };
28 changes: 28 additions & 0 deletions src/debug/jtag/shared/generated/persona/ServiceCycleResult.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ActivityDomain } from "./ActivityDomain";
import type { ChannelRegistryStatus } from "./ChannelRegistryStatus";

/**
* Result from service_cycle() — what the TS loop should do next
*/
export type ServiceCycleResult = {
/**
* Should TS process an item?
*/
should_process: boolean,
/**
* The item to process (serialized). Null if should_process is false.
*/
item?: any,
/**
* Which domain the item came from
*/
channel?: ActivityDomain,
/**
* How long TS should sleep if no work (adaptive cadence from PersonaState)
*/
wait_ms: bigint,
/**
* Current channel sizes for monitoring
*/
stats: ChannelRegistryStatus, };
8 changes: 8 additions & 0 deletions src/debug/jtag/shared/generated/persona/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,11 @@ export type { PersonaState } from './PersonaState';
export type { CognitionDecision } from './CognitionDecision';
export type { PriorityScore } from './PriorityScore';
export type { PriorityFactors } from './PriorityFactors';

// Channel system types
export type { ActivityDomain } from './ActivityDomain';
export type { ChannelStatus } from './ChannelStatus';
export type { ChannelRegistryStatus } from './ChannelRegistryStatus';
export type { ChannelEnqueueRequest } from './ChannelEnqueueRequest';
export type { ServiceCycleResult } from './ServiceCycleResult';
export type { ConsolidatedContext } from './ConsolidatedContext';
2 changes: 1 addition & 1 deletion src/debug/jtag/shared/version.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
* DO NOT EDIT MANUALLY
*/

export const VERSION = '1.0.7467';
export const VERSION = '1.0.7478';
export const PACKAGE_NAME = '@continuum/jtag';
201 changes: 57 additions & 144 deletions src/debug/jtag/system/rag/sources/GlobalAwarenessSource.ts
Original file line number Diff line number Diff line change
@@ -1,98 +1,55 @@
/**
* GlobalAwarenessSource - Injects cross-context awareness into RAG
* GlobalAwarenessSource - Injects cross-context awareness into RAG via Rust IPC
*
* This is the bridge between UnifiedConsciousness and the RAG pipeline.
* It provides the persona with:
* - Temporal continuity (what was I doing before?)
* - Cross-context knowledge (relevant info from other rooms)
* - Active intentions/goals
* - Peripheral awareness (what's happening elsewhere)
* Delegates to Rust's consciousness context builder which runs:
* - Temporal continuity queries (what was I doing before?)
* - Cross-context event aggregation (what happened in other rooms?)
* - Active intention detection (interrupted tasks)
* - Peripheral activity check
*
* Priority 85 - After identity (95), before conversation history (80).
* This ensures the persona knows WHO they are first, then WHERE they've been,
* then WHAT's been said in this room.
* All queries run concurrently in Rust with separate SQLite read connections,
* bypassing the Node.js event loop entirely.
*
* Previous implementation used TS UnifiedConsciousness through the event loop
* (3-60s under load, frequently timing out).
*
* PERFORMANCE: Caches consciousness context per persona+room for 30 seconds
* to reduce DB query load when multiple personas process messages concurrently.
* Priority 85 - After identity (95), before conversation history (80).
*/

import type { RAGSource, RAGSourceContext, RAGSection } from '../shared/RAGSource';
import { Logger } from '../../core/logging/Logger';
import {
UnifiedConsciousness,
formatConsciousnessForPrompt,
type ConsciousnessContext
} from '../../user/server/modules/consciousness/UnifiedConsciousness';

const log = Logger.create('GlobalAwarenessSource', 'rag');

/**
* Cache entry for consciousness context
*/
interface CachedContext {
context: ConsciousnessContext;
formattedPrompt: string | undefined;
cachedAt: number;
}

/**
* Cache TTL in milliseconds (30 seconds)
* Cross-context awareness doesn't change rapidly, so caching is safe
*/
const CACHE_TTL_MS = 30_000;

/**
* Cache for consciousness contexts by persona+room key
* Key format: `${personaId}:${roomId}`
*/
const contextCache = new Map<string, CachedContext>();

/**
* Registry to store UnifiedConsciousness instances by personaId
* This allows the RAG source to access the consciousness for any persona
* Registry for consciousness instances — kept for backward compatibility.
* The actual consciousness context is now built in Rust via IPC,
* but we still need the registry to check if a persona has been initialized.
*/
const consciousnessRegistry = new Map<string, UnifiedConsciousness>();
const initializedPersonas = new Set<string>();

/**
* Clear expired cache entries
* Register a persona as having consciousness initialized.
* Called during PersonaUser startup after memory/init succeeds.
*/
function clearExpiredCache(): void {
const now = Date.now();
for (const [key, entry] of contextCache) {
if (now - entry.cachedAt > CACHE_TTL_MS) {
contextCache.delete(key);
}
}
}

/**
* Get cache key for persona+room
*/
function getCacheKey(personaId: string, roomId: string): string {
return `${personaId}:${roomId}`;
}

/**
* Register a persona's consciousness for RAG access
*/
export function registerConsciousness(personaId: string, consciousness: UnifiedConsciousness): void {
consciousnessRegistry.set(personaId, consciousness);
export function registerConsciousness(personaId: string, _consciousness?: any): void {
initializedPersonas.add(personaId);
log.debug(`Registered consciousness for persona ${personaId}`);
}

/**
* Unregister a persona's consciousness
*/
export function unregisterConsciousness(personaId: string): void {
consciousnessRegistry.delete(personaId);
initializedPersonas.delete(personaId);
log.debug(`Unregistered consciousness for persona ${personaId}`);
}

/**
* Get a persona's consciousness
* Check if a persona has consciousness registered
*/
export function getConsciousness(personaId: string): UnifiedConsciousness | undefined {
return consciousnessRegistry.get(personaId);
export function getConsciousness(personaId: string): boolean {
return initializedPersonas.has(personaId);
}

export class GlobalAwarenessSource implements RAGSource {
Expand All @@ -101,104 +58,71 @@ export class GlobalAwarenessSource implements RAGSource {
readonly defaultBudgetPercent = 10;

isApplicable(context: RAGSourceContext): boolean {
// Applicable if we have consciousness registered for this persona
const hasConsciousness = consciousnessRegistry.has(context.personaId);
console.log(`[GlobalAwarenessSource] isApplicable: personaId=${context.personaId}, has=${hasConsciousness}, registrySize=${consciousnessRegistry.size}`);
return hasConsciousness;
return initializedPersonas.has(context.personaId);
}

async load(context: RAGSourceContext, _allocatedBudget: number): Promise<RAGSection> {
const startTime = performance.now();

try {
const consciousness = consciousnessRegistry.get(context.personaId);
// Get PersonaUser to access Rust bridge
const { UserDaemonServer } = await import('../../../daemons/user-daemon/server/UserDaemonServer');
const userDaemon = UserDaemonServer.getInstance();

if (!consciousness) {
log.debug(`No consciousness found for persona ${context.personaId}`);
if (!userDaemon) {
log.debug('UserDaemon not available, skipping awareness');
return this.emptySection(startTime);
}

// Check cache first (reduces DB queries from ~4 per request to ~4 per 30s)
const cacheKey = getCacheKey(context.personaId, context.roomId);
const cached = contextCache.get(cacheKey);
const now = Date.now();

if (cached && now - cached.cachedAt < CACHE_TTL_MS) {
// Cache hit - return cached result immediately (no logging to avoid overhead)
const loadTimeMs = performance.now() - startTime;

if (!cached.formattedPrompt) {
return this.emptySection(startTime);
}

return {
sourceName: this.name,
tokenCount: this.estimateTokens(cached.formattedPrompt),
loadTimeMs,
systemPromptSection: cached.formattedPrompt,
metadata: { ...this.buildMetadata(cached.context), cached: true }
};
const personaUser = userDaemon.getPersonaUser(context.personaId);
if (!personaUser) {
return this.emptySection(startTime);
}

// Clear expired entries periodically
if (contextCache.size > 50) {
clearExpiredCache();
// Access the Rust cognition bridge (nullable getter — no throw)
const bridge = (personaUser as any).rustCognitionBridge;
if (!bridge) {
log.debug('Rust cognition bridge not available, skipping awareness');
return this.emptySection(startTime);
}

// Cache miss - fetch consciousness context
const currentMessage = context.options.currentMessage?.content;

// Detect voice mode - skip expensive semantic search for faster response
// Detect voice mode — skip expensive semantic search for faster response
const voiceSessionId = (context.options as any)?.voiceSessionId;
const isVoiceMode = !!voiceSessionId;
if (isVoiceMode) {
log.debug(`VOICE MODE detected - skipping semantic search for faster response`);
}

// TIMEOUT: GlobalAwarenessSource was taking 60+ seconds without this!
// The consciousness.getContext() calls multiple DB queries that can hang
// under lock contention when multiple personas respond concurrently.
const CONSCIOUSNESS_TIMEOUT_MS = 3000; // 3 second hard limit
const currentMessage = context.options.currentMessage?.content;

const contextPromise = consciousness.getContext(
// Single IPC call → Rust builds consciousness context with concurrent SQLite reads
// Rust handles its own 30s TTL cache internally
const result = await bridge.memoryConsciousnessContext(
context.roomId,
currentMessage,
{ skipSemanticSearch: isVoiceMode } // Skip slow embedding search for voice
isVoiceMode // skipSemanticSearch
);

const timeoutPromise = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Consciousness context timeout')), CONSCIOUSNESS_TIMEOUT_MS)
);

// Build consciousness context with timeout (fast path for voice mode)
const consciousnessContext = await Promise.race([contextPromise, timeoutPromise]);

// Format for prompt injection
const systemPromptSection = formatConsciousnessForPrompt(consciousnessContext);

// Cache the result
contextCache.set(cacheKey, {
context: consciousnessContext,
formattedPrompt: systemPromptSection,
cachedAt: now
});

if (!systemPromptSection) {
log.debug(`Cache miss, no cross-context content for room ${context.roomId}`);
if (!result.formatted_prompt) {
log.debug(`No cross-context content for room ${context.roomId}`);
return this.emptySection(startTime);
}

const loadTimeMs = performance.now() - startTime;
const tokenCount = this.estimateTokens(systemPromptSection);
const tokenCount = this.estimateTokens(result.formatted_prompt);

log.debug(`Loaded global awareness in ${loadTimeMs.toFixed(1)}ms (${tokenCount} tokens)`);
log.debug(`Loaded global awareness in ${loadTimeMs.toFixed(1)}ms (${tokenCount} tokens) rust=${result.build_time_ms.toFixed(1)}ms`);

return {
sourceName: this.name,
tokenCount,
loadTimeMs,
systemPromptSection,
metadata: this.buildMetadata(consciousnessContext)
systemPromptSection: result.formatted_prompt,
metadata: {
crossContextEventCount: result.cross_context_event_count,
activeIntentionCount: result.active_intention_count,
hasPeripheralActivity: result.has_peripheral_activity,
wasInterrupted: result.temporal.was_interrupted,
lastActiveContext: result.temporal.last_active_context_name,
rustBuildMs: result.build_time_ms
}
};

} catch (error: any) {
Expand All @@ -225,17 +149,6 @@ export class GlobalAwarenessSource implements RAGSource {
};
}

private buildMetadata(ctx: ConsciousnessContext): Record<string, unknown> {
return {
crossContextEventCount: ctx.crossContext.relevantEvents.length,
activeIntentionCount: ctx.intentions.active.length,
relevantIntentionCount: ctx.intentions.relevantHere.length,
hasPeripheralActivity: ctx.crossContext.peripheralSummary !== 'Other contexts: Quiet',
wasInterrupted: ctx.temporal.wasInterrupted,
lastActiveContext: ctx.temporal.lastActiveContextName
};
}

private estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
Expand Down
Loading
Loading