Skip to content

Commit 5feedca

Browse files
feat: add /topology command with Canvas-based agent graph overlay (#37)
* feat: add /topology command with graphical Canvas-based agent overlay Surface agent parent/child relationships in TUI via a top-right overlay that renders a node-link graph using ratatui Canvas. Active agents show animated spinner dots; finished agents are removed from the graph. - Extend SubAgentSpawned event with parent + model fields (serde-compat) - Track topology in SessionState (AgentViewState.parent/children) - BFS tree layout positions nodes by level with adaptive overlay sizing - /topology slash command toggles overlay on/off * fix: rustfmt formatting + keep all files within 200-line limit * fix: use EXE_SUFFIX for Windows binary path in e2e bootstrap test
1 parent db57d9c commit 5feedca

54 files changed

Lines changed: 1162 additions & 313 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

crates/loopal-agent-hub/src/agent_io.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,10 @@ pub async fn agent_io_loop(
7171
spawn_wait_agent(hub.clone(), conn.clone(), id, params, agent_name.clone());
7272
} else if method.starts_with("hub/") {
7373
info!(agent = %agent_name, %method, "hub request received");
74-
match dispatch_hub_request(&hub, &method, params, agent_name.clone()).await
75-
{
76-
Ok(result) => { let _ = conn.respond(id, result).await; }
74+
match dispatch_hub_request(&hub, &method, params, agent_name.clone()).await {
75+
Ok(result) => {
76+
let _ = conn.respond(id, result).await;
77+
}
7778
Err(e) => {
7879
warn!(agent = %agent_name, %method, error = %e, "hub request failed");
7980
let _ = conn
@@ -102,7 +103,11 @@ pub async fn agent_io_loop(
102103
}
103104
}
104105
// Prefer AttemptCompletion output over accumulated stream text
105-
completion_output.or(if last_stream.is_empty() { None } else { Some(last_stream) })
106+
completion_output.or(if last_stream.is_empty() {
107+
None
108+
} else {
109+
Some(last_stream)
110+
})
106111
}
107112

108113
/// Spawn hub/wait_agent in a background task so it doesn't block the IO loop.
@@ -115,11 +120,16 @@ fn spawn_wait_agent(
115120
) {
116121
tokio::spawn(async move {
117122
match crate::dispatch::dispatch_hub_request(
118-
&hub, WAIT_AGENT_METHOD, params, agent_name.clone(),
123+
&hub,
124+
WAIT_AGENT_METHOD,
125+
params,
126+
agent_name.clone(),
119127
)
120128
.await
121129
{
122-
Ok(result) => { let _ = conn.respond(request_id, result).await; }
130+
Ok(result) => {
131+
let _ = conn.respond(request_id, result).await;
132+
}
123133
Err(e) => {
124134
warn!(agent = %agent_name, "background wait_agent failed: {e}");
125135
let _ = conn

crates/loopal-agent-hub/src/connection_ops.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,7 @@ impl AgentHub {
6666
}
6767

6868
/// Create a completion watcher for a named agent.
69-
pub fn watch_completion(
70-
&mut self,
71-
name: &str,
72-
) -> tokio::sync::watch::Receiver<Option<String>> {
69+
pub fn watch_completion(&mut self, name: &str) -> tokio::sync::watch::Receiver<Option<String>> {
7370
let (tx, rx) = tokio::sync::watch::channel(None);
7471
self.completions.insert(name.to_string(), tx);
7572
rx
@@ -80,9 +77,12 @@ impl AgentHub {
8077
self.agents
8178
.get(parent)
8279
.map(|a| {
83-
a.info.children.iter()
80+
a.info
81+
.children
82+
.iter()
8483
.filter(|c| {
85-
self.agents.get(c.as_str())
84+
self.agents
85+
.get(c.as_str())
8686
.is_some_and(|a| a.info.lifecycle == AgentLifecycle::Running)
8787
})
8888
.cloned()
@@ -99,9 +99,9 @@ impl AgentHub {
9999
let conn = conn.clone();
100100
let n = name.clone();
101101
tokio::spawn(async move {
102-
let _ = conn.send_notification(
103-
methods::AGENT_INTERRUPT.name, serde_json::json!({}),
104-
).await;
102+
let _ = conn
103+
.send_notification(methods::AGENT_INTERRUPT.name, serde_json::json!({}))
104+
.await;
105105
tracing::info!(agent = %n, "sent interrupt to orphan");
106106
});
107107
}

crates/loopal-agent-hub/src/dispatch/dispatch_handlers.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ pub async fn handle_spawn_agent(
8080
params: Value,
8181
from_agent: &str,
8282
) -> Result<Value, String> {
83-
let name = params["name"].as_str().ok_or("missing 'name' field")?.to_string();
83+
let name = params["name"]
84+
.as_str()
85+
.ok_or("missing 'name' field")?
86+
.to_string();
8487
let cwd = params["cwd"].as_str().unwrap_or(".").to_string();
8588
let model = params["model"].as_str().map(String::from);
8689
let prompt = params["prompt"].as_str().map(String::from);
@@ -90,9 +93,8 @@ pub async fn handle_spawn_agent(
9093
let hub_clone = hub.clone();
9194
let name_clone = name.clone();
9295
let handle = tokio::spawn(async move {
93-
crate::spawn_manager::spawn_and_register(
94-
hub_clone, name_clone, cwd, model, prompt, parent,
95-
).await
96+
crate::spawn_manager::spawn_and_register(hub_clone, name_clone, cwd, model, prompt, parent)
97+
.await
9698
});
9799

98100
let agent_id = handle

crates/loopal-agent-hub/src/dispatch/topology_handlers.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@ use tokio::sync::Mutex;
77

88
use crate::hub::AgentHub;
99

10-
pub async fn handle_agent_info(
11-
hub: &Arc<Mutex<AgentHub>>,
12-
params: Value,
13-
) -> Result<Value, String> {
10+
pub async fn handle_agent_info(hub: &Arc<Mutex<AgentHub>>, params: Value) -> Result<Value, String> {
1411
let name = params["name"].as_str().ok_or("missing 'name'")?;
1512
let h = hub.lock().await;
1613

crates/loopal-agent-hub/src/dispatch/wait_handler.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ use tracing::info;
1010
use crate::hub::AgentHub;
1111

1212
/// Wait for an agent to finish. Timeout: 10 minutes.
13-
pub async fn handle_wait_agent(
14-
hub: &Arc<Mutex<AgentHub>>,
15-
params: Value,
16-
) -> Result<Value, String> {
17-
let name = params["name"].as_str().ok_or("missing 'name' field")?.to_string();
13+
pub async fn handle_wait_agent(hub: &Arc<Mutex<AgentHub>>, params: Value) -> Result<Value, String> {
14+
let name = params["name"]
15+
.as_str()
16+
.ok_or("missing 'name' field")?
17+
.to_string();
1818
info!(agent = %name, "handle_wait_agent start");
1919

2020
let mut rx = {

crates/loopal-agent-hub/src/hub.rs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,7 @@ impl AgentHub {
6565

6666
/// Register a connected agent.
6767
/// `parent` is the spawning agent's name (None for root/TUI).
68-
pub fn register_connection(
69-
&mut self,
70-
name: &str,
71-
conn: Arc<Connection>,
72-
) -> Result<(), String> {
68+
pub fn register_connection(&mut self, name: &str, conn: Arc<Connection>) -> Result<(), String> {
7369
self.register_connection_with_parent(name, conn, None, None)
7470
}
7571

@@ -104,8 +100,7 @@ impl AgentHub {
104100
/// remove cached output and pending watchers.
105101
pub fn unregister_connection(&mut self, name: &str) {
106102
// Extract parent name before mutating
107-
let parent_name = self.agents.get(name)
108-
.and_then(|a| a.info.parent.clone());
103+
let parent_name = self.agents.get(name).and_then(|a| a.info.parent.clone());
109104
if let Some(ref p) = parent_name {
110105
if let Some(parent) = self.agents.get_mut(p.as_str()) {
111106
parent.info.children.retain(|c| c != name);
@@ -127,18 +122,14 @@ impl AgentHub {
127122

128123
/// Get a named agent's IPC Connection (if connected).
129124
pub fn get_agent_connection(&self, name: &str) -> Option<Arc<Connection>> {
130-
self.agents
131-
.get(name)
132-
.and_then(|a| a.state.connection())
125+
self.agents.get(name).and_then(|a| a.state.connection())
133126
}
134127

135128
/// Collect all connected agents with their Connections.
136129
pub fn all_agent_connections(&self) -> Vec<(String, Arc<Connection>)> {
137130
self.agents
138131
.iter()
139-
.filter_map(|(name, agent)| {
140-
agent.state.connection().map(|c| (name.clone(), c))
141-
})
132+
.filter_map(|(name, agent)| agent.state.connection().map(|c| (name.clone(), c)))
142133
.collect()
143134
}
144135

@@ -166,15 +157,19 @@ impl AgentHub {
166157

167158
/// Build serializable topology snapshot.
168159
pub fn topology_snapshot(&self) -> serde_json::Value {
169-
let agents: Vec<serde_json::Value> = self.agents.iter()
160+
let agents: Vec<serde_json::Value> = self
161+
.agents
162+
.iter()
170163
.filter(|(n, _)| !n.starts_with('_')) // skip internal (_tui)
171-
.map(|(name, a)| serde_json::json!({
172-
"name": name,
173-
"parent": a.info.parent,
174-
"children": a.info.children,
175-
"lifecycle": format!("{:?}", a.info.lifecycle),
176-
"model": a.info.model,
177-
}))
164+
.map(|(name, a)| {
165+
serde_json::json!({
166+
"name": name,
167+
"parent": a.info.parent,
168+
"children": a.info.children,
169+
"lifecycle": format!("{:?}", a.info.lifecycle),
170+
"model": a.info.model,
171+
})
172+
})
178173
.collect();
179174
serde_json::json!({ "agents": agents })
180175
}

crates/loopal-agent-hub/src/hub_server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ pub async fn accept_loop(listener: TcpListener, hub: Arc<Mutex<AgentHub>>, token
6565
let (tx, owned_rx) = tokio::sync::mpsc::channel(256);
6666
tokio::spawn(async move {
6767
while let Some(msg) = rx.recv().await {
68-
if tx.send(msg).await.is_err() { break; }
68+
if tx.send(msg).await.is_err() {
69+
break;
70+
}
6971
}
7072
});
7173
crate::agent_io::start_agent_io(hub, &name, conn, owned_rx, false);
@@ -102,9 +104,7 @@ async fn wait_for_register(
102104
.as_str()
103105
.ok_or_else(|| anyhow::anyhow!("hub/register: missing 'name'"))?
104106
.to_string();
105-
let _ = conn
106-
.respond(id, serde_json::json!({"ok": true}))
107-
.await;
107+
let _ = conn.respond(id, serde_json::json!({"ok": true})).await;
108108
return Ok(name);
109109
}
110110
let _ = conn

crates/loopal-agent-hub/src/routing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ pub async fn route_to_agent(
1414
envelope: &Envelope,
1515
observation_tx: &mpsc::Sender<AgentEvent>,
1616
) -> Result<(), String> {
17-
let params = serde_json::to_value(envelope)
18-
.map_err(|e| format!("failed to serialize envelope: {e}"))?;
17+
let params =
18+
serde_json::to_value(envelope).map_err(|e| format!("failed to serialize envelope: {e}"))?;
1919

2020
conn.send_request(methods::AGENT_MESSAGE.name, params)
2121
.await

crates/loopal-agent-hub/src/spawn_manager.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,33 @@ pub async fn spawn_and_register(
3333
return Err(format!("agent initialize failed: {e}"));
3434
}
3535
info!(agent = %name, "spawn: starting agent");
36-
if let Err(e) = client.start_agent(
37-
std::path::Path::new(&cwd), model.as_deref(), Some("act"),
38-
prompt.as_deref(), None, false, None,
39-
).await {
36+
if let Err(e) = client
37+
.start_agent(
38+
std::path::Path::new(&cwd),
39+
model.as_deref(),
40+
Some("act"),
41+
prompt.as_deref(),
42+
None,
43+
false,
44+
None,
45+
)
46+
.await
47+
{
4048
warn!(agent = %name, error = %e, "spawn: start failed, killing orphan");
4149
let _ = agent_proc.shutdown().await;
4250
return Err(format!("agent/start failed: {e}"));
4351
}
4452

4553
let (conn, incoming_rx) = client.into_parts();
4654
let agent_id = register_agent_connection(
47-
hub, &name, conn, incoming_rx, parent.as_deref(), model.as_deref(),
48-
).await;
55+
hub,
56+
&name,
57+
conn,
58+
incoming_rx,
59+
parent.as_deref(),
60+
model.as_deref(),
61+
)
62+
.await;
4963

5064
// Supervised process cleanup: when agent exits, task completes.
5165
// AgentProcess::drop will kill the child (kill_on_drop) if not exited.
@@ -77,9 +91,7 @@ pub async fn register_agent_connection(
7791
warn!(agent = %name, parent = %p, "parent not found, registering as orphan");
7892
}
7993
}
80-
if let Err(e) = h.register_connection_with_parent(
81-
name, conn.clone(), parent, model,
82-
) {
94+
if let Err(e) = h.register_connection_with_parent(name, conn.clone(), parent, model) {
8395
warn!(agent = %name, error = %e, "registration failed");
8496
return agent_id;
8597
}
@@ -94,6 +106,8 @@ pub async fn register_agent_connection(
94106
let event = AgentEvent::root(AgentEventPayload::SubAgentSpawned {
95107
name: name.to_string(),
96108
agent_id: agent_id.clone(),
109+
parent: parent.map(String::from),
110+
model: model.map(String::from),
97111
});
98112
if h.event_sender().try_send(event).is_err() {
99113
tracing::debug!(agent = %name, "SubAgentSpawned event dropped");
Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
// Single test binary — includes all test modules
2-
#[path = "suite/event_router_test.rs"]
3-
mod event_router_test;
2+
#[path = "suite/advanced_scenarios_test.rs"]
3+
mod advanced_scenarios_test;
4+
#[path = "suite/collaboration_test.rs"]
5+
mod collaboration_test;
6+
#[path = "suite/completion_output_test.rs"]
7+
mod completion_output_test;
48
#[path = "suite/dispatch_test.rs"]
59
mod dispatch_test;
6-
#[path = "suite/relay_test.rs"]
7-
mod relay_test;
10+
#[path = "suite/e2e_bootstrap_test.rs"]
11+
mod e2e_bootstrap_test;
12+
#[path = "suite/event_router_test.rs"]
13+
mod event_router_test;
814
#[path = "suite/hub_integration_test.rs"]
915
mod hub_integration_test;
1016
#[path = "suite/hub_lifecycle_test.rs"]
1117
mod hub_lifecycle_test;
12-
#[path = "suite/spawn_lifecycle_test.rs"]
13-
mod spawn_lifecycle_test;
14-
#[path = "suite/e2e_bootstrap_test.rs"]
15-
mod e2e_bootstrap_test;
1618
#[path = "suite/multi_agent_test.rs"]
1719
mod multi_agent_test;
18-
#[path = "suite/advanced_scenarios_test.rs"]
19-
mod advanced_scenarios_test;
20-
#[path = "suite/wait_nonblocking_test.rs"]
21-
mod wait_nonblocking_test;
22-
#[path = "suite/completion_output_test.rs"]
23-
mod completion_output_test;
2420
#[path = "suite/race_condition_test.rs"]
2521
mod race_condition_test;
26-
#[path = "suite/collaboration_test.rs"]
27-
mod collaboration_test;
22+
#[path = "suite/relay_test.rs"]
23+
mod relay_test;
24+
#[path = "suite/spawn_lifecycle_test.rs"]
25+
mod spawn_lifecycle_test;
26+
#[path = "suite/wait_nonblocking_test.rs"]
27+
mod wait_nonblocking_test;

0 commit comments

Comments
 (0)