From 590310a141be3873a6839626afdecd6c43ea8994 Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Sun, 1 Feb 2026 23:48:30 -0500 Subject: [PATCH 1/6] Mark Ticket 3 as complete in procmond spec --- spec/procmond/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/procmond/index.md b/spec/procmond/index.md index 5e679df..1612b2a 100644 --- a/spec/procmond/index.md +++ b/spec/procmond/index.md @@ -25,7 +25,7 @@ Execute tickets in order. Each ticket's dependencies must be complete before sta - Dynamic interval adjustment from backpressure - *Requires: Ticket 1* -- [ ] **Ticket 3**: [Implement RPC Service and Registration Manager](<./tickets/Implement_RPC_Service_and_Registration_Manager_(procmond).md>) +- [x] **Ticket 3**: [Implement RPC Service and Registration Manager](<./tickets/Implement_RPC_Service_and_Registration_Manager_(procmond).md>) - RpcServiceHandler component - RegistrationManager component From 5d0fdb5acc84878b10914a4b71f3dc3cca157ff6 Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Sun, 1 Feb 2026 23:52:25 -0500 Subject: [PATCH 2/6] docs(agents): add Clippy lint guidance for map_err_ignore and as_conversions Add documentation for two Clippy lints encountered during PR review: - map_err_ignore: Name ignored variables in closures (`|_elapsed|` not `|_|`) - as_conversions: Add `#[allow(clippy::as_conversions)]` with safety comment Co-Authored-By: Claude Opus 4.5 --- AGENTS.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/AGENTS.md b/AGENTS.md index 78d88b9..0b88a23 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -159,6 +159,8 @@ flowchart LR - **Linting**: `cargo clippy -- -D warnings` (zero warnings) - **Format args**: Use `{variable}` inlined syntax in `format!`/`anyhow!` macros (`clippy::uninlined_format_args`) - **If-else ordering**: Clippy prefers `==` checks first in if-else (`clippy::unnecessary_negation`) +- **map_err_ignore**: Name ignored variables in closures (`|_elapsed|` not `|_|`) +- **as_conversions**: Add `#[allow(clippy::as_conversions)]` with safety comment for intentional casts - **Safety**: `unsafe_code = "forbid"` at workspace level - **Formatting**: `rustfmt` with 119 char line length - **Rustdoc**: Escape brackets in paths like `/proc/\[pid\]/stat` to avoid broken link warnings From 1391f449e713bf8b2cdf28fd408a6f64e984e880 Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Mon, 2 Feb 2026 00:52:13 -0500 Subject: [PATCH 3/6] feat(daemoneye-agent): implement agent loading state and collector coordination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement the loading state machine for daemoneye-agent that coordinates collector registration during startup: - Add AgentState enum with state machine transitions: Loading → Ready → SteadyState (plus StartupFailed and ShuttingDown) - Add CollectorsConfig module for `/etc/daemoneye/collectors.json`: - JSON-based collector configuration with validation - Support for enabled/disabled collectors, startup timeouts, heartbeat intervals - Builder pattern for CollectorEntry construction - Add collector readiness tracking via registration: - CollectorReadinessTracker struct tracks expected vs ready collectors - Registration marks collectors as ready automatically - Implement startup timeout handling: - Configurable timeout from max of enabled collectors' startup_timeout_secs - wait_for_collectors_ready() with polling and timeout detection - Marks agent as StartupFailed on timeout - Implement "begin monitoring" broadcast: - Sends lifecycle event on control.collector.lifecycle topic - Called after transition to Ready state - Add privilege dropping stub for future implementation - Integrate loading state into main.rs startup sequence: - Load collectors config, wait for registration, transition states - Broadcast begin monitoring, enter steady state operation - Add 10 integration tests for loading state workflow - Add 27+ unit tests for state machine and configuration Future work: Heartbeat failure detection and escalating recovery actions (Tasks #12, #13, #17 are blocked pending this foundation) Co-Authored-By: Claude Opus 4.5 --- daemoneye-agent/src/broker_manager.rs | 1106 ++++++++++++++++- daemoneye-agent/src/collector_config.rs | 645 ++++++++++ daemoneye-agent/src/lib.rs | 2 + daemoneye-agent/src/main.rs | 144 +++ .../tests/loading_state_integration.rs | 355 ++++++ 5 files changed, 2244 insertions(+), 8 deletions(-) create mode 100644 daemoneye-agent/src/collector_config.rs create mode 100644 daemoneye-agent/tests/loading_state_integration.rs diff --git a/daemoneye-agent/src/broker_manager.rs b/daemoneye-agent/src/broker_manager.rs index c47b62b..86245c5 100644 --- a/daemoneye-agent/src/broker_manager.rs +++ b/daemoneye-agent/src/broker_manager.rs @@ -4,7 +4,20 @@ //! within the daemoneye-agent process architecture. The broker operates independently //! of the IPC server for CLI communication and provides topic-based pub/sub messaging //! for collector-core component coordination. +//! +//! # Agent Loading State Machine +//! +//! The agent implements a state machine to coordinate startup: +//! +//! ```text +//! Loading → Ready → SteadyState +//! ``` +//! +//! - **Loading**: Agent starting, broker initializing, spawning collectors +//! - **Ready**: All collectors registered and reported "ready", privileges dropped +//! - **`SteadyState`**: Normal operation, collectors monitoring +use crate::collector_config::CollectorsConfig; use crate::collector_registry::{CollectorRegistry, RegistryError}; use crate::health::{self, HealthState}; use anyhow::{Context, Result}; @@ -20,6 +33,7 @@ use daemoneye_eventbus::{ process_manager::CollectorProcessManager, }; use daemoneye_lib::config::BrokerConfig; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; @@ -66,8 +80,150 @@ impl HealthState for BrokerHealth { } } +/// Agent loading state machine. +/// +/// The agent progresses through these states during startup: +/// - Loading → Ready → `SteadyState` +/// +/// This ensures coordinated startup where all collectors are ready before +/// the agent drops privileges and enters normal operation. +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +#[allow(dead_code)] // Variants constructed by state machine methods, used in Task #15 +pub enum AgentState { + /// Agent is starting up, broker initializing, spawning collectors. + /// The agent waits for all expected collectors to register and report "ready". + Loading, + + /// All collectors have registered and reported "ready". + /// The agent has dropped privileges (if configured). + /// Waiting to broadcast "begin monitoring" to transition to steady state. + Ready, + + /// Normal operation. Collectors are actively monitoring. + /// The agent broadcasts "begin monitoring" when entering this state. + SteadyState, + + /// Agent failed to start (collectors didn't report ready within timeout). + StartupFailed { reason: String }, + + /// Agent is shutting down. + ShuttingDown, +} + +impl AgentState { + /// Returns true if the agent is in a state where it's accepting collector registrations. + #[allow(dead_code)] // Used in Task #15 when main.rs integrates state machine + pub const fn accepts_registrations(&self) -> bool { + matches!(self, Self::Loading | Self::Ready | Self::SteadyState) + } + + /// Returns true if the agent is in a running state (not failed or shutting down). + #[allow(dead_code)] // Used in Task #15 when main.rs integrates state machine + pub const fn is_running(&self) -> bool { + matches!(self, Self::Loading | Self::Ready | Self::SteadyState) + } + + /// Returns true if the agent startup has failed. + #[allow(dead_code)] // Used in Task #15 when main.rs integrates state machine + pub const fn is_failed(&self) -> bool { + matches!(self, Self::StartupFailed { .. }) + } +} + +impl std::fmt::Display for AgentState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + Self::Loading => write!(f, "Loading"), + Self::Ready => write!(f, "Ready"), + Self::SteadyState => write!(f, "SteadyState"), + Self::StartupFailed { ref reason } => write!(f, "StartupFailed: {reason}"), + Self::ShuttingDown => write!(f, "ShuttingDown"), + } + } +} + +/// Tracks which collectors have reported ready status. +#[derive(Debug)] +#[allow(dead_code)] // Methods used by BrokerManager state machine, integrated in Task #15 +struct CollectorReadinessTracker { + /// Set of collector IDs that are expected to report ready. + expected_collectors: HashSet, + /// Set of collector IDs that have reported ready. + ready_collectors: HashSet, +} + +#[allow(dead_code)] // Methods used by BrokerManager state machine, integrated in Task #15 +impl CollectorReadinessTracker { + /// Create a new readiness tracker with expected collector IDs. + #[allow(dead_code)] // May be used in future for direct initialization + fn new(expected_collectors: HashSet) -> Self { + Self { + expected_collectors, + ready_collectors: HashSet::new(), + } + } + + /// Create an empty tracker (no collectors expected). + fn empty() -> Self { + Self { + expected_collectors: HashSet::new(), + ready_collectors: HashSet::new(), + } + } + + /// Mark a collector as ready. + fn mark_ready(&mut self, collector_id: &str) { + self.ready_collectors.insert(collector_id.to_owned()); + } + + /// Check if all expected collectors are ready. + fn all_ready(&self) -> bool { + if self.expected_collectors.is_empty() { + // No collectors expected, consider ready + return true; + } + self.expected_collectors + .iter() + .all(|id| self.ready_collectors.contains(id)) + } + + /// Get the list of collectors that haven't reported ready yet. + fn pending_collectors(&self) -> Vec<&str> { + self.expected_collectors + .iter() + .filter(|id| !self.ready_collectors.contains(*id)) + .map(String::as_str) + .collect() + } + + /// Get the number of expected collectors. + fn expected_count(&self) -> usize { + self.expected_collectors.len() + } + + /// Get the number of ready collectors. + fn ready_count(&self) -> usize { + self.ready_collectors.len() + } + + /// Set expected collectors from configuration. + fn set_expected(&mut self, expected: HashSet) { + self.expected_collectors = expected; + } + + /// Reset ready status for all collectors. + #[allow(dead_code)] // May be used in future for restart scenarios + fn reset(&mut self) { + self.ready_collectors.clear(); + } +} + /// Embedded broker manager that coordinates the `DaemoneyeBroker` lifecycle /// within the daemoneye-agent process architecture. +/// +/// The broker manager also implements the agent loading state machine that +/// coordinates startup between the agent and its collectors. pub struct BrokerManager { /// Configuration for the broker config: BrokerConfig, @@ -87,6 +243,15 @@ pub struct BrokerManager { collector_registry: Arc>>>, /// RPC clients for collector lifecycle management rpc_clients: Arc>>>, + /// Current agent state (loading state machine) + #[allow(dead_code)] // Used by state machine methods, integrated in Task #15 + agent_state: Arc>, + /// Collectors configuration (loaded from file) + #[allow(dead_code)] // Used by state machine methods, integrated in Task #15 + collectors_config: Arc>, + /// Tracks which collectors have reported ready + #[allow(dead_code)] // Used by state machine methods, integrated in Task #15 + readiness_tracker: Arc>, } impl BrokerManager { @@ -123,6 +288,9 @@ impl BrokerManager { config_manager, collector_registry: Arc::new(RwLock::new(None)), rpc_clients: Arc::new(RwLock::new(std::collections::HashMap::new())), + agent_state: Arc::new(RwLock::new(AgentState::Loading)), + collectors_config: Arc::new(RwLock::new(CollectorsConfig::default())), + readiness_tracker: Arc::new(RwLock::new(CollectorReadinessTracker::empty())), } } @@ -634,6 +802,384 @@ impl BrokerManager { let clients = self.rpc_clients.read().await; clients.keys().cloned().collect() } + + // ------------------------------------------------------- + // Agent Loading State Machine + // These methods will be used when main.rs integrates the state machine (Task #15) + // ------------------------------------------------------- + + /// Get the current agent state. + #[allow(dead_code)] + pub async fn agent_state(&self) -> AgentState { + self.agent_state.read().await.clone() + } + + /// Set the collectors configuration and update the readiness tracker. + /// + /// This should be called during agent startup after loading the configuration file. + /// Only enabled collectors are added to the expected collectors set. + #[allow(dead_code)] + pub async fn set_collectors_config(&self, config: CollectorsConfig) { + // Extract enabled collector IDs + let expected: HashSet = config + .collectors + .iter() + .filter(|c| c.enabled) + .map(|c| c.id.clone()) + .collect(); + + let expected_count = expected.len(); + + // Update readiness tracker with expected collectors + self.readiness_tracker.write().await.set_expected(expected); + + // Store the configuration + *self.collectors_config.write().await = config; + + info!( + expected_collectors = expected_count, + "Configured collector readiness tracking" + ); + } + + /// Get the current collectors configuration. + #[allow(dead_code)] + pub async fn collectors_config(&self) -> CollectorsConfig { + self.collectors_config.read().await.clone() + } + + /// Mark a collector as ready and check if all collectors are ready. + /// + /// Returns `true` if all expected collectors are now ready. + /// This method is typically called when a collector sends a "ready" status + /// during its registration handshake. + #[allow(dead_code)] + pub async fn mark_collector_ready(&self, collector_id: &str) -> bool { + let mut tracker = self.readiness_tracker.write().await; + tracker.mark_ready(collector_id); + let all_ready = tracker.all_ready(); + + info!( + collector_id = %collector_id, + ready_count = tracker.ready_count(), + expected_count = tracker.expected_count(), + all_ready = all_ready, + "Collector marked as ready" + ); + + all_ready + } + + /// Get the list of collectors that haven't reported ready yet. + #[allow(dead_code)] + pub async fn pending_collectors(&self) -> Vec { + let tracker = self.readiness_tracker.read().await; + tracker + .pending_collectors() + .into_iter() + .map(ToOwned::to_owned) + .collect() + } + + /// Check if all expected collectors have reported ready. + #[allow(dead_code)] + pub async fn all_collectors_ready(&self) -> bool { + let tracker = self.readiness_tracker.read().await; + tracker.all_ready() + } + + /// Transition from Loading to Ready state. + /// + /// This transition occurs when all expected collectors have registered and + /// reported "ready" status. Returns `Ok(())` if the transition is valid, + /// or `Err` if the current state doesn't allow this transition. + #[allow(dead_code)] + pub async fn transition_to_ready(&self) -> Result<()> { + let mut state = self.agent_state.write().await; + + match *state { + AgentState::Loading => { + // Verify all collectors are ready before transitioning + let tracker = self.readiness_tracker.read().await; + if !tracker.all_ready() { + let pending: Vec<_> = tracker.pending_collectors().into_iter().collect(); + anyhow::bail!( + "Cannot transition to Ready: collectors still pending: {pending:?}" + ); + } + drop(tracker); + + info!( + previous_state = %*state, + "Agent transitioning to Ready state" + ); + *state = AgentState::Ready; + Ok(()) + } + AgentState::Ready | AgentState::SteadyState => { + // Already in Ready or beyond - no-op + debug!(current_state = %*state, "Agent already in Ready or SteadyState"); + Ok(()) + } + AgentState::StartupFailed { ref reason } => { + anyhow::bail!("Cannot transition to Ready: startup failed: {reason}"); + } + AgentState::ShuttingDown => { + anyhow::bail!("Cannot transition to Ready: agent is shutting down"); + } + } + } + + /// Transition from Ready to `SteadyState` and broadcast "begin monitoring". + /// + /// This transition occurs after privilege dropping (if configured) and + /// signals all collectors to begin their monitoring operations. + #[allow(dead_code)] + pub async fn transition_to_steady_state(&self) -> Result<()> { + let mut state = self.agent_state.write().await; + + match *state { + AgentState::Ready => { + info!( + previous_state = %*state, + "Agent transitioning to SteadyState" + ); + *state = AgentState::SteadyState; + + // Broadcast "begin monitoring" to all collectors + drop(state); // Release lock before async operations + self.broadcast_begin_monitoring().await?; + + Ok(()) + } + AgentState::SteadyState => { + // Already in SteadyState - no-op + debug!("Agent already in SteadyState"); + Ok(()) + } + AgentState::Loading => { + anyhow::bail!( + "Cannot transition to SteadyState: must be in Ready state first (currently Loading)" + ); + } + AgentState::StartupFailed { ref reason } => { + anyhow::bail!("Cannot transition to SteadyState: startup failed: {reason}"); + } + AgentState::ShuttingDown => { + anyhow::bail!("Cannot transition to SteadyState: agent is shutting down"); + } + } + } + + /// Mark the agent startup as failed. + /// + /// This is called when the startup timeout expires before all collectors + /// have reported ready. + #[allow(dead_code)] + pub async fn mark_startup_failed(&self, reason: String) { + let mut state = self.agent_state.write().await; + + // Only transition to failed if still in Loading state + if matches!(*state, AgentState::Loading) { + let pending = { + let tracker = self.readiness_tracker.read().await; + tracker + .pending_collectors() + .into_iter() + .map(ToOwned::to_owned) + .collect::>() + }; + + error!( + reason = %reason, + pending_collectors = ?pending, + "Agent startup failed" + ); + + *state = AgentState::StartupFailed { + reason: reason.clone(), + }; + } + } + + /// Transition to `ShuttingDown` state. + /// + /// This should be called before initiating the shutdown sequence. + #[allow(dead_code)] + pub async fn transition_to_shutting_down(&self) { + let mut state = self.agent_state.write().await; + let previous = state.clone(); + + *state = AgentState::ShuttingDown; + drop(state); + info!(previous_state = %previous, "Agent transitioning to ShuttingDown state"); + } + + /// Wait for all expected collectors to be ready within the given timeout. + /// + /// This method polls the readiness tracker at intervals until all collectors + /// are ready or the timeout expires. If the timeout expires, the startup is + /// marked as failed. + /// + /// # Arguments + /// + /// * `timeout` - Maximum time to wait for all collectors to be ready + /// * `poll_interval` - How often to check readiness status + /// + /// # Returns + /// + /// * `Ok(true)` - All collectors are ready + /// * `Ok(false)` - Timeout expired before all collectors were ready (startup marked as failed) + #[allow(dead_code)] + pub async fn wait_for_collectors_ready( + &self, + timeout: Duration, + poll_interval: Duration, + ) -> Result { + let start = std::time::Instant::now(); + + info!( + timeout_secs = timeout.as_secs(), + "Waiting for all collectors to be ready" + ); + + loop { + // Check if all collectors are ready + if self.all_collectors_ready().await { + let elapsed = start.elapsed(); + info!(elapsed_secs = elapsed.as_secs(), "All collectors are ready"); + return Ok(true); + } + + // Check if we've timed out + let elapsed = start.elapsed(); + if elapsed >= timeout { + let pending = self.pending_collectors().await; + warn!( + timeout_secs = timeout.as_secs(), + pending_collectors = ?pending, + "Startup timeout expired waiting for collectors" + ); + + // Mark startup as failed + self.mark_startup_failed(format!( + "Timeout after {} seconds waiting for collectors: {:?}", + timeout.as_secs(), + pending + )) + .await; + + return Ok(false); + } + + // Log progress periodically + let pending = self.pending_collectors().await; + debug!( + elapsed_secs = elapsed.as_secs(), + pending_count = pending.len(), + pending_collectors = ?pending, + "Still waiting for collectors to be ready" + ); + + // Wait before polling again + tokio::time::sleep(poll_interval).await; + } + } + + /// Drop privileges after all collectors have registered and reported ready. + /// + /// This method is a stub that will be implemented with platform-specific + /// privilege dropping logic in the future. Currently it just logs that + /// privilege dropping would occur. + /// + /// # Platform-specific behavior (future) + /// + /// - **Unix**: Drop to a non-root user (e.g., `daemoneye`) + /// - **Windows**: Reduce process token privileges + /// - **macOS**: Drop supplementary groups and effective UID + /// + /// # Safety + /// + /// This should only be called after all collectors have been spawned and + /// have completed their privileged initialization (e.g., binding to + /// privileged ports, accessing protected resources). + #[allow(dead_code)] + #[allow(clippy::unused_async)] // Future implementation will use async for platform-specific privilege dropping + pub async fn drop_privileges(&self) -> Result<()> { + info!("Privilege dropping requested (stub - not yet implemented)"); + + // Future implementation will: + // 1. Check if running as root/elevated + // 2. Drop supplementary groups + // 3. Set effective UID/GID to unprivileged user + // 4. Verify privileges were dropped successfully + + // For now, just log and succeed + debug!("Privilege dropping is not implemented - running with current privileges"); + + Ok(()) + } + + /// Get the default startup timeout from the collectors configuration. + /// + /// Returns the maximum startup timeout among all enabled collectors, + /// or a default of 60 seconds if no collectors are configured. + #[allow(dead_code)] + pub async fn get_startup_timeout(&self) -> Duration { + let max_timeout = self + .collectors_config + .read() + .await + .collectors + .iter() + .filter(|c| c.enabled) + .map(|c| c.startup_timeout_secs) + .max() + .unwrap_or(60); + + Duration::from_secs(max_timeout) + } + + /// Broadcast "begin monitoring" message to all collectors. + /// + /// This is sent on the `control.collector.lifecycle` topic to signal + /// all collectors that they should begin their monitoring operations. + #[allow(dead_code)] + #[allow(clippy::significant_drop_tightening)] // Guard must be held while broker is used + pub async fn broadcast_begin_monitoring(&self) -> Result<()> { + let broker_guard = self.broker.read().await; + let Some(broker) = broker_guard.as_ref() else { + warn!("Cannot broadcast 'begin monitoring': broker not available"); + return Ok(()); + }; + + let topic = "control.collector.lifecycle"; + + // Create the lifecycle message + let message = serde_json::json!({ + "type": "BeginMonitoring", + "timestamp": std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + "source": "daemoneye-agent", + }); + + let payload = + serde_json::to_vec(&message).context("Failed to serialize BeginMonitoring message")?; + + broker + .publish(topic, "begin-monitoring", payload) + .await + .context("Failed to publish BeginMonitoring message")?; + + info!( + topic = %topic, + "Broadcast 'begin monitoring' to all collectors" + ); + + Ok(()) + } } // ------------------------------- @@ -879,14 +1425,28 @@ impl RegistrationProvider for BrokerManager { .map_err(Self::map_registry_error)?; // Create RPC client after successful registration - if response.accepted - && let Err(e) = self.create_rpc_client(&request.collector_id).await - { - warn!( - collector_id = %request.collector_id, - error = %e, - "Failed to create RPC client after registration" - ); + if response.accepted { + if let Err(e) = self.create_rpc_client(&request.collector_id).await { + warn!( + collector_id = %request.collector_id, + error = %e, + "Failed to create RPC client after registration" + ); + } + + // Mark collector as ready when it registers successfully + // Registration indicates the collector has completed its initialization + let all_ready = self.mark_collector_ready(&request.collector_id).await; + + // Check if we should transition to Ready state + if all_ready { + let current_state = self.agent_state().await; + if matches!(current_state, AgentState::Loading) { + info!("All expected collectors are ready, agent can transition to Ready state"); + // Note: The actual transition is triggered by main.rs or startup coordination + // to ensure proper privilege dropping and other startup steps + } + } } Ok(response) @@ -974,6 +1534,7 @@ fn aggregate_worst_of>(iter: I) -> HealthStatus )] mod tests { use super::*; + use crate::collector_config::CollectorEntry; use daemoneye_lib::config::BrokerConfig; fn sample_registration_request() -> RegistrationRequest { @@ -1083,4 +1644,533 @@ mod tests { let result = manager.wait_for_healthy(Duration::from_millis(100)).await; assert!(result.is_err()); } + + // ------------------------------------------------------- + // Agent State Machine Tests + // ------------------------------------------------------- + + #[tokio::test] + async fn test_agent_state_initial_loading() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + let state = manager.agent_state().await; + assert_eq!(state, AgentState::Loading); + } + + #[tokio::test] + async fn test_agent_state_display() { + assert_eq!(format!("{}", AgentState::Loading), "Loading"); + assert_eq!(format!("{}", AgentState::Ready), "Ready"); + assert_eq!(format!("{}", AgentState::SteadyState), "SteadyState"); + assert_eq!( + format!( + "{}", + AgentState::StartupFailed { + reason: "timeout".to_owned() + } + ), + "StartupFailed: timeout" + ); + assert_eq!(format!("{}", AgentState::ShuttingDown), "ShuttingDown"); + } + + #[tokio::test] + async fn test_agent_state_helper_methods() { + assert!(AgentState::Loading.accepts_registrations()); + assert!(AgentState::Ready.accepts_registrations()); + assert!(AgentState::SteadyState.accepts_registrations()); + assert!( + !AgentState::StartupFailed { + reason: "test".to_owned() + } + .accepts_registrations() + ); + assert!(!AgentState::ShuttingDown.accepts_registrations()); + + assert!(AgentState::Loading.is_running()); + assert!(AgentState::Ready.is_running()); + assert!(AgentState::SteadyState.is_running()); + assert!( + !AgentState::StartupFailed { + reason: "test".to_owned() + } + .is_running() + ); + assert!(!AgentState::ShuttingDown.is_running()); + + assert!(!AgentState::Loading.is_failed()); + assert!(!AgentState::Ready.is_failed()); + assert!( + AgentState::StartupFailed { + reason: "test".to_owned() + } + .is_failed() + ); + } + + #[tokio::test] + async fn test_set_collectors_config() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + let collectors_config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("collector-1", "type-a", "/usr/bin/collector1"), + CollectorEntry::new("collector-2", "type-b", "/usr/bin/collector2") + .with_enabled(false), + ], + }; + + manager + .set_collectors_config(collectors_config.clone()) + .await; + + let loaded_config = manager.collectors_config().await; + assert_eq!(loaded_config.collectors.len(), 2); + + // Only enabled collectors should be in the readiness tracker + let pending = manager.pending_collectors().await; + assert_eq!(pending.len(), 1); + assert!(pending.contains(&"collector-1".to_owned())); + } + + #[tokio::test] + async fn test_mark_collector_ready() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Configure with two enabled collectors + let collectors_config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("collector-1", "type-a", "/usr/bin/collector1"), + CollectorEntry::new("collector-2", "type-b", "/usr/bin/collector2"), + ], + }; + + manager.set_collectors_config(collectors_config).await; + + // Initially not all ready + assert!(!manager.all_collectors_ready().await); + + // Mark first collector ready + let all_ready = manager.mark_collector_ready("collector-1").await; + assert!(!all_ready); + + // Mark second collector ready + let all_ready = manager.mark_collector_ready("collector-2").await; + assert!(all_ready); + + assert!(manager.all_collectors_ready().await); + assert!(manager.pending_collectors().await.is_empty()); + } + + #[tokio::test] + async fn test_all_collectors_ready_with_no_collectors() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Empty config means no collectors expected - should be ready by default + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + assert!(manager.all_collectors_ready().await); + } + + #[tokio::test] + async fn test_transition_to_ready_succeeds() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Configure with no collectors (automatically ready) + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + + // Should succeed since all collectors are ready + let result = manager.transition_to_ready().await; + assert!(result.is_ok()); + + let state = manager.agent_state().await; + assert_eq!(state, AgentState::Ready); + } + + #[tokio::test] + async fn test_transition_to_ready_fails_with_pending_collectors() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Configure with a collector that hasn't reported ready + let collectors_config = CollectorsConfig { + collectors: vec![CollectorEntry::new( + "pending-collector", + "type-a", + "/usr/bin/collector", + )], + }; + manager.set_collectors_config(collectors_config).await; + + // Should fail since collector hasn't reported ready + let result = manager.transition_to_ready().await; + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("pending-collector")); + + // State should still be Loading + let state = manager.agent_state().await; + assert_eq!(state, AgentState::Loading); + } + + #[tokio::test] + async fn test_transition_to_ready_is_idempotent() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + + // First transition + manager.transition_to_ready().await.unwrap(); + assert_eq!(manager.agent_state().await, AgentState::Ready); + + // Second transition should succeed (no-op) + let result = manager.transition_to_ready().await; + assert!(result.is_ok()); + assert_eq!(manager.agent_state().await, AgentState::Ready); + } + + #[tokio::test] + async fn test_transition_to_steady_state_fails_from_loading() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Try to skip directly to SteadyState from Loading + let result = manager.transition_to_steady_state().await; + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("must be in Ready state first")); + } + + #[tokio::test] + async fn test_mark_startup_failed() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Configure with a collector that won't report ready + let collectors_config = CollectorsConfig { + collectors: vec![CollectorEntry::new( + "slow-collector", + "type-a", + "/usr/bin/collector", + )], + }; + manager.set_collectors_config(collectors_config).await; + + // Mark startup as failed + manager + .mark_startup_failed("Timeout waiting for collectors".to_owned()) + .await; + + let state = manager.agent_state().await; + match state { + AgentState::StartupFailed { reason } => { + assert!(reason.contains("Timeout")); + } + _ => panic!("Expected StartupFailed state, got {state:?}"), + } + } + + #[tokio::test] + async fn test_mark_startup_failed_only_from_loading() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + + // Transition to Ready first + manager.transition_to_ready().await.unwrap(); + assert_eq!(manager.agent_state().await, AgentState::Ready); + + // Try to mark as failed - should be ignored since not in Loading state + manager + .mark_startup_failed("This should be ignored".to_owned()) + .await; + + // State should still be Ready + assert_eq!(manager.agent_state().await, AgentState::Ready); + } + + #[tokio::test] + async fn test_transition_to_shutting_down() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + + // Transition through states + manager.transition_to_ready().await.unwrap(); + manager.transition_to_shutting_down().await; + + assert_eq!(manager.agent_state().await, AgentState::ShuttingDown); + } + + #[tokio::test] + async fn test_full_state_machine_lifecycle() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Step 1: Start in Loading + assert_eq!(manager.agent_state().await, AgentState::Loading); + + // Step 2: Configure collectors + let collectors_config = CollectorsConfig { + collectors: vec![CollectorEntry::new( + "test-collector", + "type-a", + "/usr/bin/collector", + )], + }; + manager.set_collectors_config(collectors_config).await; + assert!(!manager.all_collectors_ready().await); + + // Step 3: Mark collector as ready + manager.mark_collector_ready("test-collector").await; + assert!(manager.all_collectors_ready().await); + + // Step 4: Transition to Ready + manager.transition_to_ready().await.unwrap(); + assert_eq!(manager.agent_state().await, AgentState::Ready); + + // Step 5: Shutdown + manager.transition_to_shutting_down().await; + assert_eq!(manager.agent_state().await, AgentState::ShuttingDown); + } + + #[tokio::test] + async fn test_registration_marks_collector_ready() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Initialize registry + { + let mut guard = manager.collector_registry.write().await; + *guard = Some(Arc::new(CollectorRegistry::default())); + } + + // Configure expected collectors + let collectors_config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("collector-a", "type-a", "/usr/bin/collector-a"), + CollectorEntry::new("collector-b", "type-b", "/usr/bin/collector-b"), + ], + }; + manager.set_collectors_config(collectors_config).await; + + // Initially not all ready + assert!(!manager.all_collectors_ready().await); + assert_eq!(manager.pending_collectors().await.len(), 2); + + // Register first collector - should mark it as ready + let request_a = RegistrationRequest { + collector_id: "collector-a".to_owned(), + collector_type: "type-a".to_owned(), + hostname: "localhost".to_owned(), + version: Some("1.0.0".to_owned()), + pid: Some(1234), + capabilities: vec![], + attributes: std::collections::HashMap::new(), + heartbeat_interval_ms: None, + }; + + let response = manager + .register_collector(request_a) + .await + .expect("registration succeeds"); + assert!(response.accepted); + + // First collector should be marked ready + assert_eq!(manager.pending_collectors().await.len(), 1); + assert!(!manager.all_collectors_ready().await); + + // Register second collector - should mark it as ready and complete readiness + let request_b = RegistrationRequest { + collector_id: "collector-b".to_owned(), + collector_type: "type-b".to_owned(), + hostname: "localhost".to_owned(), + version: Some("1.0.0".to_owned()), + pid: Some(5678), + capabilities: vec![], + attributes: std::collections::HashMap::new(), + heartbeat_interval_ms: None, + }; + + let response = manager + .register_collector(request_b) + .await + .expect("registration succeeds"); + assert!(response.accepted); + + // Both collectors should now be ready + assert!(manager.pending_collectors().await.is_empty()); + assert!(manager.all_collectors_ready().await); + + // Should be able to transition to Ready state now + manager + .transition_to_ready() + .await + .expect("transition succeeds"); + assert_eq!(manager.agent_state().await, AgentState::Ready); + } + + #[tokio::test] + async fn test_wait_for_collectors_ready_success() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Initialize registry + { + let mut guard = manager.collector_registry.write().await; + *guard = Some(Arc::new(CollectorRegistry::default())); + } + + // Configure with a single collector + let collectors_config = CollectorsConfig { + collectors: vec![CollectorEntry::new( + "test-collector", + "type-a", + "/usr/bin/collector", + )], + }; + manager.set_collectors_config(collectors_config).await; + + // Spawn a task that will register the collector after a short delay + let manager_clone = Arc::new(manager); + let manager_for_task = Arc::clone(&manager_clone); + + let registration_task = tokio::spawn(async move { + // Wait a bit then register + tokio::time::sleep(Duration::from_millis(50)).await; + + let request = RegistrationRequest { + collector_id: "test-collector".to_owned(), + collector_type: "type-a".to_owned(), + hostname: "localhost".to_owned(), + version: None, + pid: None, + capabilities: vec![], + attributes: std::collections::HashMap::new(), + heartbeat_interval_ms: None, + }; + + manager_for_task + .register_collector(request) + .await + .expect("registration succeeds"); + }); + + // Wait for collectors with a generous timeout + let result = manager_clone + .wait_for_collectors_ready(Duration::from_secs(5), Duration::from_millis(10)) + .await + .expect("wait succeeds"); + + assert!(result, "All collectors should be ready"); + assert!(manager_clone.all_collectors_ready().await); + + // Clean up + registration_task.await.unwrap(); + } + + #[tokio::test] + async fn test_wait_for_collectors_ready_timeout() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // Configure with a collector that will never register + let collectors_config = CollectorsConfig { + collectors: vec![CollectorEntry::new( + "never-registers", + "type-a", + "/usr/bin/collector", + )], + }; + manager.set_collectors_config(collectors_config).await; + + // Wait with a short timeout + let result = manager + .wait_for_collectors_ready(Duration::from_millis(100), Duration::from_millis(20)) + .await + .expect("wait succeeds but returns false"); + + assert!(!result, "Should timeout and return false"); + + // Agent state should be StartupFailed + let state = manager.agent_state().await; + match state { + AgentState::StartupFailed { reason } => { + assert!(reason.contains("never-registers")); + assert!(reason.contains("Timeout")); + } + _ => panic!("Expected StartupFailed state, got {state:?}"), + } + } + + #[tokio::test] + async fn test_wait_for_collectors_ready_no_collectors() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // No collectors configured - should succeed immediately + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + + let result = manager + .wait_for_collectors_ready(Duration::from_secs(1), Duration::from_millis(10)) + .await + .expect("wait succeeds"); + + assert!(result, "Should succeed immediately with no collectors"); + } + + #[tokio::test] + async fn test_get_startup_timeout() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // No collectors - default timeout + manager + .set_collectors_config(CollectorsConfig::default()) + .await; + assert_eq!(manager.get_startup_timeout().await, Duration::from_secs(60)); + + // With collectors, use max timeout + let collectors_config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("collector-1", "type-a", "/usr/bin/collector1") + .with_startup_timeout(30), + CollectorEntry::new("collector-2", "type-b", "/usr/bin/collector2") + .with_startup_timeout(90), + CollectorEntry::new("collector-3", "type-c", "/usr/bin/collector3") + .with_startup_timeout(45) + .with_enabled(false), // Disabled - should be ignored + ], + }; + manager.set_collectors_config(collectors_config).await; + + // Should return 90 (max of enabled collectors) + assert_eq!(manager.get_startup_timeout().await, Duration::from_secs(90)); + } + + #[tokio::test] + async fn test_drop_privileges_stub() { + let config = BrokerConfig::default(); + let manager = BrokerManager::new(config); + + // The stub should succeed without doing anything + let result = manager.drop_privileges().await; + assert!(result.is_ok()); + } } diff --git a/daemoneye-agent/src/collector_config.rs b/daemoneye-agent/src/collector_config.rs new file mode 100644 index 0000000..3919444 --- /dev/null +++ b/daemoneye-agent/src/collector_config.rs @@ -0,0 +1,645 @@ +//! Collector configuration for daemoneye-agent. +//! +//! This module handles loading and validating collector configurations from +//! `/etc/daemoneye/collectors.json` (or a configurable path). The configuration +//! defines which collectors to spawn, their binary paths, and collector-specific +//! settings. +//! +//! # Configuration Format +//! +//! ```json +//! { +//! "collectors": [ +//! { +//! "id": "procmond", +//! "collector_type": "process-monitor", +//! "binary_path": "/usr/bin/procmond", +//! "enabled": true, +//! "auto_restart": true, +//! "startup_timeout_secs": 60, +//! "config": { +//! "collection_interval_secs": 30, +//! "enhanced_metadata": true, +//! "compute_hashes": false +//! } +//! } +//! ] +//! } +//! ``` + +// Module items will be used when main.rs integrates the loading state machine (Task #15) +#![allow(dead_code)] + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use thiserror::Error; +use tracing::{info, warn}; + +/// Default path for collector configuration file. +#[cfg(unix)] +pub const DEFAULT_COLLECTORS_CONFIG_PATH: &str = "/etc/daemoneye/collectors.json"; + +#[cfg(windows)] +pub const DEFAULT_COLLECTORS_CONFIG_PATH: &str = r"C:\ProgramData\DaemonEye\config\collectors.json"; + +/// Default startup timeout for collectors in seconds. +pub const DEFAULT_STARTUP_TIMEOUT_SECS: u64 = 60; + +/// Default heartbeat interval for collectors in seconds. +pub const DEFAULT_HEARTBEAT_INTERVAL_SECS: u64 = 30; + +/// Errors that can occur when loading or validating collector configuration. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum CollectorConfigError { + /// Configuration file not found. + #[error("collector configuration file not found: {path}")] + FileNotFound { path: PathBuf }, + + /// Failed to read configuration file. + #[error("failed to read collector configuration: {0}")] + IoError(#[from] std::io::Error), + + /// Failed to parse JSON configuration. + #[error("failed to parse collector configuration: {0}")] + ParseError(#[from] serde_json::Error), + + /// Configuration validation failed. + #[error("collector configuration validation failed: {message}")] + ValidationError { message: String }, + + /// Collector binary not found or not executable. + #[error("collector binary not found or not executable: {path}")] + BinaryNotFound { path: PathBuf }, + + /// Duplicate collector ID. + #[error("duplicate collector ID: {id}")] + DuplicateCollectorId { id: String }, +} + +/// Root configuration structure for collectors. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct CollectorsConfig { + /// List of collector configurations. + #[serde(default)] + pub collectors: Vec, +} + +/// Configuration for a single collector. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CollectorEntry { + /// Unique identifier for this collector instance. + pub id: String, + + /// Type of collector (e.g., "process-monitor", "network-monitor"). + pub collector_type: String, + + /// Path to the collector binary. + pub binary_path: PathBuf, + + /// Whether this collector is enabled. + #[serde(default = "default_enabled")] + pub enabled: bool, + + /// Whether to automatically restart the collector on failure. + #[serde(default)] + pub auto_restart: bool, + + /// Startup timeout in seconds (how long to wait for "ready" status). + #[serde(default = "default_startup_timeout")] + pub startup_timeout_secs: u64, + + /// Heartbeat interval in seconds. + #[serde(default = "default_heartbeat_interval")] + pub heartbeat_interval_secs: u64, + + /// Collector-specific configuration (passed to collector via environment or config file). + #[serde(default)] + pub config: HashMap, +} + +const fn default_enabled() -> bool { + true +} + +const fn default_startup_timeout() -> u64 { + DEFAULT_STARTUP_TIMEOUT_SECS +} + +const fn default_heartbeat_interval() -> u64 { + DEFAULT_HEARTBEAT_INTERVAL_SECS +} + +impl CollectorsConfig { + /// Load collector configuration from a file path. + /// + /// # Errors + /// + /// Returns an error if: + /// - The file cannot be read + /// - The JSON is malformed + /// - Validation fails (e.g., duplicate IDs, invalid paths) + pub fn load_from_file(path: &Path) -> Result { + if !path.exists() { + return Err(CollectorConfigError::FileNotFound { + path: path.to_path_buf(), + }); + } + + let content = std::fs::read_to_string(path)?; + let config: Self = serde_json::from_str(&content)?; + + config.validate()?; + + info!( + path = %path.display(), + collector_count = config.collectors.len(), + "Loaded collector configuration" + ); + + Ok(config) + } + + /// Load collector configuration from the default path, or return empty config if not found. + /// + /// This method is lenient - if the config file doesn't exist, it returns an empty + /// configuration rather than an error. This supports deployments without explicit + /// collector configuration. + pub fn load_or_default() -> Self { + let default_path = Path::new(DEFAULT_COLLECTORS_CONFIG_PATH); + + match Self::load_from_file(default_path) { + Ok(config) => config, + Err(CollectorConfigError::FileNotFound { path }) => { + info!( + path = %path.display(), + "Collector configuration file not found, using empty configuration" + ); + Self::default() + } + Err(e) => { + warn!( + error = %e, + "Failed to load collector configuration, using empty configuration" + ); + Self::default() + } + } + } + + /// Validate the configuration. + /// + /// Checks for: + /// - Duplicate collector IDs + /// - Empty collector IDs or types + /// - Binary path existence and executability (for enabled collectors) + pub fn validate(&self) -> Result<(), CollectorConfigError> { + let mut seen_ids = std::collections::HashSet::new(); + + for collector in &self.collectors { + // Check for empty ID + if collector.id.trim().is_empty() { + return Err(CollectorConfigError::ValidationError { + message: "collector ID cannot be empty".to_owned(), + }); + } + + // Check for empty type + if collector.collector_type.trim().is_empty() { + return Err(CollectorConfigError::ValidationError { + message: format!( + "collector type cannot be empty for collector '{}'", + collector.id + ), + }); + } + + // Check for duplicate IDs + if !seen_ids.insert(&collector.id) { + return Err(CollectorConfigError::DuplicateCollectorId { + id: collector.id.clone(), + }); + } + + // Validate binary path for enabled collectors + if collector.enabled { + Self::validate_binary_path(&collector.binary_path, &collector.id)?; + } + + // Validate timeout values + if collector.startup_timeout_secs == 0 { + return Err(CollectorConfigError::ValidationError { + message: format!( + "startup_timeout_secs must be greater than 0 for collector '{}'", + collector.id + ), + }); + } + + if collector.heartbeat_interval_secs == 0 { + return Err(CollectorConfigError::ValidationError { + message: format!( + "heartbeat_interval_secs must be greater than 0 for collector '{}'", + collector.id + ), + }); + } + } + + Ok(()) + } + + /// Validate that a binary path exists and is executable. + fn validate_binary_path(path: &Path, collector_id: &str) -> Result<(), CollectorConfigError> { + if !path.exists() { + warn!( + collector_id = %collector_id, + path = %path.display(), + "Collector binary not found" + ); + return Err(CollectorConfigError::BinaryNotFound { + path: path.to_path_buf(), + }); + } + + // Check if file is executable (Unix only) + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let metadata = std::fs::metadata(path)?; + let permissions = metadata.permissions(); + // Check if any execute bit is set (owner, group, or other) + if permissions.mode() & 0o111 == 0 { + warn!( + collector_id = %collector_id, + path = %path.display(), + "Collector binary is not executable" + ); + return Err(CollectorConfigError::BinaryNotFound { + path: path.to_path_buf(), + }); + } + } + + Ok(()) + } + + /// Get all enabled collectors. + pub fn enabled_collectors(&self) -> impl Iterator { + self.collectors.iter().filter(|c| c.enabled) + } + + /// Get a collector by ID. + pub fn get_collector(&self, id: &str) -> Option<&CollectorEntry> { + self.collectors.iter().find(|c| c.id == id) + } + + /// Get the number of enabled collectors. + pub fn enabled_count(&self) -> usize { + self.collectors.iter().filter(|c| c.enabled).count() + } + + /// Get all collector IDs (enabled and disabled). + pub fn collector_ids(&self) -> Vec<&str> { + self.collectors.iter().map(|c| c.id.as_str()).collect() + } + + /// Get all enabled collector IDs. + pub fn enabled_collector_ids(&self) -> Vec<&str> { + self.collectors + .iter() + .filter(|c| c.enabled) + .map(|c| c.id.as_str()) + .collect() + } +} + +impl CollectorEntry { + /// Create a new collector entry with minimal required fields. + pub fn new( + id: impl Into, + collector_type: impl Into, + binary_path: impl Into, + ) -> Self { + Self { + id: id.into(), + collector_type: collector_type.into(), + binary_path: binary_path.into(), + enabled: true, + auto_restart: false, + startup_timeout_secs: DEFAULT_STARTUP_TIMEOUT_SECS, + heartbeat_interval_secs: DEFAULT_HEARTBEAT_INTERVAL_SECS, + config: HashMap::new(), + } + } + + /// Builder method to set enabled status. + #[must_use] + pub const fn with_enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } + + /// Builder method to set `auto_restart`. + #[must_use] + pub const fn with_auto_restart(mut self, auto_restart: bool) -> Self { + self.auto_restart = auto_restart; + self + } + + /// Builder method to set startup timeout. + #[must_use] + pub const fn with_startup_timeout(mut self, secs: u64) -> Self { + self.startup_timeout_secs = secs; + self + } + + /// Builder method to set heartbeat interval. + #[must_use] + pub const fn with_heartbeat_interval(mut self, secs: u64) -> Self { + self.heartbeat_interval_secs = secs; + self + } + + /// Builder method to add config value. + #[must_use] + pub fn with_config(mut self, key: impl Into, value: serde_json::Value) -> Self { + self.config.insert(key.into(), value); + self + } +} + +#[cfg(test)] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::str_to_string, + clippy::indexing_slicing +)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::NamedTempFile; + + fn sample_config_json() -> &'static str { + r#"{ + "collectors": [ + { + "id": "procmond", + "collector_type": "process-monitor", + "binary_path": "/usr/bin/procmond", + "enabled": true, + "auto_restart": true, + "startup_timeout_secs": 60, + "heartbeat_interval_secs": 30, + "config": { + "collection_interval_secs": 30, + "enhanced_metadata": true, + "compute_hashes": false + } + } + ] + }"# + } + + #[test] + fn test_parse_valid_config() { + let config: CollectorsConfig = serde_json::from_str(sample_config_json()).unwrap(); + + assert_eq!(config.collectors.len(), 1); + let procmond = &config.collectors[0]; + assert_eq!(procmond.id, "procmond"); + assert_eq!(procmond.collector_type, "process-monitor"); + assert_eq!(procmond.binary_path, PathBuf::from("/usr/bin/procmond")); + assert!(procmond.enabled); + assert!(procmond.auto_restart); + assert_eq!(procmond.startup_timeout_secs, 60); + assert_eq!(procmond.heartbeat_interval_secs, 30); + + // Check nested config + assert_eq!( + procmond.config.get("collection_interval_secs"), + Some(&serde_json::Value::Number(30.into())) + ); + assert_eq!( + procmond.config.get("enhanced_metadata"), + Some(&serde_json::Value::Bool(true)) + ); + } + + #[test] + fn test_default_values() { + let json = r#"{ + "collectors": [ + { + "id": "test", + "collector_type": "test-type", + "binary_path": "/usr/bin/test" + } + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + let collector = &config.collectors[0]; + + assert!(collector.enabled); // default true + assert!(!collector.auto_restart); // default false + assert_eq!(collector.startup_timeout_secs, DEFAULT_STARTUP_TIMEOUT_SECS); + assert_eq!( + collector.heartbeat_interval_secs, + DEFAULT_HEARTBEAT_INTERVAL_SECS + ); + assert!(collector.config.is_empty()); + } + + #[test] + fn test_empty_config() { + let json = r#"{"collectors": []}"#; + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + + assert!(config.collectors.is_empty()); + assert!(config.validate().is_ok()); + } + + #[test] + fn test_validation_duplicate_id() { + let json = r#"{ + "collectors": [ + {"id": "test", "collector_type": "t1", "binary_path": "/bin/test"}, + {"id": "test", "collector_type": "t2", "binary_path": "/bin/test2"} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + let result = config.validate(); + + assert!(matches!( + result, + Err(CollectorConfigError::DuplicateCollectorId { id }) if id == "test" + )); + } + + #[test] + fn test_validation_empty_id() { + let json = r#"{ + "collectors": [ + {"id": "", "collector_type": "test", "binary_path": "/bin/test"} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + let result = config.validate(); + + assert!(matches!( + result, + Err(CollectorConfigError::ValidationError { message }) if message.contains("ID cannot be empty") + )); + } + + #[test] + fn test_validation_empty_type() { + let json = r#"{ + "collectors": [ + {"id": "test", "collector_type": "", "binary_path": "/bin/test"} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + let result = config.validate(); + + assert!(matches!( + result, + Err(CollectorConfigError::ValidationError { message }) if message.contains("type cannot be empty") + )); + } + + #[test] + fn test_validation_zero_timeout() { + let json = r#"{ + "collectors": [ + {"id": "test", "collector_type": "test", "binary_path": "/bin/test", "startup_timeout_secs": 0} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + let result = config.validate(); + + assert!(matches!( + result, + Err(CollectorConfigError::ValidationError { message }) if message.contains("startup_timeout_secs") + )); + } + + #[test] + fn test_enabled_collectors_filter() { + let json = r#"{ + "collectors": [ + {"id": "enabled1", "collector_type": "t", "binary_path": "/bin/e1", "enabled": true}, + {"id": "disabled", "collector_type": "t", "binary_path": "/bin/d", "enabled": false}, + {"id": "enabled2", "collector_type": "t", "binary_path": "/bin/e2", "enabled": true} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + + assert_eq!(config.enabled_count(), 2); + + let enabled_ids: Vec<&str> = config.enabled_collector_ids(); + assert_eq!(enabled_ids, vec!["enabled1", "enabled2"]); + } + + #[test] + fn test_get_collector() { + let json = r#"{ + "collectors": [ + {"id": "a", "collector_type": "t1", "binary_path": "/bin/a"}, + {"id": "b", "collector_type": "t2", "binary_path": "/bin/b"} + ] + }"#; + + let config: CollectorsConfig = serde_json::from_str(json).unwrap(); + + assert!(config.get_collector("a").is_some()); + assert!(config.get_collector("b").is_some()); + assert!(config.get_collector("c").is_none()); + } + + #[test] + fn test_collector_entry_builder() { + let entry = CollectorEntry::new("test-id", "test-type", "/usr/bin/test") + .with_enabled(false) + .with_auto_restart(true) + .with_startup_timeout(120) + .with_heartbeat_interval(15) + .with_config("key1", serde_json::Value::Bool(true)); + + assert_eq!(entry.id, "test-id"); + assert_eq!(entry.collector_type, "test-type"); + assert_eq!(entry.binary_path, PathBuf::from("/usr/bin/test")); + assert!(!entry.enabled); + assert!(entry.auto_restart); + assert_eq!(entry.startup_timeout_secs, 120); + assert_eq!(entry.heartbeat_interval_secs, 15); + assert_eq!( + entry.config.get("key1"), + Some(&serde_json::Value::Bool(true)) + ); + } + + #[test] + fn test_load_from_file() { + let mut temp_file = NamedTempFile::new().unwrap(); + + // Write a config with a disabled collector (so binary validation is skipped) + let json = r#"{ + "collectors": [ + {"id": "test", "collector_type": "test", "binary_path": "/nonexistent", "enabled": false} + ] + }"#; + temp_file.write_all(json.as_bytes()).unwrap(); + + let config = CollectorsConfig::load_from_file(temp_file.path()).unwrap(); + assert_eq!(config.collectors.len(), 1); + assert_eq!(config.collectors[0].id, "test"); + } + + #[test] + fn test_load_file_not_found() { + let result = CollectorsConfig::load_from_file(Path::new("/nonexistent/config.json")); + assert!(matches!( + result, + Err(CollectorConfigError::FileNotFound { .. }) + )); + } + + #[test] + fn test_load_or_default_returns_empty_when_no_file() { + // This test relies on DEFAULT_COLLECTORS_CONFIG_PATH not existing in test environment + let config = CollectorsConfig::load_or_default(); + // Should return empty config without error + assert!(config.collectors.is_empty()); + } + + #[test] + fn test_default_config() { + let config = CollectorsConfig::default(); + assert!(config.collectors.is_empty()); + } + + #[test] + fn test_serialization_roundtrip() { + let original = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("test", "test-type", "/bin/test") + .with_auto_restart(true) + .with_config("setting", serde_json::json!(42)), + ], + }; + + let json = serde_json::to_string(&original).unwrap(); + let deserialized: CollectorsConfig = serde_json::from_str(&json).unwrap(); + + assert_eq!(original, deserialized); + } +} diff --git a/daemoneye-agent/src/lib.rs b/daemoneye-agent/src/lib.rs index 0a66fd4..f7d54c1 100644 --- a/daemoneye-agent/src/lib.rs +++ b/daemoneye-agent/src/lib.rs @@ -7,11 +7,13 @@ #![forbid(unsafe_code)] pub mod broker_manager; +pub mod collector_config; pub mod collector_registry; pub mod health; pub mod ipc_server; pub use broker_manager::{BrokerHealth, BrokerManager}; +pub use collector_config::{CollectorConfigError, CollectorEntry, CollectorsConfig}; pub use collector_registry::{CollectorRegistry, RegistryError}; pub use health::{HealthState, wait_for_healthy}; pub use ipc_server::{IpcServerHealth, IpcServerManager, create_cli_ipc_config}; diff --git a/daemoneye-agent/src/main.rs b/daemoneye-agent/src/main.rs index b4a147b..5ffa3ad 100644 --- a/daemoneye-agent/src/main.rs +++ b/daemoneye-agent/src/main.rs @@ -6,11 +6,13 @@ use std::time::{Duration, Instant}; use tracing::{debug, error, info, warn}; mod broker_manager; +mod collector_config; mod collector_registry; mod health; mod ipc_server; use broker_manager::BrokerManager; +use collector_config::CollectorsConfig; use ipc_server::IpcServerManager; #[derive(Parser)] @@ -116,6 +118,148 @@ async fn run() -> Result<(), Box> { "IPC server is healthy and ready for CLI communication" ); + // ========================================================================= + // Loading State Coordination + // ========================================================================= + // The agent starts in Loading state. We load the collectors configuration, + // wait for all expected collectors to register, then transition through: + // Loading -> Ready -> SteadyState + + // Load collectors configuration (defaults to empty if file doesn't exist) + let collectors_config_path = std::path::Path::new("/etc/daemoneye/collectors.json"); + let collectors_config = match CollectorsConfig::load_from_file(collectors_config_path) { + Ok(loaded_config) => { + info!( + path = %collectors_config_path.display(), + "Loaded collectors configuration from file" + ); + loaded_config + } + Err(e) => { + debug!( + path = %collectors_config_path.display(), + error = %e, + "No collectors config file found, using empty configuration" + ); + CollectorsConfig::default() + } + }; + + let expected_count = collectors_config.enabled_collectors().count(); + + info!( + config_path = %collectors_config_path.display(), + total_collectors = collectors_config.collectors.len(), + enabled_collectors = expected_count, + "Loaded collectors configuration" + ); + + // Set the collectors configuration on the broker manager + broker_manager + .set_collectors_config(collectors_config.clone()) + .await; + + // Get the startup timeout from the collectors configuration + let startup_timeout = broker_manager.get_startup_timeout().await; + + let current_agent_state = broker_manager.agent_state().await; + info!( + agent_state = %current_agent_state, + expected_collectors = expected_count, + startup_timeout_secs = startup_timeout.as_secs(), + "Waiting for collectors to register" + ); + + // Wait for all expected collectors to register + // If no collectors are expected, this will return immediately + // Poll every second for collector readiness + let poll_interval = Duration::from_secs(1); + match broker_manager + .wait_for_collectors_ready(startup_timeout, poll_interval) + .await + { + Ok(true) => { + info!("All expected collectors have registered"); + + // Transition to Ready state + if let Err(e) = broker_manager.transition_to_ready().await { + error!(error = %e, "Failed to transition to Ready state"); + broker_manager + .mark_startup_failed(format!("Failed to transition to Ready: {e}")) + .await; + return Err(e.into()); + } + + let ready_state = broker_manager.agent_state().await; + info!(agent_state = %ready_state, "Agent is now Ready"); + + // Drop privileges (stub - not yet implemented) + if let Err(e) = broker_manager.drop_privileges().await { + error!(error = %e, "Failed to drop privileges"); + // Continue anyway - privilege dropping failure is not fatal + warn!("Continuing with elevated privileges"); + } + + // Broadcast "begin monitoring" to all collectors + if let Err(e) = broker_manager.broadcast_begin_monitoring().await { + error!(error = %e, "Failed to broadcast begin monitoring"); + // Continue anyway - broadcast failure is not fatal for the main loop + warn!("Collectors may not have received begin monitoring signal"); + } + + // Transition to SteadyState + if let Err(e) = broker_manager.transition_to_steady_state().await { + error!(error = %e, "Failed to transition to SteadyState"); + // This shouldn't happen if we're in Ready state, but log and continue + warn!("Agent may not be in expected state"); + } + + let steady_state = broker_manager.agent_state().await; + info!( + agent_state = %steady_state, + "Agent startup complete, entering steady state operation" + ); + } + Ok(false) => { + // Timeout - not all collectors registered in time + // The wait_for_collectors_ready function already marked startup as failed + error!( + expected = expected_count, + "Startup timeout: not all collectors registered in time" + ); + + let current_state = broker_manager.agent_state().await; + error!( + agent_state = %current_state, + "Agent startup failed due to timeout" + ); + + return Err(anyhow::anyhow!("Startup timeout: not all collectors registered").into()); + } + Err(e) => { + error!( + error = %e, + expected = expected_count, + "Startup error while waiting for collectors" + ); + broker_manager + .mark_startup_failed(format!("Startup error: {e}")) + .await; + + let current_state = broker_manager.agent_state().await; + error!( + agent_state = %current_state, + "Agent startup failed" + ); + + return Err(e.into()); + } + } + + // ========================================================================= + // Initialize Detection and Alerting + // ========================================================================= + // Initialize detection engine let mut detection_engine = detection::DetectionEngine::new(); diff --git a/daemoneye-agent/tests/loading_state_integration.rs b/daemoneye-agent/tests/loading_state_integration.rs new file mode 100644 index 0000000..f3777f5 --- /dev/null +++ b/daemoneye-agent/tests/loading_state_integration.rs @@ -0,0 +1,355 @@ +//! Integration tests for agent loading state coordination. +//! +//! These tests validate the complete loading state machine workflow including: +//! - State transitions (Loading → Ready → SteadyState) +//! - Collector configuration loading +//! - Collector readiness tracking +//! - Startup timeout handling +//! - Begin monitoring broadcast + +#![allow( + clippy::str_to_string, + clippy::expect_used, + clippy::unwrap_used, + clippy::uninlined_format_args, + clippy::let_underscore_must_use, + clippy::print_stdout +)] + +use daemoneye_agent::broker_manager::{AgentState, BrokerManager}; +use daemoneye_agent::collector_config::{CollectorEntry, CollectorsConfig}; +use daemoneye_eventbus::rpc::{RegistrationProvider, RegistrationRequest}; +use daemoneye_lib::config::BrokerConfig; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tempfile::TempDir; + +/// Setup test broker with loading state enabled +async fn setup_test_broker() -> anyhow::Result<(TempDir, BrokerManager)> { + let temp_dir = TempDir::new()?; + let socket_path = temp_dir.path().join("test-broker.sock"); + + let config = BrokerConfig { + enabled: true, + socket_path: socket_path.to_string_lossy().to_string(), + max_connections: 100, + shutdown_timeout_seconds: 10, + startup_timeout_seconds: 10, + message_buffer_size: 1000, + topic_hierarchy: daemoneye_lib::config::TopicHierarchyConfig::default(), + collector_binaries: std::collections::HashMap::new(), + config_directory: temp_dir.path().join("configs"), + process_manager: daemoneye_lib::config::ProcessManagerConfig::default(), + }; + + let manager = BrokerManager::new(config); + manager.start().await?; + + // Wait for broker to be ready + tokio::time::sleep(Duration::from_millis(100)).await; + + Ok((temp_dir, manager)) +} + +/// Create a test collectors configuration +fn create_test_collectors_config(collector_ids: &[&str]) -> CollectorsConfig { + let collectors = collector_ids + .iter() + .map(|id| { + CollectorEntry::new(id.to_string(), "process", "/usr/bin/test-collector") + .with_startup_timeout(5) + }) + .collect(); + + CollectorsConfig { collectors } +} + +/// Create a registration request for testing +fn create_registration_request(collector_id: &str) -> RegistrationRequest { + RegistrationRequest { + collector_id: collector_id.to_string(), + collector_type: "process".to_string(), + hostname: "test-host".to_string(), + version: Some("1.0.0".to_string()), + pid: Some(12345), + capabilities: vec!["enumerate".to_string()], + attributes: HashMap::new(), + heartbeat_interval_ms: Some(30000), + } +} + +#[tokio::test] +async fn test_agent_starts_in_loading_state() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Verify agent starts in Loading state + let state = manager.agent_state().await; + assert!( + matches!(state, AgentState::Loading), + "Agent should start in Loading state, got {:?}", + state + ); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_set_collectors_config_sets_expected() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set collectors configuration with 2 collectors + let config = create_test_collectors_config(&["collector-a", "collector-b"]); + manager.set_collectors_config(config).await; + + // Verify we're still in Loading state (no collectors have registered yet) + let state = manager.agent_state().await; + assert!(matches!(state, AgentState::Loading)); + + // Transition to Ready should fail because collectors haven't registered + let result = manager.transition_to_ready().await; + assert!( + result.is_err(), + "Should not be able to transition to Ready with pending collectors" + ); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_registration_marks_collector_ready() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set up expected collectors + let config = create_test_collectors_config(&["collector-a"]); + manager.set_collectors_config(config).await; + + // Simulate collector registration + let request = create_registration_request("collector-a"); + let response = manager.register_collector(request).await?; + assert!(response.accepted, "Registration should be accepted"); + + // Now transition to Ready should succeed + let result = manager.transition_to_ready().await; + assert!( + result.is_ok(), + "Should be able to transition to Ready after all collectors registered" + ); + + let state = manager.agent_state().await; + assert!(matches!(state, AgentState::Ready)); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_multiple_collectors_registration_and_ready() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set up expected collectors + let config = create_test_collectors_config(&["collector-a", "collector-b", "collector-c"]); + manager.set_collectors_config(config).await; + + // Register collectors one by one + for id in &["collector-a", "collector-b"] { + let request = create_registration_request(id); + let _ = manager.register_collector(request).await?; + } + + // Transition should still fail (collector-c not registered) + assert!(manager.transition_to_ready().await.is_err()); + + // Register the last collector + let request = create_registration_request("collector-c"); + let _ = manager.register_collector(request).await?; + + // Now transition should succeed + assert!(manager.transition_to_ready().await.is_ok()); + assert!(matches!(manager.agent_state().await, AgentState::Ready)); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_wait_for_collectors_ready_immediate_with_no_collectors() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Empty config means no collectors expected + let config = CollectorsConfig::default(); + manager.set_collectors_config(config).await; + + // Wait should return immediately with true (all ready) + let result = manager + .wait_for_collectors_ready(Duration::from_secs(1), Duration::from_millis(100)) + .await?; + + assert!(result, "Should return true when no collectors expected"); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_wait_for_collectors_ready_timeout() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set up expected collector that won't register + let config = create_test_collectors_config(&["never-registers"]); + manager.set_collectors_config(config).await; + + // Wait should timeout and return false + let result = manager + .wait_for_collectors_ready(Duration::from_millis(200), Duration::from_millis(50)) + .await?; + + assert!(!result, "Should return false on timeout"); + + // State should be StartupFailed + let state = manager.agent_state().await; + assert!( + matches!(state, AgentState::StartupFailed { .. }), + "Agent should be in StartupFailed state after timeout, got {:?}", + state + ); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_wait_for_collectors_ready_success_before_timeout() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set up expected collector + let config = create_test_collectors_config(&["quick-collector"]); + manager.set_collectors_config(config).await; + + // Wrap manager in Arc for sharing with spawned task + let manager_arc = Arc::new(manager); + let manager_clone = Arc::clone(&manager_arc); + + // Spawn a task to register collector after a short delay + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(50)).await; + let request = create_registration_request("quick-collector"); + let _ = manager_clone.register_collector(request).await; + }); + + // Wait should succeed before timeout + let result = manager_arc + .wait_for_collectors_ready(Duration::from_secs(2), Duration::from_millis(25)) + .await?; + + assert!( + result, + "Should return true when collector registers in time" + ); + + // State should still be Loading (wait doesn't transition) + let state = manager_arc.agent_state().await; + assert!( + matches!(state, AgentState::Loading), + "State should still be Loading after wait, got {:?}", + state + ); + + // Shutdown can be called on Arc since it takes &self + manager_arc.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_full_loading_state_lifecycle() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // 1. Start in Loading state + assert!(matches!(manager.agent_state().await, AgentState::Loading)); + + // 2. Set collectors config + let config = create_test_collectors_config(&["procmond"]); + manager.set_collectors_config(config).await; + + // 3. Register collector + let request = create_registration_request("procmond"); + let _ = manager.register_collector(request).await?; + + // 4. Transition to Ready + manager.transition_to_ready().await?; + assert!(matches!(manager.agent_state().await, AgentState::Ready)); + + // 5. Drop privileges (stub) + manager.drop_privileges().await?; + + // 6. Broadcast begin monitoring + manager.broadcast_begin_monitoring().await?; + + // 7. Transition to SteadyState + manager.transition_to_steady_state().await?; + assert!(matches!( + manager.agent_state().await, + AgentState::SteadyState + )); + + // 8. Transition to ShuttingDown for graceful shutdown + manager.transition_to_shutting_down().await; + assert!(matches!( + manager.agent_state().await, + AgentState::ShuttingDown + )); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_startup_timeout_from_config() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set collectors config with specific timeout + let config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("collector-a", "process", "/usr/bin/test").with_startup_timeout(30), + CollectorEntry::new("collector-b", "network", "/usr/bin/test").with_startup_timeout(45), + ], + }; + manager.set_collectors_config(config).await; + + // Get startup timeout should return the max (45 seconds) + let timeout = manager.get_startup_timeout().await; + assert_eq!(timeout.as_secs(), 45); + + manager.shutdown().await?; + Ok(()) +} + +#[tokio::test] +async fn test_disabled_collectors_not_expected() -> anyhow::Result<()> { + let (_temp_dir, manager) = setup_test_broker().await?; + + // Set collectors config with one enabled and one disabled + let config = CollectorsConfig { + collectors: vec![ + CollectorEntry::new("enabled-collector", "process", "/usr/bin/test") + .with_startup_timeout(30), + CollectorEntry::new("disabled-collector", "network", "/usr/bin/test") + .with_enabled(false) + .with_startup_timeout(30), + ], + }; + manager.set_collectors_config(config).await; + + // Only register the enabled collector + let request = create_registration_request("enabled-collector"); + let _ = manager.register_collector(request).await?; + + // Should be able to transition to Ready (disabled collector not expected) + manager.transition_to_ready().await?; + assert!(matches!(manager.agent_state().await, AgentState::Ready)); + + manager.shutdown().await?; + Ok(()) +} From 28307328738457b560e6704de2a3165e1d4dfbe4 Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Mon, 2 Feb 2026 10:24:43 -0500 Subject: [PATCH 4/6] feat(daemoneye-agent): implement heartbeat detection and escalating recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive heartbeat monitoring and recovery infrastructure: - HeartbeatStatus enum (Healthy, Degraded, Failed) with needs_recovery() - Missed heartbeat tracking in CollectorRegistry with check_heartbeats() - RecoveryAction escalation chain: HealthCheck → GracefulShutdown → ForceKill → Restart - CollectorRecoveryState for tracking recovery attempts per collector - execute_recovery() function for automated escalating recovery - 21 integration tests covering heartbeat and recovery workflows Also includes documentation updates for async-in-tracing gotchas, pre-commit handling, and cross-crate trait imports. Co-Authored-By: Claude Opus 4.5 --- AGENTS.md | 4 + daemoneye-agent/src/collector_registry.rs | 152 ++++- daemoneye-agent/src/lib.rs | 4 +- daemoneye-agent/src/recovery.rs | 569 ++++++++++++++++++ .../tests/heartbeat_detection_integration.rs | 449 ++++++++++++++ 5 files changed, 1176 insertions(+), 2 deletions(-) create mode 100644 daemoneye-agent/src/recovery.rs create mode 100644 daemoneye-agent/tests/heartbeat_detection_integration.rs diff --git a/AGENTS.md b/AGENTS.md index 0b88a23..7ab3213 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -161,6 +161,7 @@ flowchart LR - **If-else ordering**: Clippy prefers `==` checks first in if-else (`clippy::unnecessary_negation`) - **map_err_ignore**: Name ignored variables in closures (`|_elapsed|` not `|_|`) - **as_conversions**: Add `#[allow(clippy::as_conversions)]` with safety comment for intentional casts +- **Async in tracing macros**: Never `.await` inside `info!`/`debug!`/`warn!`/`error!` - causes `future_not_send`. Extract value first. - **Safety**: `unsafe_code = "forbid"` at workspace level - **Formatting**: `rustfmt` with 119 char line length - **Rustdoc**: Escape brackets in paths like `/proc/\[pid\]/stat` to avoid broken link warnings @@ -178,6 +179,8 @@ flowchart LR 5. No new `unsafe` without approval 6. Benchmarks within acceptable ranges +**Gotchas**: Pre-commit runs `cargo fmt` which modifies files. If unstaged changes exist, commit may fail with "Stashed changes conflicted". Run `cargo fmt --all` before staging or reset unrelated unstaged files. + --- ## Security Model @@ -332,6 +335,7 @@ pub trait AlertSink: Send + Sync { - **Property**: proptest for edge cases - **Fuzz**: Security-critical components - **Snapshot**: insta for CLI output +- **Cross-crate traits**: Import traits for method access (e.g., `use daemoneye_eventbus::rpc::RegistrationProvider;`) ### Test Environment diff --git a/daemoneye-agent/src/collector_registry.rs b/daemoneye-agent/src/collector_registry.rs index 9790188..2a8faf5 100644 --- a/daemoneye-agent/src/collector_registry.rs +++ b/daemoneye-agent/src/collector_registry.rs @@ -53,6 +53,7 @@ impl CollectorRegistry { registered_at: now, last_heartbeat: now, heartbeat_interval, + missed_heartbeats: 0, }; records.insert(collector_id.clone(), record); drop(records); @@ -83,12 +84,119 @@ impl CollectorRegistry { } } - /// Update the heartbeat timestamp for a collector. + /// Update the heartbeat timestamp for a collector and reset missed count. pub async fn update_heartbeat(&self, collector_id: &str) -> Result<(), RegistryError> { let mut records = self.records.write().await; match records.get_mut(collector_id) { Some(record) => { record.last_heartbeat = SystemTime::now(); + record.missed_heartbeats = 0; + Ok(()) + } + None => Err(RegistryError::NotFound(collector_id.to_owned())), + } + } + + /// Check heartbeat status for all collectors and increment missed counts. + /// + /// This should be called periodically (e.g., every heartbeat interval) to + /// detect collectors that have stopped sending heartbeats. + /// + /// Returns a list of (`collector_id`, `HeartbeatStatus`) for collectors that + /// have missed at least one heartbeat. + #[allow(dead_code)] + #[allow(clippy::significant_drop_tightening)] // Lock must be held while iterating and mutating + pub async fn check_heartbeats(&self) -> Vec<(String, HeartbeatStatus)> { + let now = SystemTime::now(); + let mut records = self.records.write().await; + let mut results = Vec::new(); + + for (collector_id, record) in records.iter_mut() { + let elapsed = now + .duration_since(record.last_heartbeat) + .unwrap_or(Duration::ZERO); + + // Check if heartbeat is overdue (allow 10% grace period) + let expected_interval = record.heartbeat_interval; + // Use saturating_add to avoid overflow; division by 10 is always safe + #[allow(clippy::arithmetic_side_effects)] + let grace_period = expected_interval.saturating_add(expected_interval / 10); + + if elapsed > grace_period { + record.missed_heartbeats = record.missed_heartbeats.saturating_add(1); + + let status = if record.missed_heartbeats >= MAX_MISSED_HEARTBEATS { + HeartbeatStatus::Failed { + missed_count: record.missed_heartbeats, + time_since_last: elapsed, + } + } else { + HeartbeatStatus::Degraded { + missed_count: record.missed_heartbeats, + } + }; + + results.push((collector_id.clone(), status)); + } + } + + results + } + + /// Get collectors that need recovery action (missed >= `MAX_MISSED_HEARTBEATS`). + #[allow(dead_code)] + #[allow(clippy::pattern_type_mismatch)] // Conflicting lint with needless_borrowed_reference + pub async fn collectors_needing_recovery(&self) -> Vec<(String, HeartbeatStatus)> { + self.check_heartbeats() + .await + .into_iter() + .filter(|(_, status)| status.needs_recovery()) + .collect() + } + + /// Get the heartbeat status for a specific collector. + #[allow(dead_code)] + pub async fn heartbeat_status(&self, collector_id: &str) -> Option { + let now = SystemTime::now(); + + // Clone record to release lock early + let record = { + let records = self.records.read().await; + records.get(collector_id).cloned()? + }; + + let elapsed = now + .duration_since(record.last_heartbeat) + .unwrap_or(Duration::ZERO); + + let expected_interval = record.heartbeat_interval; + // Division by 10 is always safe + #[allow(clippy::arithmetic_side_effects)] + let grace_period = expected_interval.saturating_add(expected_interval / 10); + + if elapsed <= grace_period && record.missed_heartbeats == 0 { + Some(HeartbeatStatus::Healthy) + } else if record.missed_heartbeats >= MAX_MISSED_HEARTBEATS { + Some(HeartbeatStatus::Failed { + missed_count: record.missed_heartbeats, + time_since_last: elapsed, + }) + } else if record.missed_heartbeats > 0 { + Some(HeartbeatStatus::Degraded { + missed_count: record.missed_heartbeats, + }) + } else { + Some(HeartbeatStatus::Healthy) + } + } + + /// Reset missed heartbeat count for a collector (e.g., after successful recovery). + #[allow(dead_code)] + pub async fn reset_missed_heartbeats(&self, collector_id: &str) -> Result<(), RegistryError> { + let mut records = self.records.write().await; + match records.get_mut(collector_id) { + Some(record) => { + record.missed_heartbeats = 0; Ok(()) } None => Err(RegistryError::NotFound(collector_id.to_owned())), @@ -135,6 +243,48 @@ pub struct CollectorRecord { pub last_heartbeat: SystemTime, /// Heartbeat interval assigned to the collector. pub heartbeat_interval: Duration, + /// Number of consecutive missed heartbeats. + pub missed_heartbeats: u32, +} + +/// Maximum consecutive missed heartbeats before triggering recovery actions. +#[allow(dead_code)] +pub const MAX_MISSED_HEARTBEATS: u32 = 3; + +/// Status of a collector's heartbeat health. +#[derive(Debug, Clone, PartialEq, Eq)] +#[allow(dead_code)] +#[non_exhaustive] +pub enum HeartbeatStatus { + /// Collector is healthy - heartbeat received within expected interval. + Healthy, + /// Collector missed one or more heartbeats but below threshold. + Degraded { + /// Number of consecutive missed heartbeats. + missed_count: u32, + }, + /// Collector has missed too many heartbeats - recovery action needed. + Failed { + /// Number of consecutive missed heartbeats. + missed_count: u32, + /// Duration since last heartbeat. + time_since_last: Duration, + }, +} + +#[allow(dead_code)] +impl HeartbeatStatus { + /// Returns true if the collector needs recovery action. + #[must_use] + pub const fn needs_recovery(&self) -> bool { + matches!(self, Self::Failed { .. }) + } + + /// Returns true if the collector is healthy. + #[must_use] + pub const fn is_healthy(&self) -> bool { + matches!(self, Self::Healthy) + } } /// Errors that can occur when interacting with the collector registry. diff --git a/daemoneye-agent/src/lib.rs b/daemoneye-agent/src/lib.rs index f7d54c1..cef8ee0 100644 --- a/daemoneye-agent/src/lib.rs +++ b/daemoneye-agent/src/lib.rs @@ -11,9 +11,11 @@ pub mod collector_config; pub mod collector_registry; pub mod health; pub mod ipc_server; +pub mod recovery; pub use broker_manager::{BrokerHealth, BrokerManager}; pub use collector_config::{CollectorConfigError, CollectorEntry, CollectorsConfig}; -pub use collector_registry::{CollectorRegistry, RegistryError}; +pub use collector_registry::{CollectorRegistry, HeartbeatStatus, RegistryError}; pub use health::{HealthState, wait_for_healthy}; pub use ipc_server::{IpcServerHealth, IpcServerManager, create_cli_ipc_config}; +pub use recovery::{RecoveryAction, RecoveryError, RecoveryResult, execute_recovery}; diff --git a/daemoneye-agent/src/recovery.rs b/daemoneye-agent/src/recovery.rs new file mode 100644 index 0000000..51f84c6 --- /dev/null +++ b/daemoneye-agent/src/recovery.rs @@ -0,0 +1,569 @@ +//! Escalating recovery actions for failed collectors. +//! +//! This module implements a progressive recovery strategy for collectors that +//! have missed heartbeats or become unresponsive: +//! +//! 1. **Health Check**: Verify collector state via RPC +//! 2. **Graceful Shutdown**: Request clean shutdown via RPC +//! 3. **Force Kill**: Terminate process via signal +//! 4. **Restart**: Spawn new collector process +//! +//! Each stage escalates if the previous action fails or times out. + +use crate::broker_manager::BrokerManager; +use crate::collector_registry::HeartbeatStatus; +use std::time::Duration; +use thiserror::Error; +use tracing::{debug, error, info, warn}; + +/// Maximum number of recovery attempts before giving up. +pub const MAX_RECOVERY_ATTEMPTS: u32 = 3; + +/// Default timeout for health check RPC. +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(10); + +/// Default timeout for graceful shutdown. +const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(30); + +/// Default timeout for force kill. +const FORCE_KILL_TIMEOUT: Duration = Duration::from_secs(10); + +/// Default timeout for restart. +const RESTART_TIMEOUT: Duration = Duration::from_secs(60); + +/// Recovery actions in escalating order. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[non_exhaustive] +#[allow(dead_code)] // Variants used by recovery orchestration +pub enum RecoveryAction { + /// Send health check RPC to verify collector state. + HealthCheck, + /// Request graceful shutdown via RPC. + GracefulShutdown, + /// Force kill the collector process. + ForceKill, + /// Restart the collector process. + Restart, +} + +impl std::fmt::Display for RecoveryAction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + Self::HealthCheck => write!(f, "HealthCheck"), + Self::GracefulShutdown => write!(f, "GracefulShutdown"), + Self::ForceKill => write!(f, "ForceKill"), + Self::Restart => write!(f, "Restart"), + } + } +} + +impl RecoveryAction { + /// Get the next escalation action. + #[must_use] + #[allow(dead_code)] // Used by escalation logic + pub const fn escalate(self) -> Option { + match self { + Self::HealthCheck => Some(Self::GracefulShutdown), + Self::GracefulShutdown => Some(Self::ForceKill), + Self::ForceKill => Some(Self::Restart), + Self::Restart => None, + } + } + + /// Get the appropriate action based on heartbeat status. + #[must_use] + #[allow(dead_code)] // Used by recovery determination logic + pub const fn from_heartbeat_status(status: &HeartbeatStatus) -> Option { + match *status { + HeartbeatStatus::Healthy => None, + HeartbeatStatus::Degraded { missed_count } => { + if missed_count >= 2 { + Some(Self::HealthCheck) + } else { + None + } + } + HeartbeatStatus::Failed { .. } => Some(Self::GracefulShutdown), + } + } +} + +/// Result of a recovery attempt. +#[derive(Debug, Clone)] +#[non_exhaustive] +#[allow(dead_code)] // Variants used by recovery orchestration +pub enum RecoveryResult { + /// Recovery action succeeded, collector is healthy. + Success { + /// The action that succeeded. + action: RecoveryAction, + /// Human-readable message. + message: String, + }, + /// Recovery action failed, may need escalation. + Failed { + /// The action that failed. + action: RecoveryAction, + /// Error message. + error: String, + /// Whether to escalate to the next action. + should_escalate: bool, + }, + /// Recovery was skipped (e.g., collector already healthy). + Skipped { + /// Reason for skipping. + reason: String, + }, + /// Maximum recovery attempts exceeded. + Exhausted { + /// Total attempts made. + attempts: u32, + /// Last action attempted. + last_action: RecoveryAction, + }, +} + +/// Errors that can occur during recovery operations. +#[derive(Debug, Error)] +#[non_exhaustive] +pub enum RecoveryError { + /// Health check failed. + #[error("health check failed: {0}")] + HealthCheckFailed(String), + + /// Graceful shutdown failed. + #[error("graceful shutdown failed: {0}")] + GracefulShutdownFailed(String), + + /// Force kill failed. + #[error("force kill failed: {0}")] + ForceKillFailed(String), + + /// Restart failed. + #[error("restart failed: {0}")] + RestartFailed(String), + + /// Operation timed out. + #[error("operation timed out after {0:?}")] + Timeout(Duration), + + /// Collector not found. + #[error("collector `{0}` not found")] + CollectorNotFound(String), + + /// Internal error. + #[error("internal error: {0}")] + Internal(String), +} + +/// Tracks recovery state for a collector. +#[derive(Debug, Clone)] +#[allow(dead_code)] // Used by recovery orchestration +pub struct CollectorRecoveryState { + /// Collector ID. + pub collector_id: String, + /// Current recovery action being attempted. + pub current_action: Option, + /// Number of recovery attempts. + pub attempt_count: u32, + /// Last recovery result. + pub last_result: Option, + /// Whether recovery is in progress. + pub in_progress: bool, +} + +impl CollectorRecoveryState { + /// Create a new recovery state for a collector. + #[must_use] + #[allow(dead_code)] + pub const fn new(collector_id: String) -> Self { + Self { + collector_id, + current_action: None, + attempt_count: 0, + last_result: None, + in_progress: false, + } + } + + /// Check if recovery has been exhausted. + #[must_use] + #[allow(dead_code)] + pub const fn is_exhausted(&self) -> bool { + self.attempt_count >= MAX_RECOVERY_ATTEMPTS + } + + /// Mark recovery as starting. + #[allow(dead_code)] + pub const fn start_recovery(&mut self, action: RecoveryAction) { + self.current_action = Some(action); + self.in_progress = true; + self.attempt_count = self.attempt_count.saturating_add(1); + } + + /// Mark recovery as complete with result. + #[allow(dead_code)] + pub fn complete_recovery(&mut self, result: RecoveryResult) { + self.last_result = Some(result); + self.in_progress = false; + } + + /// Reset recovery state (e.g., after successful recovery). + #[allow(dead_code)] + pub fn reset(&mut self) { + self.current_action = None; + self.attempt_count = 0; + self.last_result = None; + self.in_progress = false; + } +} + +/// Execute escalating recovery actions for a collector. +/// +/// This function attempts recovery actions in escalating order: +/// 1. Health check to verify state +/// 2. Graceful shutdown via RPC +/// 3. Force kill via process manager +/// 4. Restart via process manager +/// +/// Returns when the collector is recovered or all actions are exhausted. +#[allow(dead_code)] +pub async fn execute_recovery( + manager: &BrokerManager, + collector_id: &str, + starting_action: RecoveryAction, +) -> RecoveryResult { + let mut current_action = starting_action; + let mut attempts = 0_u32; + + loop { + attempts = attempts.saturating_add(1); + + if attempts > MAX_RECOVERY_ATTEMPTS { + warn!( + collector_id = %collector_id, + attempts = attempts, + last_action = %current_action, + "Recovery attempts exhausted" + ); + return RecoveryResult::Exhausted { + attempts, + last_action: current_action, + }; + } + + info!( + collector_id = %collector_id, + action = %current_action, + attempt = attempts, + "Executing recovery action" + ); + + let result = execute_action(manager, collector_id, current_action).await; + + match result { + Ok(()) => { + info!( + collector_id = %collector_id, + action = %current_action, + "Recovery action succeeded" + ); + return RecoveryResult::Success { + action: current_action, + message: format!("{current_action} completed successfully"), + }; + } + Err(e) => { + warn!( + collector_id = %collector_id, + action = %current_action, + error = %e, + "Recovery action failed" + ); + + // Try to escalate to next action + if let Some(next_action) = current_action.escalate() { + info!( + collector_id = %collector_id, + from = %current_action, + to = %next_action, + "Escalating recovery action" + ); + current_action = next_action; + } else { + // No more escalation options + error!( + collector_id = %collector_id, + action = %current_action, + error = %e, + "Recovery failed with no more escalation options" + ); + return RecoveryResult::Failed { + action: current_action, + error: e.to_string(), + should_escalate: false, + }; + } + } + } + } +} + +/// Execute a single recovery action. +async fn execute_action( + manager: &BrokerManager, + collector_id: &str, + action: RecoveryAction, +) -> Result<(), RecoveryError> { + match action { + RecoveryAction::HealthCheck => execute_health_check(manager, collector_id).await, + RecoveryAction::GracefulShutdown => execute_graceful_shutdown(manager, collector_id).await, + RecoveryAction::ForceKill => execute_force_kill(manager, collector_id).await, + RecoveryAction::Restart => execute_restart(manager, collector_id).await, + } +} + +/// Execute health check via RPC. +async fn execute_health_check( + manager: &BrokerManager, + collector_id: &str, +) -> Result<(), RecoveryError> { + debug!(collector_id = %collector_id, "Executing health check"); + + let result = tokio::time::timeout(HEALTH_CHECK_TIMEOUT, async { + manager.health_check_rpc(collector_id).await + }) + .await; + + match result { + Ok(Ok(health_data)) => { + if health_data.status.is_healthy() { + debug!( + collector_id = %collector_id, + status = ?health_data.status, + "Health check passed" + ); + Ok(()) + } else { + Err(RecoveryError::HealthCheckFailed(format!( + "collector unhealthy: {:?}", + health_data.status + ))) + } + } + Ok(Err(e)) => Err(RecoveryError::HealthCheckFailed(e.to_string())), + Err(_) => Err(RecoveryError::Timeout(HEALTH_CHECK_TIMEOUT)), + } +} + +/// Execute graceful shutdown via RPC. +async fn execute_graceful_shutdown( + manager: &BrokerManager, + collector_id: &str, +) -> Result<(), RecoveryError> { + debug!(collector_id = %collector_id, "Executing graceful shutdown"); + + let result = tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async { + manager.stop_collector_rpc(collector_id, true).await + }) + .await; + + match result { + Ok(Ok(())) => { + debug!(collector_id = %collector_id, "Graceful shutdown succeeded"); + Ok(()) + } + Ok(Err(e)) => Err(RecoveryError::GracefulShutdownFailed(e.to_string())), + Err(_) => Err(RecoveryError::Timeout(GRACEFUL_SHUTDOWN_TIMEOUT)), + } +} + +/// Execute force kill via process manager. +async fn execute_force_kill( + manager: &BrokerManager, + collector_id: &str, +) -> Result<(), RecoveryError> { + debug!(collector_id = %collector_id, "Executing force kill"); + + let process_manager = manager.process_manager(); + + let result = tokio::time::timeout(FORCE_KILL_TIMEOUT, async { + process_manager + .stop_collector(collector_id, true, FORCE_KILL_TIMEOUT) + .await + }) + .await; + + match result { + Ok(Ok(_exit_code)) => { + debug!(collector_id = %collector_id, "Force kill succeeded"); + Ok(()) + } + Ok(Err(e)) => Err(RecoveryError::ForceKillFailed(e.to_string())), + Err(_) => Err(RecoveryError::Timeout(FORCE_KILL_TIMEOUT)), + } +} + +/// Execute restart via process manager. +async fn execute_restart(manager: &BrokerManager, collector_id: &str) -> Result<(), RecoveryError> { + debug!(collector_id = %collector_id, "Executing restart"); + + let process_manager = manager.process_manager(); + + let result = tokio::time::timeout(RESTART_TIMEOUT, async { + process_manager + .restart_collector(collector_id, RESTART_TIMEOUT) + .await + }) + .await; + + match result { + Ok(Ok(_new_pid)) => { + debug!(collector_id = %collector_id, "Restart succeeded"); + Ok(()) + } + Ok(Err(e)) => Err(RecoveryError::RestartFailed(e.to_string())), + Err(_) => Err(RecoveryError::Timeout(RESTART_TIMEOUT)), + } +} + +/// Extension trait for `HealthStatus` to check if healthy. +trait HealthStatusExt { + fn is_healthy(&self) -> bool; +} + +impl HealthStatusExt for daemoneye_eventbus::rpc::HealthStatus { + fn is_healthy(&self) -> bool { + matches!(self, Self::Healthy) + } +} + +#[cfg(test)] +#[allow( + clippy::expect_used, + clippy::unwrap_used, + clippy::str_to_string, + clippy::semicolon_outside_block, + clippy::semicolon_inside_block +)] +mod tests { + use super::*; + + #[test] + fn test_recovery_action_escalation() { + assert_eq!( + RecoveryAction::HealthCheck.escalate(), + Some(RecoveryAction::GracefulShutdown) + ); + assert_eq!( + RecoveryAction::GracefulShutdown.escalate(), + Some(RecoveryAction::ForceKill) + ); + assert_eq!( + RecoveryAction::ForceKill.escalate(), + Some(RecoveryAction::Restart) + ); + assert_eq!(RecoveryAction::Restart.escalate(), None); + } + + #[test] + fn test_recovery_action_from_heartbeat_status_healthy() { + let status = HeartbeatStatus::Healthy; + assert!(RecoveryAction::from_heartbeat_status(&status).is_none()); + } + + #[test] + fn test_recovery_action_from_heartbeat_status_degraded_low() { + let status = HeartbeatStatus::Degraded { missed_count: 1 }; + assert!(RecoveryAction::from_heartbeat_status(&status).is_none()); + } + + #[test] + fn test_recovery_action_from_heartbeat_status_degraded_high() { + let status = HeartbeatStatus::Degraded { missed_count: 2 }; + assert_eq!( + RecoveryAction::from_heartbeat_status(&status), + Some(RecoveryAction::HealthCheck) + ); + } + + #[test] + fn test_recovery_action_from_heartbeat_status_failed() { + let status = HeartbeatStatus::Failed { + missed_count: 3, + time_since_last: Duration::from_secs(90), + }; + assert_eq!( + RecoveryAction::from_heartbeat_status(&status), + Some(RecoveryAction::GracefulShutdown) + ); + } + + #[test] + fn test_recovery_action_display() { + assert_eq!(format!("{}", RecoveryAction::HealthCheck), "HealthCheck"); + assert_eq!( + format!("{}", RecoveryAction::GracefulShutdown), + "GracefulShutdown" + ); + assert_eq!(format!("{}", RecoveryAction::ForceKill), "ForceKill"); + assert_eq!(format!("{}", RecoveryAction::Restart), "Restart"); + } + + #[test] + fn test_recovery_action_ordering() { + // Actions should be ordered by escalation level + assert!(RecoveryAction::HealthCheck < RecoveryAction::GracefulShutdown); + assert!(RecoveryAction::GracefulShutdown < RecoveryAction::ForceKill); + assert!(RecoveryAction::ForceKill < RecoveryAction::Restart); + } + + #[test] + fn test_collector_recovery_state_new() { + let state = CollectorRecoveryState::new("test-collector".to_owned()); + assert_eq!(state.collector_id, "test-collector"); + assert!(state.current_action.is_none()); + assert_eq!(state.attempt_count, 0); + assert!(state.last_result.is_none()); + assert!(!state.in_progress); + } + + #[test] + fn test_collector_recovery_state_exhaustion() { + let mut state = CollectorRecoveryState::new("test-collector".to_owned()); + assert!(!state.is_exhausted()); + + // Simulate max attempts + state.attempt_count = MAX_RECOVERY_ATTEMPTS; + assert!(state.is_exhausted()); + } + + #[test] + fn test_collector_recovery_state_lifecycle() { + let mut state = CollectorRecoveryState::new("test-collector".to_owned()); + + // Start recovery + state.start_recovery(RecoveryAction::HealthCheck); + assert!(state.in_progress); + assert_eq!(state.current_action, Some(RecoveryAction::HealthCheck)); + assert_eq!(state.attempt_count, 1); + + // Complete recovery + state.complete_recovery(RecoveryResult::Success { + action: RecoveryAction::HealthCheck, + message: "OK".to_owned(), + }); + assert!(!state.in_progress); + assert!(matches!( + state.last_result, + Some(RecoveryResult::Success { .. }) + )); + + // Reset + state.reset(); + assert!(state.current_action.is_none()); + assert_eq!(state.attempt_count, 0); + assert!(state.last_result.is_none()); + assert!(!state.in_progress); + } +} diff --git a/daemoneye-agent/tests/heartbeat_detection_integration.rs b/daemoneye-agent/tests/heartbeat_detection_integration.rs new file mode 100644 index 0000000..db2064d --- /dev/null +++ b/daemoneye-agent/tests/heartbeat_detection_integration.rs @@ -0,0 +1,449 @@ +//! Integration tests for heartbeat detection and recovery functionality. +//! +//! These tests verify the heartbeat monitoring, missed heartbeat detection, +//! and escalating recovery action infrastructure. + +#![allow( + clippy::expect_used, + clippy::unwrap_used, + clippy::str_to_string, + clippy::semicolon_outside_block, + clippy::semicolon_inside_block +)] + +use daemoneye_agent::collector_registry::{ + CollectorRegistry, HeartbeatStatus, MAX_MISSED_HEARTBEATS, +}; +use daemoneye_agent::recovery::{ + CollectorRecoveryState, MAX_RECOVERY_ATTEMPTS, RecoveryAction, RecoveryResult, +}; +use daemoneye_eventbus::rpc::RegistrationRequest; +use std::collections::HashMap; +use std::time::Duration; + +fn create_registration_request(collector_id: &str) -> RegistrationRequest { + RegistrationRequest { + collector_id: collector_id.to_string(), + collector_type: "process".to_string(), + hostname: "test-host".to_string(), + version: Some("1.0.0".to_string()), + pid: Some(12345), + capabilities: vec!["process".to_string()], + attributes: HashMap::new(), + heartbeat_interval_ms: Some(1000), // 1 second heartbeat + } +} + +// ------------------------------------------------ +// HeartbeatStatus Tests +// ------------------------------------------------ + +#[test] +fn test_heartbeat_status_needs_recovery() { + // Healthy status does not need recovery + assert!(!HeartbeatStatus::Healthy.needs_recovery()); + + // Degraded status does not need recovery + assert!(!HeartbeatStatus::Degraded { missed_count: 1 }.needs_recovery()); + assert!(!HeartbeatStatus::Degraded { missed_count: 2 }.needs_recovery()); + + // Failed status needs recovery + assert!( + HeartbeatStatus::Failed { + missed_count: 3, + time_since_last: Duration::from_secs(90), + } + .needs_recovery() + ); +} + +#[test] +fn test_heartbeat_status_is_healthy() { + assert!(HeartbeatStatus::Healthy.is_healthy()); + assert!(!HeartbeatStatus::Degraded { missed_count: 1 }.is_healthy()); + assert!( + !HeartbeatStatus::Failed { + missed_count: 3, + time_since_last: Duration::from_secs(90), + } + .is_healthy() + ); +} + +// ------------------------------------------------ +// CollectorRegistry Heartbeat Tests +// ------------------------------------------------ + +#[tokio::test] +async fn test_registry_update_heartbeat_resets_missed_count() { + let registry = CollectorRegistry::default(); + + // Register a collector + let request = create_registration_request("heartbeat-test"); + registry + .register(request) + .await + .expect("registration succeeds"); + + // Update heartbeat should succeed + registry + .update_heartbeat("heartbeat-test") + .await + .expect("heartbeat update succeeds"); + + // Get the collector record to verify + let record = registry + .get("heartbeat-test") + .await + .expect("collector exists"); + assert_eq!(record.missed_heartbeats, 0); +} + +#[tokio::test] +async fn test_registry_update_heartbeat_unknown_collector() { + let registry = CollectorRegistry::default(); + + // Update heartbeat for non-existent collector should fail + let result = registry.update_heartbeat("unknown-collector").await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_registry_heartbeat_status_healthy() { + let registry = CollectorRegistry::default(); + + // Register a collector + let request = create_registration_request("healthy-collector"); + registry + .register(request) + .await + .expect("registration succeeds"); + + // Immediately after registration, status should be healthy + let status = registry.heartbeat_status("healthy-collector").await; + assert!(status.is_some()); + assert!(matches!(status.unwrap(), HeartbeatStatus::Healthy)); +} + +#[tokio::test] +async fn test_registry_heartbeat_status_unknown_collector() { + let registry = CollectorRegistry::default(); + + // Status for non-existent collector should be None + let status = registry.heartbeat_status("unknown-collector").await; + assert!(status.is_none()); +} + +#[tokio::test] +async fn test_registry_reset_missed_heartbeats() { + let registry = CollectorRegistry::default(); + + // Register a collector + let request = create_registration_request("reset-test"); + registry + .register(request) + .await + .expect("registration succeeds"); + + // Reset missed heartbeats should succeed + registry + .reset_missed_heartbeats("reset-test") + .await + .expect("reset succeeds"); + + // Verify it's still at 0 + let record = registry.get("reset-test").await.expect("collector exists"); + assert_eq!(record.missed_heartbeats, 0); +} + +// ------------------------------------------------ +// RecoveryAction Tests +// ------------------------------------------------ + +#[test] +fn test_recovery_action_escalation_chain() { + // Full escalation chain + let action = RecoveryAction::HealthCheck; + let action = action.escalate().expect("can escalate from HealthCheck"); + assert_eq!(action, RecoveryAction::GracefulShutdown); + + let action = action + .escalate() + .expect("can escalate from GracefulShutdown"); + assert_eq!(action, RecoveryAction::ForceKill); + + let action = action.escalate().expect("can escalate from ForceKill"); + assert_eq!(action, RecoveryAction::Restart); + + // Cannot escalate from Restart + assert!(action.escalate().is_none()); +} + +#[test] +fn test_recovery_action_from_degraded_status() { + // Low missed count - no action needed + let status = HeartbeatStatus::Degraded { missed_count: 1 }; + assert!(RecoveryAction::from_heartbeat_status(&status).is_none()); + + // Higher missed count - trigger health check + let status = HeartbeatStatus::Degraded { missed_count: 2 }; + assert_eq!( + RecoveryAction::from_heartbeat_status(&status), + Some(RecoveryAction::HealthCheck) + ); + + let status = HeartbeatStatus::Degraded { missed_count: 3 }; + assert_eq!( + RecoveryAction::from_heartbeat_status(&status), + Some(RecoveryAction::HealthCheck) + ); +} + +#[test] +fn test_recovery_action_from_failed_status() { + let status = HeartbeatStatus::Failed { + missed_count: MAX_MISSED_HEARTBEATS, + time_since_last: Duration::from_secs(120), + }; + + // Failed status should trigger graceful shutdown (skip health check) + assert_eq!( + RecoveryAction::from_heartbeat_status(&status), + Some(RecoveryAction::GracefulShutdown) + ); +} + +// ------------------------------------------------ +// CollectorRecoveryState Tests +// ------------------------------------------------ + +#[test] +fn test_recovery_state_lifecycle() { + let mut state = CollectorRecoveryState::new("test-collector".to_string()); + + // Initial state + assert!(!state.in_progress); + assert!(state.current_action.is_none()); + assert_eq!(state.attempt_count, 0); + assert!(!state.is_exhausted()); + + // Start recovery + state.start_recovery(RecoveryAction::HealthCheck); + assert!(state.in_progress); + assert_eq!(state.current_action, Some(RecoveryAction::HealthCheck)); + assert_eq!(state.attempt_count, 1); + + // Complete recovery successfully + state.complete_recovery(RecoveryResult::Success { + action: RecoveryAction::HealthCheck, + message: "OK".to_string(), + }); + assert!(!state.in_progress); + assert!(matches!( + state.last_result, + Some(RecoveryResult::Success { .. }) + )); + + // Reset state + state.reset(); + assert!(!state.in_progress); + assert!(state.current_action.is_none()); + assert_eq!(state.attempt_count, 0); + assert!(state.last_result.is_none()); +} + +#[test] +fn test_recovery_state_exhaustion() { + let mut state = CollectorRecoveryState::new("exhausted-collector".to_string()); + + // Simulate multiple recovery attempts + for i in 0..MAX_RECOVERY_ATTEMPTS { + assert!( + !state.is_exhausted(), + "Should not be exhausted at attempt {i}" + ); + state.start_recovery(RecoveryAction::HealthCheck); + state.complete_recovery(RecoveryResult::Failed { + action: RecoveryAction::HealthCheck, + error: "Test failure".to_string(), + should_escalate: true, + }); + } + + // Should now be exhausted + assert!(state.is_exhausted()); +} + +// ------------------------------------------------ +// Integration Tests - Heartbeat + Recovery Workflow +// ------------------------------------------------ + +#[tokio::test] +async fn test_heartbeat_to_recovery_workflow() { + let registry = CollectorRegistry::default(); + + // Register collector with short heartbeat interval + let mut request = create_registration_request("workflow-test"); + request.heartbeat_interval_ms = Some(100); // 100ms heartbeat + registry + .register(request) + .await + .expect("registration succeeds"); + + // Initially healthy + let status = registry.heartbeat_status("workflow-test").await; + assert!(matches!(status, Some(HeartbeatStatus::Healthy))); + assert!(RecoveryAction::from_heartbeat_status(&status.unwrap()).is_none()); + + // Simulate sending heartbeat + registry + .update_heartbeat("workflow-test") + .await + .expect("heartbeat update succeeds"); + + // Status should still be healthy + let status = registry.heartbeat_status("workflow-test").await; + assert!(matches!(status, Some(HeartbeatStatus::Healthy))); +} + +#[tokio::test] +async fn test_multiple_collectors_heartbeat_tracking() { + let registry = CollectorRegistry::default(); + + // Register multiple collectors + let collectors = vec!["collector-a", "collector-b", "collector-c"]; + for collector_id in &collectors { + let request = create_registration_request(collector_id); + registry + .register(request) + .await + .expect("registration succeeds"); + } + + // Update heartbeats for all + for collector_id in &collectors { + registry + .update_heartbeat(collector_id) + .await + .expect("heartbeat update succeeds"); + } + + // All should be healthy + for collector_id in &collectors { + let status = registry.heartbeat_status(collector_id).await; + assert!( + matches!(status, Some(HeartbeatStatus::Healthy)), + "Collector {collector_id} should be healthy" + ); + } +} + +#[tokio::test] +async fn test_list_collector_ids() { + let registry = CollectorRegistry::default(); + + // Register multiple collectors + let expected_ids = vec!["list-test-a", "list-test-b"]; + for collector_id in &expected_ids { + let request = create_registration_request(collector_id); + registry + .register(request) + .await + .expect("registration succeeds"); + } + + // List should contain all registered collectors + let ids = registry.list_collector_ids().await; + assert_eq!(ids.len(), expected_ids.len()); + for expected_id in &expected_ids { + assert!( + ids.contains(&expected_id.to_string()), + "Should contain {expected_id}" + ); + } +} + +#[test] +fn test_max_missed_heartbeats_constant() { + // Verify the constant value matches expectations + assert_eq!(MAX_MISSED_HEARTBEATS, 3); +} + +#[test] +fn test_max_recovery_attempts_constant() { + // Verify the constant value matches expectations + assert_eq!(MAX_RECOVERY_ATTEMPTS, 3); +} + +// ------------------------------------------------ +// Recovery Result Tests +// ------------------------------------------------ + +#[test] +fn test_recovery_result_success() { + let result = RecoveryResult::Success { + action: RecoveryAction::HealthCheck, + message: "Collector recovered".to_string(), + }; + + if let RecoveryResult::Success { action, message } = result { + assert_eq!(action, RecoveryAction::HealthCheck); + assert_eq!(message, "Collector recovered"); + } else { + panic!("Expected Success result"); + } +} + +#[test] +fn test_recovery_result_failed() { + let result = RecoveryResult::Failed { + action: RecoveryAction::GracefulShutdown, + error: "Connection refused".to_string(), + should_escalate: true, + }; + + if let RecoveryResult::Failed { + action, + error, + should_escalate, + } = result + { + assert_eq!(action, RecoveryAction::GracefulShutdown); + assert_eq!(error, "Connection refused"); + assert!(should_escalate); + } else { + panic!("Expected Failed result"); + } +} + +#[test] +fn test_recovery_result_skipped() { + let result = RecoveryResult::Skipped { + reason: "Collector already healthy".to_string(), + }; + + if let RecoveryResult::Skipped { reason } = result { + assert_eq!(reason, "Collector already healthy"); + } else { + panic!("Expected Skipped result"); + } +} + +#[test] +fn test_recovery_result_exhausted() { + let result = RecoveryResult::Exhausted { + attempts: 3, + last_action: RecoveryAction::Restart, + }; + + if let RecoveryResult::Exhausted { + attempts, + last_action, + } = result + { + assert_eq!(attempts, 3); + assert_eq!(last_action, RecoveryAction::Restart); + } else { + panic!("Expected Exhausted result"); + } +} From 6472d8294a4e148b0deb3cef64b6354d7fab4a08 Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Mon, 2 Feb 2026 19:52:13 -0500 Subject: [PATCH 5/6] fix(daemoneye-agent): address CodeRabbit review findings - Fix test assertion case mismatch in test_validation_empty_id - Add state rollback on broadcast_begin_monitoring failure to prevent inconsistent state where agent is in SteadyState but collectors never received the "begin monitoring" message Co-Authored-By: Claude Opus 4.5 --- daemoneye-agent/src/broker_manager.rs | 10 +++++++++- daemoneye-agent/src/collector_config.rs | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/daemoneye-agent/src/broker_manager.rs b/daemoneye-agent/src/broker_manager.rs index 86245c5..3d33c38 100644 --- a/daemoneye-agent/src/broker_manager.rs +++ b/daemoneye-agent/src/broker_manager.rs @@ -948,7 +948,15 @@ impl BrokerManager { // Broadcast "begin monitoring" to all collectors drop(state); // Release lock before async operations - self.broadcast_begin_monitoring().await?; + if let Err(e) = self.broadcast_begin_monitoring().await { + // Rollback state on broadcast failure + warn!( + error = %e, + "Failed to broadcast 'begin monitoring', rolling back to Ready state" + ); + *self.agent_state.write().await = AgentState::Ready; + return Err(e); + } Ok(()) } diff --git a/daemoneye-agent/src/collector_config.rs b/daemoneye-agent/src/collector_config.rs index 3919444..ef39b54 100644 --- a/daemoneye-agent/src/collector_config.rs +++ b/daemoneye-agent/src/collector_config.rs @@ -493,7 +493,7 @@ mod tests { assert!(matches!( result, - Err(CollectorConfigError::ValidationError { message }) if message.contains("ID cannot be empty") + Err(CollectorConfigError::ValidationError { message }) if message.contains("collector ID cannot be empty") )); } From 5e59c8f2bbe285b854f0c4c1930d3be07a76d17e Mon Sep 17 00:00:00 2001 From: UncleSp1d3r Date: Mon, 2 Feb 2026 20:39:31 -0500 Subject: [PATCH 6/6] fix(daemoneye-agent): add clippy lint allows for test modules Add necessary clippy lint allows to test code to pass CI checks: - broker_manager.rs: shadow_unrelated, wildcard_enum_match_arm, panic - heartbeat_detection_integration.rs: shadow_reuse, shadow_unrelated, inefficient_to_string, panic - loading_state_integration.rs: inefficient_to_string, fix doc backticks Co-Authored-By: Claude Opus 4.5 --- daemoneye-agent/src/broker_manager.rs | 5 ++++- daemoneye-agent/tests/heartbeat_detection_integration.rs | 6 +++++- daemoneye-agent/tests/loading_state_integration.rs | 5 +++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/daemoneye-agent/src/broker_manager.rs b/daemoneye-agent/src/broker_manager.rs index 3d33c38..ced4514 100644 --- a/daemoneye-agent/src/broker_manager.rs +++ b/daemoneye-agent/src/broker_manager.rs @@ -1538,7 +1538,10 @@ fn aggregate_worst_of>(iter: I) -> HealthStatus clippy::str_to_string, clippy::semicolon_outside_block, clippy::semicolon_inside_block, - clippy::semicolon_if_nothing_returned + clippy::semicolon_if_nothing_returned, + clippy::shadow_unrelated, + clippy::wildcard_enum_match_arm, + clippy::panic )] mod tests { use super::*; diff --git a/daemoneye-agent/tests/heartbeat_detection_integration.rs b/daemoneye-agent/tests/heartbeat_detection_integration.rs index db2064d..4f23574 100644 --- a/daemoneye-agent/tests/heartbeat_detection_integration.rs +++ b/daemoneye-agent/tests/heartbeat_detection_integration.rs @@ -8,7 +8,11 @@ clippy::unwrap_used, clippy::str_to_string, clippy::semicolon_outside_block, - clippy::semicolon_inside_block + clippy::semicolon_inside_block, + clippy::shadow_reuse, + clippy::shadow_unrelated, + clippy::inefficient_to_string, + clippy::panic )] use daemoneye_agent::collector_registry::{ diff --git a/daemoneye-agent/tests/loading_state_integration.rs b/daemoneye-agent/tests/loading_state_integration.rs index f3777f5..53b3967 100644 --- a/daemoneye-agent/tests/loading_state_integration.rs +++ b/daemoneye-agent/tests/loading_state_integration.rs @@ -1,7 +1,7 @@ //! Integration tests for agent loading state coordination. //! //! These tests validate the complete loading state machine workflow including: -//! - State transitions (Loading → Ready → SteadyState) +//! - State transitions (Loading → Ready → `SteadyState`) //! - Collector configuration loading //! - Collector readiness tracking //! - Startup timeout handling @@ -13,7 +13,8 @@ clippy::unwrap_used, clippy::uninlined_format_args, clippy::let_underscore_must_use, - clippy::print_stdout + clippy::print_stdout, + clippy::inefficient_to_string )] use daemoneye_agent::broker_manager::{AgentState, BrokerManager};