Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions crates/openfang-api/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,70 @@ pub async fn kill_agent(
}
}

/// POST /api/agents/{id}/restart — Restart a crashed/stuck agent.
///
/// Cancels any active task, resets agent state to Running, and updates last_active.
/// Returns the agent's new state.
pub async fn restart_agent(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let agent_id: AgentId = match id.parse() {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "Invalid agent ID"})),
);
}
};

// Check agent exists
let entry = match state.kernel.registry.get(agent_id) {
Some(e) => e,
None => {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Agent not found"})),
);
}
};

let agent_name = entry.name.clone();
let previous_state = format!("{:?}", entry.state);
drop(entry);

// Cancel any running task
let was_running = state
.kernel
.stop_agent_run(agent_id)
.unwrap_or(false);

// Reset state to Running (also updates last_active)
let _ = state
.kernel
.registry
.set_state(agent_id, openfang_types::agent::AgentState::Running);

tracing::info!(
agent = %agent_name,
previous_state = %previous_state,
task_cancelled = was_running,
"Agent restarted via API"
);

(
StatusCode::OK,
Json(serde_json::json!({
"status": "restarted",
"agent": agent_name,
"agent_id": id,
"previous_state": previous_state,
"task_cancelled": was_running,
})),
)
}

/// GET /api/status — Kernel status.
pub async fn status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let agents: Vec<serde_json::Value> = state
Expand Down
8 changes: 8 additions & 0 deletions crates/openfang-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ pub async fn build_router(
axum::routing::put(routes::set_agent_mode),
)
.route("/api/profiles", axum::routing::get(routes::list_profiles))
.route(
"/api/agents/{id}/restart",
axum::routing::post(routes::restart_agent),
)
.route(
"/api/agents/{id}/start",
axum::routing::post(routes::restart_agent),
)
.route(
"/api/agents/{id}/message",
axum::routing::post(routes::send_message),
Expand Down
128 changes: 120 additions & 8 deletions crates/openfang-kernel/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
//! each running agent's `last_active` timestamp. If an agent hasn't been active
//! for longer than 2x its heartbeat interval, a `HealthCheckFailed` event is
//! published to the event bus.
//!
//! Crashed agents are tracked for auto-recovery: the heartbeat will attempt to
//! reset crashed agents back to Running up to `max_recovery_attempts` times.
//! After exhausting attempts, agents are marked as Terminated (dead).

use crate::registry::AgentRegistry;
use chrono::Utc;
use dashmap::DashMap;
use openfang_types::agent::{AgentId, AgentState};
use tracing::{debug, warn};

Expand All @@ -17,6 +22,12 @@ const DEFAULT_CHECK_INTERVAL_SECS: u64 = 30;
/// multiples of its heartbeat interval.
const UNRESPONSIVE_MULTIPLIER: u64 = 2;

/// Default maximum recovery attempts before giving up.
const DEFAULT_MAX_RECOVERY_ATTEMPTS: u32 = 3;

/// Default cooldown between recovery attempts (seconds).
const DEFAULT_RECOVERY_COOLDOWN_SECS: u64 = 60;

/// Result of a heartbeat check.
#[derive(Debug, Clone)]
pub struct HeartbeatStatus {
Expand All @@ -28,6 +39,8 @@ pub struct HeartbeatStatus {
pub inactive_secs: i64,
/// Whether the agent is considered unresponsive.
pub unresponsive: bool,
/// Current agent state.
pub state: AgentState,
}

/// Heartbeat monitor configuration.
Expand All @@ -38,18 +51,82 @@ pub struct HeartbeatConfig {
/// Default threshold for unresponsiveness (seconds).
/// Overridden per-agent by AutonomousConfig.heartbeat_interval_secs.
pub default_timeout_secs: u64,
/// Maximum recovery attempts before marking agent as Terminated.
pub max_recovery_attempts: u32,
/// Minimum seconds between recovery attempts for the same agent.
pub recovery_cooldown_secs: u64,
}

impl Default for HeartbeatConfig {
fn default() -> Self {
Self {
check_interval_secs: DEFAULT_CHECK_INTERVAL_SECS,
default_timeout_secs: DEFAULT_CHECK_INTERVAL_SECS * UNRESPONSIVE_MULTIPLIER,
// 180s default: browser tasks and complex LLM calls can take 1-3 minutes
default_timeout_secs: 180,
max_recovery_attempts: DEFAULT_MAX_RECOVERY_ATTEMPTS,
recovery_cooldown_secs: DEFAULT_RECOVERY_COOLDOWN_SECS,
}
}
}

/// Check all running agents and return their heartbeat status.
/// Tracks per-agent recovery state across heartbeat cycles.
#[derive(Debug)]
pub struct RecoveryTracker {
/// Per-agent recovery state: (consecutive_failures, last_attempt_epoch_secs).
state: DashMap<AgentId, (u32, u64)>,
}

impl RecoveryTracker {
/// Create a new recovery tracker.
pub fn new() -> Self {
Self {
state: DashMap::new(),
}
}

/// Record a recovery attempt for an agent.
/// Returns the current attempt number (1-indexed).
pub fn record_attempt(&self, agent_id: AgentId) -> u32 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut entry = self.state.entry(agent_id).or_insert((0, 0));
entry.0 += 1;
entry.1 = now;
entry.0
}

/// Check if enough time has passed since the last recovery attempt.
pub fn can_attempt(&self, agent_id: AgentId, cooldown_secs: u64) -> bool {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
match self.state.get(&agent_id) {
Some(entry) => now.saturating_sub(entry.1) >= cooldown_secs,
None => true, // No prior attempts
}
}

/// Get the current failure count for an agent.
pub fn failure_count(&self, agent_id: AgentId) -> u32 {
self.state.get(&agent_id).map(|e| e.0).unwrap_or(0)
}

/// Reset recovery state for an agent (e.g. after successful recovery).
pub fn reset(&self, agent_id: AgentId) {
self.state.remove(&agent_id);
}
}

impl Default for RecoveryTracker {
fn default() -> Self {
Self::new()
}
}

/// Check all running and crashed agents and return their heartbeat status.
///
/// This is a pure function — it doesn't start a background task.
/// The caller (kernel) can run this periodically or in a background task.
Expand All @@ -58,9 +135,10 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
let mut statuses = Vec::new();

for entry_ref in registry.list() {
// Only check running agents
if entry_ref.state != AgentState::Running {
continue;
// Check Running agents (for unresponsiveness) and Crashed agents (for recovery)
match entry_ref.state {
AgentState::Running | AgentState::Crashed => {}
_ => continue,
}

let inactive_secs = (now - entry_ref.last_active).num_seconds();
Expand All @@ -73,15 +151,22 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
.map(|a| a.heartbeat_interval_secs * UNRESPONSIVE_MULTIPLIER)
.unwrap_or(config.default_timeout_secs) as i64;

let unresponsive = inactive_secs > timeout_secs;
// Crashed agents are always considered unresponsive
let unresponsive = entry_ref.state == AgentState::Crashed || inactive_secs > timeout_secs;

if unresponsive {
if unresponsive && entry_ref.state == AgentState::Running {
warn!(
agent = %entry_ref.name,
inactive_secs,
timeout_secs,
"Agent is unresponsive"
);
} else if entry_ref.state == AgentState::Crashed {
warn!(
agent = %entry_ref.name,
inactive_secs,
"Agent is crashed — eligible for recovery"
);
} else {
debug!(
agent = %entry_ref.name,
Expand All @@ -95,6 +180,7 @@ pub fn check_agents(registry: &AgentRegistry, config: &HeartbeatConfig) -> Vec<H
name: entry_ref.name.clone(),
inactive_secs,
unresponsive,
state: entry_ref.state,
});
}

Expand Down Expand Up @@ -201,7 +287,7 @@ mod tests {
fn test_heartbeat_config_default() {
let config = HeartbeatConfig::default();
assert_eq!(config.check_interval_secs, 30);
assert_eq!(config.default_timeout_secs, 60);
assert_eq!(config.default_timeout_secs, 180);
}

#[test]
Expand All @@ -220,18 +306,21 @@ mod tests {
name: "agent-1".to_string(),
inactive_secs: 10,
unresponsive: false,
state: AgentState::Running,
},
HeartbeatStatus {
agent_id: AgentId::new(),
name: "agent-2".to_string(),
inactive_secs: 120,
unresponsive: true,
state: AgentState::Running,
},
HeartbeatStatus {
agent_id: AgentId::new(),
name: "agent-3".to_string(),
inactive_secs: 5,
unresponsive: false,
state: AgentState::Running,
},
];

Expand All @@ -242,4 +331,27 @@ mod tests {
assert_eq!(summary.unresponsive_agents.len(), 1);
assert_eq!(summary.unresponsive_agents[0].name, "agent-2");
}

#[test]
fn test_recovery_tracker() {
let tracker = RecoveryTracker::new();
let agent_id = AgentId::new();

assert_eq!(tracker.failure_count(agent_id), 0);
assert!(tracker.can_attempt(agent_id, 60));

let attempt = tracker.record_attempt(agent_id);
assert_eq!(attempt, 1);
assert_eq!(tracker.failure_count(agent_id), 1);

// Just recorded — cooldown should block (unless cooldown is 0)
assert!(!tracker.can_attempt(agent_id, 60));
assert!(tracker.can_attempt(agent_id, 0));

let attempt = tracker.record_attempt(agent_id);
assert_eq!(attempt, 2);

tracker.reset(agent_id);
assert_eq!(tracker.failure_count(agent_id), 0);
}
}
Loading