Skip to content

Commit ebc445a

Browse files
refactor: deliver messages directly to agent mailbox, remove TUI inbox routing (#84)
TUI previously buffered messages in a local Inbox and used agent_idle as a routing decision: when agent was busy, messages were queued locally and an interrupt signal was used to wake the agent to drain the queue. This was TUI concept leakage into the agent runtime — agent internal state was being directly mutated by the display layer, and interrupt was overloaded with "cancel work" AND "new message arrived" semantics. The worst symptom: ephemeral agents exited immediately on user input because the yield_now idle check fired before the TUI's delayed inbox forward could complete the IPC round-trip. Architecture principles applied: - Agent owns its internal state (idle/busy); external actors interact only via mailbox delivery + explicit interrupt - TUI state (observable.status, is_idle) is derived solely from agent events, never set directly - InterruptSignal carries only cancel/shutdown semantics Key changes: - Remove agent_idle field; derive via AgentViewState::is_idle() from status - Remove Inbox struct entirely (dead after routing bypass) - TUI calls append_user_display() + route_message() directly — no buffering - Ephemeral idle check: drop yield_now(), drain_pending() is reliable now - drain_pending() returns Vec<AgentInput> so Control commands are not silently dropped - Merge InputFromClient into AgentInput (Interrupt variant was vestigial) - Status transition rolls back on emit failure for consistency - Upgrade observation-channel try_send drops from debug to warn for prod visibility 49/49 tests pass, zero clippy warnings, rustfmt clean.
1 parent dacdb0f commit ebc445a

62 files changed

Lines changed: 619 additions & 545 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

crates/loopal-agent-hub/src/agent_io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub async fn agent_io_loop(
4848
}
4949
let h = hub.lock().await;
5050
if h.registry.event_sender().try_send(event).is_err() {
51-
tracing::debug!(agent = %agent_name, "event dropped (channel full)");
51+
tracing::warn!(agent = %agent_name, "event dropped (channel full)");
5252
}
5353
}
5454
}

crates/loopal-agent-hub/src/agent_registry/completion.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ impl AgentRegistry {
2727
let pending_delivery = self.prepare_parent_delivery(name, &text);
2828

2929
let event = AgentEvent::named(name, AgentEventPayload::Finished);
30-
let _ = self.event_tx.try_send(event);
30+
if self.event_tx.try_send(event).is_err() {
31+
tracing::warn!(agent = %name, "Finished event dropped (channel full)");
32+
}
3133

3234
if let Some(tx) = self.completions.remove(name) {
3335
let _ = tx.send(Some(text));

crates/loopal-agent-hub/src/routing.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ pub async fn route_to_agent(
2626
target: envelope.target.clone(),
2727
content_preview: envelope.content_preview().to_string(),
2828
});
29-
let _ = observation_tx.try_send(event);
29+
if observation_tx.try_send(event).is_err() {
30+
tracing::warn!(
31+
target = %envelope.target,
32+
"observation event dropped (channel full)"
33+
);
34+
}
3035
Ok(())
3136
}

crates/loopal-agent-hub/src/spawn_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub async fn register_agent_connection(
130130
session_id: session_id.map(String::from),
131131
});
132132
if h.registry.event_sender().try_send(event).is_err() {
133-
tracing::debug!(agent = %name, "SubAgentSpawned event dropped");
133+
tracing::warn!(agent = %name, "SubAgentSpawned event dropped (channel full)");
134134
}
135135
}
136136
agent_id

crates/loopal-agent-server/src/hub_frontend.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@ use tracing::{debug, info};
1414

1515
use loopal_error::{LoopalError, Result};
1616
use loopal_ipc::protocol::methods;
17-
use loopal_protocol::{AgentEvent, AgentEventPayload, Envelope, Question, UserQuestionResponse};
17+
use loopal_protocol::{AgentEvent, AgentEventPayload, Question, UserQuestionResponse};
1818
use loopal_runtime::agent_input::AgentInput;
1919
use loopal_runtime::frontend::traits::{AgentFrontend, EventEmitter};
2020
use loopal_tool_api::PermissionDecision;
2121

2222
use crate::hub_emitter::HubEventEmitter;
23-
use crate::session_hub::{InputFromClient, SharedSession};
23+
use crate::session_hub::SharedSession;
2424

2525
/// Frontend that multiplexes across all clients in a shared session.
2626
pub struct HubFrontend {
2727
session: tokio::sync::RwLock<Arc<SharedSession>>,
28-
input_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<InputFromClient>>,
28+
input_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<AgentInput>>,
2929
agent_name: Option<String>,
3030
/// Watch channel for interrupt detection in recv_input.
3131
interrupt_rx: tokio::sync::Mutex<tokio::sync::watch::Receiver<u64>>,
@@ -34,7 +34,7 @@ pub struct HubFrontend {
3434
impl HubFrontend {
3535
pub fn new(
3636
session: Arc<SharedSession>,
37-
input_rx: tokio::sync::mpsc::Receiver<InputFromClient>,
37+
input_rx: tokio::sync::mpsc::Receiver<AgentInput>,
3838
agent_name: Option<String>,
3939
interrupt_rx: tokio::sync::watch::Receiver<u64>,
4040
) -> Self {
@@ -95,18 +95,12 @@ impl AgentFrontend for HubFrontend {
9595
// has already been handled by TurnCancel. Without this, changed()
9696
// fires immediately on the old value and exits the agent loop.
9797
interrupt_rx.borrow_and_update();
98-
loop {
99-
tokio::select! {
100-
msg = rx.recv() => {
101-
match msg? {
102-
InputFromClient::Message(env) => return Some(AgentInput::Message(env)),
103-
InputFromClient::Control(cmd) => return Some(AgentInput::Control(cmd)),
104-
InputFromClient::Interrupt => continue,
105-
}
106-
}
107-
_ = interrupt_rx.changed() => {
108-
return None; // Interrupted — exit agent loop
109-
}
98+
tokio::select! {
99+
msg = rx.recv() => {
100+
return msg;
101+
}
102+
_ = interrupt_rx.changed() => {
103+
return None; // Interrupted — exit agent loop
110104
}
111105
}
112106
}
@@ -199,14 +193,12 @@ impl AgentFrontend for HubFrontend {
199193
true
200194
}
201195

202-
async fn drain_pending(&self) -> Vec<Envelope> {
196+
async fn drain_pending(&self) -> Vec<AgentInput> {
203197
let mut rx = self.input_rx.lock().await;
204-
let mut envelopes = Vec::new();
198+
let mut inputs = Vec::new();
205199
while let Ok(msg) = rx.try_recv() {
206-
if let InputFromClient::Message(env) = msg {
207-
envelopes.push(env);
208-
}
200+
inputs.push(msg);
209201
}
210-
envelopes
202+
inputs
211203
}
212204
}

crates/loopal-agent-server/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub fn ipc_frontend_for_test(
4848
#[doc(hidden)]
4949
pub fn hub_frontend_for_test(
5050
session: std::sync::Arc<session_hub::SharedSession>,
51-
input_rx: tokio::sync::mpsc::Receiver<session_hub::InputFromClient>,
51+
input_rx: tokio::sync::mpsc::Receiver<loopal_runtime::agent_input::AgentInput>,
5252
interrupt_rx: tokio::sync::watch::Receiver<u64>,
5353
) -> std::sync::Arc<dyn loopal_runtime::frontend::traits::AgentFrontend> {
5454
std::sync::Arc::new(hub_frontend::HubFrontend::new(
@@ -64,5 +64,6 @@ pub fn hub_frontend_for_test(
6464
pub mod testing {
6565
pub use crate::agent_setup::build_with_frontend;
6666
pub use crate::params::{StartParams, build_kernel_with_provider};
67-
pub use crate::session_hub::{InputFromClient, SharedSession};
67+
pub use crate::session_hub::SharedSession;
68+
pub use loopal_runtime::agent_input::AgentInput;
6869
}

crates/loopal-agent-server/src/session_forward.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use loopal_ipc::connection::{Connection, Incoming};
1010
use loopal_ipc::jsonrpc;
1111
use loopal_ipc::protocol::methods;
1212
use loopal_protocol::{ControlCommand, Envelope};
13+
use loopal_runtime::agent_input::AgentInput;
1314

14-
use crate::session_hub::InputFromClient;
1515
use crate::session_start::SessionHandle;
1616

1717
/// Result of forward_loop — tells dispatch_loop what happened.
@@ -74,7 +74,7 @@ pub(crate) async fn forward_loop(
7474
} else if method == methods::AGENT_MESSAGE.name {
7575
// Hub-injected message (e.g. sub-agent completion notification).
7676
if let Ok(env) = serde_json::from_value::<Envelope>(params) {
77-
let _ = session.input_tx.send(InputFromClient::Message(env)).await;
77+
let _ = session.input_tx.send(AgentInput::Message(env)).await;
7878
}
7979
}
8080
}
@@ -98,7 +98,7 @@ async fn route_request(
9898
match method {
9999
m if m == methods::AGENT_MESSAGE.name => match serde_json::from_value::<Envelope>(params) {
100100
Ok(env) => {
101-
let _ = session.input_tx.send(InputFromClient::Message(env)).await;
101+
let _ = session.input_tx.send(AgentInput::Message(env)).await;
102102
let _ = connection
103103
.respond(id, serde_json::json!({"ok": true}))
104104
.await;
@@ -112,7 +112,7 @@ async fn route_request(
112112
m if m == methods::AGENT_CONTROL.name => {
113113
match serde_json::from_value::<ControlCommand>(params) {
114114
Ok(cmd) => {
115-
let _ = session.input_tx.send(InputFromClient::Control(cmd)).await;
115+
let _ = session.input_tx.send(AgentInput::Control(cmd)).await;
116116
let _ = connection
117117
.respond(id, serde_json::json!({"ok": true}))
118118
.await;

crates/loopal-agent-server/src/session_hub.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tokio::sync::Mutex;
88

99
use loopal_ipc::connection::Connection;
1010
use loopal_protocol::InterruptSignal;
11+
use loopal_runtime::agent_input::AgentInput;
1112

1213
/// A connected client handle within a shared session.
1314
pub struct ClientHandle {
@@ -22,19 +23,12 @@ pub struct SharedSession {
2223
pub session_id: String,
2324
pub clients: Mutex<Vec<ClientHandle>>,
2425
/// Channel to send input into the agent loop.
25-
pub input_tx: tokio::sync::mpsc::Sender<InputFromClient>,
26+
pub input_tx: tokio::sync::mpsc::Sender<AgentInput>,
2627
/// Interrupt signal shared with the agent loop.
2728
pub interrupt: InterruptSignal,
2829
pub interrupt_tx: Arc<tokio::sync::watch::Sender<u64>>,
2930
}
3031

31-
/// Input forwarded from a client connection to the agent loop.
32-
pub enum InputFromClient {
33-
Message(loopal_protocol::Envelope),
34-
Control(loopal_protocol::ControlCommand),
35-
Interrupt,
36-
}
37-
3832
/// Server-wide session registry.
3933
#[derive(Default)]
4034
pub struct SessionHub {
@@ -108,7 +102,7 @@ impl SessionHub {
108102
impl SharedSession {
109103
/// Create a placeholder session (for bootstrapping before session_id is known).
110104
pub fn placeholder(
111-
input_tx: tokio::sync::mpsc::Sender<InputFromClient>,
105+
input_tx: tokio::sync::mpsc::Sender<AgentInput>,
112106
interrupt: InterruptSignal,
113107
interrupt_tx: Arc<tokio::sync::watch::Sender<u64>>,
114108
) -> Self {

crates/loopal-agent-server/src/session_start.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use loopal_config::load_config;
1010
use loopal_error::AgentOutput;
1111
use loopal_ipc::connection::Connection;
1212
use loopal_protocol::InterruptSignal;
13+
use loopal_runtime::agent_input::AgentInput;
1314
use loopal_runtime::agent_loop;
1415

1516
use crate::agent_setup;
1617
use crate::hub_frontend::HubFrontend;
1718
use crate::params::StartParams;
18-
use crate::session_hub::{InputFromClient, SessionHub, SharedSession};
19+
use crate::session_hub::{SessionHub, SharedSession};
1920

2021
/// Handle returned to the dispatch loop after starting a session.
2122
pub(crate) struct SessionHandle {
@@ -77,7 +78,7 @@ pub(crate) async fn start_session(
7778
};
7879

7980
// Create session infrastructure
80-
let (input_tx, input_rx) = tokio::sync::mpsc::channel::<InputFromClient>(16);
81+
let (input_tx, input_rx) = tokio::sync::mpsc::channel::<AgentInput>(16);
8182
let interrupt = InterruptSignal::new();
8283
let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64);
8384
let interrupt_tx = Arc::new(watch_tx);

crates/loopal-agent-server/tests/suite.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ mod bridge_edge_test;
66
mod bridge_helpers;
77
#[path = "suite/dispatch_loop_test.rs"]
88
mod dispatch_loop_test;
9+
#[path = "suite/hub_drain_test.rs"]
10+
mod hub_drain_test;
911
#[path = "suite/hub_frontend_edge_test.rs"]
1012
mod hub_frontend_edge_test;
1113
#[path = "suite/hub_frontend_test.rs"]

0 commit comments

Comments
 (0)