From 4eb381ef0fbc585b82f1566c2c61d2230ffdfc7a Mon Sep 17 00:00:00 2001 From: Milo Antaeus Date: Tue, 10 Feb 2026 13:35:48 -0800 Subject: [PATCH 1/4] Fix: auto-retry gateway connection on disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the gateway is busy processing cron jobs (~60% of the time), the initial WebSocket handshake times out and the Studio shows "No agents available" with no recovery. Add exponential backoff retry (2s → 30s, up to 20 attempts) that automatically reconnects when the gateway becomes available. Auth errors are not retried. Manual disconnect suppresses auto-retry. Co-Authored-By: Claude Opus 4.6 --- src/lib/gateway/GatewayClient.ts | 60 ++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/lib/gateway/GatewayClient.ts b/src/lib/gateway/GatewayClient.ts index 05f1b695..bd5576b9 100644 --- a/src/lib/gateway/GatewayClient.ts +++ b/src/lib/gateway/GatewayClient.ts @@ -326,12 +326,31 @@ type StudioSettingsCoordinatorLike = { flushPending: () => Promise; }; +const isAuthError = (errorMessage: string | null): boolean => { + if (!errorMessage) return false; + const lower = errorMessage.toLowerCase(); + return ( + lower.includes("auth") || + lower.includes("unauthorized") || + lower.includes("forbidden") || + lower.includes("invalid token") || + lower.includes("token required") + ); +}; + +const MAX_AUTO_RETRY_ATTEMPTS = 20; +const INITIAL_RETRY_DELAY_MS = 2_000; +const MAX_RETRY_DELAY_MS = 30_000; + export const useGatewayConnection = ( settingsCoordinator: StudioSettingsCoordinatorLike ): GatewayConnectionState => { const [client] = useState(() => new GatewayClient()); const didAutoConnect = useRef(false); const loadedGatewaySettings = useRef<{ gatewayUrl: string; token: string } | null>(null); + const retryAttemptRef = useRef(0); + const retryTimerRef = useRef | null>(null); + const wasManualDisconnectRef = useRef(false); const [gatewayUrl, setGatewayUrl] = useState(DEFAULT_GATEWAY_URL); const [token, setToken] = useState(""); @@ -387,14 +406,20 @@ export const useGatewayConnection = ( useEffect(() => { return () => { + if (retryTimerRef.current) { + clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } client.disconnect(); }; }, [client]); const connect = useCallback(async () => { setError(null); + wasManualDisconnectRef.current = false; try { await client.connect({ gatewayUrl, token }); + retryAttemptRef.current = 0; } catch (err) { setError(formatGatewayError(err)); } @@ -408,6 +433,40 @@ export const useGatewayConnection = ( void connect(); }, [connect, gatewayUrl, settingsLoaded]); + // Auto-retry on disconnect (gateway busy, network blip, etc.) + useEffect(() => { + if (status !== "disconnected") return; + if (!didAutoConnect.current) return; + if (wasManualDisconnectRef.current) return; + if (!gatewayUrl.trim()) return; + if (isAuthError(error)) return; + if (retryAttemptRef.current >= MAX_AUTO_RETRY_ATTEMPTS) return; + + const attempt = retryAttemptRef.current; + const delay = Math.min( + INITIAL_RETRY_DELAY_MS * Math.pow(1.5, attempt), + MAX_RETRY_DELAY_MS + ); + retryTimerRef.current = setTimeout(() => { + retryAttemptRef.current = attempt + 1; + void connect(); + }, delay); + + return () => { + if (retryTimerRef.current) { + clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } + }; + }, [connect, error, gatewayUrl, status]); + + // Reset retry count on successful connection + useEffect(() => { + if (status === "connected") { + retryAttemptRef.current = 0; + } + }, [status]); + useEffect(() => { if (!settingsLoaded) return; const baseline = loadedGatewaySettings.current; @@ -429,6 +488,7 @@ export const useGatewayConnection = ( const disconnect = useCallback(() => { setError(null); + wasManualDisconnectRef.current = true; client.disconnect(); }, [client]); From 727321ff672c7f1243d9d074e6d3396821ea52e1 Mon Sep 17 00:00:00 2001 From: Milo Antaeus Date: Tue, 10 Feb 2026 17:03:40 -0800 Subject: [PATCH 2/4] Add real-time observability dashboard at /observe New standalone page that subscribes to ALL gateway events (including cron jobs and isolated sessions) without session-key filtering. Provides live activity feed, session status cards with origin badges, and intervention alerts for errors. Co-Authored-By: Claude Opus 4.6 --- src/app/observe/page.tsx | 197 ++++++++++++++++++ .../observe/components/ActivityFeed.tsx | 69 ++++++ .../observe/components/ActivityFeedEntry.tsx | 62 ++++++ .../observe/components/InterventionAlerts.tsx | 55 +++++ .../observe/components/ObserveHeaderBar.tsx | 87 ++++++++ .../observe/components/SessionCard.tsx | 105 ++++++++++ .../observe/components/SessionOverview.tsx | 57 +++++ .../observe/state/observeEventHandler.ts | 169 +++++++++++++++ src/features/observe/state/reducer.ts | 131 ++++++++++++ src/features/observe/state/types.ts | 44 ++++ 10 files changed, 976 insertions(+) create mode 100644 src/app/observe/page.tsx create mode 100644 src/features/observe/components/ActivityFeed.tsx create mode 100644 src/features/observe/components/ActivityFeedEntry.tsx create mode 100644 src/features/observe/components/InterventionAlerts.tsx create mode 100644 src/features/observe/components/ObserveHeaderBar.tsx create mode 100644 src/features/observe/components/SessionCard.tsx create mode 100644 src/features/observe/components/SessionOverview.tsx create mode 100644 src/features/observe/state/observeEventHandler.ts create mode 100644 src/features/observe/state/reducer.ts create mode 100644 src/features/observe/state/types.ts diff --git a/src/app/observe/page.tsx b/src/app/observe/page.tsx new file mode 100644 index 00000000..d7bbd327 --- /dev/null +++ b/src/app/observe/page.tsx @@ -0,0 +1,197 @@ +"use client"; + +import { useCallback, useEffect, useReducer, useRef, useState } from "react"; +import { createStudioSettingsCoordinator } from "@/lib/studio/coordinator"; +import { + useGatewayConnection, + parseAgentIdFromSessionKey, +} from "@/lib/gateway/GatewayClient"; +import type { EventFrame } from "@/lib/gateway/GatewayClient"; +import { createRafBatcher } from "@/lib/dom"; +import { mapEventFrameToEntry } from "@/features/observe/state/observeEventHandler"; +import { observeReducer, initialObserveState } from "@/features/observe/state/reducer"; +import type { SessionStatus } from "@/features/observe/state/types"; +import { ObserveHeaderBar } from "@/features/observe/components/ObserveHeaderBar"; +import { SessionOverview } from "@/features/observe/components/SessionOverview"; +import { ActivityFeed } from "@/features/observe/components/ActivityFeed"; +import { InterventionAlerts } from "@/features/observe/components/InterventionAlerts"; + +type SessionsListResult = { + sessions: Array<{ + key: string; + agentId?: string; + displayName?: string; + origin?: { label?: string }; + updatedAt?: number; + }>; +}; + +const inferOrigin = (label?: string): SessionStatus["origin"] => { + if (!label) return "unknown"; + const lower = label.toLowerCase(); + if (lower.includes("cron") || lower.includes("isolated")) return "cron"; + if (lower.includes("heartbeat")) return "heartbeat"; + if (lower.includes("interactive") || lower.includes("main")) return "interactive"; + return "unknown"; +}; + +export default function ObservePage() { + const [settingsCoordinator] = useState(() => createStudioSettingsCoordinator()); + const { client, status } = useGatewayConnection(settingsCoordinator); + const [state, dispatch] = useReducer(observeReducer, initialObserveState); + const [selectedSession, setSelectedSession] = useState(null); + + const pendingEntriesRef = useRef[]>([]); + + // Subscribe to ALL gateway events with RAF batching + useEffect(() => { + const batcher = createRafBatcher(() => { + const pending = pendingEntriesRef.current; + if (pending.length === 0) return; + pendingEntriesRef.current = []; + const valid = pending.filter( + (e): e is NonNullable => e !== null + ); + if (valid.length > 0) { + dispatch({ type: "pushEntries", entries: valid }); + } + }); + + const unsubscribe = client.onEvent((event: EventFrame) => { + const entry = mapEventFrameToEntry(event); + if (entry) { + pendingEntriesRef.current.push(entry); + batcher.schedule(); + } + }); + return () => { + unsubscribe(); + batcher.cancel(); + }; + }, [client]); + + // Discover sessions on connect + useEffect(() => { + if (status !== "connected") return; + let cancelled = false; + + const loadSessions = async () => { + try { + const result = await client.call("sessions.list", { + includeGlobal: true, + includeUnknown: true, + limit: 200, + }); + if (cancelled) return; + const sessions: SessionStatus[] = (result.sessions ?? []).map((s) => ({ + sessionKey: s.key, + agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), + displayName: s.displayName ?? s.agentId ?? null, + origin: inferOrigin(s.origin?.label), + status: "idle" as const, + lastActivityAt: s.updatedAt ?? null, + currentToolName: null, + lastError: null, + eventCount: 0, + })); + dispatch({ type: "hydrateSessions", sessions }); + } catch (err) { + console.warn("[observe] Failed to load sessions:", err); + } + }; + + void loadSessions(); + return () => { + cancelled = true; + }; + }, [client, status]); + + // Refresh sessions on presence events (throttled) + const refreshTimerRef = useRef | null>(null); + useEffect(() => { + if (status !== "connected") return; + + const unsubscribe = client.onEvent((event: EventFrame) => { + if (event.event !== "presence") return; + if (refreshTimerRef.current) return; + refreshTimerRef.current = setTimeout(async () => { + refreshTimerRef.current = null; + try { + const result = await client.call("sessions.list", { + includeGlobal: true, + includeUnknown: true, + limit: 200, + }); + const sessions: SessionStatus[] = (result.sessions ?? []).map((s) => ({ + sessionKey: s.key, + agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), + displayName: s.displayName ?? s.agentId ?? null, + origin: inferOrigin(s.origin?.label), + status: "idle" as const, + lastActivityAt: s.updatedAt ?? null, + currentToolName: null, + lastError: null, + eventCount: 0, + })); + dispatch({ type: "hydrateSessions", sessions }); + } catch { + // ignore refresh failures + } + }, 2000); + }); + + return () => { + unsubscribe(); + if (refreshTimerRef.current) { + clearTimeout(refreshTimerRef.current); + refreshTimerRef.current = null; + } + }; + }, [client, status]); + + const handleTogglePause = useCallback(() => { + dispatch({ type: "togglePause" }); + }, []); + + const handleClear = useCallback(() => { + dispatch({ type: "clearLog" }); + }, []); + + const handleSelectSession = useCallback((sessionKey: string | null) => { + setSelectedSession(sessionKey); + }, []); + + return ( +
+ + + + +
+ {/* Session sidebar */} +
+ +
+ + {/* Activity feed */} +
+ +
+
+
+ ); +} diff --git a/src/features/observe/components/ActivityFeed.tsx b/src/features/observe/components/ActivityFeed.tsx new file mode 100644 index 00000000..65082058 --- /dev/null +++ b/src/features/observe/components/ActivityFeed.tsx @@ -0,0 +1,69 @@ +"use client"; + +import { useCallback, useEffect, useRef } from "react"; +import { isNearBottom } from "@/lib/dom"; +import type { ObserveEntry } from "../state/types"; +import { ActivityFeedEntry } from "./ActivityFeedEntry"; + +type ActivityFeedProps = { + entries: ObserveEntry[]; + sessionFilter: string | null; +}; + +export const ActivityFeed = ({ entries, sessionFilter }: ActivityFeedProps) => { + const containerRef = useRef(null); + const shouldAutoScroll = useRef(true); + + const filtered = sessionFilter + ? entries.filter((e) => e.sessionKey === sessionFilter) + : entries; + + const handleScroll = useCallback(() => { + const el = containerRef.current; + if (!el) return; + shouldAutoScroll.current = isNearBottom({ + scrollTop: el.scrollTop, + scrollHeight: el.scrollHeight, + clientHeight: el.clientHeight, + }); + }, []); + + useEffect(() => { + const el = containerRef.current; + if (!el || !shouldAutoScroll.current) return; + el.scrollTop = el.scrollHeight; + }, [filtered.length]); + + return ( +
+
+

+ Activity Feed + {sessionFilter && ( + + ({sessionFilter.slice(0, 20)}) + + )} +

+ + {filtered.length} events + +
+
+ {filtered.length === 0 ? ( +
+ Waiting for events... +
+ ) : ( + filtered.map((entry) => ( + + )) + )} +
+
+ ); +}; diff --git a/src/features/observe/components/ActivityFeedEntry.tsx b/src/features/observe/components/ActivityFeedEntry.tsx new file mode 100644 index 00000000..c71d496c --- /dev/null +++ b/src/features/observe/components/ActivityFeedEntry.tsx @@ -0,0 +1,62 @@ +import type { ObserveEntry } from "../state/types"; + +const formatTime = (ts: number): string => { + const d = new Date(ts); + const h = String(d.getHours()).padStart(2, "0"); + const m = String(d.getMinutes()).padStart(2, "0"); + const s = String(d.getSeconds()).padStart(2, "0"); + const ms = String(d.getMilliseconds()).padStart(3, "0"); + return `${h}:${m}:${s}.${ms}`; +}; + +const severityClass: Record = { + info: "text-muted-foreground", + warn: "text-amber-400", + error: "text-red-400", +}; + +const streamLabel = (entry: ObserveEntry): string => { + if (entry.eventType === "heartbeat") return "heartbeat"; + if (entry.eventType === "presence") return "presence"; + if (entry.eventType === "chat") { + return entry.chatState ? `chat:${entry.chatState}` : "chat"; + } + if (entry.stream === "tool") { + const name = entry.toolName ?? "?"; + const phase = entry.toolPhase ? `:${entry.toolPhase}` : ""; + return `tool${phase} ${name}`; + } + if (entry.stream === "lifecycle") { + return `lifecycle:${entry.text ?? "?"}`; + } + if (entry.stream) return entry.stream; + return entry.eventType; +}; + +type ActivityFeedEntryProps = { + entry: ObserveEntry; +}; + +export const ActivityFeedEntry = ({ entry }: ActivityFeedEntryProps) => { + const agentLabel = entry.agentId ?? entry.sessionKey?.slice(0, 12) ?? "-"; + + return ( +
+ {formatTime(entry.timestamp)} + + {agentLabel} + + + {streamLabel(entry)} + + {entry.errorMessage && ( + {entry.errorMessage} + )} + {!entry.errorMessage && entry.text && ( + {entry.text} + )} +
+ ); +}; diff --git a/src/features/observe/components/InterventionAlerts.tsx b/src/features/observe/components/InterventionAlerts.tsx new file mode 100644 index 00000000..44c9c0ad --- /dev/null +++ b/src/features/observe/components/InterventionAlerts.tsx @@ -0,0 +1,55 @@ +import type { ObserveEntry } from "../state/types"; + +type InterventionAlertsProps = { + entries: ObserveEntry[]; +}; + +const MAX_ALERTS = 10; + +const formatTime = (ts: number): string => { + const d = new Date(ts); + const h = String(d.getHours()).padStart(2, "0"); + const m = String(d.getMinutes()).padStart(2, "0"); + const s = String(d.getSeconds()).padStart(2, "0"); + return `${h}:${m}:${s}`; +}; + +export const InterventionAlerts = ({ entries }: InterventionAlertsProps) => { + const errors = entries.filter((e) => e.severity === "error"); + if (errors.length === 0) return null; + + const recent = errors.slice(-MAX_ALERTS).reverse(); + const hiddenCount = errors.length > MAX_ALERTS ? errors.length - MAX_ALERTS : 0; + + return ( +
+
+

+ Intervention Needed +

+ + {errors.length} error{errors.length !== 1 ? "s" : ""} + {hiddenCount > 0 ? ` (+${hiddenCount} hidden)` : ""} + +
+
+ {recent.map((entry) => ( +
+ + {formatTime(entry.timestamp)} + + + {entry.agentId ?? entry.sessionKey?.slice(0, 12) ?? "-"} + + + {entry.errorMessage ?? entry.text ?? "Unknown error"} + +
+ ))} +
+
+ ); +}; diff --git a/src/features/observe/components/ObserveHeaderBar.tsx b/src/features/observe/components/ObserveHeaderBar.tsx new file mode 100644 index 00000000..21583a78 --- /dev/null +++ b/src/features/observe/components/ObserveHeaderBar.tsx @@ -0,0 +1,87 @@ +import Link from "next/link"; +import type { GatewayStatus } from "@/lib/gateway/GatewayClient"; + +type ObserveHeaderBarProps = { + status: GatewayStatus; + paused: boolean; + entryCount: number; + interventionCount: number; + onTogglePause: () => void; + onClear: () => void; +}; + +const statusStyles: Record = { + disconnected: { + label: "Disconnected", + className: "border border-border/70 bg-muted text-muted-foreground", + }, + connecting: { + label: "Connecting", + className: "border border-border/70 bg-secondary text-secondary-foreground", + }, + connected: { + label: "Connected", + className: "border border-primary/30 bg-primary/15 text-foreground", + }, +}; + +export const ObserveHeaderBar = ({ + status, + paused, + entryCount, + interventionCount, + onTogglePause, + onClear, +}: ObserveHeaderBarProps) => { + const statusConfig = statusStyles[status]; + + return ( +
+
+
+

+ Milo Observe +

+

+ Real-time gateway event monitor +

+
+ + {statusConfig.label} + + {interventionCount > 0 && ( + + {interventionCount} error{interventionCount !== 1 ? "s" : ""} + + )} +
+
+ + {entryCount} events + + + + + Studio + +
+
+ ); +}; diff --git a/src/features/observe/components/SessionCard.tsx b/src/features/observe/components/SessionCard.tsx new file mode 100644 index 00000000..0a4f6dff --- /dev/null +++ b/src/features/observe/components/SessionCard.tsx @@ -0,0 +1,105 @@ +import type { SessionStatus } from "../state/types"; + +type SessionCardProps = { + session: SessionStatus; + isSelected: boolean; + onSelect: (sessionKey: string | null) => void; +}; + +const statusStyles: Record = { + idle: { + label: "Idle", + className: "border-border/50 bg-muted/30", + }, + running: { + label: "Running", + className: "border-primary/40 bg-primary/5 shadow-[0_0_8px_rgba(var(--primary-rgb,100,100,255),0.15)]", + }, + error: { + label: "Error", + className: "border-red-500/40 bg-red-500/5", + }, +}; + +const originBadge: Record = { + interactive: { + label: "Interactive", + className: "bg-blue-500/15 text-blue-400", + }, + cron: { + label: "Cron", + className: "bg-amber-500/15 text-amber-400", + }, + heartbeat: { + label: "Heartbeat", + className: "bg-muted text-muted-foreground", + }, + unknown: { + label: "Unknown", + className: "bg-muted text-muted-foreground", + }, +}; + +const formatRelativeTime = (ts: number | null): string => { + if (!ts) return "-"; + const diff = Date.now() - ts; + if (diff < 1000) return "now"; + if (diff < 60_000) return `${Math.floor(diff / 1000)}s ago`; + if (diff < 3_600_000) return `${Math.floor(diff / 60_000)}m ago`; + return `${Math.floor(diff / 3_600_000)}h ago`; +}; + +export const SessionCard = ({ session, isSelected, onSelect }: SessionCardProps) => { + const statusConfig = statusStyles[session.status]; + const originConfig = originBadge[session.origin]; + const displayName = session.displayName ?? session.agentId ?? session.sessionKey.slice(0, 16); + + return ( + + ); +}; diff --git a/src/features/observe/components/SessionOverview.tsx b/src/features/observe/components/SessionOverview.tsx new file mode 100644 index 00000000..78caaa9e --- /dev/null +++ b/src/features/observe/components/SessionOverview.tsx @@ -0,0 +1,57 @@ +import type { SessionStatus } from "../state/types"; +import { SessionCard } from "./SessionCard"; + +type SessionOverviewProps = { + sessions: SessionStatus[]; + selectedSession: string | null; + onSelectSession: (sessionKey: string | null) => void; +}; + +const statusOrder: Record = { + error: 0, + running: 1, + idle: 2, +}; + +export const SessionOverview = ({ + sessions, + selectedSession, + onSelectSession, +}: SessionOverviewProps) => { + const sorted = [...sessions].sort((a, b) => { + const statusDiff = statusOrder[a.status] - statusOrder[b.status]; + if (statusDiff !== 0) return statusDiff; + return (b.lastActivityAt ?? 0) - (a.lastActivityAt ?? 0); + }); + + return ( +
+
+

+ Sessions +

+ + {sessions.length} + +
+
+ {sorted.length === 0 ? ( +
+ No sessions +
+ ) : ( +
+ {sorted.map((session) => ( + + ))} +
+ )} +
+
+ ); +}; diff --git a/src/features/observe/state/observeEventHandler.ts b/src/features/observe/state/observeEventHandler.ts new file mode 100644 index 00000000..6c6ed943 --- /dev/null +++ b/src/features/observe/state/observeEventHandler.ts @@ -0,0 +1,169 @@ +import type { EventFrame } from "@/lib/gateway/GatewayClient"; +import { parseAgentIdFromSessionKey } from "@/lib/gateway/GatewayClient"; +import { classifyGatewayEventKind } from "@/features/agents/state/runtimeEventBridge"; +import type { ChatEventPayload, AgentEventPayload } from "@/features/agents/state/runtimeEventBridge"; +import type { ObserveEntry } from "./types"; + +let entryCounter = 0; + +const nextId = (): string => { + entryCounter += 1; + return `obs-${entryCounter}`; +}; + +const truncate = (text: string | null | undefined, maxLen: number = 200): string | null => { + if (!text) return null; + if (text.length <= maxLen) return text; + return text.slice(0, maxLen) + "..."; +}; + +const extractText = (message: unknown): string | null => { + if (!message || typeof message !== "object") return null; + const record = message as Record; + if (typeof record.content === "string") return record.content; + if (typeof record.text === "string") return record.text; + return null; +}; + +const mapChatEvent = (payload: ChatEventPayload, timestamp: number): ObserveEntry => { + const agentId = payload.sessionKey + ? parseAgentIdFromSessionKey(payload.sessionKey) + : null; + + const isError = payload.state === "error" || payload.state === "aborted"; + const text = payload.errorMessage || extractText(payload.message); + + return { + id: nextId(), + timestamp, + eventType: "chat", + sessionKey: payload.sessionKey ?? null, + agentId, + runId: payload.runId ?? null, + stream: null, + toolName: null, + toolPhase: null, + chatState: payload.state ?? null, + errorMessage: isError ? (payload.errorMessage ?? "Chat error") : null, + text: truncate(text), + severity: isError ? "error" : "info", + }; +}; + +const mapAgentEvent = (payload: AgentEventPayload, timestamp: number): ObserveEntry => { + const sessionKey = payload.sessionKey ?? null; + const agentId = sessionKey ? parseAgentIdFromSessionKey(sessionKey) : null; + const stream = payload.stream ?? null; + const data = payload.data ?? {}; + + let toolName: string | null = null; + let toolPhase: string | null = null; + let text: string | null = null; + let errorMessage: string | null = null; + let severity: ObserveEntry["severity"] = "info"; + + if (stream === "lifecycle") { + const phase = typeof data.phase === "string" ? data.phase : null; + text = phase; + if (phase === "error") { + severity = "error"; + errorMessage = typeof data.error === "string" ? data.error : "Lifecycle error"; + } + } else if (stream === "tool") { + toolName = typeof data.name === "string" ? data.name : null; + toolPhase = typeof data.phase === "string" ? data.phase : null; + if (typeof data.error === "string") { + severity = "error"; + errorMessage = data.error; + } + if (typeof data.result === "string") { + text = truncate(data.result); + } else if (typeof data.args === "string") { + text = truncate(data.args); + } else if (data.args && typeof data.args === "object") { + try { + text = truncate(JSON.stringify(data.args)); + } catch { + text = null; + } + } + } else if (stream === "assistant") { + const raw = typeof data.text === "string" ? data.text : null; + text = truncate(raw); + } else { + // reasoning or other streams + const raw = typeof data.text === "string" ? data.text : null; + text = truncate(raw); + } + + return { + id: nextId(), + timestamp, + eventType: "agent", + sessionKey, + agentId, + runId: payload.runId ?? null, + stream, + toolName, + toolPhase, + chatState: null, + errorMessage, + text, + severity, + }; +}; + +export const mapEventFrameToEntry = (event: EventFrame): ObserveEntry | null => { + const timestamp = Date.now(); + const kind = classifyGatewayEventKind(event.event); + + if (kind === "runtime-chat") { + const payload = event.payload as ChatEventPayload | undefined; + if (!payload) return null; + return mapChatEvent(payload, timestamp); + } + + if (kind === "runtime-agent") { + const payload = event.payload as AgentEventPayload | undefined; + if (!payload) return null; + return mapAgentEvent(payload, timestamp); + } + + if (event.event === "heartbeat") { + return { + id: nextId(), + timestamp, + eventType: "heartbeat", + sessionKey: null, + agentId: null, + runId: null, + stream: null, + toolName: null, + toolPhase: null, + chatState: null, + errorMessage: null, + text: null, + severity: "info", + }; + } + + if (event.event === "presence") { + return { + id: nextId(), + timestamp, + eventType: "presence", + sessionKey: null, + agentId: null, + runId: null, + stream: null, + toolName: null, + toolPhase: null, + chatState: null, + errorMessage: null, + text: null, + severity: "info", + }; + } + + return null; +}; diff --git a/src/features/observe/state/reducer.ts b/src/features/observe/state/reducer.ts new file mode 100644 index 00000000..d432e92c --- /dev/null +++ b/src/features/observe/state/reducer.ts @@ -0,0 +1,131 @@ +import type { ObserveAction, ObserveEntry, ObserveState, SessionStatus } from "./types"; +import { MAX_ENTRIES } from "./types"; + +export const initialObserveState: ObserveState = { + entries: [], + sessions: [], + interventionCount: 0, + paused: false, +}; + +const updateSessionsFromEntries = ( + sessions: SessionStatus[], + entries: ObserveEntry[] +): SessionStatus[] => { + const map = new Map(); + for (const s of sessions) { + map.set(s.sessionKey, { ...s }); + } + + for (const entry of entries) { + if (!entry.sessionKey) continue; + let session = map.get(entry.sessionKey); + if (!session) { + session = { + sessionKey: entry.sessionKey, + agentId: entry.agentId, + displayName: entry.agentId, + origin: "unknown", + status: "idle", + lastActivityAt: null, + currentToolName: null, + lastError: null, + eventCount: 0, + }; + map.set(entry.sessionKey, session); + } + + session.eventCount += 1; + session.lastActivityAt = entry.timestamp; + + if (entry.stream === "lifecycle") { + if (entry.text === "start") { + session.status = "running"; + session.currentToolName = null; + session.lastError = null; + } else if (entry.text === "end") { + session.status = "idle"; + session.currentToolName = null; + } else if (entry.text === "error") { + session.status = "error"; + session.lastError = entry.errorMessage; + session.currentToolName = null; + } + } + + if (entry.stream === "tool" && entry.toolName) { + session.currentToolName = entry.toolName; + } + + if (entry.severity === "error" && entry.errorMessage) { + session.lastError = entry.errorMessage; + } + } + + return Array.from(map.values()); +}; + +const countInterventions = (entries: ObserveEntry[]): number => { + let count = 0; + for (const e of entries) { + if (e.severity === "error") count += 1; + } + return count; +}; + +export const observeReducer = ( + state: ObserveState, + action: ObserveAction +): ObserveState => { + switch (action.type) { + case "pushEntries": { + if (state.paused || action.entries.length === 0) return state; + const merged = [...state.entries, ...action.entries]; + const capped = + merged.length > MAX_ENTRIES + ? merged.slice(merged.length - MAX_ENTRIES) + : merged; + const sessions = updateSessionsFromEntries(state.sessions, action.entries); + return { + ...state, + entries: capped, + sessions, + interventionCount: countInterventions(capped), + }; + } + case "hydrateSessions": { + const existing = new Map(); + for (const s of state.sessions) { + existing.set(s.sessionKey, s); + } + const merged: SessionStatus[] = []; + for (const incoming of action.sessions) { + const current = existing.get(incoming.sessionKey); + if (current) { + merged.push({ + ...current, + displayName: incoming.displayName ?? current.displayName, + origin: incoming.origin !== "unknown" ? incoming.origin : current.origin, + }); + existing.delete(incoming.sessionKey); + } else { + merged.push(incoming); + } + } + for (const remaining of existing.values()) { + merged.push(remaining); + } + return { ...state, sessions: merged }; + } + case "togglePause": + return { ...state, paused: !state.paused }; + case "clearLog": + return { + ...state, + entries: [], + interventionCount: 0, + }; + default: + return state; + } +}; diff --git a/src/features/observe/state/types.ts b/src/features/observe/state/types.ts new file mode 100644 index 00000000..cb8e3a0f --- /dev/null +++ b/src/features/observe/state/types.ts @@ -0,0 +1,44 @@ +export type ObserveEntry = { + id: string; + timestamp: number; + eventType: "chat" | "agent" | "presence" | "heartbeat" | "unknown"; + sessionKey: string | null; + agentId: string | null; + runId: string | null; + stream: string | null; + toolName: string | null; + toolPhase: string | null; + chatState: string | null; + errorMessage: string | null; + text: string | null; + severity: "info" | "warn" | "error"; +}; + +export type SessionOrigin = "interactive" | "cron" | "heartbeat" | "unknown"; + +export type SessionStatus = { + sessionKey: string; + agentId: string | null; + displayName: string | null; + origin: SessionOrigin; + status: "idle" | "running" | "error"; + lastActivityAt: number | null; + currentToolName: string | null; + lastError: string | null; + eventCount: number; +}; + +export type ObserveState = { + entries: ObserveEntry[]; + sessions: SessionStatus[]; + interventionCount: number; + paused: boolean; +}; + +export type ObserveAction = + | { type: "pushEntries"; entries: ObserveEntry[] } + | { type: "hydrateSessions"; sessions: SessionStatus[] } + | { type: "togglePause" } + | { type: "clearLog" }; + +export const MAX_ENTRIES = 2000; From 24baa72197a3cb6ab03414330bd2926a6ff00827 Mon Sep 17 00:00:00 2001 From: Milo Antaeus Date: Tue, 10 Feb 2026 17:12:54 -0800 Subject: [PATCH 3/4] Improve observe dashboard: human-readable feed, live output, session context MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Activity feed now shows plain English descriptions instead of raw protocol events: "Reading LEARNINGS.md", "Running: git status", "Spawning subagent" - Added LiveOutputPanel showing streaming text from the active session - Session cards now display current activity, tool name, streaming preview - Header shows high-level summary of what's running across all sessions - Fixed UNKNOWN origin badges — infers cron/interactive from session keys - Studio link opens in new tab to avoid navigating away from /observe - Filtered out noisy heartbeat/presence/delta events from feed Co-Authored-By: Claude Opus 4.6 --- src/app/observe/page.tsx | 153 +++++++---- .../observe/components/ActivityFeedEntry.tsx | 63 +++-- .../observe/components/InterventionAlerts.tsx | 32 +-- .../observe/components/LiveOutputPanel.tsx | 56 ++++ .../observe/components/ObserveHeaderBar.tsx | 139 ++++++---- .../observe/components/SessionCard.tsx | 68 +++-- .../observe/state/observeEventHandler.ts | 260 +++++++++++++----- src/features/observe/state/reducer.ts | 62 ++++- src/features/observe/state/types.ts | 5 + 9 files changed, 597 insertions(+), 241 deletions(-) create mode 100644 src/features/observe/components/LiveOutputPanel.tsx diff --git a/src/app/observe/page.tsx b/src/app/observe/page.tsx index d7bbd327..8a50a286 100644 --- a/src/app/observe/page.tsx +++ b/src/app/observe/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useCallback, useEffect, useReducer, useRef, useState } from "react"; +import { useCallback, useEffect, useMemo, useReducer, useRef, useState } from "react"; import { createStudioSettingsCoordinator } from "@/lib/studio/coordinator"; import { useGatewayConnection, @@ -9,12 +9,16 @@ import { import type { EventFrame } from "@/lib/gateway/GatewayClient"; import { createRafBatcher } from "@/lib/dom"; import { mapEventFrameToEntry } from "@/features/observe/state/observeEventHandler"; -import { observeReducer, initialObserveState } from "@/features/observe/state/reducer"; +import { + observeReducer, + initialObserveState, +} from "@/features/observe/state/reducer"; import type { SessionStatus } from "@/features/observe/state/types"; import { ObserveHeaderBar } from "@/features/observe/components/ObserveHeaderBar"; import { SessionOverview } from "@/features/observe/components/SessionOverview"; import { ActivityFeed } from "@/features/observe/components/ActivityFeed"; import { InterventionAlerts } from "@/features/observe/components/InterventionAlerts"; +import { LiveOutputPanel } from "@/features/observe/components/LiveOutputPanel"; type SessionsListResult = { sessions: Array<{ @@ -26,22 +30,37 @@ type SessionsListResult = { }>; }; -const inferOrigin = (label?: string): SessionStatus["origin"] => { - if (!label) return "unknown"; - const lower = label.toLowerCase(); - if (lower.includes("cron") || lower.includes("isolated")) return "cron"; - if (lower.includes("heartbeat")) return "heartbeat"; - if (lower.includes("interactive") || lower.includes("main")) return "interactive"; +const inferOrigin = ( + label?: string, + key?: string +): SessionStatus["origin"] => { + if (label) { + const lower = label.toLowerCase(); + if (lower.includes("cron") || lower.includes("isolated")) return "cron"; + if (lower.includes("heartbeat")) return "heartbeat"; + if (lower.includes("interactive") || lower.includes("main")) + return "interactive"; + } + if (key) { + const lowerKey = key.toLowerCase(); + if (lowerKey.includes("cron:") || lowerKey.includes("isolated")) + return "cron"; + if (lowerKey.includes("heartbeat")) return "heartbeat"; + } return "unknown"; }; export default function ObservePage() { - const [settingsCoordinator] = useState(() => createStudioSettingsCoordinator()); + const [settingsCoordinator] = useState(() => + createStudioSettingsCoordinator() + ); const { client, status } = useGatewayConnection(settingsCoordinator); const [state, dispatch] = useReducer(observeReducer, initialObserveState); const [selectedSession, setSelectedSession] = useState(null); - const pendingEntriesRef = useRef[]>([]); + const pendingEntriesRef = useRef[]>( + [] + ); // Subscribe to ALL gateway events with RAF batching useEffect(() => { @@ -77,23 +96,31 @@ export default function ObservePage() { const loadSessions = async () => { try { - const result = await client.call("sessions.list", { - includeGlobal: true, - includeUnknown: true, - limit: 200, - }); + const result = await client.call( + "sessions.list", + { + includeGlobal: true, + includeUnknown: true, + limit: 200, + } + ); if (cancelled) return; - const sessions: SessionStatus[] = (result.sessions ?? []).map((s) => ({ - sessionKey: s.key, - agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), - displayName: s.displayName ?? s.agentId ?? null, - origin: inferOrigin(s.origin?.label), - status: "idle" as const, - lastActivityAt: s.updatedAt ?? null, - currentToolName: null, - lastError: null, - eventCount: 0, - })); + const sessions: SessionStatus[] = (result.sessions ?? []).map( + (s) => ({ + sessionKey: s.key, + agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), + displayName: s.displayName ?? s.agentId ?? null, + origin: inferOrigin(s.origin?.label, s.key), + status: "idle" as const, + lastActivityAt: s.updatedAt ?? null, + currentToolName: null, + currentToolArgs: null, + currentActivity: null, + streamingText: null, + lastError: null, + eventCount: 0, + }) + ); dispatch({ type: "hydrateSessions", sessions }); } catch (err) { console.warn("[observe] Failed to load sessions:", err); @@ -117,22 +144,30 @@ export default function ObservePage() { refreshTimerRef.current = setTimeout(async () => { refreshTimerRef.current = null; try { - const result = await client.call("sessions.list", { - includeGlobal: true, - includeUnknown: true, - limit: 200, - }); - const sessions: SessionStatus[] = (result.sessions ?? []).map((s) => ({ - sessionKey: s.key, - agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), - displayName: s.displayName ?? s.agentId ?? null, - origin: inferOrigin(s.origin?.label), - status: "idle" as const, - lastActivityAt: s.updatedAt ?? null, - currentToolName: null, - lastError: null, - eventCount: 0, - })); + const result = await client.call( + "sessions.list", + { + includeGlobal: true, + includeUnknown: true, + limit: 200, + } + ); + const sessions: SessionStatus[] = (result.sessions ?? []).map( + (s) => ({ + sessionKey: s.key, + agentId: s.agentId ?? parseAgentIdFromSessionKey(s.key), + displayName: s.displayName ?? s.agentId ?? null, + origin: inferOrigin(s.origin?.label, s.key), + status: "idle" as const, + lastActivityAt: s.updatedAt ?? null, + currentToolName: null, + currentToolArgs: null, + currentActivity: null, + streamingText: null, + lastError: null, + eventCount: 0, + }) + ); dispatch({ type: "hydrateSessions", sessions }); } catch { // ignore refresh failures @@ -161,12 +196,22 @@ export default function ObservePage() { setSelectedSession(sessionKey); }, []); + // Find the primary running session for the live output panel + const activeSession = useMemo(() => { + if (selectedSession) { + return state.sessions.find( + (s) => s.sessionKey === selectedSession && s.status === "running" + ); + } + return state.sessions.find((s) => s.status === "running"); + }, [state.sessions, selectedSession]); + return (
{/* Session sidebar */} -
+
- {/* Activity feed */} -
- + {/* Main content: live output + activity feed */} +
+ {/* Live output panel — shows streaming text from active session */} + {activeSession && ( + + )} + + {/* Activity feed */} +
+ +
diff --git a/src/features/observe/components/ActivityFeedEntry.tsx b/src/features/observe/components/ActivityFeedEntry.tsx index c71d496c..a9c47510 100644 --- a/src/features/observe/components/ActivityFeedEntry.tsx +++ b/src/features/observe/components/ActivityFeedEntry.tsx @@ -5,32 +5,40 @@ const formatTime = (ts: number): string => { const h = String(d.getHours()).padStart(2, "0"); const m = String(d.getMinutes()).padStart(2, "0"); const s = String(d.getSeconds()).padStart(2, "0"); - const ms = String(d.getMilliseconds()).padStart(3, "0"); - return `${h}:${m}:${s}.${ms}`; + return `${h}:${m}:${s}`; }; const severityClass: Record = { - info: "text-muted-foreground", + info: "", warn: "text-amber-400", error: "text-red-400", }; -const streamLabel = (entry: ObserveEntry): string => { - if (entry.eventType === "heartbeat") return "heartbeat"; - if (entry.eventType === "presence") return "presence"; - if (entry.eventType === "chat") { - return entry.chatState ? `chat:${entry.chatState}` : "chat"; - } +const streamIcon = (entry: ObserveEntry): string => { if (entry.stream === "tool") { - const name = entry.toolName ?? "?"; - const phase = entry.toolPhase ? `:${entry.toolPhase}` : ""; - return `tool${phase} ${name}`; + if (entry.toolPhase === "result") return "\u2713"; // checkmark + return "\u25B6"; // play triangle } if (entry.stream === "lifecycle") { - return `lifecycle:${entry.text ?? "?"}`; + if (entry.text === "start") return "\u25CF"; // filled circle + if (entry.text === "end") return "\u25CB"; // empty circle + if (entry.text === "error") return "\u2717"; // X mark + return "\u25CF"; } - if (entry.stream) return entry.stream; - return entry.eventType; + if (entry.stream === "assistant") return "\u270E"; // pencil + if (entry.eventType === "chat") return "\u2709"; // envelope + return "\u2022"; // bullet +}; + +const streamColor = (entry: ObserveEntry): string => { + if (entry.severity === "error") return "text-red-400"; + if (entry.stream === "tool" && entry.toolPhase !== "result") + return "text-blue-400"; + if (entry.stream === "tool" && entry.toolPhase === "result") + return "text-emerald-400"; + if (entry.stream === "lifecycle") return "text-amber-400"; + if (entry.stream === "assistant") return "text-purple-400"; + return "text-muted-foreground"; }; type ActivityFeedEntryProps = { @@ -38,25 +46,26 @@ type ActivityFeedEntryProps = { }; export const ActivityFeedEntry = ({ entry }: ActivityFeedEntryProps) => { - const agentLabel = entry.agentId ?? entry.sessionKey?.slice(0, 12) ?? "-"; + const agentLabel = entry.agentId ?? entry.sessionKey?.slice(0, 16) ?? "-"; return (
- {formatTime(entry.timestamp)} - + + {formatTime(entry.timestamp)} + + + {streamIcon(entry)} + + {agentLabel} - - {streamLabel(entry)} + + {entry.description} - {entry.errorMessage && ( - {entry.errorMessage} - )} - {!entry.errorMessage && entry.text && ( - {entry.text} - )}
); }; diff --git a/src/features/observe/components/InterventionAlerts.tsx b/src/features/observe/components/InterventionAlerts.tsx index 44c9c0ad..e732745f 100644 --- a/src/features/observe/components/InterventionAlerts.tsx +++ b/src/features/observe/components/InterventionAlerts.tsx @@ -4,7 +4,7 @@ type InterventionAlertsProps = { entries: ObserveEntry[]; }; -const MAX_ALERTS = 10; +const MAX_ALERTS = 5; const formatTime = (ts: number): string => { const d = new Date(ts); @@ -19,34 +19,22 @@ export const InterventionAlerts = ({ entries }: InterventionAlertsProps) => { if (errors.length === 0) return null; const recent = errors.slice(-MAX_ALERTS).reverse(); - const hiddenCount = errors.length > MAX_ALERTS ? errors.length - MAX_ALERTS : 0; return ( -
-
-

- Intervention Needed -

- - {errors.length} error{errors.length !== 1 ? "s" : ""} - {hiddenCount > 0 ? ` (+${hiddenCount} hidden)` : ""} - -
-
+
+

+ Needs Attention ({errors.length}) +

+
{recent.map((entry) => ( -
- +
+ {formatTime(entry.timestamp)} - {entry.agentId ?? entry.sessionKey?.slice(0, 12) ?? "-"} - - - {entry.errorMessage ?? entry.text ?? "Unknown error"} + {entry.agentId ?? entry.sessionKey?.slice(0, 16) ?? "-"} + {entry.description}
))}
diff --git a/src/features/observe/components/LiveOutputPanel.tsx b/src/features/observe/components/LiveOutputPanel.tsx new file mode 100644 index 00000000..f8a43d78 --- /dev/null +++ b/src/features/observe/components/LiveOutputPanel.tsx @@ -0,0 +1,56 @@ +import type { SessionStatus } from "../state/types"; + +type LiveOutputPanelProps = { + session: SessionStatus; +}; + +export const LiveOutputPanel = ({ session }: LiveOutputPanelProps) => { + const name = + session.displayName ?? session.agentId ?? session.sessionKey.slice(0, 20); + + return ( +
+
+
+ + {name} + {session.origin !== "unknown" && ( + + {session.origin} + + )} +
+ {session.currentActivity && ( + + {session.currentActivity} + + )} +
+ + {session.streamingText ? ( +
+
+            {session.streamingText.slice(-1000)}
+          
+
+ ) : ( +
+
+ {session.currentToolName ? ( + <> + {session.currentToolName} + {session.currentToolArgs && ( + + {session.currentToolArgs.slice(0, 100)} + + )} + + ) : ( + Waiting for output... + )} +
+
+ )} +
+ ); +}; diff --git a/src/features/observe/components/ObserveHeaderBar.tsx b/src/features/observe/components/ObserveHeaderBar.tsx index 21583a78..b90f8f5f 100644 --- a/src/features/observe/components/ObserveHeaderBar.tsx +++ b/src/features/observe/components/ObserveHeaderBar.tsx @@ -1,87 +1,130 @@ import Link from "next/link"; import type { GatewayStatus } from "@/lib/gateway/GatewayClient"; +import type { SessionStatus } from "../state/types"; type ObserveHeaderBarProps = { status: GatewayStatus; paused: boolean; - entryCount: number; + sessions: SessionStatus[]; interventionCount: number; onTogglePause: () => void; onClear: () => void; }; -const statusStyles: Record = { +const statusStyles: Record< + GatewayStatus, + { label: string; className: string } +> = { disconnected: { label: "Disconnected", className: "border border-border/70 bg-muted text-muted-foreground", }, connecting: { - label: "Connecting", + label: "Connecting...", className: "border border-border/70 bg-secondary text-secondary-foreground", }, connected: { - label: "Connected", - className: "border border-primary/30 bg-primary/15 text-foreground", + label: "Live", + className: "border border-emerald-500/30 bg-emerald-500/15 text-emerald-400", }, }; +const buildSummary = (sessions: SessionStatus[]): string => { + const running = sessions.filter((s) => s.status === "running"); + const errors = sessions.filter((s) => s.status === "error"); + + if (running.length === 0 && errors.length === 0) { + return "All sessions idle"; + } + + const parts: string[] = []; + + for (const s of running) { + const name = s.displayName ?? s.agentId ?? "session"; + if (s.currentActivity) { + parts.push(`${name}: ${s.currentActivity}`); + } else { + parts.push(`${name} is running`); + } + } + + for (const s of errors) { + const name = s.displayName ?? s.agentId ?? "session"; + parts.push(`${name} has errors`); + } + + return parts.join(" \u2022 "); +}; + export const ObserveHeaderBar = ({ status, paused, - entryCount, + sessions, interventionCount, onTogglePause, onClear, }: ObserveHeaderBarProps) => { const statusConfig = statusStyles[status]; + const runningSessions = sessions.filter((s) => s.status === "running"); + const summary = buildSummary(sessions); return ( -
-
-
-

+
+
+
+

Milo Observe

-

- Real-time gateway event monitor -

-
- - {statusConfig.label} - - {interventionCount > 0 && ( - - {interventionCount} error{interventionCount !== 1 ? "s" : ""} + + {status === "connected" && ( + + )} + {statusConfig.label} - )} -
-
- - {entryCount} events - - - - - Studio - + {runningSessions.length > 0 && ( + + {runningSessions.length} active + + )} + {interventionCount > 0 && ( + + {interventionCount} error{interventionCount !== 1 ? "s" : ""} + + )} +
+
+ + + + Studio + +

+ + {/* High-level activity summary */} + {status === "connected" && ( +
+ {summary} +
+ )}
); }; diff --git a/src/features/observe/components/SessionCard.tsx b/src/features/observe/components/SessionCard.tsx index 0a4f6dff..f79d9f5a 100644 --- a/src/features/observe/components/SessionCard.tsx +++ b/src/features/observe/components/SessionCard.tsx @@ -6,14 +6,18 @@ type SessionCardProps = { onSelect: (sessionKey: string | null) => void; }; -const statusStyles: Record = { +const statusStyles: Record< + SessionStatus["status"], + { label: string; className: string } +> = { idle: { label: "Idle", - className: "border-border/50 bg-muted/30", + className: "border-border/50 bg-muted/20", }, running: { label: "Running", - className: "border-primary/40 bg-primary/5 shadow-[0_0_8px_rgba(var(--primary-rgb,100,100,255),0.15)]", + className: + "border-primary/40 bg-primary/5 shadow-[0_0_12px_rgba(var(--primary-rgb,100,100,255),0.12)]", }, error: { label: "Error", @@ -21,7 +25,10 @@ const statusStyles: Record = { +const originBadge: Record< + SessionStatus["origin"], + { label: string; className: string } +> = { interactive: { label: "Interactive", className: "bg-blue-500/15 text-blue-400", @@ -35,7 +42,7 @@ const originBadge: Record { return `${Math.floor(diff / 3_600_000)}h ago`; }; -export const SessionCard = ({ session, isSelected, onSelect }: SessionCardProps) => { +export const SessionCard = ({ + session, + isSelected, + onSelect, +}: SessionCardProps) => { const statusConfig = statusStyles[session.status]; const originConfig = originBadge[session.origin]; - const displayName = session.displayName ?? session.agentId ?? session.sessionKey.slice(0, 16); + const displayName = + session.displayName ?? + session.agentId ?? + session.sessionKey.slice(0, 20); return (
-
+ + {/* Status + current tool */} +
{session.status === "running" && ( @@ -85,18 +101,28 @@ export const SessionCard = ({ session, isSelected, onSelect }: SessionCardProps) )} {statusConfig.label} - {session.currentToolName && session.status === "running" && ( - - {session.currentToolName} - - )} -
-
- {formatRelativeTime(session.lastActivityAt)} - {session.eventCount} events + + {formatRelativeTime(session.lastActivityAt)} +
- {session.lastError && ( -
+ + {/* Current activity description */} + {session.currentActivity && session.status === "running" && ( +
+ {session.currentActivity} +
+ )} + + {/* Streaming text preview */} + {session.streamingText && session.status === "running" && ( +
+ {session.streamingText.slice(-200)} +
+ )} + + {/* Error */} + {session.lastError && session.status === "error" && ( +
{session.lastError}
)} diff --git a/src/features/observe/state/observeEventHandler.ts b/src/features/observe/state/observeEventHandler.ts index 6c6ed943..57d75156 100644 --- a/src/features/observe/state/observeEventHandler.ts +++ b/src/features/observe/state/observeEventHandler.ts @@ -1,7 +1,10 @@ import type { EventFrame } from "@/lib/gateway/GatewayClient"; import { parseAgentIdFromSessionKey } from "@/lib/gateway/GatewayClient"; import { classifyGatewayEventKind } from "@/features/agents/state/runtimeEventBridge"; -import type { ChatEventPayload, AgentEventPayload } from "@/features/agents/state/runtimeEventBridge"; +import type { + ChatEventPayload, + AgentEventPayload, +} from "@/features/agents/state/runtimeEventBridge"; import type { ObserveEntry } from "./types"; let entryCounter = 0; @@ -11,27 +14,150 @@ const nextId = (): string => { return `obs-${entryCounter}`; }; -const truncate = (text: string | null | undefined, maxLen: number = 200): string | null => { +const truncate = ( + text: string | null | undefined, + maxLen: number = 200 +): string | null => { if (!text) return null; - if (text.length <= maxLen) return text; - return text.slice(0, maxLen) + "..."; + const trimmed = text.trim(); + if (!trimmed) return null; + if (trimmed.length <= maxLen) return trimmed; + return trimmed.slice(0, maxLen) + "..."; }; -const extractText = (message: unknown): string | null => { +const extractTextFromMessage = (message: unknown): string | null => { if (!message || typeof message !== "object") return null; const record = message as Record; if (typeof record.content === "string") return record.content; if (typeof record.text === "string") return record.text; + if (Array.isArray(record.content)) { + for (const block of record.content) { + if (block && typeof block === "object" && typeof (block as Record).text === "string") { + return (block as Record).text as string; + } + } + } + return null; +}; + +const extractToolArgs = (data: Record): string | null => { + const raw = data.arguments ?? data.args ?? data.input ?? data.parameters; + if (typeof raw === "string") return truncate(raw, 300); + if (raw && typeof raw === "object") { + try { + return truncate(JSON.stringify(raw), 300); + } catch { + return null; + } + } + return null; +}; + +const extractToolResult = (data: Record): string | null => { + const result = data.result; + if (typeof result === "string") return truncate(result, 300); + if (result && typeof result === "object") { + const r = result as Record; + if (typeof r.content === "string") return truncate(r.content, 300); + if (typeof r.text === "string") return truncate(r.text, 300); + if (r.details && typeof r.details === "object") { + const d = r.details as Record; + const parts: string[] = []; + if (typeof d.exitCode === "number") parts.push(`exit ${d.exitCode}`); + if (typeof d.durationMs === "number") parts.push(`${d.durationMs}ms`); + if (parts.length > 0) return parts.join(", "); + } + try { + return truncate(JSON.stringify(result), 300); + } catch { + return null; + } + } return null; }; -const mapChatEvent = (payload: ChatEventPayload, timestamp: number): ObserveEntry => { +const describeToolCall = (name: string, args: string | null): string => { + if (!args) return `Calling ${name}`; + + // Make common tools human-readable + try { + const parsed = JSON.parse(args); + if (typeof parsed === "object" && parsed !== null) { + if (name === "read" && typeof parsed.file_path === "string") { + const file = parsed.file_path.split("/").pop(); + return `Reading ${file}`; + } + if (name === "exec" && typeof parsed.command === "string") { + return `Running: ${truncate(parsed.command, 80)}`; + } + if (name === "browser" && typeof parsed.action === "string") { + return `Browser: ${parsed.action}${parsed.url ? ` — ${truncate(parsed.url, 60)}` : ""}`; + } + if (name === "write" && typeof parsed.file_path === "string") { + const file = parsed.file_path.split("/").pop(); + return `Writing ${file}`; + } + if (name === "sessions_spawn" && typeof parsed.agentId === "string") { + return `Spawning subagent: ${parsed.agentId}`; + } + if (name === "sessions_send" && typeof parsed.agentId === "string") { + return `Sending message to ${parsed.agentId}`; + } + } + } catch { + // not JSON, fall through + } + + return `Calling ${name}`; +}; + +const describeToolResult = ( + name: string, + result: string | null, + isError: boolean +): string => { + if (isError) return `${name} failed${result ? `: ${truncate(result, 100)}` : ""}`; + return `${name} completed`; +}; + +const mapChatEvent = ( + payload: ChatEventPayload, + timestamp: number +): ObserveEntry | null => { const agentId = payload.sessionKey ? parseAgentIdFromSessionKey(payload.sessionKey) : null; const isError = payload.state === "error" || payload.state === "aborted"; - const text = payload.errorMessage || extractText(payload.message); + const messageText = extractTextFromMessage(payload.message); + const role = + payload.message && + typeof payload.message === "object" && + typeof (payload.message as Record).role === "string" + ? ((payload.message as Record).role as string) + : null; + + // Skip delta events for chat — too noisy, we get assistant stream from agent events + if (payload.state === "delta") return null; + + let description: string; + if (isError) { + description = payload.errorMessage ?? "Session error"; + } else if (payload.state === "final") { + if (role === "assistant") { + description = messageText + ? `Response: ${truncate(messageText, 120)}` + : "Response complete"; + } else if (role === "user") { + description = messageText + ? `Prompt: ${truncate(messageText, 120)}` + : "User message received"; + } else { + description = "Message received"; + } + } else { + description = "Chat event"; + } return { id: nextId(), @@ -43,57 +169,92 @@ const mapChatEvent = (payload: ChatEventPayload, timestamp: number): ObserveEntr stream: null, toolName: null, toolPhase: null, + toolArgs: null, chatState: payload.state ?? null, errorMessage: isError ? (payload.errorMessage ?? "Chat error") : null, - text: truncate(text), + text: truncate(messageText), + description, severity: isError ? "error" : "info", }; }; -const mapAgentEvent = (payload: AgentEventPayload, timestamp: number): ObserveEntry => { +const mapAgentEvent = ( + payload: AgentEventPayload, + timestamp: number +): ObserveEntry | null => { const sessionKey = payload.sessionKey ?? null; - const agentId = sessionKey ? parseAgentIdFromSessionKey(sessionKey) : null; + const agentId = sessionKey + ? parseAgentIdFromSessionKey(sessionKey) + : null; const stream = payload.stream ?? null; const data = payload.data ?? {}; let toolName: string | null = null; let toolPhase: string | null = null; + let toolArgs: string | null = null; let text: string | null = null; let errorMessage: string | null = null; let severity: ObserveEntry["severity"] = "info"; + let description: string; if (stream === "lifecycle") { const phase = typeof data.phase === "string" ? data.phase : null; - text = phase; - if (phase === "error") { + if (phase === "start") { + description = "Session started"; + } else if (phase === "end") { + description = "Session ended"; + } else if (phase === "error") { severity = "error"; - errorMessage = typeof data.error === "string" ? data.error : "Lifecycle error"; + errorMessage = + typeof data.error === "string" ? data.error : "Session error"; + description = `Session error: ${truncate(errorMessage, 100)}`; + } else { + description = `Lifecycle: ${phase ?? "unknown"}`; } + text = phase; } else if (stream === "tool") { toolName = typeof data.name === "string" ? data.name : null; toolPhase = typeof data.phase === "string" ? data.phase : null; - if (typeof data.error === "string") { + const isResult = toolPhase === "result"; + const isError = + data.isError === true || + typeof data.error === "string"; + + if (isError) { severity = "error"; - errorMessage = data.error; + errorMessage = + typeof data.error === "string" + ? data.error + : "Tool error"; } - if (typeof data.result === "string") { - text = truncate(data.result); - } else if (typeof data.args === "string") { - text = truncate(data.args); - } else if (data.args && typeof data.args === "object") { - try { - text = truncate(JSON.stringify(data.args)); - } catch { - text = null; - } + + if (isResult) { + const resultText = extractToolResult(data); + text = resultText; + description = describeToolResult( + toolName ?? "tool", + resultText, + isError + ); + } else { + toolArgs = extractToolArgs(data); + text = toolArgs; + description = describeToolCall(toolName ?? "tool", toolArgs); } } else if (stream === "assistant") { const raw = typeof data.text === "string" ? data.text : null; - text = truncate(raw); + const delta = typeof data.delta === "string" ? data.delta : null; + // Only emit entries for meaningful text updates, not every delta + if (!raw && !delta) return null; + text = truncate(raw ?? delta); + description = text ? `Writing: ${truncate(text, 100)}` : "Thinking..."; } else { - // reasoning or other streams + // reasoning / thinking streams const raw = typeof data.text === "string" ? data.text : null; - text = truncate(raw); + const delta = typeof data.delta === "string" ? data.delta : null; + if (!raw && !delta) return null; + text = truncate(raw ?? delta); + description = "Thinking..."; } return { @@ -106,14 +267,18 @@ const mapAgentEvent = (payload: AgentEventPayload, timestamp: number): ObserveEn stream, toolName, toolPhase, + toolArgs, chatState: null, errorMessage, text, + description, severity, }; }; -export const mapEventFrameToEntry = (event: EventFrame): ObserveEntry | null => { +export const mapEventFrameToEntry = ( + event: EventFrame +): ObserveEntry | null => { const timestamp = Date.now(); const kind = classifyGatewayEventKind(event.event); @@ -129,41 +294,6 @@ export const mapEventFrameToEntry = (event: EventFrame): ObserveEntry | null => return mapAgentEvent(payload, timestamp); } - if (event.event === "heartbeat") { - return { - id: nextId(), - timestamp, - eventType: "heartbeat", - sessionKey: null, - agentId: null, - runId: null, - stream: null, - toolName: null, - toolPhase: null, - chatState: null, - errorMessage: null, - text: null, - severity: "info", - }; - } - - if (event.event === "presence") { - return { - id: nextId(), - timestamp, - eventType: "presence", - sessionKey: null, - agentId: null, - runId: null, - stream: null, - toolName: null, - toolPhase: null, - chatState: null, - errorMessage: null, - text: null, - severity: "info", - }; - } - + // Skip heartbeat and presence from the feed — too noisy, no useful info return null; }; diff --git a/src/features/observe/state/reducer.ts b/src/features/observe/state/reducer.ts index d432e92c..0ef76050 100644 --- a/src/features/observe/state/reducer.ts +++ b/src/features/observe/state/reducer.ts @@ -1,4 +1,9 @@ -import type { ObserveAction, ObserveEntry, ObserveState, SessionStatus } from "./types"; +import type { + ObserveAction, + ObserveEntry, + ObserveState, + SessionStatus, +} from "./types"; import { MAX_ENTRIES } from "./types"; export const initialObserveState: ObserveState = { @@ -25,10 +30,13 @@ const updateSessionsFromEntries = ( sessionKey: entry.sessionKey, agentId: entry.agentId, displayName: entry.agentId, - origin: "unknown", + origin: inferOriginFromKey(entry.sessionKey), status: "idle", lastActivityAt: null, currentToolName: null, + currentToolArgs: null, + currentActivity: null, + streamingText: null, lastError: null, eventCount: 0, }; @@ -42,19 +50,44 @@ const updateSessionsFromEntries = ( if (entry.text === "start") { session.status = "running"; session.currentToolName = null; + session.currentToolArgs = null; + session.currentActivity = "Starting..."; + session.streamingText = null; session.lastError = null; } else if (entry.text === "end") { session.status = "idle"; session.currentToolName = null; + session.currentToolArgs = null; + session.currentActivity = null; + session.streamingText = null; } else if (entry.text === "error") { session.status = "error"; session.lastError = entry.errorMessage; session.currentToolName = null; + session.currentToolArgs = null; + session.currentActivity = entry.description; + } + } else if (entry.stream === "tool") { + if (entry.toolPhase !== "result") { + session.currentToolName = entry.toolName; + session.currentToolArgs = entry.toolArgs; + session.currentActivity = entry.description; + session.streamingText = null; + } else { + session.currentActivity = entry.description; + // Keep tool name visible briefly after result + } + } else if (entry.stream === "assistant") { + session.currentToolName = null; + session.currentActivity = "Writing response..."; + if (entry.text) { + session.streamingText = entry.text; + } + } else if (entry.eventType === "chat") { + if (entry.chatState === "final") { + session.currentActivity = entry.description; + session.streamingText = null; } - } - - if (entry.stream === "tool" && entry.toolName) { - session.currentToolName = entry.toolName; } if (entry.severity === "error" && entry.errorMessage) { @@ -65,6 +98,13 @@ const updateSessionsFromEntries = ( return Array.from(map.values()); }; +const inferOriginFromKey = (key: string): SessionStatus["origin"] => { + const lower = key.toLowerCase(); + if (lower.includes("cron:") || lower.includes("isolated")) return "cron"; + if (lower.includes("heartbeat")) return "heartbeat"; + return "interactive"; +}; + const countInterventions = (entries: ObserveEntry[]): number => { let count = 0; for (const e of entries) { @@ -85,7 +125,10 @@ export const observeReducer = ( merged.length > MAX_ENTRIES ? merged.slice(merged.length - MAX_ENTRIES) : merged; - const sessions = updateSessionsFromEntries(state.sessions, action.entries); + const sessions = updateSessionsFromEntries( + state.sessions, + action.entries + ); return { ...state, entries: capped, @@ -105,7 +148,10 @@ export const observeReducer = ( merged.push({ ...current, displayName: incoming.displayName ?? current.displayName, - origin: incoming.origin !== "unknown" ? incoming.origin : current.origin, + origin: + incoming.origin !== "unknown" + ? incoming.origin + : current.origin, }); existing.delete(incoming.sessionKey); } else { diff --git a/src/features/observe/state/types.ts b/src/features/observe/state/types.ts index cb8e3a0f..709af70c 100644 --- a/src/features/observe/state/types.ts +++ b/src/features/observe/state/types.ts @@ -8,9 +8,11 @@ export type ObserveEntry = { stream: string | null; toolName: string | null; toolPhase: string | null; + toolArgs: string | null; chatState: string | null; errorMessage: string | null; text: string | null; + description: string; severity: "info" | "warn" | "error"; }; @@ -24,6 +26,9 @@ export type SessionStatus = { status: "idle" | "running" | "error"; lastActivityAt: number | null; currentToolName: string | null; + currentToolArgs: string | null; + currentActivity: string | null; + streamingText: string | null; lastError: string | null; eventCount: number; }; From 2983f61af7b8db3a19fbfe52f5b760e7d830d9f2 Mon Sep 17 00:00:00 2001 From: Milo Antaeus Date: Tue, 10 Feb 2026 17:42:36 -0800 Subject: [PATCH 4/4] Redesign /observe as mission control with strategic context - Added /api/observe/context endpoint reading filesystem data: recent memory summaries, active initiatives, task queue - Left panel: session cards + cron schedule with next run times - Center: live output when active, last session preview when idle - Right panel: strategic focus areas from INITIATIVES.md + recent memory showing topics, actions, and tools from hourly summaries - Cron jobs loaded via gateway cron.list, auto-refresh every 30s - Session previews via sessions.preview for recent activity context - Human-readable activity feed with tool call descriptions - Live streaming text panel for active sessions Co-Authored-By: Claude Opus 4.6 --- src/app/api/observe/context/route.ts | 218 +++++++++++++++ src/app/observe/page.tsx | 259 +++++++++++++++--- .../observe/components/CronSchedulePanel.tsx | 91 ++++++ .../observe/components/InitiativesPanel.tsx | 81 ++++++ .../components/RecentActivityPanel.tsx | 78 ++++++ .../observe/components/RecentMemoryPanel.tsx | 140 ++++++++++ 6 files changed, 828 insertions(+), 39 deletions(-) create mode 100644 src/app/api/observe/context/route.ts create mode 100644 src/features/observe/components/CronSchedulePanel.tsx create mode 100644 src/features/observe/components/InitiativesPanel.tsx create mode 100644 src/features/observe/components/RecentActivityPanel.tsx create mode 100644 src/features/observe/components/RecentMemoryPanel.tsx diff --git a/src/app/api/observe/context/route.ts b/src/app/api/observe/context/route.ts new file mode 100644 index 00000000..0d16a748 --- /dev/null +++ b/src/app/api/observe/context/route.ts @@ -0,0 +1,218 @@ +import { NextResponse } from "next/server"; +import * as fs from "node:fs"; +import * as path from "node:path"; + +export const runtime = "nodejs"; +export const dynamic = "force-dynamic"; + +const HOME = process.env.HOME ?? "/Users/miloantaeus"; +const GOALS_DIR = path.join(HOME, "personal-ai-assistant-goals"); +const OPENCLAW_DIR = path.join(HOME, ".openclaw"); + +type Initiative = { + title: string; + priority: string; + status: string; + summary: string; +}; + +type TaskQueueItem = { + id: string; + description: string; + priority: number; + status: string; +}; + +type ObserveContext = { + recentMemory: string | null; + initiatives: Initiative[]; + taskQueue: TaskQueueItem[]; + cronJobs: Array<{ + id: string; + name: string; + schedule: string; + lastStatus: string; + lastRunAt: string | null; + nextRunAt: string | null; + }>; + systemInfo: { + model: string; + agentCount: number; + }; +}; + +const readFileSafe = (filePath: string): string | null => { + try { + return fs.readFileSync(filePath, "utf-8"); + } catch { + return null; + } +}; + +const parseRecentMemory = (): string | null => { + const memoryDir = path.join( + GOALS_DIR, + "openclaw/workspace/skills/notes/data/memory/hourly" + ); + try { + const files = fs + .readdirSync(memoryDir) + .filter((f) => f.endsWith(".md")) + .sort() + .reverse(); + if (files.length === 0) return null; + const content = fs.readFileSync(path.join(memoryDir, files[0]), "utf-8"); + // Get last 2 hourly blocks (most recent activity) + const blocks = content.split(/^### /m).filter(Boolean); + const recent = blocks.slice(-2).map((b) => `### ${b}`); + return recent.join("\n").trim() || null; + } catch { + return null; + } +}; + +const parseInitiatives = (): Initiative[] => { + const content = readFileSafe( + path.join(GOALS_DIR, "openclaw/workspace/INITIATIVES.md") + ); + if (!content) return []; + + const initiatives: Initiative[] = []; + const lines = content.split("\n"); + let current: Partial | null = null; + let priority = 1; + + for (const line of lines) { + // Match initiative headers like "### 1. Title (Priority)" + const headerMatch = line.match( + /^###\s+\d+\.\s+(.+?)(?:\s*\(([^)]+)\))?\s*$/ + ); + if (headerMatch) { + if (current?.title) { + initiatives.push(current as Initiative); + } + current = { + title: headerMatch[1].trim(), + priority: headerMatch[2]?.trim() ?? `P${priority}`, + status: "active", + summary: "", + }; + priority++; + continue; + } + // Collect first non-empty line as summary + if (current && !current.summary && line.trim() && !line.startsWith("#")) { + current.summary = line.trim().slice(0, 150); + } + // Detect status markers + if (current && line.toLowerCase().includes("blocked")) { + current.status = "blocked"; + } + if (current && line.toLowerCase().includes("completed")) { + current.status = "completed"; + } + } + if (current?.title) { + initiatives.push(current as Initiative); + } + + return initiatives.slice(0, 8); +}; + +const parseTaskQueue = (): TaskQueueItem[] => { + const content = readFileSafe( + path.join(OPENCLAW_DIR, "task_queue.json") + ); + if (!content) return []; + + try { + const data = JSON.parse(content) as { + tasks?: Array<{ + id?: string; + description?: string; + priority?: number; + status?: string; + }>; + }; + return (data.tasks ?? []) + .filter((t) => t.status !== "completed") + .map((t) => ({ + id: t.id ?? "", + description: t.description ?? "", + priority: t.priority ?? 3, + status: t.status ?? "pending", + })) + .slice(0, 10); + } catch { + return []; + } +}; + +const parseCronJobs = (): ObserveContext["cronJobs"] => { + const content = readFileSafe( + path.join(OPENCLAW_DIR, "cron/jobs.json") + ); + if (!content) return []; + + try { + const data = JSON.parse(content) as { + jobs?: Array<{ + id?: string; + name?: string; + schedule?: { cron?: string; intervalSeconds?: number }; + state?: { + lastStatus?: string; + lastRunAtMs?: number; + nextRunAtMs?: number; + }; + }>; + }; + return (data.jobs ?? []).map((j) => ({ + id: j.id ?? "", + name: j.name ?? "", + schedule: j.schedule?.cron ?? (j.schedule?.intervalSeconds ? `every ${j.schedule.intervalSeconds}s` : ""), + lastStatus: j.state?.lastStatus ?? "unknown", + lastRunAt: j.state?.lastRunAtMs + ? new Date(j.state.lastRunAtMs).toISOString() + : null, + nextRunAt: j.state?.nextRunAtMs + ? new Date(j.state.nextRunAtMs).toISOString() + : null, + })); + } catch { + return []; + } +}; + +const countAgents = (): number => { + const agentsDir = path.join(OPENCLAW_DIR, "agents"); + try { + return fs + .readdirSync(agentsDir) + .filter((f) => + fs.statSync(path.join(agentsDir, f)).isDirectory() + ).length; + } catch { + return 0; + } +}; + +export async function GET() { + try { + const context: ObserveContext = { + recentMemory: parseRecentMemory(), + initiatives: parseInitiatives(), + taskQueue: parseTaskQueue(), + cronJobs: parseCronJobs(), + systemInfo: { + model: "qwen2.5:14b", + agentCount: countAgents(), + }, + }; + return NextResponse.json(context); + } catch (err) { + const message = + err instanceof Error ? err.message : "Failed to load observe context."; + return NextResponse.json({ error: message }, { status: 500 }); + } +} diff --git a/src/app/observe/page.tsx b/src/app/observe/page.tsx index 8a50a286..edcb09f5 100644 --- a/src/app/observe/page.tsx +++ b/src/app/observe/page.tsx @@ -1,6 +1,13 @@ "use client"; -import { useCallback, useEffect, useMemo, useReducer, useRef, useState } from "react"; +import { + useCallback, + useEffect, + useMemo, + useReducer, + useRef, + useState, +} from "react"; import { createStudioSettingsCoordinator } from "@/lib/studio/coordinator"; import { useGatewayConnection, @@ -19,7 +26,18 @@ import { SessionOverview } from "@/features/observe/components/SessionOverview"; import { ActivityFeed } from "@/features/observe/components/ActivityFeed"; import { InterventionAlerts } from "@/features/observe/components/InterventionAlerts"; import { LiveOutputPanel } from "@/features/observe/components/LiveOutputPanel"; +import { + CronSchedulePanel, + type CronJob, +} from "@/features/observe/components/CronSchedulePanel"; +import { + InitiativesPanel, + type Initiative, +} from "@/features/observe/components/InitiativesPanel"; +import { RecentMemoryPanel } from "@/features/observe/components/RecentMemoryPanel"; +import { RecentActivityPanel } from "@/features/observe/components/RecentActivityPanel"; +// Gateway API types type SessionsListResult = { sessions: Array<{ key: string; @@ -30,6 +48,32 @@ type SessionsListResult = { }>; }; +type CronListResult = { + jobs: CronJob[]; +}; + +type PreviewResult = { + ts: number; + previews: Array<{ + key: string; + status: string; + items: Array<{ role: string; text: string; timestamp?: number | string }>; + }>; +}; + +// Filesystem context API type +type ObserveContext = { + recentMemory: string | null; + initiatives: Initiative[]; + taskQueue: Array<{ + id: string; + description: string; + priority: number; + status: string; + }>; + systemInfo: { model: string; agentCount: number }; +}; + const inferOrigin = ( label?: string, key?: string @@ -58,10 +102,46 @@ export default function ObservePage() { const [state, dispatch] = useReducer(observeReducer, initialObserveState); const [selectedSession, setSelectedSession] = useState(null); + // Context from filesystem API + const [context, setContext] = useState(null); + const [contextLoading, setContextLoading] = useState(true); + + // Cron jobs from gateway + const [cronJobs, setCronJobs] = useState([]); + const [cronLoading, setCronLoading] = useState(true); + + // Session previews + const [previews, setPreviews] = useState([]); + const [previewsLoading, setPreviewsLoading] = useState(true); + const pendingEntriesRef = useRef[]>( [] ); + // Load filesystem context + useEffect(() => { + let cancelled = false; + const load = async () => { + try { + const res = await fetch("/api/observe/context"); + if (!res.ok) throw new Error("Failed to load context"); + const data = (await res.json()) as ObserveContext; + if (!cancelled) setContext(data); + } catch (err) { + console.warn("[observe] context load failed:", err); + } finally { + if (!cancelled) setContextLoading(false); + } + }; + void load(); + // Refresh every 60s + const interval = setInterval(() => void load(), 60_000); + return () => { + cancelled = true; + clearInterval(interval); + }; + }, []); + // Subscribe to ALL gateway events with RAF batching useEffect(() => { const batcher = createRafBatcher(() => { @@ -89,20 +169,17 @@ export default function ObservePage() { }; }, [client]); - // Discover sessions on connect + // Load sessions, cron jobs, and previews on connect useEffect(() => { if (status !== "connected") return; let cancelled = false; - const loadSessions = async () => { + const loadAll = async () => { + // Load sessions try { const result = await client.call( "sessions.list", - { - includeGlobal: true, - includeUnknown: true, - limit: 200, - } + { includeGlobal: true, includeUnknown: true, limit: 200 } ); if (cancelled) return; const sessions: SessionStatus[] = (result.sessions ?? []).map( @@ -122,22 +199,74 @@ export default function ObservePage() { }) ); dispatch({ type: "hydrateSessions", sessions }); + + // Load previews for recent sessions + const recentKeys = (result.sessions ?? []) + .filter((s) => s.updatedAt) + .sort((a, b) => (b.updatedAt ?? 0) - (a.updatedAt ?? 0)) + .slice(0, 5) + .map((s) => s.key); + + if (recentKeys.length > 0) { + try { + const previewResult = await client.call( + "sessions.preview", + { keys: recentKeys, limit: 6, maxChars: 300 } + ); + if (!cancelled) { + setPreviews(previewResult.previews ?? []); + } + } catch { + // preview optional + } + } + if (!cancelled) setPreviewsLoading(false); + } catch (err) { + console.warn("[observe] session load failed:", err); + if (!cancelled) setPreviewsLoading(false); + } + + // Load cron jobs + try { + const cronResult = await client.call("cron.list", { + includeDisabled: true, + }); + if (!cancelled) { + setCronJobs(cronResult.jobs ?? []); + } } catch (err) { - console.warn("[observe] Failed to load sessions:", err); + console.warn("[observe] cron load failed:", err); + } finally { + if (!cancelled) setCronLoading(false); } }; - void loadSessions(); + void loadAll(); return () => { cancelled = true; }; }, [client, status]); + // Refresh cron jobs periodically (every 30s) + useEffect(() => { + if (status !== "connected") return; + const interval = setInterval(async () => { + try { + const cronResult = await client.call("cron.list", { + includeDisabled: true, + }); + setCronJobs(cronResult.jobs ?? []); + } catch { + // ignore + } + }, 30_000); + return () => clearInterval(interval); + }, [client, status]); + // Refresh sessions on presence events (throttled) const refreshTimerRef = useRef | null>(null); useEffect(() => { if (status !== "connected") return; - const unsubscribe = client.onEvent((event: EventFrame) => { if (event.event !== "presence") return; if (refreshTimerRef.current) return; @@ -146,11 +275,7 @@ export default function ObservePage() { try { const result = await client.call( "sessions.list", - { - includeGlobal: true, - includeUnknown: true, - limit: 200, - } + { includeGlobal: true, includeUnknown: true, limit: 200 } ); const sessions: SessionStatus[] = (result.sessions ?? []).map( (s) => ({ @@ -170,11 +295,10 @@ export default function ObservePage() { ); dispatch({ type: "hydrateSessions", sessions }); } catch { - // ignore refresh failures + // ignore } }, 2000); }); - return () => { unsubscribe(); if (refreshTimerRef.current) { @@ -196,7 +320,6 @@ export default function ObservePage() { setSelectedSession(sessionKey); }, []); - // Find the primary running session for the live output panel const activeSession = useMemo(() => { if (selectedSession) { return state.sessions.find( @@ -206,8 +329,10 @@ export default function ObservePage() { return state.sessions.find((s) => s.status === "running"); }, [state.sessions, selectedSession]); + const hasActivity = state.entries.length > 0; + return ( -
+
-
- {/* Session sidebar */} -
- +
+ {/* Left panel: Sessions + Cron Schedule */} +
+
+ +
+
+
+

+ Cron Schedule +

+
+
+ +
+
- {/* Main content: live output + activity feed */} -
- {/* Live output panel — shows streaming text from active session */} - {activeSession && ( - - )} + {/* Center: Live output + Activity feed */} +
+ {activeSession && } - {/* Activity feed */}
- + {hasActivity ? ( + + ) : ( +
+
+

+ {previewsLoading + ? "Loading..." + : "Last Session Activity"} +

+
+
+ +
+
+ )} +
+
+ + {/* Right panel: Initiatives + Recent Memory */} +
+
+
+

+ Strategic Focus +

+
+
+ +
+
+
+
+

+ Recent Memory +

+
+
+ +
diff --git a/src/features/observe/components/CronSchedulePanel.tsx b/src/features/observe/components/CronSchedulePanel.tsx new file mode 100644 index 00000000..f6e35ece --- /dev/null +++ b/src/features/observe/components/CronSchedulePanel.tsx @@ -0,0 +1,91 @@ +type CronJob = { + id: string; + name: string; + agentId?: string; + enabled: boolean; + state: { + nextRunAtMs?: number; + runningAtMs?: number; + lastRunAtMs?: number; + lastStatus?: "ok" | "error" | "skipped"; + lastError?: string; + lastDurationMs?: number; + }; +}; + +type CronSchedulePanelProps = { + jobs: CronJob[]; + loading: boolean; +}; + +const formatRelative = (ms: number | undefined): string => { + if (!ms) return "-"; + const diff = ms - Date.now(); + if (diff < 0) { + const ago = Date.now() - ms; + if (ago < 60_000) return `${Math.floor(ago / 1000)}s ago`; + if (ago < 3_600_000) return `${Math.floor(ago / 60_000)}m ago`; + return `${Math.floor(ago / 3_600_000)}h ago`; + } + if (diff < 60_000) return `in ${Math.floor(diff / 1000)}s`; + if (diff < 3_600_000) return `in ${Math.floor(diff / 60_000)}m`; + return `in ${Math.floor(diff / 3_600_000)}h`; +}; + +const statusColor = (status?: string): string => { + if (status === "ok") return "text-emerald-400"; + if (status === "error") return "text-red-400"; + if (status === "skipped") return "text-amber-400"; + return "text-muted-foreground/50"; +}; + +export const CronSchedulePanel = ({ jobs, loading }: CronSchedulePanelProps) => { + if (loading) { + return ( +
+ Loading schedule... +
+ ); + } + + const enabled = jobs.filter((j) => j.enabled); + const sorted = [...enabled].sort( + (a, b) => (a.state.nextRunAtMs ?? Infinity) - (b.state.nextRunAtMs ?? Infinity) + ); + + return ( +
+ {sorted.map((job) => ( +
+
+ {job.name} +
+ + {job.state.runningAtMs ? "Running" : (job.state.lastStatus ?? "pending")} + + {job.state.lastRunAtMs && !job.state.runningAtMs && ( + + {formatRelative(job.state.lastRunAtMs)} + + )} +
+
+
+ {job.state.runningAtMs ? ( + active + ) : job.state.nextRunAtMs ? ( + + {formatRelative(job.state.nextRunAtMs)} + + ) : null} +
+
+ ))} +
+ ); +}; + +export type { CronJob }; diff --git a/src/features/observe/components/InitiativesPanel.tsx b/src/features/observe/components/InitiativesPanel.tsx new file mode 100644 index 00000000..91110b64 --- /dev/null +++ b/src/features/observe/components/InitiativesPanel.tsx @@ -0,0 +1,81 @@ +type Initiative = { + title: string; + priority: string; + status: string; + summary: string; +}; + +type InitiativesPanelProps = { + initiatives: Initiative[]; + loading: boolean; +}; + +const statusBadge = (status: string): { label: string; className: string } => { + if (status === "blocked") + return { + label: "Blocked", + className: "bg-red-500/15 text-red-400", + }; + if (status === "completed") + return { + label: "Done", + className: "bg-emerald-500/15 text-emerald-400", + }; + return { + label: "Active", + className: "bg-primary/15 text-primary", + }; +}; + +export const InitiativesPanel = ({ + initiatives, + loading, +}: InitiativesPanelProps) => { + if (loading) { + return ( +
+ Loading initiatives... +
+ ); + } + + if (initiatives.length === 0) { + return ( +
+ No initiatives found +
+ ); + } + + return ( +
+ {initiatives.map((initiative, i) => { + const badge = statusBadge(initiative.status); + return ( +
+
+ + {initiative.title} + + + {badge.label} + +
+ {initiative.summary && ( +

+ {initiative.summary} +

+ )} +
+ ); + })} +
+ ); +}; + +export type { Initiative }; diff --git a/src/features/observe/components/RecentActivityPanel.tsx b/src/features/observe/components/RecentActivityPanel.tsx new file mode 100644 index 00000000..fc63559a --- /dev/null +++ b/src/features/observe/components/RecentActivityPanel.tsx @@ -0,0 +1,78 @@ +type PreviewItem = { + role: string; + text: string; + timestamp?: number | string; +}; + +type RecentActivityPanelProps = { + previews: Array<{ + key: string; + status: string; + items: PreviewItem[]; + }>; + loading: boolean; +}; + +const roleIcon = (role: string): string => { + if (role === "user") return "\u25B6"; + if (role === "assistant") return "\u270E"; + if (role === "tool") return "\u2699"; + return "\u2022"; +}; + +const roleColor = (role: string): string => { + if (role === "user") return "text-amber-400/80"; + if (role === "assistant") return "text-foreground/80"; + if (role === "tool") return "text-blue-400/80"; + return "text-muted-foreground/60"; +}; + +export const RecentActivityPanel = ({ + previews, + loading, +}: RecentActivityPanelProps) => { + if (loading) { + return ( +
+ Loading recent activity... +
+ ); + } + + const nonEmpty = previews.filter( + (p) => p.status === "ok" && p.items.length > 0 + ); + + if (nonEmpty.length === 0) { + return ( +
+ No recent activity +
+ ); + } + + return ( +
+ {nonEmpty.map((preview) => ( +
+
+ {preview.key.length > 30 + ? preview.key.slice(0, 30) + "..." + : preview.key} +
+
+ {preview.items.map((item, i) => ( +
+ {roleIcon(item.role)} + {item.text} +
+ ))} +
+
+ ))} +
+ ); +}; diff --git a/src/features/observe/components/RecentMemoryPanel.tsx b/src/features/observe/components/RecentMemoryPanel.tsx new file mode 100644 index 00000000..5e9ecce8 --- /dev/null +++ b/src/features/observe/components/RecentMemoryPanel.tsx @@ -0,0 +1,140 @@ +type RecentMemoryPanelProps = { + memory: string | null; + loading: boolean; +}; + +export const RecentMemoryPanel = ({ + memory, + loading, +}: RecentMemoryPanelProps) => { + if (loading) { + return ( +
+ Loading memory... +
+ ); + } + + if (!memory) { + return ( +
+ No recent memory entries +
+ ); + } + + // Parse memory into structured blocks + const blocks = memory.split(/^### /m).filter(Boolean); + + return ( +
+ {blocks.map((block, i) => { + const lines = block.split("\n").filter(Boolean); + const timestamp = lines[0]?.trim() ?? ""; + const content = lines.slice(1); + + const topics: string[] = []; + const actions: string[] = []; + const tools: string[] = []; + let stats = ""; + + let section = ""; + for (const line of content) { + if (line.startsWith("Topics Discussed:")) { + section = "topics"; + continue; + } + if (line.startsWith("Actions:")) { + section = "actions"; + continue; + } + if (line.startsWith("Tools Used:")) { + section = "tools"; + const toolsPart = line.replace("Tools Used:", "").trim(); + if (toolsPart) tools.push(toolsPart); + continue; + } + if (line.startsWith("Stats:")) { + stats = line.replace("Stats:", "").trim(); + section = ""; + continue; + } + if ( + line.startsWith("Decisions Made:") || + line.startsWith("User Messages:") || + line.startsWith("Errors:") + ) { + section = ""; + continue; + } + + const trimmed = line.replace(/^\s+→\s*/, "").trim(); + if (!trimmed) continue; + + if (section === "topics") topics.push(trimmed); + if (section === "actions") actions.push(trimmed.replace(/^\[action\]\s*/, "")); + if (section === "tools") tools.push(trimmed); + } + + return ( +
+
+ + {timestamp} + + {stats && ( + + {stats} + + )} +
+ + {topics.length > 0 && ( +
+ {topics.map((topic, j) => ( + + {topic} + + ))} +
+ )} + + {actions.length > 0 && ( +
+ {actions.slice(0, 3).map((action, j) => ( +
+ {action} +
+ ))} + {actions.length > 3 && ( + + +{actions.length - 3} more + + )} +
+ )} + + {tools.length > 0 && ( +
+ {tools.map((tool, j) => ( + + {tool} + + ))} +
+ )} +
+ ); + })} +
+ ); +};