Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/adapters/openclaw-readonly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { loadProjectStore } from "../runtime/project-store";
import { computeProjectSummaries } from "../runtime/project-summary";
import { loadTaskStore } from "../runtime/task-store";
import { computeTasksSummary } from "../runtime/task-summary";
import { inferSubagentStatsFromKeys } from "../runtime/subagent-tree";

/**
* Official-first adapter (read path only).
Expand Down Expand Up @@ -72,6 +73,8 @@ export class OpenClawReadonlyAdapter {
});
}

const subagentStats = inferSubagentStatsFromKeys(sessions, statuses);

return {
sessions,
statuses,
Expand All @@ -82,6 +85,7 @@ export class OpenClawReadonlyAdapter {
tasks,
tasksSummary,
budgetSummary,
subagentStats,
generatedAt: new Date().toISOString(),
};
}
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export const TASK_HEARTBEAT_MAX_TASKS_PER_RUN = parsePositiveInt(
3,
);

export const SUBAGENT_MAX_DEPTH_WARN = parsePositiveInt(process.env.SUBAGENT_MAX_DEPTH_WARN, 5);

export const POLLING_INTERVALS_MS = {
sessionsList: 5000,
sessionStatus: 2000,
Expand Down
51 changes: 50 additions & 1 deletion src/runtime/commander.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { listTasks } from "./task-store";
import { SUBAGENT_MAX_DEPTH_WARN } from "../config";
import type {
AlertLevel,
BudgetEvaluation,
Expand All @@ -18,7 +19,9 @@ export interface CommanderAlert {
| "HAS_BLOCKED"
| "HAS_PENDING_APPROVALS"
| "HAS_OVER_BUDGET"
| "HAS_TASKS_DUE";
| "HAS_TASKS_DUE"
| "HAS_SUBAGENT_ERRORS"
| "HAS_DEEP_CHAIN";
message: string;
route: "timeline" | "operator-watch" | "action-queue";
}
Expand Down Expand Up @@ -81,6 +84,26 @@ export function commanderAlerts(snapshot: ReadModelSnapshot): CommanderAlert[] {
});
}

if (snapshot.subagentStats) {
const stats = snapshot.subagentStats;
if (stats.errorNodes > 0) {
alerts.push({
level: "action-required",
code: "HAS_SUBAGENT_ERRORS",
message: `${stats.errorNodes} subagent session(s) are in error state.`,
route: routeForLevel("action-required"),
});
}
if (stats.maxDepth >= SUBAGENT_MAX_DEPTH_WARN) {
alerts.push({
level: "warn",
code: "HAS_DEEP_CHAIN",
message: `Subagent chain depth ${stats.maxDepth} exceeds threshold ${SUBAGENT_MAX_DEPTH_WARN}.`,
route: routeForLevel("warn"),
});
}
}

return alerts;
}

Expand Down Expand Up @@ -197,6 +220,32 @@ export function commanderExceptionsFeed(snapshot: ReadModelSnapshot): CommanderE
});
}

if (snapshot.subagentStats) {
const stats = snapshot.subagentStats;
if (stats.errorNodes > 0) {
items.push({
level: "action-required",
code: "SUBAGENT_ERROR",
source: "subagent",
sourceId: "subagent-tree",
message: `${stats.errorNodes} subagent session(s) are in error state.`,
route: routeForLevel("action-required"),
occurredAt: fallbackOccurredAt,
});
}
if (stats.maxDepth >= SUBAGENT_MAX_DEPTH_WARN) {
items.push({
level: "warn",
code: "SUBAGENT_DEEP_CHAIN",
source: "subagent",
sourceId: "subagent-tree",
message: `Subagent chain depth ${stats.maxDepth} exceeds threshold ${SUBAGENT_MAX_DEPTH_WARN}.`,
route: routeForLevel("warn"),
occurredAt: fallbackOccurredAt,
});
}
}

const sortedItems = [...items].sort(compareFeedItems);

return {
Expand Down
213 changes: 213 additions & 0 deletions src/runtime/subagent-tree.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import type { AgentRunState, ReadModelSnapshot, SessionStatusSnapshot, SessionSummary, SubagentTreeStats } from "../types";
import type { SessionExecutionChainSummary } from "./session-conversations";

export interface SubagentNode {
sessionKey: string;
agentId?: string;
label?: string;
state: AgentRunState;
model?: string;
tokensIn: number;
tokensOut: number;
totalTokens: number;
cost: number;
lastActivity?: string;
parentSessionKey?: string;
children: SubagentNode[];
depth: number;
stage: SessionExecutionChainSummary["stage"];
}

export interface SubagentTreeSnapshot {
roots: SubagentNode[];
totalNodes: number;
activeNodes: number;
idleNodes: number;
errorNodes: number;
blockedNodes: number;
totalCost: number;
totalTokens: number;
maxDepth: number;
generatedAt: string;
}

interface SessionWithChain {
session: SessionSummary;
status?: SessionStatusSnapshot;
executionChain?: SessionExecutionChainSummary;
}

export function buildSubagentTree(
sessions: SessionWithChain[],
snapshot: ReadModelSnapshot,
): SubagentTreeSnapshot {
const statusByKey = new Map<string, SessionStatusSnapshot>();
for (const s of snapshot.statuses) {
statusByKey.set(s.sessionKey, s);
}

const childrenMap = new Map<string, SessionWithChain[]>();
const allKeys = new Set<string>();
const hasParent = new Set<string>();

for (const item of sessions) {
allKeys.add(item.session.sessionKey);
const parentKey = item.executionChain?.parentSessionKey;
if (parentKey && parentKey !== item.session.sessionKey) {
hasParent.add(item.session.sessionKey);
const list = childrenMap.get(parentKey) ?? [];
list.push(item);
childrenMap.set(parentKey, list);
}
}

const sessionByKey = new Map<string, SessionWithChain>();
for (const item of sessions) {
sessionByKey.set(item.session.sessionKey, item);
}

function buildNode(item: SessionWithChain, depth: number): SubagentNode {
const status = item.status ?? statusByKey.get(item.session.sessionKey);
const children = (childrenMap.get(item.session.sessionKey) ?? []).map((child) =>
buildNode(child, depth + 1),
);

return {
sessionKey: item.session.sessionKey,
agentId: item.session.agentId,
label: item.session.label,
state: item.session.state,
model: status?.model,
tokensIn: status?.tokensIn ?? 0,
tokensOut: status?.tokensOut ?? 0,
totalTokens: (status?.tokensIn ?? 0) + (status?.tokensOut ?? 0),
cost: status?.cost ?? 0,
lastActivity: item.session.lastMessageAt ?? status?.updatedAt,
parentSessionKey: item.executionChain?.parentSessionKey,
children,
depth,
stage: item.executionChain?.stage ?? "idle",
};
}

const roots: SubagentNode[] = [];
for (const item of sessions) {
if (!hasParent.has(item.session.sessionKey)) {
roots.push(buildNode(item, 0));
}
}

roots.sort((a, b) => {
const aTime = a.lastActivity ? Date.parse(a.lastActivity) : 0;
const bTime = b.lastActivity ? Date.parse(b.lastActivity) : 0;
return bTime - aTime;
});

const stats = collectStats(roots);

return {
roots,
...stats,
generatedAt: new Date().toISOString(),
};
}

function collectStats(nodes: SubagentNode[]): {
totalNodes: number;
activeNodes: number;
idleNodes: number;
errorNodes: number;
blockedNodes: number;
totalCost: number;
totalTokens: number;
maxDepth: number;
} {
let totalNodes = 0;
let activeNodes = 0;
let idleNodes = 0;
let errorNodes = 0;
let blockedNodes = 0;
let totalCost = 0;
let totalTokens = 0;
let maxDepth = 0;

function walk(node: SubagentNode): void {
totalNodes++;
if (node.state === "running") activeNodes++;
if (node.state === "idle") idleNodes++;
if (node.state === "error") errorNodes++;
if (node.state === "blocked" || node.state === "waiting_approval") blockedNodes++;
totalCost += node.cost;
totalTokens += node.totalTokens;
if (node.depth > maxDepth) maxDepth = node.depth;
for (const child of node.children) walk(child);
}

for (const root of nodes) walk(root);

return { totalNodes, activeNodes, idleNodes, errorNodes, blockedNodes, totalCost, totalTokens, maxDepth };
}

export function flattenTree(roots: SubagentNode[]): SubagentNode[] {
const result: SubagentNode[] = [];
function walk(node: SubagentNode): void {
result.push(node);
for (const child of node.children) walk(child);
}
for (const root of roots) walk(root);
return result;
}

/**
* Lightweight subagent stats from session key patterns only (no history needed).
* Uses `:run:` marker in session keys to infer parent-child relationships.
*/
export function inferSubagentStatsFromKeys(
sessions: SessionSummary[],
statuses: SessionStatusSnapshot[],
): SubagentTreeStats {
const RUN_MARKER = ":run:";
const statusByKey = new Map(statuses.map((s) => [s.sessionKey, s]));
const childKeys = new Set<string>();
let maxDepth = 0;

for (const session of sessions) {
let depth = 0;
let key = session.sessionKey;
while (key.includes(RUN_MARKER)) {
depth++;
const idx = key.indexOf(RUN_MARKER);
key = key.slice(0, idx);
}
if (depth > 0) childKeys.add(session.sessionKey);
if (depth > maxDepth) maxDepth = depth;
}

let activeNodes = 0;
let idleNodes = 0;
let errorNodes = 0;
let blockedNodes = 0;
let totalCost = 0;
let totalTokens = 0;

for (const session of sessions) {
if (session.state === "running") activeNodes++;
if (session.state === "idle") idleNodes++;
if (session.state === "error") errorNodes++;
if (session.state === "blocked" || session.state === "waiting_approval") blockedNodes++;
const status = statusByKey.get(session.sessionKey);
totalCost += status?.cost ?? 0;
totalTokens += (status?.tokensIn ?? 0) + (status?.tokensOut ?? 0);
}

return {
totalNodes: sessions.length,
activeNodes,
idleNodes,
errorNodes,
blockedNodes,
totalCost,
totalTokens,
maxDepth,
};
}
18 changes: 16 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,17 @@ export interface BudgetPolicyConfig {
task: Record<string, BudgetThresholds>;
}

export interface SubagentTreeStats {
totalNodes: number;
activeNodes: number;
idleNodes: number;
errorNodes: number;
blockedNodes: number;
totalCost: number;
totalTokens: number;
maxDepth: number;
}

export interface ReadModelSnapshot {
sessions: SessionSummary[];
statuses: SessionStatusSnapshot[];
Expand All @@ -183,6 +194,7 @@ export interface ReadModelSnapshot {
tasks: TaskStoreSnapshot;
tasksSummary: TasksSummary;
budgetSummary: BudgetSummary;
subagentStats?: SubagentTreeStats;
generatedAt: string;
}

Expand Down Expand Up @@ -224,8 +236,10 @@ export interface ExceptionFeedItem {
| "SESSION_ERROR"
| "PENDING_APPROVAL"
| "OVER_BUDGET"
| "TASK_DUE";
source: "system" | "session" | "approval" | "budget" | "task";
| "TASK_DUE"
| "SUBAGENT_ERROR"
| "SUBAGENT_DEEP_CHAIN";
source: "system" | "session" | "approval" | "budget" | "task" | "subagent";
sourceId: string;
message: string;
route: "timeline" | "operator-watch" | "action-queue";
Expand Down
Loading