From 15725c88f2d4b08960d852a00f52d4ec2492367c Mon Sep 17 00:00:00 2001 From: psumo Date: Thu, 12 Mar 2026 00:59:56 +0000 Subject: [PATCH] fix: resolve channel default_agent by name at message-time instead of caching UUID at startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a channel's default_agent is resolved at startup, the UUID gets cached in the AgentRouter. Kill + respawn assigns a new UUID but the router keeps the old one, so all messages fail with "Agent not found" until you restart the process. Added a lazy re-resolution fallback in the bridge dispatch path — on "Agent not found", look up the configured agent name, call find_agent_by_name() for the current UUID, update the cache, retry. No overhead on the normal path. --- crates/openfang-api/src/channel_bridge.rs | 2 +- crates/openfang-channels/src/bridge.rs | 119 +++++++++++++++++++++- crates/openfang-channels/src/router.rs | 24 +++++ 3 files changed, 143 insertions(+), 2 deletions(-) diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 1477f85b5..6561f08f0 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -1603,7 +1603,7 @@ pub async fn start_channel_bridge_with_config( "{} default agent: {name} ({agent_id}) [channel: {channel_key}]", adapter.name() ); - router.set_channel_default(channel_key, agent_id); + router.set_channel_default_with_name(channel_key, agent_id, name.clone()); // First configured default also becomes system-wide fallback if !system_default_set { router.set_default(agent_id); diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 3a8c3d68a..f7de6582d 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -434,6 +434,65 @@ async fn send_lifecycle_reaction( let _ = adapter.send_reaction(user, message_id, &reaction).await; } +/// Attempt to re-resolve an agent by channel default name after an "Agent not found" error. +/// +/// When an agent is killed and respawned, it gets a new UUID. The router's cached ID +/// becomes stale. This function: +/// 1. Checks if the error indicates the agent was not found. +/// 2. Looks up the channel's configured default agent name. +/// 3. Calls `find_agent_by_name()` to get the new UUID. +/// 4. Updates the router's channel default cache. +/// 5. Returns the new AgentId if successful, None otherwise. +/// +/// This is the fallback path -- only called when the primary send fails. The hot path +/// (cached UUID is valid) never hits this function. +async fn try_reresolution( + error: &str, + channel_key: &str, + handle: &Arc, + router: &Arc, +) -> Option { + // Only attempt re-resolution for "Agent not found" errors + if !error.contains("Agent not found") { + return None; + } + + // Check if we have a configured name for this channel's default agent + let agent_name = router.channel_default_name(channel_key)?; + + info!( + channel = channel_key, + agent_name = %agent_name, + "Cached agent UUID is stale -- attempting re-resolution by name" + ); + + // Try to find the agent by name (it may have been respawned with a new UUID) + match handle.find_agent_by_name(&agent_name).await { + Ok(Some(new_id)) => { + info!( + channel = channel_key, + agent_name = %agent_name, + new_id = %new_id, + "Re-resolved agent by name -- updating router cache" + ); + router.update_channel_default(channel_key, new_id); + Some(new_id) + } + Ok(None) => { + warn!( + channel = channel_key, + agent_name = %agent_name, + "Agent not found by name during re-resolution -- agent may not be running" + ); + None + } + Err(e) => { + warn!(channel = channel_key, error = %e, "Error during agent re-resolution"); + None + } + } +} + /// Dispatch a single incoming message — handles bot commands or routes to an agent. /// /// Applies per-channel policies (DM/group filtering, rate limiting, formatting, threading). @@ -692,6 +751,8 @@ async fn dispatch_message( &message.sender.platform_id, message.sender.openfang_user.as_deref(), ); + // Capture channel key for potential re-resolution after agent kill+respawn + let channel_key = format!("{:?}", message.channel); let agent_id = match agent_id { Some(id) => id, @@ -770,6 +831,33 @@ async fn dispatch_message( .await; } Err(e) => { + // Try re-resolving by name if the agent UUID is stale (killed + respawned). + // This only fires on "Agent not found" errors -- all other errors fall through. + if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await { + // Retry with the newly resolved agent ID + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await; + match handle.send_message(new_id, &text).await { + Ok(response) => { + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await; + send_response(adapter, &message.sender, response, thread_id, output_format).await; + handle + .record_delivery(new_id, ct_str, &message.sender.platform_id, true, None, thread_id) + .await; + } + Err(e2) => { + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await; + warn!("Agent error for {new_id} (after re-resolution): {e2}"); + let err_msg = format!("Agent error: {e2}"); + send_response(adapter, &message.sender, err_msg.clone(), thread_id, output_format).await; + handle + .record_delivery(new_id, ct_str, &message.sender.platform_id, false, Some(&err_msg), thread_id) + .await; + } + } + return; + } + + // No re-resolution possible -- report the original error send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await; warn!("Agent error for {agent_id}: {e}"); let err_msg = format!("Agent error: {e}"); @@ -931,6 +1019,8 @@ async fn dispatch_with_blocks( &message.sender.platform_id, message.sender.openfang_user.as_deref(), ); + // Capture channel key for potential re-resolution after agent kill+respawn + let channel_key = format!("{:?}", message.channel); let agent_id = match agent_id { Some(id) => id, @@ -986,7 +1076,7 @@ async fn dispatch_with_blocks( send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Queued).await; send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await; - match handle.send_message_with_blocks(agent_id, blocks).await { + match handle.send_message_with_blocks(agent_id, blocks.clone()).await { Ok(response) => { send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await; send_response(adapter, &message.sender, response, thread_id, output_format).await; @@ -995,6 +1085,33 @@ async fn dispatch_with_blocks( .await; } Err(e) => { + // Try re-resolving by name if the agent UUID is stale (killed + respawned). + // This only fires on "Agent not found" errors -- all other errors fall through. + if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await { + // Retry with the newly resolved agent ID + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await; + match handle.send_message_with_blocks(new_id, blocks).await { + Ok(response) => { + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await; + send_response(adapter, &message.sender, response, thread_id, output_format).await; + handle + .record_delivery(new_id, ct_str, &message.sender.platform_id, true, None, thread_id) + .await; + } + Err(e2) => { + send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await; + warn!("Agent error for {new_id} (after re-resolution): {e2}"); + let err_msg = format!("Agent error: {e2}"); + send_response(adapter, &message.sender, err_msg.clone(), thread_id, output_format).await; + handle + .record_delivery(new_id, ct_str, &message.sender.platform_id, false, Some(&err_msg), thread_id) + .await; + } + } + return; + } + + // No re-resolution possible -- report the original error send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await; warn!("Agent error for {agent_id}: {e}"); let err_msg = format!("Agent error: {e}"); diff --git a/crates/openfang-channels/src/router.rs b/crates/openfang-channels/src/router.rs index 985f5d48b..3016b3be8 100644 --- a/crates/openfang-channels/src/router.rs +++ b/crates/openfang-channels/src/router.rs @@ -34,6 +34,8 @@ pub struct AgentRouter { default_agent: Option, /// Per-channel-type default agent (e.g., Telegram -> agent_a, Discord -> agent_b). channel_defaults: DashMap, + /// Per-channel-type default agent NAME (for re-resolution after agent kill+respawn). + channel_default_names: DashMap, /// Sorted bindings (most specific first). Uses Mutex for runtime updates via Arc. bindings: Mutex>, /// Broadcast configuration. Uses Mutex for runtime updates via Arc. @@ -50,6 +52,7 @@ impl AgentRouter { direct_routes: DashMap::new(), default_agent: None, channel_defaults: DashMap::new(), + channel_default_names: DashMap::new(), bindings: Mutex::new(Vec::new()), broadcast: Mutex::new(BroadcastConfig::default()), agent_name_cache: DashMap::new(), @@ -66,6 +69,20 @@ impl AgentRouter { self.channel_defaults.insert(channel_key, agent_id); } + /// Set a per-channel-type default agent with its name for later re-resolution. + /// + /// When an agent is killed and respawned (new UUID), the bridge can use the + /// stored name to re-resolve the agent by calling `find_agent_by_name()`. + pub fn set_channel_default_with_name(&self, channel_key: String, agent_id: AgentId, agent_name: String) { + self.channel_default_names.insert(channel_key.clone(), agent_name); + self.channel_defaults.insert(channel_key, agent_id); + } + + /// Get the configured default agent name for a channel type (if set via `set_channel_default_with_name`). + pub fn channel_default_name(&self, channel_key: &str) -> Option { + self.channel_default_names.get(channel_key).map(|r| r.clone()) + } + /// Set a user's default agent. pub fn set_user_default(&self, user_key: String, agent_id: AgentId) { self.user_defaults.insert(user_key, agent_id); @@ -107,6 +124,13 @@ impl AgentRouter { self.agent_name_cache.insert(name, id); } + /// Update a channel default agent ID (used when re-resolving after agent respawn). + /// + /// Only updates the cached AgentId -- the name remains unchanged. + pub fn update_channel_default(&self, channel_key: &str, new_agent_id: AgentId) { + self.channel_defaults.insert(channel_key.to_string(), new_agent_id); + } + /// Resolve which agent should handle a message. /// /// Priority: bindings > direct route > user default > system default.