From 43954b606d751f87a5006a6916184ded693d9d59 Mon Sep 17 00:00:00 2001 From: baudbot-agent Date: Thu, 5 Mar 2026 17:02:53 +0000 Subject: [PATCH 1/2] feat: add spawn circuit breaker, lifecycle logging, and heartbeat auto-recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses remaining items from #185 (control-agent resilience): After 3 consecutive spawn failures, the circuit opens and rejects new spawn attempts for 5 minutes (cooldown). This prevents resource waste when spawns are failing due to systemic issues (missing API keys, model unavailability, etc.). The circuit transitions: closed → open (after 3 failures) → half-open (after cooldown) → closed (on success) Failure tracking counts tmux spawn failures, readiness timeouts, and aborted readiness checks. Validation errors (bad name, missing model) don't affect the circuit. All spawn events are logged to ~/.pi/agent/logs/worker-lifecycle.jsonl: - spawn_started: when a spawn attempt begins - spawn_success: readiness verified (includes ready_after_ms) - spawn_failed: tmux error, readiness timeout, or abort - circuit_rejected: spawn refused by open circuit New `spawn_status` tool exposes circuit breaker state and recent lifecycle events for observability. Before prompting the control-agent about failures, the heartbeat now attempts automatic recovery for two failure types: - Bridge down: kills existing bridge tmux session, clears port holders, runs startup-pi.sh, verifies bridge comes back - Orphaned dev-agents: kills tmux session, removes stale alias If recovery succeeds, no LLM tokens are consumed (same as healthy check). Only unrecoverable failures prompt the agent. Recovery actions are logged to ~/.pi/agent/logs/auto-recovery.jsonl for audit and debugging. - Fixed test harness to handle multiple tool registrations - Added circuit breaker test (3 failures → open → rejected) - Added spawn_status tool registration test - All 128 tests pass (73 heartbeat + 6 agent-spawn + 49 memory) Refs #185 Co-authored-by: Darcy Clarke --- pi/extensions/agent-spawn.test.mjs | 60 +++++++- pi/extensions/agent-spawn.ts | 229 ++++++++++++++++++++++++++++- pi/extensions/heartbeat.ts | 194 +++++++++++++++++++++++- 3 files changed, 471 insertions(+), 12 deletions(-) diff --git a/pi/extensions/agent-spawn.test.mjs b/pi/extensions/agent-spawn.test.mjs index 217fb2f..2befdc2 100644 --- a/pi/extensions/agent-spawn.test.mjs +++ b/pi/extensions/agent-spawn.test.mjs @@ -13,16 +13,16 @@ function randomId() { } function createExtensionHarness(execImpl) { - let registeredTool = null; + const registeredTools = {}; const pi = { registerTool(tool) { - registeredTool = tool; + registeredTools[tool.name] = tool; }, exec: execImpl, }; agentSpawnExtension(pi); - if (!registeredTool) throw new Error("agent_spawn tool was not registered"); - return registeredTool; + if (!registeredTools.agent_spawn) throw new Error("agent_spawn tool was not registered"); + return registeredTools.agent_spawn; } function startUnixSocketServer(socketPath) { @@ -242,4 +242,56 @@ describe("agent_spawn extension tool", () => { expect(result.details.aborted).toBe(true); expect(Date.now() - startedAt).toBeLessThan(1000); }); + + it("opens circuit breaker after 3 consecutive failures", async () => { + const root = mkdtempSync(path.join(tmpdir(), "agent-spawn-test-")); + tempDirs.push(root); + const worktree = path.join(root, "worktree"); + const skillPath = path.join(root, "dev-skill"); + const controlDir = path.join(root, "session-control"); + process.env[CONTROL_DIR_ENV] = controlDir; + mkdirSync(worktree, { recursive: true }); + mkdirSync(skillPath, { recursive: true }); + mkdirSync(controlDir, { recursive: true }); + + // Spawns succeed at tmux level but readiness always times out (1s timeout) + const execSpy = vi.fn(async () => ({ stdout: "", stderr: "", code: 0, killed: false })); + const tool = createExtensionHarness(execSpy); + + const params = { + session_name: `dev-agent-circuit-${randomId()}`, + cwd: worktree, + skill_path: skillPath, + model: "anthropic/claude-opus-4-6", + ready_timeout_sec: 1, + }; + + // Fail 3 times (readiness timeout) + for (let i = 0; i < 3; i++) { + params.session_name = `dev-agent-circuit-${randomId()}`; + const result = await tool.execute("id", params, undefined, undefined, {}); + expect(result.isError).toBe(true); + expect(result.details.error).toBe("readiness_timeout"); + } + + // 4th attempt should be rejected by circuit breaker + params.session_name = `dev-agent-circuit-${randomId()}`; + const rejected = await tool.execute("id", params, undefined, undefined, {}); + expect(rejected.isError).toBe(true); + expect(rejected.details.error).toBe("circuit_open"); + expect(String(rejected.content[0].text)).toContain("Circuit breaker OPEN"); + }); + + it("exposes spawn_status tool", () => { + const registeredTools = {}; + const pi = { + registerTool(tool) { + registeredTools[tool.name] = tool; + }, + exec: async () => ({ stdout: "", stderr: "", code: 0 }), + }; + agentSpawnExtension(pi); + expect(registeredTools.spawn_status).toBeDefined(); + expect(registeredTools.spawn_status.name).toBe("spawn_status"); + }); }); diff --git a/pi/extensions/agent-spawn.ts b/pi/extensions/agent-spawn.ts index bd3eb54..64c9ab4 100644 --- a/pi/extensions/agent-spawn.ts +++ b/pi/extensions/agent-spawn.ts @@ -1,6 +1,6 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; -import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; +import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; import net from "node:net"; import { homedir } from "node:os"; import { dirname, join, resolve } from "node:path"; @@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200; const SOCKET_PROBE_TIMEOUT_MS = 300; const TMUX_SPAWN_TIMEOUT_MS = 15_000; +// Circuit breaker defaults +const CIRCUIT_FAILURE_THRESHOLD = 3; +const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes + type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted"; +// ── Circuit Breaker ───────────────────────────────────────────────────────── + +type CircuitState = "closed" | "open" | "half-open"; + +type CircuitBreaker = { + state: CircuitState; + consecutiveFailures: number; + lastFailureAt: number | null; + lastSuccessAt: number | null; + totalFailures: number; + totalSuccesses: number; +}; + +function createCircuitBreaker(): CircuitBreaker { + return { + state: "closed", + consecutiveFailures: 0, + lastFailureAt: null, + lastSuccessAt: null, + totalFailures: 0, + totalSuccesses: 0, + }; +} + +function recordSuccess(cb: CircuitBreaker): void { + cb.consecutiveFailures = 0; + cb.lastSuccessAt = Date.now(); + cb.totalSuccesses++; + cb.state = "closed"; +} + +function recordFailure(cb: CircuitBreaker): void { + cb.consecutiveFailures++; + cb.lastFailureAt = Date.now(); + cb.totalFailures++; + if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) { + cb.state = "open"; + } +} + +function isCircuitOpen(cb: CircuitBreaker): boolean { + if (cb.state !== "open") return false; + // Check if cooldown has elapsed → transition to half-open + if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) { + cb.state = "half-open"; + return false; + } + return true; +} + +function circuitStatus(cb: CircuitBreaker): string { + const cooldownRemaining = + cb.state === "open" && cb.lastFailureAt + ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt)) + : 0; + return [ + `State: ${cb.state}`, + `Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`, + `Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`, + `Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`, + `Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`, + cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "", + ] + .filter(Boolean) + .join("\n "); +} + +// ── Lifecycle Log ─────────────────────────────────────────────────────────── + +const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl"); + +type LifecycleEvent = { + timestamp: string; + session_name: string; + event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected"; + stage?: string; + ready_after_ms?: number; + error?: string; +}; + +function logLifecycleEvent(event: LifecycleEvent): void { + try { + mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true }); + appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n"); + } catch { + // Best-effort — don't break spawn on logging failure + } +} + type ReadinessResult = { ready: boolean; aborted: boolean; @@ -192,11 +285,14 @@ type AgentSpawnInput = { }; export default function agentSpawnExtension(pi: ExtensionAPI): void { + const circuit = createCircuitBreaker(); + pi.registerTool({ name: "agent_spawn", label: "Agent Spawn", description: - "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.", + "Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " + + "Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.", parameters: Type.Object({ session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }), cwd: Type.String({ description: "Working directory for the new session" }), @@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { const model = input.model?.trim(); const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec); + // Circuit breaker check + if (isCircuitOpen(circuit)) { + const cooldownLeft = circuit.lastFailureAt + ? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt)) + : 0; + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName || "unknown", + event: "circuit_rejected", + error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`, + }); + return { + content: [{ + type: "text", + text: + `⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` + + `Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` + + `Investigate the root cause (check logs, API keys, model availability).`, + }], + isError: true, + details: { + error: "circuit_open", + circuit: { + state: circuit.state, + consecutive_failures: circuit.consecutiveFailures, + cooldown_remaining_sec: Math.round(cooldownLeft / 1000), + last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null, + }, + }, + }; + } + if (!sessionName || !isSafeName(sessionName)) { return { content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }], @@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; } + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_started", + }); + const tmuxCommand = [ `cd ${shellQuote(cwdPath)}`, 'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"', @@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { ); if (spawnResult.code !== 0) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: "spawn", + error: `tmux exit code ${spawnResult.code}`, + }); return { content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }], isError: true, @@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { stdout: spawnResult.stdout, stderr: spawnResult.stderr, exit_code: spawnResult.code, + circuit_state: circuit.state, + circuit_failures: circuit.consecutiveFailures, }, }; } @@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { ready_after_ms: readiness.readyAfterMs, stage: readiness.stage, error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout", + circuit_state: circuit.state, + circuit_failures: circuit.consecutiveFailures, }; if (readiness.aborted) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: "aborted", + error: "readiness_aborted", + }); return { content: [{ type: "text", @@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { } if (!readiness.ready) { + recordFailure(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_failed", + stage: readiness.stage, + ready_after_ms: readiness.readyAfterMs, + error: "readiness_timeout", + }); return { content: [{ type: "text", @@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; } + recordSuccess(circuit); + logLifecycleEvent({ + timestamp: new Date().toISOString(), + session_name: sessionName, + event: "spawn_success", + stage: readiness.stage, + ready_after_ms: readiness.readyAfterMs, + }); + return { content: [{ type: "text", @@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; }, }); + + // ── spawn_status tool ───────────────────────────────────────────────────── + + pi.registerTool({ + name: "spawn_status", + label: "Spawn Status", + description: + "Check the agent_spawn circuit breaker state and recent worker lifecycle events.", + parameters: Type.Object({}), + async execute() { + let recentEvents = ""; + try { + const { execSync } = require("node:child_process"); + const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" }); + if (tail.trim()) { + const lines = tail.trim().split("\n"); + recentEvents = lines + .map((line: string) => { + try { + const e = JSON.parse(line) as LifecycleEvent; + return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`; + } catch { + return ` (unparseable)`; + } + }) + .join("\n"); + } + } catch { + recentEvents = " (no lifecycle log)"; + } + + return { + content: [{ + type: "text" as const, + text: [ + "Spawn Circuit Breaker:", + ` ${circuitStatus(circuit)}`, + "", + "Recent lifecycle events:", + recentEvents || " (none)", + ].join("\n"), + }], + details: { + circuit: { + state: circuit.state, + consecutive_failures: circuit.consecutiveFailures, + total_successes: circuit.totalSuccesses, + total_failures: circuit.totalFailures, + }, + }, + }; + }, + }); } diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index ab0fb9e..3abf9a9 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -25,9 +25,9 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; import { StringEnum } from "@mariozechner/pi-ai"; -import { closeSync, existsSync, openSync, readdirSync, readFileSync, readSync, statSync } from "node:fs"; +import { appendFileSync, closeSync, existsSync, mkdirSync, openSync, readdirSync, readFileSync, readSync, statSync } from "node:fs"; import { homedir } from "node:os"; -import { join } from "node:path"; +import { dirname, join } from "node:path"; import { discoverSubagentPackages, readSubagentState, resolveEffectiveState } from "./subagent-registry.ts"; const DEFAULT_INTERVAL_MS = 10 * 60 * 1000; // 10 minutes @@ -608,6 +608,155 @@ function hasDevAgentForTodo(todoId: string): boolean { } } +// ── Auto-Recovery ─────────────────────────────────────────────────────────── + +const RECOVERY_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "auto-recovery.jsonl"); + +type RecoveryAction = { + timestamp: string; + check: string; + action: string; + success: boolean; + detail?: string; +}; + +function logRecovery(entry: RecoveryAction): void { + try { + mkdirSync(dirname(RECOVERY_LOG_PATH), { recursive: true }); + appendFileSync(RECOVERY_LOG_PATH, JSON.stringify(entry) + "\n"); + } catch { + // Best-effort + } +} + +/** + * Attempt automatic recovery for certain failure types. + * Returns an array of results describing what was attempted and whether it worked. + * Only performs safe, idempotent actions: + * - Restart bridge tmux session + * - Kill orphaned dev-agent tmux sessions and remove stale aliases + */ +async function tryAutoRecover(failures: CheckResult[]): Promise { + const actions: RecoveryAction[] = []; + const { execSync } = require("node:child_process"); + + for (const failure of failures) { + // Auto-recover: bridge down → restart the bridge tmux session + if (failure.name === "bridge") { + try { + // Find control-agent UUID from alias + const controlAlias = join(SOCKET_DIR, "control-agent.alias"); + if (!existsSync(controlAlias)) { + actions.push({ + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: "Cannot restart bridge: control-agent.alias not found", + }); + continue; + } + + // Kill existing bridge tmux session + try { + execSync('tmux kill-session -t baudbot-gateway-bridge 2>/dev/null', { timeout: 5000 }); + } catch { + // May not exist — that's fine + } + + // Kill anything holding port 7890 + try { + execSync('lsof -ti :7890 2>/dev/null | xargs kill -9 2>/dev/null', { timeout: 5000 }); + } catch { + // Nothing holding port — fine + } + + // Restart via startup script + const startupScript = join(homedir(), ".pi", "agent", "skills", "control-agent", "startup-pi.sh"); + if (existsSync(startupScript)) { + // Get live session UUIDs from session-control dir + const sockFiles = readdirSync(SOCKET_DIR).filter((f) => f.endsWith(".sock")); + const uuids = sockFiles.map((f) => f.replace(".sock", "")).join(" "); + if (uuids) { + execSync(`bash "${startupScript}" ${uuids} 2>&1`, { + timeout: 30000, + encoding: "utf-8", + }); + + // Verify bridge came back + await new Promise((resolve) => setTimeout(resolve, 3000)); + const verifyResult = await checkBridge(); + + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: verifyResult.ok, + detail: verifyResult.ok + ? "Bridge restarted and verified healthy" + : `Bridge restart attempted but still failing: ${verifyResult.detail}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + } catch (err: any) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: `Recovery failed: ${err.message || err}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + + // Auto-recover: orphaned dev-agent → kill tmux session + remove alias + if (failure.name.startsWith("orphan:")) { + const sessionName = failure.name.replace("orphan:", ""); + try { + // Kill the tmux session + try { + execSync(`tmux kill-session -t "${sessionName}" 2>/dev/null`, { timeout: 5000 }); + } catch { + // May already be dead + } + + // Remove the stale alias + const aliasPath = join(SOCKET_DIR, `${sessionName}.alias`); + if (existsSync(aliasPath)) { + const { unlinkSync } = require("node:fs"); + unlinkSync(aliasPath); + } + + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "orphan_cleanup", + success: true, + detail: `Killed tmux session and removed alias for orphaned dev-agent "${sessionName}"`, + }; + actions.push(entry); + logRecovery(entry); + } catch (err: any) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "orphan_cleanup", + success: false, + detail: `Cleanup failed for "${sessionName}": ${err.message || err}`, + }; + actions.push(entry); + logRecovery(entry); + } + } + } + + return actions; +} + // ── Extension ─────────────────────────────────────────────────────────────── export default function heartbeatExtension(pi: ExtensionAPI): void { @@ -682,19 +831,52 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { return; } - // Something is wrong — inject a prompt so the control-agent can fix it - const failureList = failures + // Attempt auto-recovery for recoverable failures before prompting the agent + const recoveryActions = await tryAutoRecover(failures); + const successfulRecoveries = recoveryActions.filter((a) => a.success); + + // Re-check: remove failures that were successfully auto-recovered + const recoveredChecks = new Set(successfulRecoveries.map((a) => a.check)); + const remainingFailures = failures.filter((f) => !recoveredChecks.has(f.name)); + + // If all failures were auto-recovered, no need to prompt the agent + if (remainingFailures.length === 0) { + state.lastFailures = []; + if (successfulRecoveries.length > 0) { + state.lastFailures = successfulRecoveries.map( + (a) => `auto-recovered: ${a.check} — ${a.detail}`, + ); + } + state.consecutiveErrors = 0; + saveState(); + armTimer(); + return; + } + + // Build prompt with both failures and recovery attempt details + const failureList = remainingFailures .map((f) => `- **${f.name}**: ${f.detail}`) .join("\n"); + const recoveryInfo = recoveryActions.length > 0 + ? [ + "", + "**Auto-recovery attempted:**", + ...recoveryActions.map((a) => + `- ${a.success ? "✅" : "❌"} ${a.action} (${a.check}): ${a.detail}`, + ), + ].join("\n") + : ""; + const prompt = [ `🫀 **Heartbeat** (run #${state.totalRuns}, ${new Date(now).toISOString()})`, ``, - `**${failures.length} health check failure(s) detected** — take action:`, + `**${remainingFailures.length} health check failure(s) remain** — take action:`, ``, failureList, + recoveryInfo, ``, - `All other checks passed. Fix the issues above and report what you did.`, + `Fix the remaining issues above and report what you did.`, ].join("\n"); pi.sendMessage( From cafb91b6991fd9f24f4dbe1f2fc10a0d4039507a Mon Sep 17 00:00:00 2001 From: baudbot-agent Date: Fri, 13 Mar 2026 22:07:35 +0000 Subject: [PATCH 2/2] fix: address PR #198 review feedback from Greptile and Sentry - Fix circuit breaker race: separate isCircuitOpen check from half-open state transition, call transitionToHalfOpen only after all input validation passes (Sentry) - Fix orphan cleanup error swallowing: inspect tmux kill-session errors, only treat 'session not found' as success, report real failures (Sentry) - Fix bridge recovery gaps: add explicit failure actions when startup script is missing or no live socket UUIDs found (Greptile) - Fix shell injection: use execFileSync for tmux commands instead of execSync with string interpolation (Greptile) - Replace all dynamic require() calls with static ES module imports in both agent-spawn.ts and heartbeat.ts (Greptile) - Replace shell tail command with readFileSync for lifecycle log reading in spawn_status tool (Greptile) --- pi/extensions/agent-spawn.ts | 25 +++++--- pi/extensions/heartbeat.ts | 110 +++++++++++++++++++++++------------ 2 files changed, 92 insertions(+), 43 deletions(-) diff --git a/pi/extensions/agent-spawn.ts b/pi/extensions/agent-spawn.ts index 64c9ab4..fc8137e 100644 --- a/pi/extensions/agent-spawn.ts +++ b/pi/extensions/agent-spawn.ts @@ -1,6 +1,6 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; -import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs"; +import { appendFileSync, existsSync, mkdirSync, readFileSync, readlinkSync, statSync } from "node:fs"; import net from "node:net"; import { homedir } from "node:os"; import { dirname, join, resolve } from "node:path"; @@ -63,14 +63,20 @@ function recordFailure(cb: CircuitBreaker): void { function isCircuitOpen(cb: CircuitBreaker): boolean { if (cb.state !== "open") return false; - // Check if cooldown has elapsed → transition to half-open + // Check if cooldown has elapsed — eligible for half-open probe if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) { - cb.state = "half-open"; return false; } return true; } +/** Transition to half-open state. Call only after input validation passes. */ +function transitionToHalfOpen(cb: CircuitBreaker): void { + if (cb.state === "open" && cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) { + cb.state = "half-open"; + } +} + function circuitStatus(cb: CircuitBreaker): string { const cooldownRemaining = cb.state === "open" && cb.lastFailureAt @@ -401,6 +407,10 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { }; } + // All validation passed — now safe to transition circuit to half-open + // (allows exactly one probe attempt to test recovery) + transitionToHalfOpen(circuit); + logLifecycleEvent({ timestamp: new Date().toISOString(), session_name: sessionName, @@ -542,10 +552,11 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void { async execute() { let recentEvents = ""; try { - const { execSync } = require("node:child_process"); - const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" }); - if (tail.trim()) { - const lines = tail.trim().split("\n"); + if (existsSync(LIFECYCLE_LOG_PATH)) { + const lines = readFileSync(LIFECYCLE_LOG_PATH, "utf-8") + .trimEnd() + .split("\n") + .slice(-20); recentEvents = lines .map((line: string) => { try { diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 3abf9a9..ce392c6 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -25,7 +25,8 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; import { StringEnum } from "@mariozechner/pi-ai"; -import { appendFileSync, closeSync, existsSync, mkdirSync, openSync, readdirSync, readFileSync, readSync, statSync } from "node:fs"; +import { appendFileSync, closeSync, existsSync, mkdirSync, openSync, readdirSync, readFileSync, readlinkSync, readSync, statSync, unlinkSync } from "node:fs"; +import { execFileSync, execSync } from "node:child_process"; import { homedir } from "node:os"; import { dirname, join } from "node:path"; import { discoverSubagentPackages, readSubagentState, resolveEffectiveState } from "./subagent-registry.ts"; @@ -132,7 +133,6 @@ function checkSessions(): CheckResult[] { // Check that the symlink target (.sock file) exists try { - const { readlinkSync } = require("node:fs"); const target = readlinkSync(aliasPath); const sockPath = join(SOCKET_DIR, target); if (!existsSync(sockPath)) { @@ -353,8 +353,7 @@ function checkUnansweredMentions(): CheckResult[] { // Support both bridge implementations: // - broker-bridge.mjs: "... (type: app_mention, ts: 1234.5678)" // - bridge.mjs: "app_mention ... ts: 1234.5678" - const { execSync } = require("node:child_process"); - const logTail = execSync(`tail -500 "${bridgeLogPath}"`, { encoding: "utf-8" }); + const logTail = execFileSync("tail", ["-500", bridgeLogPath], { encoding: "utf-8" }); const mentionThreadTsSet = new Set(extractMentionThreadTs(logTail)); @@ -638,7 +637,6 @@ function logRecovery(entry: RecoveryAction): void { */ async function tryAutoRecover(failures: CheckResult[]): Promise { const actions: RecoveryAction[] = []; - const { execSync } = require("node:child_process"); for (const failure of failures) { // Auto-recover: bridge down → restart the bridge tmux session @@ -659,7 +657,7 @@ async function tryAutoRecover(failures: CheckResult[]): Promise/dev/null', { timeout: 5000 }); + execFileSync("tmux", ["kill-session", "-t", "baudbot-gateway-bridge"], { timeout: 5000 }); } catch { // May not exist — that's fine } @@ -673,33 +671,56 @@ async function tryAutoRecover(failures: CheckResult[]): Promise f.endsWith(".sock")); - const uuids = sockFiles.map((f) => f.replace(".sock", "")).join(" "); - if (uuids) { - execSync(`bash "${startupScript}" ${uuids} 2>&1`, { - timeout: 30000, - encoding: "utf-8", - }); - - // Verify bridge came back - await new Promise((resolve) => setTimeout(resolve, 3000)); - const verifyResult = await checkBridge(); + if (!existsSync(startupScript)) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: `Bridge session killed but startup script not found at ${startupScript} — cannot restart`, + }; + actions.push(entry); + logRecovery(entry); + continue; + } - const entry: RecoveryAction = { - timestamp: new Date().toISOString(), - check: failure.name, - action: "bridge_restart", - success: verifyResult.ok, - detail: verifyResult.ok - ? "Bridge restarted and verified healthy" - : `Bridge restart attempted but still failing: ${verifyResult.detail}`, - }; - actions.push(entry); - logRecovery(entry); - } + // Get live session UUIDs from session-control dir + const sockFiles = readdirSync(SOCKET_DIR).filter((f) => f.endsWith(".sock")); + const uuids = sockFiles.map((f) => f.replace(".sock", "")); + if (uuids.length === 0) { + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: false, + detail: "Bridge session killed but no live socket UUIDs found — cannot restart", + }; + actions.push(entry); + logRecovery(entry); + continue; } + + // Pass UUIDs as separate args to avoid shell injection + execFileSync("bash", [startupScript, ...uuids], { + timeout: 30000, + encoding: "utf-8", + }); + + // Verify bridge came back + await new Promise((resolve) => setTimeout(resolve, 3000)); + const verifyResult = await checkBridge(); + + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "bridge_restart", + success: verifyResult.ok, + detail: verifyResult.ok + ? "Bridge restarted and verified healthy" + : `Bridge restart attempted but still failing: ${verifyResult.detail}`, + }; + actions.push(entry); + logRecovery(entry); } catch (err: any) { const entry: RecoveryAction = { timestamp: new Date().toISOString(), @@ -717,17 +738,34 @@ async function tryAutoRecover(failures: CheckResult[]): Promise/dev/null`, { timeout: 5000 }); - } catch { - // May already be dead + execFileSync("tmux", ["kill-session", "-t", sessionName], { timeout: 5000 }); + tmuxKilled = true; + } catch (killErr: any) { + // "session not found" means it's already dead — that's fine + const msg = killErr.message || String(killErr); + if (msg.includes("session not found") || msg.includes("can't find session")) { + tmuxKilled = true; // Already gone — counts as success + } else { + // Real error (tmux daemon down, permissions, timeout, etc.) + const entry: RecoveryAction = { + timestamp: new Date().toISOString(), + check: failure.name, + action: "orphan_cleanup", + success: false, + detail: `Failed to kill tmux session "${sessionName}": ${msg}`, + }; + actions.push(entry); + logRecovery(entry); + continue; + } } // Remove the stale alias const aliasPath = join(SOCKET_DIR, `${sessionName}.alias`); if (existsSync(aliasPath)) { - const { unlinkSync } = require("node:fs"); unlinkSync(aliasPath); }