Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/openfang-api/src/channel_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,7 +1603,7 @@ pub async fn start_channel_bridge_with_config(
"{} default agent: {name} ({agent_id}) [channel: {channel_key}]",
adapter.name()
);
router.set_channel_default(channel_key, agent_id);
router.set_channel_default_with_name(channel_key, agent_id, name.clone());
// First configured default also becomes system-wide fallback
if !system_default_set {
router.set_default(agent_id);
Expand Down
119 changes: 118 additions & 1 deletion crates/openfang-channels/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,65 @@ async fn send_lifecycle_reaction(
let _ = adapter.send_reaction(user, message_id, &reaction).await;
}

/// Attempt to re-resolve an agent by channel default name after an "Agent not found" error.
///
/// When an agent is killed and respawned, it gets a new UUID. The router's cached ID
/// becomes stale. This function:
/// 1. Checks if the error indicates the agent was not found.
/// 2. Looks up the channel's configured default agent name.
/// 3. Calls `find_agent_by_name()` to get the new UUID.
/// 4. Updates the router's channel default cache.
/// 5. Returns the new AgentId if successful, None otherwise.
///
/// This is the fallback path -- only called when the primary send fails. The hot path
/// (cached UUID is valid) never hits this function.
async fn try_reresolution(
error: &str,
channel_key: &str,
handle: &Arc<dyn ChannelBridgeHandle>,
router: &Arc<AgentRouter>,
) -> Option<AgentId> {
// Only attempt re-resolution for "Agent not found" errors
if !error.contains("Agent not found") {
return None;
}

// Check if we have a configured name for this channel's default agent
let agent_name = router.channel_default_name(channel_key)?;

info!(
channel = channel_key,
agent_name = %agent_name,
"Cached agent UUID is stale -- attempting re-resolution by name"
);

// Try to find the agent by name (it may have been respawned with a new UUID)
match handle.find_agent_by_name(&agent_name).await {
Ok(Some(new_id)) => {
info!(
channel = channel_key,
agent_name = %agent_name,
new_id = %new_id,
"Re-resolved agent by name -- updating router cache"
);
router.update_channel_default(channel_key, new_id);
Some(new_id)
}
Ok(None) => {
warn!(
channel = channel_key,
agent_name = %agent_name,
"Agent not found by name during re-resolution -- agent may not be running"
);
None
}
Err(e) => {
warn!(channel = channel_key, error = %e, "Error during agent re-resolution");
None
}
}
}

/// Dispatch a single incoming message — handles bot commands or routes to an agent.
///
/// Applies per-channel policies (DM/group filtering, rate limiting, formatting, threading).
Expand Down Expand Up @@ -692,6 +751,8 @@ async fn dispatch_message(
&message.sender.platform_id,
message.sender.openfang_user.as_deref(),
);
// Capture channel key for potential re-resolution after agent kill+respawn
let channel_key = format!("{:?}", message.channel);

let agent_id = match agent_id {
Some(id) => id,
Expand Down Expand Up @@ -770,6 +831,33 @@ async fn dispatch_message(
.await;
}
Err(e) => {
// Try re-resolving by name if the agent UUID is stale (killed + respawned).
// This only fires on "Agent not found" errors -- all other errors fall through.
if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await {
// Retry with the newly resolved agent ID
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await;
match handle.send_message(new_id, &text).await {
Ok(response) => {
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await;
send_response(adapter, &message.sender, response, thread_id, output_format).await;
handle
.record_delivery(new_id, ct_str, &message.sender.platform_id, true, None, thread_id)
.await;
}
Err(e2) => {
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await;
warn!("Agent error for {new_id} (after re-resolution): {e2}");
let err_msg = format!("Agent error: {e2}");
send_response(adapter, &message.sender, err_msg.clone(), thread_id, output_format).await;
handle
.record_delivery(new_id, ct_str, &message.sender.platform_id, false, Some(&err_msg), thread_id)
.await;
}
}
return;
}

// No re-resolution possible -- report the original error
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await;
warn!("Agent error for {agent_id}: {e}");
let err_msg = format!("Agent error: {e}");
Expand Down Expand Up @@ -931,6 +1019,8 @@ async fn dispatch_with_blocks(
&message.sender.platform_id,
message.sender.openfang_user.as_deref(),
);
// Capture channel key for potential re-resolution after agent kill+respawn
let channel_key = format!("{:?}", message.channel);

let agent_id = match agent_id {
Some(id) => id,
Expand Down Expand Up @@ -986,7 +1076,7 @@ async fn dispatch_with_blocks(
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Queued).await;
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await;

match handle.send_message_with_blocks(agent_id, blocks).await {
match handle.send_message_with_blocks(agent_id, blocks.clone()).await {
Ok(response) => {
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await;
send_response(adapter, &message.sender, response, thread_id, output_format).await;
Expand All @@ -995,6 +1085,33 @@ async fn dispatch_with_blocks(
.await;
}
Err(e) => {
// Try re-resolving by name if the agent UUID is stale (killed + respawned).
// This only fires on "Agent not found" errors -- all other errors fall through.
if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await {
// Retry with the newly resolved agent ID
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Thinking).await;
match handle.send_message_with_blocks(new_id, blocks).await {
Ok(response) => {
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Done).await;
send_response(adapter, &message.sender, response, thread_id, output_format).await;
handle
.record_delivery(new_id, ct_str, &message.sender.platform_id, true, None, thread_id)
.await;
}
Err(e2) => {
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await;
warn!("Agent error for {new_id} (after re-resolution): {e2}");
let err_msg = format!("Agent error: {e2}");
send_response(adapter, &message.sender, err_msg.clone(), thread_id, output_format).await;
handle
.record_delivery(new_id, ct_str, &message.sender.platform_id, false, Some(&err_msg), thread_id)
.await;
}
}
return;
}

// No re-resolution possible -- report the original error
send_lifecycle_reaction(adapter, &message.sender, msg_id, AgentPhase::Error).await;
warn!("Agent error for {agent_id}: {e}");
let err_msg = format!("Agent error: {e}");
Expand Down
24 changes: 24 additions & 0 deletions crates/openfang-channels/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub struct AgentRouter {
default_agent: Option<AgentId>,
/// Per-channel-type default agent (e.g., Telegram -> agent_a, Discord -> agent_b).
channel_defaults: DashMap<String, AgentId>,
/// Per-channel-type default agent NAME (for re-resolution after agent kill+respawn).
channel_default_names: DashMap<String, String>,
/// Sorted bindings (most specific first). Uses Mutex for runtime updates via Arc.
bindings: Mutex<Vec<(AgentBinding, String)>>,
/// Broadcast configuration. Uses Mutex for runtime updates via Arc.
Expand All @@ -50,6 +52,7 @@ impl AgentRouter {
direct_routes: DashMap::new(),
default_agent: None,
channel_defaults: DashMap::new(),
channel_default_names: DashMap::new(),
bindings: Mutex::new(Vec::new()),
broadcast: Mutex::new(BroadcastConfig::default()),
agent_name_cache: DashMap::new(),
Expand All @@ -66,6 +69,20 @@ impl AgentRouter {
self.channel_defaults.insert(channel_key, agent_id);
}

/// Set a per-channel-type default agent with its name for later re-resolution.
///
/// When an agent is killed and respawned (new UUID), the bridge can use the
/// stored name to re-resolve the agent by calling `find_agent_by_name()`.
pub fn set_channel_default_with_name(&self, channel_key: String, agent_id: AgentId, agent_name: String) {
self.channel_default_names.insert(channel_key.clone(), agent_name);
self.channel_defaults.insert(channel_key, agent_id);
}

/// Get the configured default agent name for a channel type (if set via `set_channel_default_with_name`).
pub fn channel_default_name(&self, channel_key: &str) -> Option<String> {
self.channel_default_names.get(channel_key).map(|r| r.clone())
}

/// Set a user's default agent.
pub fn set_user_default(&self, user_key: String, agent_id: AgentId) {
self.user_defaults.insert(user_key, agent_id);
Expand Down Expand Up @@ -107,6 +124,13 @@ impl AgentRouter {
self.agent_name_cache.insert(name, id);
}

/// Update a channel default agent ID (used when re-resolving after agent respawn).
///
/// Only updates the cached AgentId -- the name remains unchanged.
pub fn update_channel_default(&self, channel_key: &str, new_agent_id: AgentId) {
self.channel_defaults.insert(channel_key.to_string(), new_agent_id);
}

/// Resolve which agent should handle a message.
///
/// Priority: bindings > direct route > user default > system default.
Expand Down