Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/loopal-agent-hub/src/agent_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub async fn agent_io_loop(
}
let h = hub.lock().await;
if h.registry.event_sender().try_send(event).is_err() {
tracing::debug!(agent = %agent_name, "event dropped (channel full)");
tracing::warn!(agent = %agent_name, "event dropped (channel full)");
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion crates/loopal-agent-hub/src/agent_registry/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ impl AgentRegistry {
let pending_delivery = self.prepare_parent_delivery(name, &text);

let event = AgentEvent::named(name, AgentEventPayload::Finished);
let _ = self.event_tx.try_send(event);
if self.event_tx.try_send(event).is_err() {
tracing::warn!(agent = %name, "Finished event dropped (channel full)");
}

if let Some(tx) = self.completions.remove(name) {
let _ = tx.send(Some(text));
Expand Down
7 changes: 6 additions & 1 deletion crates/loopal-agent-hub/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ pub async fn route_to_agent(
target: envelope.target.clone(),
content_preview: envelope.content_preview().to_string(),
});
let _ = observation_tx.try_send(event);
if observation_tx.try_send(event).is_err() {
tracing::warn!(
target = %envelope.target,
"observation event dropped (channel full)"
);
}
Ok(())
}
2 changes: 1 addition & 1 deletion crates/loopal-agent-hub/src/spawn_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn register_agent_connection(
session_id: session_id.map(String::from),
});
if h.registry.event_sender().try_send(event).is_err() {
tracing::debug!(agent = %name, "SubAgentSpawned event dropped");
tracing::warn!(agent = %name, "SubAgentSpawned event dropped (channel full)");
}
}
agent_id
Expand Down
36 changes: 14 additions & 22 deletions crates/loopal-agent-server/src/hub_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ use tracing::{debug, info};

use loopal_error::{LoopalError, Result};
use loopal_ipc::protocol::methods;
use loopal_protocol::{AgentEvent, AgentEventPayload, Envelope, Question, UserQuestionResponse};
use loopal_protocol::{AgentEvent, AgentEventPayload, Question, UserQuestionResponse};
use loopal_runtime::agent_input::AgentInput;
use loopal_runtime::frontend::traits::{AgentFrontend, EventEmitter};
use loopal_tool_api::PermissionDecision;

use crate::hub_emitter::HubEventEmitter;
use crate::session_hub::{InputFromClient, SharedSession};
use crate::session_hub::SharedSession;

/// Frontend that multiplexes across all clients in a shared session.
pub struct HubFrontend {
session: tokio::sync::RwLock<Arc<SharedSession>>,
input_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<InputFromClient>>,
input_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<AgentInput>>,
agent_name: Option<String>,
/// Watch channel for interrupt detection in recv_input.
interrupt_rx: tokio::sync::Mutex<tokio::sync::watch::Receiver<u64>>,
Expand All @@ -34,7 +34,7 @@ pub struct HubFrontend {
impl HubFrontend {
pub fn new(
session: Arc<SharedSession>,
input_rx: tokio::sync::mpsc::Receiver<InputFromClient>,
input_rx: tokio::sync::mpsc::Receiver<AgentInput>,
agent_name: Option<String>,
interrupt_rx: tokio::sync::watch::Receiver<u64>,
) -> Self {
Expand Down Expand Up @@ -95,18 +95,12 @@ impl AgentFrontend for HubFrontend {
// has already been handled by TurnCancel. Without this, changed()
// fires immediately on the old value and exits the agent loop.
interrupt_rx.borrow_and_update();
loop {
tokio::select! {
msg = rx.recv() => {
match msg? {
InputFromClient::Message(env) => return Some(AgentInput::Message(env)),
InputFromClient::Control(cmd) => return Some(AgentInput::Control(cmd)),
InputFromClient::Interrupt => continue,
}
}
_ = interrupt_rx.changed() => {
return None; // Interrupted — exit agent loop
}
tokio::select! {
msg = rx.recv() => {
return msg;
}
_ = interrupt_rx.changed() => {
return None; // Interrupted — exit agent loop
}
}
}
Expand Down Expand Up @@ -199,14 +193,12 @@ impl AgentFrontend for HubFrontend {
true
}

async fn drain_pending(&self) -> Vec<Envelope> {
async fn drain_pending(&self) -> Vec<AgentInput> {
let mut rx = self.input_rx.lock().await;
let mut envelopes = Vec::new();
let mut inputs = Vec::new();
while let Ok(msg) = rx.try_recv() {
if let InputFromClient::Message(env) = msg {
envelopes.push(env);
}
inputs.push(msg);
}
envelopes
inputs
}
}
5 changes: 3 additions & 2 deletions crates/loopal-agent-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn ipc_frontend_for_test(
#[doc(hidden)]
pub fn hub_frontend_for_test(
session: std::sync::Arc<session_hub::SharedSession>,
input_rx: tokio::sync::mpsc::Receiver<session_hub::InputFromClient>,
input_rx: tokio::sync::mpsc::Receiver<loopal_runtime::agent_input::AgentInput>,
interrupt_rx: tokio::sync::watch::Receiver<u64>,
) -> std::sync::Arc<dyn loopal_runtime::frontend::traits::AgentFrontend> {
std::sync::Arc::new(hub_frontend::HubFrontend::new(
Expand All @@ -64,5 +64,6 @@ pub fn hub_frontend_for_test(
pub mod testing {
pub use crate::agent_setup::build_with_frontend;
pub use crate::params::{StartParams, build_kernel_with_provider};
pub use crate::session_hub::{InputFromClient, SharedSession};
pub use crate::session_hub::SharedSession;
pub use loopal_runtime::agent_input::AgentInput;
}
8 changes: 4 additions & 4 deletions crates/loopal-agent-server/src/session_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use loopal_ipc::connection::{Connection, Incoming};
use loopal_ipc::jsonrpc;
use loopal_ipc::protocol::methods;
use loopal_protocol::{ControlCommand, Envelope};
use loopal_runtime::agent_input::AgentInput;

use crate::session_hub::InputFromClient;
use crate::session_start::SessionHandle;

/// Result of forward_loop — tells dispatch_loop what happened.
Expand Down Expand Up @@ -74,7 +74,7 @@ pub(crate) async fn forward_loop(
} else if method == methods::AGENT_MESSAGE.name {
// Hub-injected message (e.g. sub-agent completion notification).
if let Ok(env) = serde_json::from_value::<Envelope>(params) {
let _ = session.input_tx.send(InputFromClient::Message(env)).await;
let _ = session.input_tx.send(AgentInput::Message(env)).await;
}
}
}
Expand All @@ -98,7 +98,7 @@ async fn route_request(
match method {
m if m == methods::AGENT_MESSAGE.name => match serde_json::from_value::<Envelope>(params) {
Ok(env) => {
let _ = session.input_tx.send(InputFromClient::Message(env)).await;
let _ = session.input_tx.send(AgentInput::Message(env)).await;
let _ = connection
.respond(id, serde_json::json!({"ok": true}))
.await;
Expand All @@ -112,7 +112,7 @@ async fn route_request(
m if m == methods::AGENT_CONTROL.name => {
match serde_json::from_value::<ControlCommand>(params) {
Ok(cmd) => {
let _ = session.input_tx.send(InputFromClient::Control(cmd)).await;
let _ = session.input_tx.send(AgentInput::Control(cmd)).await;
let _ = connection
.respond(id, serde_json::json!({"ok": true}))
.await;
Expand Down
12 changes: 3 additions & 9 deletions crates/loopal-agent-server/src/session_hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tokio::sync::Mutex;

use loopal_ipc::connection::Connection;
use loopal_protocol::InterruptSignal;
use loopal_runtime::agent_input::AgentInput;

/// A connected client handle within a shared session.
pub struct ClientHandle {
Expand All @@ -22,19 +23,12 @@ pub struct SharedSession {
pub session_id: String,
pub clients: Mutex<Vec<ClientHandle>>,
/// Channel to send input into the agent loop.
pub input_tx: tokio::sync::mpsc::Sender<InputFromClient>,
pub input_tx: tokio::sync::mpsc::Sender<AgentInput>,
/// Interrupt signal shared with the agent loop.
pub interrupt: InterruptSignal,
pub interrupt_tx: Arc<tokio::sync::watch::Sender<u64>>,
}

/// Input forwarded from a client connection to the agent loop.
pub enum InputFromClient {
Message(loopal_protocol::Envelope),
Control(loopal_protocol::ControlCommand),
Interrupt,
}

/// Server-wide session registry.
#[derive(Default)]
pub struct SessionHub {
Expand Down Expand Up @@ -108,7 +102,7 @@ impl SessionHub {
impl SharedSession {
/// Create a placeholder session (for bootstrapping before session_id is known).
pub fn placeholder(
input_tx: tokio::sync::mpsc::Sender<InputFromClient>,
input_tx: tokio::sync::mpsc::Sender<AgentInput>,
interrupt: InterruptSignal,
interrupt_tx: Arc<tokio::sync::watch::Sender<u64>>,
) -> Self {
Expand Down
5 changes: 3 additions & 2 deletions crates/loopal-agent-server/src/session_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ use loopal_config::load_config;
use loopal_error::AgentOutput;
use loopal_ipc::connection::Connection;
use loopal_protocol::InterruptSignal;
use loopal_runtime::agent_input::AgentInput;
use loopal_runtime::agent_loop;

use crate::agent_setup;
use crate::hub_frontend::HubFrontend;
use crate::params::StartParams;
use crate::session_hub::{InputFromClient, SessionHub, SharedSession};
use crate::session_hub::{SessionHub, SharedSession};

/// Handle returned to the dispatch loop after starting a session.
pub(crate) struct SessionHandle {
Expand Down Expand Up @@ -77,7 +78,7 @@ pub(crate) async fn start_session(
};

// Create session infrastructure
let (input_tx, input_rx) = tokio::sync::mpsc::channel::<InputFromClient>(16);
let (input_tx, input_rx) = tokio::sync::mpsc::channel::<AgentInput>(16);
let interrupt = InterruptSignal::new();
let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64);
let interrupt_tx = Arc::new(watch_tx);
Expand Down
2 changes: 2 additions & 0 deletions crates/loopal-agent-server/tests/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod bridge_edge_test;
mod bridge_helpers;
#[path = "suite/dispatch_loop_test.rs"]
mod dispatch_loop_test;
#[path = "suite/hub_drain_test.rs"]
mod hub_drain_test;
#[path = "suite/hub_frontend_edge_test.rs"]
mod hub_frontend_edge_test;
#[path = "suite/hub_frontend_test.rs"]
Expand Down
65 changes: 65 additions & 0 deletions crates/loopal-agent-server/tests/suite/hub_drain_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! Tests for HubFrontend::drain_pending() — message and control routing.

use std::sync::Arc;

use tokio::sync::Mutex;

use loopal_protocol::{ControlCommand, Envelope, InterruptSignal, MessageSource};
use loopal_runtime::agent_input::AgentInput;
use loopal_runtime::frontend::traits::AgentFrontend;

use loopal_agent_server::hub_frontend::HubFrontend;
use loopal_agent_server::session_hub::SharedSession;

fn make_session() -> (
Arc<SharedSession>,
tokio::sync::mpsc::Sender<AgentInput>,
tokio::sync::mpsc::Receiver<AgentInput>,
tokio::sync::watch::Receiver<u64>,
) {
let (input_tx, input_rx) = tokio::sync::mpsc::channel(16);
let interrupt = InterruptSignal::new();
let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64);
let session = Arc::new(SharedSession {
session_id: "test-session".into(),
clients: Mutex::new(Vec::new()),
input_tx: input_tx.clone(),
interrupt,
interrupt_tx: Arc::new(watch_tx),
});
(session, input_tx, input_rx, watch_rx)
}

#[tokio::test]
async fn test_hub_drain_pending_messages() {
let (session, input_tx, input_rx, watch_rx) = make_session();
let frontend = HubFrontend::new(session, input_rx, None, watch_rx);

let env = Envelope::new(MessageSource::Human, "main", "hello");
input_tx.send(AgentInput::Message(env)).await.unwrap();

let pending = frontend.drain_pending().await;
assert_eq!(pending.len(), 1);
let AgentInput::Message(ref env) = pending[0] else {
panic!("expected AgentInput::Message");
};
assert_eq!(env.content.text, "hello");
}

#[tokio::test]
async fn test_hub_drain_pending_controls() {
let (session, input_tx, input_rx, watch_rx) = make_session();
let frontend = HubFrontend::new(session, input_rx, None, watch_rx);

input_tx
.send(AgentInput::Control(ControlCommand::Clear))
.await
.unwrap();

let pending = frontend.drain_pending().await;
assert_eq!(pending.len(), 1);
assert!(matches!(
pending[0],
AgentInput::Control(ControlCommand::Clear)
));
}
12 changes: 6 additions & 6 deletions crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use loopal_runtime::agent_input::AgentInput;
use loopal_runtime::frontend::traits::AgentFrontend;

use loopal_agent_server::hub_frontend::HubFrontend;
use loopal_agent_server::session_hub::{InputFromClient, SharedSession};
use loopal_agent_server::session_hub::SharedSession;

const T: Duration = Duration::from_secs(5);

fn make_session() -> (
Arc<SharedSession>,
tokio::sync::mpsc::Sender<InputFromClient>,
tokio::sync::mpsc::Receiver<InputFromClient>,
tokio::sync::mpsc::Sender<AgentInput>,
tokio::sync::mpsc::Receiver<AgentInput>,
tokio::sync::watch::Receiver<u64>,
) {
let (input_tx, input_rx) = tokio::sync::mpsc::channel(16);
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn stale_interrupt_does_not_exit_recv_input() {

// Now send a real message — recv_input should return it.
let env = Envelope::new(MessageSource::Human, "main", "hello after interrupt");
input_tx.send(InputFromClient::Message(env)).await.unwrap();
input_tx.send(AgentInput::Message(env)).await.unwrap();

let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap();
assert!(
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn interrupt_then_continue_cycle() {

// Send a message — should be delivered.
let env = Envelope::new(MessageSource::Human, "main", "continue working");
input_tx.send(InputFromClient::Message(env)).await.unwrap();
input_tx.send(AgentInput::Message(env)).await.unwrap();

let result2 = tokio::time::timeout(T, recv2).await.unwrap().unwrap();
assert!(
Expand Down Expand Up @@ -136,7 +136,7 @@ async fn multiple_stale_interrupts_all_consumed() {
);

let env = Envelope::new(MessageSource::Human, "main", "msg");
input_tx.send(InputFromClient::Message(env)).await.unwrap();
input_tx.send(AgentInput::Message(env)).await.unwrap();

let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap();
assert!(matches!(result, Some(AgentInput::Message(_))));
Expand Down
5 changes: 3 additions & 2 deletions crates/loopal-agent-server/tests/suite/hub_frontend_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use loopal_ipc::transport::Transport;
use loopal_protocol::{AgentEventPayload, InterruptSignal};
use loopal_runtime::frontend::traits::AgentFrontend;

use loopal_agent_server::session_hub::{InputFromClient, SharedSession};
use loopal_agent_server::session_hub::SharedSession;
use loopal_runtime::agent_input::AgentInput;

/// Create a bidirectional Connection pair (like a network socket pair).
/// Returns (server_conn, client_conn, client_rx).
Expand Down Expand Up @@ -41,7 +42,7 @@ fn conn_pair() -> (

fn make_session() -> (
Arc<SharedSession>,
tokio::sync::mpsc::Receiver<InputFromClient>,
tokio::sync::mpsc::Receiver<AgentInput>,
tokio::sync::watch::Receiver<u64>,
) {
let (input_tx, input_rx) = tokio::sync::mpsc::channel(16);
Expand Down
Loading
Loading