Skip to content

Commit e09516f

Browse files
refactor: agent state machine, deterministic result passing, overflow-to-file (#57)
* refactor: deterministic agent result passing + completion notification injection Three architectural changes to the multi-agent communication system: 1. Deterministic result passing (no fallback): - agent/completed notification now carries reason + result fields - Hub agent_io_loop extracts result from agent/completed only - Removed last_stream/completion_output fallback in agent_io_loop - AgentOutput.result flows: runtime → server → IPC → Hub (single path) 2. Completion notification injection (push model): - MessageSource::System variant for Hub-generated notifications - Hub deliver_to_parent() sends Envelope to parent's completion_tx - completion_bridge forwards from Hub channel to parent via IPC - session_forward handles agent/message notifications for injection - System messages are ephemeral (not persisted to session) 3. Remove max_turns limit: - TerminateReason::MaxTurns, AgentEventPayload::MaxTurnsReached removed - Settings.max_turns, AgentConfig.max_turns fields removed - LOOPAL_MAX_TURNS env var handling removed - Agent loop runs without artificial turn limits Agent tool updated: foreground (blocking) is default, multiple foreground spawns in one turn execute in parallel via JoinSet. Background mode reserved for when parent has independent work to do. * refactor: remove AttemptCompletion tool — agent output is streaming text AttemptCompletion was a flawed abstraction that: 1. Forced LLMs to re-summarize content already output as streaming text 2. Used the compressed summary as agent output, losing information 3. Conflated "I'm done" signal with content delivery After removal, agent completion works naturally: - Agent stops when LLM generates no tool calls (existing path) - Agent output = last assistant streaming text (TurnOutput.output) - No special tool or is_completion flag needed Removed across the entire stack: - AttemptCompletionTool definition and registration - ToolResult::completion() constructor and is_completion field - ContentBlock::ToolResult is_completion field - AgentEventPayload::ToolResult is_completion field - Runtime completion detection in turn_exec/tools/finalize_tool_results - Session tool_result_handler completion promotion logic - Protocol projection completion promotion - Agent bridge completion_result tracking - Headless mode is_completion detection - Sub-agent prompt "call AttemptCompletion" instructions - All related test infrastructure (FakeCompletionTool, scenarios, etc.) * refactor: explicit agent state machine — Runtime drives AgentStatus Runtime layer now holds AgentStatus as source of truth and drives state transitions via events. Replaces the `initial_prompt` heuristic. State machine: Starting → Running → WaitingForInput → Running → ... → Finished Key changes: - AgentLoopRunner holds `status: AgentStatus` field - `transition()` method updates status and emits corresponding event - `run_loop()` unified: all agents (interactive + task) run the same loop - Removed `initial_prompt = !store.is_empty()` hack - `wait_for_input()` no longer emits AwaitingInput (run_loop controls it) - AwaitingInput emitted for ALL agents after turn completion (unified idle signal) - Task agents exit when input channel closes (channel lifecycle = agent lifecycle) - IntegrationHarness.close_input() for tests that need prompt-driven exit * refactor: complete agent state machine — all transitions through transition() Every AgentStatus change now goes through transition() or transition_error(), ensuring Runtime is the deterministic SSOT for agent state. State transition table: - Starting → Running: transition(Running) + emit(Started) [once at startup] - Running → WaitingForInput: transition(WaitingForInput) → emit(AwaitingInput) - WaitingForInput → Running: transition(Running) [implicit — Stream/ToolCall signal activity] - Running → Error: transition_error(msg) → emit(Error { message }) - Interrupted: status = WaitingForInput + emit(Interrupted) - * → Finished: transition(Finished) → emit(Finished) Removed: direct `self.status = AgentStatus::Running` assignment, standalone `self.emit(Error/Finished)` calls that bypassed status update. Exception: FinishedGuard (panic Drop) uses try_emit without status update — acceptable since self is inaccessible during stack unwinding. * feat: overflow-to-file for large outputs + agent edge case fixes Large tool/agent outputs are now saved to overflow files instead of being silently truncated. The LLM receives a preview + file path and can use the Read tool to access full content on demand. Overflow-to-file integration: - tool-api: handle_overflow() + save_to_overflow_file() core abstraction - tool_pipeline: uses handle_overflow instead of truncate + manual save - backend/shell: bash stdout/stderr overflow to file - backend/grep: full match results saved when exceeding limit - backend/glob: full file list saved when exceeding limit - backend/net: HTTP body saved when exceeding fetch size limit - agent-hub/completion: large agent results overflow before parent delivery Overflow files stored in {tmp}/loopal/overflow/{label}_{timestamp}.txt Agent edge case fixes: - Background spawn: 1-hour timeout on worktree cleanup wait - GrepSearchResult, GlobSearchResult, FetchResult: overflow_path field added * fix: close input channel for task agents so they exit after prompt Prompt-driven (task) sub-agents were hanging forever in wait_for_input() because SharedSession held the only input_tx sender clone, preventing the channel from closing. Fix: for task sessions (has_initial_prompt=true), use a dummy sender in SharedSession instead of the real input_tx. The real sender is dropped immediately, and the placeholder clone is dropped on replace_session(). With zero connected senders, recv_input() returns None → agent exits after processing the pre-loaded prompt. * Revert "fix: close input channel for task agents so they exit after prompt" This reverts commit ea9d992. * feat: LifecycleMode — Task agents exit on idle, Interactive agents wait Task agents (sub-agents, headless) now exit when idle with no pending input, instead of relying on channel closure. This is an agent-internal decision, not an external hack. - LifecycleMode enum: Task (exit on idle) vs Interactive (wait for input) - AgentConfig.lifecycle field, set by agent_setup based on prompt presence - run_loop idle phase: Task drains pending input, exits if empty - drain_pending_input() non-blocking check for queued messages - HarnessBuilder defaults to Task; build_spawned() forces Interactive - Reverted channel-closure hack (ea9d992) * fix: implement HubFrontend.drain_pending + yield before Task exit Two fixes for Task agent idle behavior: 1. HubFrontend now implements drain_pending() — non-blocking try_recv from input_rx. Previously returned empty (default trait impl), causing Task agents to miss queued IPC messages (sub-agent completions). 2. Task agent yields before drain check — gives in-flight IPC messages time to arrive before deciding to exit.
1 parent 58c58c7 commit e09516f

147 files changed

Lines changed: 1670 additions & 1700 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ TUI Process ──stdio IPC──→ Agent Server Process ←──TCP──→
9696
<project>/.loopal/settings.local.json Local overrides (gitignored)
9797
```
9898

99-
Environment variable overrides use `LOOPAL_` prefix. Key settings: `model` (default: `claude-sonnet-4-20250514`), `max_turns` (default: 50), `permission_mode`.
99+
Environment variable overrides use `LOOPAL_` prefix. Key settings: `model` (default: `claude-sonnet-4-20250514`), `permission_mode`.
100100

101101
## Code Conventions
102102

LOOPAL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ TUI Process ──stdio IPC──→ Agent Server Process ←──TCP──→
101101
<project>/.loopal/settings.local.json Local overrides (gitignored)
102102
```
103103

104-
Environment variable overrides use `LOOPAL_` prefix. Key settings: `model` (default: `claude-sonnet-4-20250514`), `max_turns` (default: 50), `permission_mode`.
104+
Environment variable overrides use `LOOPAL_` prefix. Key settings: `model` (default: `claude-sonnet-4-20250514`), `permission_mode`.
105105

106106
## Code Conventions
107107

README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,6 @@ Loopal uses a layered configuration system. Create a `.loopal/config.toml` in yo
137137
# Default model
138138
model = "claude-sonnet-4-20250514"
139139

140-
# Max turns per agent loop
141-
max_turns = 200
142-
143140
# Permission mode: "supervised" or "bypass"
144141
permission_mode = "supervised"
145142

crates/loopal-acp/src/adapter/events.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ impl AcpAdapter {
5959
async fn handle_event(&self, event: &AgentEvent, session_id: &str) -> Option<StopReason> {
6060
match &event.payload {
6161
AgentEventPayload::AwaitingInput => return Some(StopReason::EndTurn),
62-
AgentEventPayload::MaxTurnsReached { .. } => {
63-
return Some(StopReason::MaxTurnRequests);
64-
}
6562
AgentEventPayload::Finished => return Some(StopReason::EndTurn),
6663
_ => {}
6764
}

crates/loopal-acp/src/translate/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ pub fn translate_event(payload: &AgentEventPayload, session_id: &str) -> Option<
9696

9797
// ── Events with no ACP counterpart ───────────────────────────
9898
AgentEventPayload::AwaitingInput
99-
| AgentEventPayload::MaxTurnsReached { .. }
10099
| AgentEventPayload::AutoContinuation { .. }
101100
| AgentEventPayload::Started
102101
| AgentEventPayload::Finished

crates/loopal-agent-hub/src/agent_io.rs

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tracing::{info, warn};
88

99
use loopal_ipc::connection::{Connection, Incoming};
1010
use loopal_ipc::protocol::methods;
11-
use loopal_protocol::{AgentEvent, AgentEventPayload};
11+
use loopal_protocol::AgentEvent;
1212

1313
use crate::dispatch::dispatch_hub_request;
1414
use crate::hub::Hub;
@@ -18,40 +18,29 @@ use crate::ui_relay::relay_to_ui_clients;
1818
const WAIT_AGENT_METHOD: &str = "hub/wait_agent";
1919

2020
/// Run the IO loop for a connected agent. Returns the agent's final output
21-
/// (from AttemptCompletion or last stream text) for passing to wait_agent watchers.
21+
/// extracted from the `agent/completed` notification — the single authoritative source.
2222
pub async fn agent_io_loop(
2323
hub: Arc<Mutex<Hub>>,
2424
conn: Arc<Connection>,
2525
mut rx: tokio::sync::mpsc::Receiver<Incoming>,
2626
agent_name: String,
2727
) -> Option<String> {
2828
info!(agent = %agent_name, "agent IO loop started");
29-
let mut last_stream = String::new();
30-
let mut completion_output: Option<String> = None;
29+
let mut agent_result: Option<String> = None;
3130

3231
while let Some(msg) = rx.recv().await {
3332
match msg {
3433
Incoming::Notification { method, params } => {
3534
if method == methods::AGENT_COMPLETED.name {
36-
// Explicit completion — primary signal (EOF is fallback).
37-
info!(agent = %agent_name, "received agent/completed");
35+
// Extract result from the authoritative completion signal.
36+
agent_result = params
37+
.get("result")
38+
.and_then(|v| v.as_str())
39+
.map(String::from);
40+
info!(agent = %agent_name, has_result = agent_result.is_some(), "received agent/completed");
3841
break;
3942
} else if method == methods::AGENT_EVENT.name {
4043
if let Ok(mut event) = serde_json::from_value::<AgentEvent>(params) {
41-
// Track output for wait_agent result
42-
match &event.payload {
43-
AgentEventPayload::ToolResult {
44-
result,
45-
is_completion: true,
46-
..
47-
} => {
48-
completion_output = Some(result.clone());
49-
}
50-
AgentEventPayload::Stream { text } => {
51-
last_stream.push_str(text);
52-
}
53-
_ => {}
54-
}
5544
if event.agent_name.is_none() {
5645
event.agent_name = Some(agent_name.clone());
5746
}
@@ -101,12 +90,7 @@ pub async fn agent_io_loop(
10190
}
10291
}
10392
}
104-
// Prefer AttemptCompletion output over accumulated stream text
105-
completion_output.or(if last_stream.is_empty() {
106-
None
107-
} else {
108-
Some(last_stream)
109-
})
93+
agent_result
11094
}
11195

11296
/// Spawn hub/wait_agent in a background task so it doesn't block the IO loop.
@@ -162,13 +146,19 @@ pub fn start_agent_io(
162146
}
163147
info!(agent = %n, "agent registered in Hub");
164148
let output = agent_io_loop(hub2, conn, rx, n.clone()).await;
165-
let mut h = hub.lock().await;
166-
// Order matters: emit BEFORE unregister to avoid race with wait_agent.
167-
// wait_agent checks agent existence → if we unregister first, it returns
168-
// "not found" and misses the output. By emitting first, any pending watcher
169-
// gets the output before the agent is removed.
170-
h.registry.emit_agent_finished(&n2, output);
171-
h.registry.unregister_connection(&n2);
149+
let pending = {
150+
let mut h = hub.lock().await;
151+
// Order matters: emit BEFORE unregister to avoid race with wait_agent.
152+
let pending = h.registry.emit_agent_finished(&n2, output);
153+
h.registry.unregister_connection(&n2);
154+
pending
155+
}; // Lock released.
156+
// Deliver to parent outside the lock — uses async send (no data loss).
157+
if let Some((tx, envelope)) = pending {
158+
if tx.send(envelope).await.is_err() {
159+
tracing::warn!(agent = %n2, "parent completion channel closed");
160+
}
161+
}
172162
info!(agent = %n2, "agent IO loop ended");
173163
});
174164
}
@@ -185,9 +175,17 @@ pub fn spawn_io_loop(
185175
let n2 = name.to_string();
186176
tokio::spawn(async move {
187177
let output = agent_io_loop(hub2, conn, rx, n.clone()).await;
188-
let mut h = hub.lock().await;
189-
h.registry.emit_agent_finished(&n2, output);
190-
h.registry.unregister_connection(&n2);
178+
let pending = {
179+
let mut h = hub.lock().await;
180+
let pending = h.registry.emit_agent_finished(&n2, output);
181+
h.registry.unregister_connection(&n2);
182+
pending
183+
};
184+
if let Some((tx, envelope)) = pending {
185+
if tx.send(envelope).await.is_err() {
186+
tracing::warn!(agent = %n2, "parent completion channel closed");
187+
}
188+
}
191189
info!(agent = %n2, "agent IO loop ended");
192190
});
193191
}

crates/loopal-agent-hub/src/agent_registry/completion.rs

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,31 @@
1-
//! Agent completion tracking and cascade interrupt.
1+
//! Agent completion tracking, result delivery, and cascade interrupt.
22
33
use loopal_ipc::protocol::methods;
4-
use loopal_protocol::{AgentEvent, AgentEventPayload};
5-
use tokio::sync::watch;
4+
use loopal_protocol::{AgentEvent, AgentEventPayload, Envelope, MessageSource};
5+
use tokio::sync::{mpsc, watch};
66

77
use super::AgentRegistry;
88
use crate::topology::AgentLifecycle;
99

1010
impl AgentRegistry {
11-
/// Emit Finished event, cache output, notify watchers, cascade orphans.
12-
pub fn emit_agent_finished(&mut self, name: &str, output: Option<String>) {
11+
/// Emit Finished event, cache output, deliver result to parent, notify watchers.
12+
///
13+
/// Returns an optional `(sender, envelope)` pair for the caller to deliver
14+
/// **after releasing the Hub lock**. This avoids holding the lock during IPC.
15+
pub fn emit_agent_finished(
16+
&mut self,
17+
name: &str,
18+
output: Option<String>,
19+
) -> Option<(mpsc::Sender<Envelope>, Envelope)> {
1320
tracing::info!(agent = %name, has_output = output.is_some(), "emitting Finished");
1421
self.set_lifecycle(name, AgentLifecycle::Finished);
1522

1623
let text = output.unwrap_or_else(|| "(no output)".into());
1724
self.finished_outputs.insert(name.to_string(), text.clone());
1825

26+
// Prepare delivery envelope (actual send happens after lock release).
27+
let pending_delivery = self.prepare_parent_delivery(name, &text);
28+
1929
let event = AgentEvent::named(name, AgentEventPayload::Finished);
2030
let _ = self.event_tx.try_send(event);
2131

@@ -28,6 +38,37 @@ impl AgentRegistry {
2838
tracing::info!(agent = %name, orphans = ?orphans, "cascade interrupt");
2939
self.interrupt_orphans(&orphans);
3040
}
41+
42+
pending_delivery
43+
}
44+
45+
/// Build the delivery envelope and find the parent's completion_tx.
46+
/// Returns None if no parent or no completion channel.
47+
fn prepare_parent_delivery(
48+
&self,
49+
child_name: &str,
50+
result: &str,
51+
) -> Option<(mpsc::Sender<Envelope>, Envelope)> {
52+
let parent_name = self.agents.get(child_name)?.info.parent.as_deref()?;
53+
let tx = self
54+
.agents
55+
.get(parent_name)?
56+
.completion_tx
57+
.as_ref()?
58+
.clone();
59+
// Cap large results: save to overflow file, embed path in envelope.
60+
let body = if result.len() > MAX_RESULT_BYTES {
61+
overflow_agent_result(child_name, result)
62+
} else {
63+
result.to_string()
64+
};
65+
let content = format!("<agent-result name=\"{child_name}\">\n{body}\n</agent-result>");
66+
let envelope = Envelope::new(
67+
MessageSource::System("agent-completed".into()),
68+
parent_name,
69+
content,
70+
);
71+
Some((tx, envelope))
3172
}
3273

3374
/// Create a completion watcher for a named agent.
@@ -87,3 +128,34 @@ impl AgentRegistry {
87128
}
88129
}
89130
}
131+
132+
/// Max agent result bytes before overflow to file (100 KB).
133+
const MAX_RESULT_BYTES: usize = 100_000;
134+
135+
/// Save oversized agent result to file, return preview + path.
136+
fn overflow_agent_result(agent_name: &str, result: &str) -> String {
137+
let dir = std::env::temp_dir().join("loopal").join("overflow");
138+
let _ = std::fs::create_dir_all(&dir);
139+
let ts = std::time::SystemTime::now()
140+
.duration_since(std::time::UNIX_EPOCH)
141+
.unwrap_or_default()
142+
.as_millis();
143+
let path = dir.join(format!("agent_{agent_name}_{ts}.txt"));
144+
let path_str = path.to_string_lossy().into_owned();
145+
if std::fs::write(&path, result).is_err() {
146+
return result[..MAX_RESULT_BYTES].to_string();
147+
}
148+
// Preview: first ~25 KB
149+
let preview_end = result
150+
.char_indices()
151+
.take_while(|(i, _)| *i < MAX_RESULT_BYTES / 4)
152+
.last()
153+
.map(|(i, c)| i + c.len_utf8())
154+
.unwrap_or(0);
155+
let kb = result.len() / 1024;
156+
format!(
157+
"{}\n\n[Agent result too large for context ({kb} KB). Full output saved to: {path_str}]\n\
158+
Use the Read tool to access the complete output if needed.",
159+
&result[..preview_end]
160+
)
161+
}

crates/loopal-agent-hub/src/agent_registry/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,13 @@ impl AgentRegistry {
4646
ManagedAgent {
4747
state: AgentConnectionState::Local(channels),
4848
info: AgentInfo::new(name, None, None),
49+
completion_tx: None,
4950
},
5051
);
5152
}
5253

5354
pub fn register_connection(&mut self, name: &str, conn: Arc<Connection>) -> Result<(), String> {
54-
self.register_connection_with_parent(name, conn, None, None)
55+
self.register_connection_with_parent(name, conn, None, None, None)
5556
}
5657

5758
pub fn register_connection_with_parent(
@@ -60,6 +61,7 @@ impl AgentRegistry {
6061
conn: Arc<Connection>,
6162
parent: Option<&str>,
6263
model: Option<&str>,
64+
completion_tx: Option<mpsc::Sender<Envelope>>,
6365
) -> Result<(), String> {
6466
if self.agents.contains_key(name) {
6567
return Err(format!("agent '{name}' already registered"));
@@ -74,6 +76,7 @@ impl AgentRegistry {
7476
ManagedAgent {
7577
state: AgentConnectionState::Connected(conn),
7678
info: AgentInfo::new(name, parent, model),
79+
completion_tx,
7780
},
7881
);
7982
Ok(())

0 commit comments

Comments
 (0)