diff --git a/procmond/src/main.rs b/procmond/src/main.rs index 2820851..27ff176 100644 --- a/procmond/src/main.rs +++ b/procmond/src/main.rs @@ -1,14 +1,19 @@ #![forbid(unsafe_code)] use clap::Parser; -use collector_core::{Collector, CollectorConfig, CollectorRegistrationConfig}; +use collector_core::{CollectionEvent, Collector, CollectorConfig, CollectorRegistrationConfig}; use daemoneye_lib::{config, storage, telemetry}; -use procmond::{ProcessEventSource, ProcessSourceConfig}; +use procmond::{ + ProcessEventSource, ProcessSourceConfig, + event_bus_connector::EventBusConnector, + monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig}, +}; use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; -use tracing::info; +use tokio::sync::{Mutex, mpsc}; +use tracing::{error, info, warn}; /// Parse and validate the collection interval argument. /// @@ -98,69 +103,279 @@ pub async fn main() -> Result<(), Box> { "Database stats retrieved" ); - // Create collector configuration - let mut collector_config = CollectorConfig::new() - .with_component_name("procmond".to_owned()) - .with_ipc_endpoint(daemoneye_lib::ipc::IpcConfig::default().endpoint_path) - .with_max_event_sources(1) - .with_event_buffer_size(1000) - .with_shutdown_timeout(Duration::from_secs(30)) - .with_health_check_interval(Duration::from_secs(60)) - .with_telemetry(true) - .with_debug_logging(cli.log_level == "debug"); - - // Enable broker registration for RPC service - // Note: In a real deployment, the broker would be provided via configuration - // For now, we'll configure registration but it will only work if a broker is available - collector_config.registration = Some(CollectorRegistrationConfig { - enabled: true, - broker: None, // Will be set if broker is available via environment/config - collector_id: Some("procmond".to_owned()), - collector_type: Some("procmond".to_owned()), - topic: "control.collector.registration".to_owned(), - timeout: Duration::from_secs(10), - retry_attempts: 3, - heartbeat_interval: Duration::from_secs(30), - attributes: HashMap::new(), - }); - - // Create process source configuration - let process_config = ProcessSourceConfig { - collection_interval: Duration::from_secs(cli.interval), - collect_enhanced_metadata: cli.enhanced_metadata, - max_processes_per_cycle: cli.max_processes, - compute_executable_hashes: cli.compute_hashes, - ..Default::default() - }; - - // Create process event source - let process_source = ProcessEventSource::with_config(db_manager, process_config); - - // Log RPC service status before moving collector_config - // The RPC service will be automatically started by collector-core after broker registration - let registration_enabled = collector_config - .registration - .as_ref() - .is_some_and(|r| r.enabled); - let collector_id_str = collector_config - .registration - .as_ref() - .and_then(|r| r.collector_id.as_deref()) - .unwrap_or("procmond"); - - if registration_enabled { + // Check for broker configuration via environment variable + // DAEMONEYE_BROKER_SOCKET: If set, use actor mode with EventBusConnector + // If not set, use standalone mode with collector-core + let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok(); + + if let Some(ref socket_path) = broker_socket { + // ======================================================================== + // Actor Mode: Use ProcmondMonitorCollector with EventBusConnector + // ======================================================================== info!( - collector_id = %collector_id_str, - "RPC service will be initialized after broker registration" + socket_path = %socket_path, + "Broker socket configured, starting in actor mode" ); - } - // Create and configure collector - let mut collector = Collector::new(collector_config); - collector.register(Box::new(process_source))?; + // Create actor channel (bounded, capacity: 100) + let (actor_handle, message_receiver) = ProcmondMonitorCollector::create_channel(); + + // Create ProcmondMonitorConfig from CLI arguments + let monitor_config = ProcmondMonitorConfig { + base_config: collector_core::MonitorCollectorConfig { + collection_interval: Duration::from_secs(cli.interval), + ..Default::default() + }, + process_config: procmond::process_collector::ProcessCollectionConfig { + collect_enhanced_metadata: cli.enhanced_metadata, + max_processes: cli.max_processes, + compute_executable_hashes: cli.compute_hashes, + ..Default::default() + }, + ..Default::default() + }; + + // Create the actor-based collector + let mut collector = ProcmondMonitorCollector::new( + Arc::clone(&db_manager), + monitor_config, + message_receiver, + )?; + + // Initialize EventBusConnector with WAL directory + let wal_dir = PathBuf::from(&cli.database).parent().map_or_else( + || PathBuf::from("/var/lib/daemoneye/wal"), + |p| p.join("wal"), + ); + + // Ensure WAL directory exists (fail-fast to avoid confusing WAL init failures) + std::fs::create_dir_all(&wal_dir).map_err(|e| { + error!( + wal_dir = ?wal_dir, + error = %e, + "Failed to create WAL directory" + ); + e + })?; + + let mut event_bus_connector = EventBusConnector::new(wal_dir).await?; + + // Attempt to connect to the broker + match event_bus_connector.connect().await { + Ok(()) => { + info!("Connected to daemoneye-agent broker"); + + // Replay any events from WAL (crash recovery) + match event_bus_connector.replay_wal().await { + Ok(replayed) if replayed > 0 => { + info!( + replayed = replayed, + "Replayed events from WAL after connection" + ); + } + Ok(_) => { + info!("No events to replay from WAL"); + } + Err(e) => { + warn!(error = %e, "Failed to replay WAL, some events may be delayed"); + } + } + } + Err(e) => { + warn!( + error = %e, + "Failed to connect to broker, will buffer events until connection available" + ); + } + } + + // Take the backpressure receiver before moving connector to collector + let backpressure_rx = event_bus_connector.take_backpressure_receiver(); + + // Set the EventBusConnector on the collector + collector.set_event_bus_connector(event_bus_connector); + + // Spawn backpressure monitor task if we have the receiver + let original_interval = Duration::from_secs(cli.interval); + let backpressure_task = backpressure_rx.map_or_else( + || { + warn!("Backpressure receiver not available, dynamic interval adjustment disabled"); + None + }, + |bp_rx| { + Some(ProcmondMonitorCollector::spawn_backpressure_monitor( + actor_handle.clone(), + bp_rx, + original_interval, + )) + }, + ); + + // Create event channel for the actor's output + let (event_tx, mut event_rx) = mpsc::channel::(1000); + + // Clone handle for shutdown task + let shutdown_handle = actor_handle.clone(); + + // Spawn task to handle graceful shutdown on Ctrl+C + let shutdown_task = tokio::spawn(async move { + // Wait for Ctrl+C + if let Err(e) = tokio::signal::ctrl_c().await { + error!(error = %e, "Failed to listen for Ctrl+C signal"); + return; + } + info!("Received Ctrl+C, initiating graceful shutdown"); - // Run the collector (this will handle IPC, event processing, and lifecycle management) - collector.run().await?; + // Send graceful shutdown to actor + match shutdown_handle.graceful_shutdown().await { + Ok(()) => info!("Actor shutdown completed successfully"), + Err(e) => error!(error = %e, "Actor shutdown failed"), + } + }); + + // Startup behavior: begin monitoring immediately on launch. + // + // The collector currently does not wait for an explicit "begin monitoring" + // command from the agent. This makes procmond usable in isolation and in + // test environments without requiring the full agent/broker stack. + // + // If coordinated startup with the agent becomes a hard requirement in the + // future, this is the place to integrate a subscription to a + // `control.collector.lifecycle` (or similar) control topic and defer + // calling `begin_monitoring()` until the appropriate control message is + // received. + info!("Starting collection immediately on startup"); + if let Err(e) = actor_handle.begin_monitoring() { + error!(error = %e, "Failed to send BeginMonitoring command"); + } + + // Spawn the actor task + let actor_task = tokio::spawn(async move { + if let Err(e) = collector.run(event_tx).await { + error!(error = %e, "Actor run loop failed"); + } + }); + + // Spawn task to consume events from the actor (logging only for now) + let event_consumer_task = tokio::spawn(async move { + let mut event_count = 0_u64; + while let Some(event) = event_rx.recv().await { + event_count = event_count.saturating_add(1); + if event_count.is_multiple_of(100) { + info!(total_events = event_count, "Processing collection events"); + } + // In a full implementation, events would be sent to downstream processors + match event { + CollectionEvent::Process(pe) => { + tracing::trace!(pid = pe.pid, name = %pe.name, "Received process event"); + } + CollectionEvent::Network(_) + | CollectionEvent::Filesystem(_) + | CollectionEvent::Performance(_) + | CollectionEvent::TriggerRequest(_) => { + tracing::trace!("Received non-process event"); + } + } + } + info!(total_events = event_count, "Event consumer task exiting"); + }); + + // Wait for actor to complete (either by shutdown or error) + tokio::select! { + result = actor_task => { + if let Err(e) = result { + error!(error = %e, "Actor task panicked"); + } + } + _ = shutdown_task => { + info!("Shutdown task completed"); + } + } + + // Clean up backpressure monitor task + if let Some(bp_task) = backpressure_task { + bp_task.abort(); + info!("Backpressure monitor task aborted"); + } + + // Wait for event consumer to exit naturally (channel sender is dropped) + // Use a timeout to avoid hanging indefinitely + match tokio::time::timeout(Duration::from_secs(5), event_consumer_task).await { + Ok(Ok(())) => info!("Event consumer task completed successfully"), + Ok(Err(e)) => error!(error = %e, "Event consumer task join error"), + Err(_) => { + warn!("Event consumer task did not complete within timeout"); + } + } + + info!("Procmond actor mode shutdown complete"); + } else { + // ======================================================================== + // Standalone Mode: Use ProcessEventSource with collector-core + // ======================================================================== + info!("No broker socket configured, starting in standalone mode"); + + // Create collector configuration + let mut collector_config = CollectorConfig::new() + .with_component_name("procmond".to_owned()) + .with_ipc_endpoint(daemoneye_lib::ipc::IpcConfig::default().endpoint_path) + .with_max_event_sources(1) + .with_event_buffer_size(1000) + .with_shutdown_timeout(Duration::from_secs(30)) + .with_health_check_interval(Duration::from_secs(60)) + .with_telemetry(true) + .with_debug_logging(cli.log_level == "debug"); + + // Enable broker registration for RPC service (if broker becomes available) + collector_config.registration = Some(CollectorRegistrationConfig { + enabled: true, + broker: None, + collector_id: Some("procmond".to_owned()), + collector_type: Some("procmond".to_owned()), + topic: "control.collector.registration".to_owned(), + timeout: Duration::from_secs(10), + retry_attempts: 3, + heartbeat_interval: Duration::from_secs(30), + attributes: HashMap::new(), + }); + + // Create process source configuration + let process_config = ProcessSourceConfig { + collection_interval: Duration::from_secs(cli.interval), + collect_enhanced_metadata: cli.enhanced_metadata, + max_processes_per_cycle: cli.max_processes, + compute_executable_hashes: cli.compute_hashes, + ..Default::default() + }; + + // Create process event source + let process_source = ProcessEventSource::with_config(db_manager, process_config); + + // Log RPC service status + let registration_enabled = collector_config + .registration + .as_ref() + .is_some_and(|r| r.enabled); + let collector_id_str = collector_config + .registration + .as_ref() + .and_then(|r| r.collector_id.as_deref()) + .unwrap_or("procmond"); + + if registration_enabled { + info!( + collector_id = %collector_id_str, + "RPC service will be initialized after broker registration" + ); + } + + // Create and configure collector + let mut collector = Collector::new(collector_config); + collector.register(Box::new(process_source))?; + + // Run the collector (handles IPC, event processing, and lifecycle management) + collector.run().await?; + } Ok(()) } diff --git a/procmond/src/monitor_collector.rs b/procmond/src/monitor_collector.rs index 1e237e4..9a325f2 100644 --- a/procmond/src/monitor_collector.rs +++ b/procmond/src/monitor_collector.rs @@ -5,13 +5,14 @@ //! collector-core `EventSource` trait. use crate::{ - lifecycle::{LifecycleTrackingConfig, ProcessLifecycleTracker}, + event_bus_connector::{EventBusConnector, ProcessEventType}, + lifecycle::{LifecycleTrackingConfig, ProcessLifecycleEvent, ProcessLifecycleTracker}, process_collector::{ProcessCollectionConfig, ProcessCollector, SysinfoProcessCollector}, }; use anyhow::Context; use async_trait::async_trait; use collector_core::{ - AnalysisChainCoordinator, CollectionEvent, EventBus, EventSource, LocalEventBus, + AnalysisChainCoordinator, CollectionEvent, EventSource, MonitorCollector as MonitorCollectorTrait, MonitorCollectorConfig, MonitorCollectorStats, MonitorCollectorStatsSnapshot, SourceCaps, TriggerManager, }; @@ -24,22 +25,272 @@ use std::{ time::{Duration, Instant}, }; use tokio::{ - sync::{Mutex, RwLock, Semaphore, mpsc}, + sync::{Mutex, Semaphore, mpsc}, time::{interval, timeout}, }; use tracing::{debug, error, info, instrument, warn}; +// ============================================================================ +// Actor Pattern Types +// ============================================================================ + +/// Messages for the ProcmondMonitorCollector actor. +/// +/// The actor processes these messages sequentially to maintain consistent state +/// without complex locking. Request/response patterns use oneshot channels. +#[derive(Debug)] +#[non_exhaustive] +pub enum ActorMessage { + /// Request health check data from the actor. + HealthCheck { + /// Channel to send the health check response. + respond_to: tokio::sync::oneshot::Sender, + }, + /// Update the collector configuration at the next cycle boundary. + UpdateConfig { + /// New configuration to apply (boxed to reduce enum size). + config: Box, + /// Channel to send the result. + respond_to: tokio::sync::oneshot::Sender>, + }, + /// Request graceful shutdown, completing the current cycle. + GracefulShutdown { + /// Channel to signal shutdown completion. + respond_to: tokio::sync::oneshot::Sender>, + }, + /// Signal from agent to begin monitoring after startup coordination. + BeginMonitoring, + /// Adjust collection interval due to backpressure. + AdjustInterval { + /// New collection interval to use. + new_interval: Duration, + }, +} + +/// Current state of the collector actor. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum CollectorState { + /// Waiting for BeginMonitoring command from agent. + WaitingForAgent, + /// Actively collecting process data. + Running, + /// Graceful shutdown in progress. + ShuttingDown, + /// Collector has stopped. + Stopped, +} + +impl std::fmt::Display for CollectorState { + #[allow(clippy::pattern_type_mismatch)] // Match ergonomics for enum Display + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::WaitingForAgent => write!(f, "waiting_for_agent"), + Self::Running => write!(f, "running"), + Self::ShuttingDown => write!(f, "shutting_down"), + Self::Stopped => write!(f, "stopped"), + } + } +} + +/// Health check response data from the collector actor. +#[derive(Debug, Clone)] +pub struct HealthCheckData { + /// Current state of the collector. + pub state: CollectorState, + /// Current collection interval. + pub collection_interval: Duration, + /// Original collection interval (before any backpressure adjustments). + pub original_interval: Duration, + /// Whether connected to the event bus broker. + pub event_bus_connected: bool, + /// Current buffer level percentage (0-100) if available. + pub buffer_level_percent: Option, + /// Timestamp of last successful collection. + pub last_collection: Option, + /// Number of collection cycles completed. + pub collection_cycles: u64, + /// Number of lifecycle events detected. + pub lifecycle_events: u64, + /// Number of collection errors. + pub collection_errors: u64, + /// Number of backpressure events. + pub backpressure_events: u64, +} + +/// Channel capacity for actor messages. +pub const ACTOR_CHANNEL_CAPACITY: usize = 100; + +/// Error returned when the actor channel is full or closed. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ActorError { + /// The actor's message channel is full. + #[error("Actor message channel is full (capacity: {capacity})")] + ChannelFull { capacity: usize }, + /// The actor's message channel is closed. + #[error("Actor message channel is closed")] + ChannelClosed, + /// The response channel was dropped before receiving a response. + #[error("Response channel dropped")] + ResponseDropped, + /// The actor returned an error. + #[error("Actor error: {0}")] + ActorError(#[from] anyhow::Error), +} + +/// Handle for sending messages to the ProcmondMonitorCollector actor. +/// +/// This handle is cloneable and can be shared across tasks to communicate +/// with the actor. It provides typed methods for each message type. +#[derive(Clone)] +pub struct ActorHandle { + sender: mpsc::Sender, +} + +impl ActorHandle { + /// Creates a new actor handle from an mpsc sender. + pub const fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + /// Requests health check data from the actor. + /// + /// Returns detailed health information including collector state, + /// event bus connectivity, and statistics. + pub async fn health_check(&self) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .try_send(ActorMessage::HealthCheck { respond_to: tx }) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ActorError::ChannelFull { + capacity: ACTOR_CHANNEL_CAPACITY, + }, + mpsc::error::TrySendError::Closed(_) => ActorError::ChannelClosed, + })?; + rx.await.map_err(|_recv_err| ActorError::ResponseDropped) + } + + /// Updates the collector configuration. + /// + /// The configuration is applied at the start of the next collection cycle + /// to ensure atomic configuration changes. + pub async fn update_config(&self, config: ProcmondMonitorConfig) -> Result<(), ActorError> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .try_send(ActorMessage::UpdateConfig { + config: Box::new(config), + respond_to: tx, + }) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ActorError::ChannelFull { + capacity: ACTOR_CHANNEL_CAPACITY, + }, + mpsc::error::TrySendError::Closed(_) => ActorError::ChannelClosed, + })?; + rx.await + .map_err(|_recv_err| ActorError::ResponseDropped)? + .map_err(ActorError::ActorError) + } + + /// Requests graceful shutdown of the collector. + /// + /// The collector will complete its current collection cycle before + /// shutting down. Returns when shutdown is complete. + pub async fn graceful_shutdown(&self) -> Result<(), ActorError> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .try_send(ActorMessage::GracefulShutdown { respond_to: tx }) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ActorError::ChannelFull { + capacity: ACTOR_CHANNEL_CAPACITY, + }, + mpsc::error::TrySendError::Closed(_) => ActorError::ChannelClosed, + })?; + rx.await + .map_err(|_recv_err| ActorError::ResponseDropped)? + .map_err(ActorError::ActorError) + } + + /// Signals the collector to begin monitoring. + /// + /// This is called by the agent after startup coordination is complete. + /// The collector transitions from WaitingForAgent to Running state. + pub fn begin_monitoring(&self) -> Result<(), ActorError> { + self.sender + .try_send(ActorMessage::BeginMonitoring) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ActorError::ChannelFull { + capacity: ACTOR_CHANNEL_CAPACITY, + }, + mpsc::error::TrySendError::Closed(_) => ActorError::ChannelClosed, + }) + } + + /// Adjusts the collection interval due to backpressure. + /// + /// Called by the EventBusConnector when backpressure is detected or released. + pub fn adjust_interval(&self, new_interval: Duration) -> Result<(), ActorError> { + self.sender + .try_send(ActorMessage::AdjustInterval { new_interval }) + .map_err(|e| match e { + mpsc::error::TrySendError::Full(_) => ActorError::ChannelFull { + capacity: ACTOR_CHANNEL_CAPACITY, + }, + mpsc::error::TrySendError::Closed(_) => ActorError::ChannelClosed, + }) + } + + /// Checks if the actor channel is closed. + pub fn is_closed(&self) -> bool { + self.sender.is_closed() + } +} + +impl std::fmt::Debug for ActorHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ActorHandle") + .field("closed", &self.sender.is_closed()) + .finish() + } +} + +// ============================================================================ +// Configuration +// ============================================================================ + /// Procmond-specific Monitor Collector configuration. /// /// This extends the base `MonitorCollectorConfig` with procmond-specific /// configuration for process collection and lifecycle tracking. +/// +/// # Hot-Reload Support +/// +/// Configuration can be updated at runtime via `ActorHandle::update_config()`. +/// Updates are applied atomically at collection cycle boundaries. +/// +/// ## Hot-Reloadable Settings +/// +/// These settings can be changed without restarting procmond: +/// - `base_config.collection_interval` - Collection frequency +/// - `base_config.max_events_in_flight` - Backpressure limit (note: semaphore not resized) +/// - `lifecycle_config.start_threshold` - Process start detection threshold +/// - `lifecycle_config.stop_threshold` - Process stop detection threshold +/// - `lifecycle_config.modification_threshold` - Process modification detection threshold +/// +/// ## Requires Restart +/// +/// These settings require procmond restart to take effect: +/// - `process_config.excluded_pids` - Affects collector initialization +/// - `base_config.enable_event_driven` - Requires recreating event bus +/// - `process_config.collection_timeout` - Affects collector initialization #[derive(Debug, Clone, Default)] pub struct ProcmondMonitorConfig { - /// Base monitor collector configuration + /// Base monitor collector configuration (collection_interval is hot-reloadable) pub base_config: MonitorCollectorConfig, - /// Process collection configuration + /// Process collection configuration (mostly requires restart) pub process_config: ProcessCollectionConfig, - /// Lifecycle tracking configuration + /// Lifecycle tracking configuration (thresholds are hot-reloadable) pub lifecycle_config: LifecycleTrackingConfig, } @@ -50,14 +301,34 @@ impl ProcmondMonitorConfig { } } -/// Procmond Monitor Collector implementation. +/// Procmond Monitor Collector implementation using actor pattern. /// /// This collector integrates process lifecycle tracking with the collector-core /// framework, providing event-driven process monitoring capabilities. +/// +/// # Actor Pattern +/// +/// The collector runs as an actor in a dedicated task, processing messages +/// sequentially to maintain consistent state. Messages are received via a +/// bounded mpsc channel (capacity: 100) and processed one at a time. +/// +/// # Startup Coordination +/// +/// The collector starts in `WaitingForAgent` state and waits for a +/// `BeginMonitoring` message from the agent before starting collection. +/// This ensures the agent has completed loading state before monitoring begins. +/// +/// # Backpressure Handling +/// +/// The actor receives `AdjustInterval` messages from the EventBusConnector +/// when backpressure is detected. The collection interval increases by 1.5x +/// during backpressure and is restored when backpressure is released. #[allow(dead_code)] pub struct ProcmondMonitorCollector { - /// Configuration + /// Current configuration (may be updated at cycle boundaries) config: ProcmondMonitorConfig, + /// Pending configuration update to apply at next cycle boundary + pending_config: Option, /// Database manager for audit logging database: Arc>, /// Process collector implementation @@ -68,8 +339,6 @@ pub struct ProcmondMonitorCollector { trigger_manager: Arc, /// Analysis chain coordinator analysis_coordinator: Arc, - /// Event bus for inter-collector communication - event_bus: Arc>>>, /// Runtime statistics stats: Arc, /// Backpressure semaphore @@ -78,26 +347,69 @@ pub struct ProcmondMonitorCollector { consecutive_backpressure_timeouts: Arc, /// Circuit breaker cooldown timestamp circuit_breaker_until: Arc>>, - /// Shutdown coordination - shutdown_signal: Arc, + + // Actor-specific fields + /// Actor message receiver + message_receiver: mpsc::Receiver, + /// Current collector state + state: CollectorState, + /// Current collection interval (may be adjusted due to backpressure) + current_interval: Duration, + /// Original collection interval (before backpressure adjustments) + original_interval: Duration, + /// Timestamp of last successful collection + last_collection: Option, + /// Event bus connection status + event_bus_connected: bool, + /// Buffer level percentage (0-100) from EventBusConnector + buffer_level_percent: Option, + /// Pending graceful shutdown response channel + pending_shutdown_response: Option>>, + /// Pending interval update from backpressure (applied at next iteration) + pending_interval: Option, + + // Event Bus Integration + /// EventBusConnector for publishing events to the broker with WAL integration. + event_bus_connector: Option, } impl ProcmondMonitorCollector { - /// Creates a new Procmond Monitor Collector. + /// Creates a new Procmond Monitor Collector as an actor. + /// + /// Returns both the collector and an `ActorHandle` for sending messages. + /// The collector should be spawned in a dedicated task using the `run()` method. + /// + /// # Arguments + /// + /// * `database` - Database manager for audit logging + /// * `config` - Collector configuration + /// * `message_receiver` - Receiver end of the actor message channel + /// + /// # Example + /// + /// ```ignore + /// let (tx, rx) = mpsc::channel(ACTOR_CHANNEL_CAPACITY); + /// let handle = ActorHandle::new(tx); + /// let collector = ProcmondMonitorCollector::new(database, config, rx)?; + /// tokio::spawn(async move { collector.run(event_tx).await }); + /// ``` pub fn new( database: Arc>, config: ProcmondMonitorConfig, + message_receiver: mpsc::Receiver, ) -> anyhow::Result { // Validate configuration first config .validate() .with_context(|| "Procmond Monitor Collector configuration validation failed")?; + let collection_interval = config.base_config.collection_interval; + info!( - collection_interval_secs = config.base_config.collection_interval.as_secs(), + collection_interval_secs = collection_interval.as_secs(), max_events_in_flight = config.base_config.max_events_in_flight, event_driven = config.base_config.enable_event_driven, - "Creating Procmond Monitor Collector" + "Creating Procmond Monitor Collector (actor mode)" ); // Create process collector @@ -118,53 +430,438 @@ impl ProcmondMonitorCollector { let analysis_coordinator = AnalysisChainCoordinator::new(config.base_config.analysis_config.clone()); - // Create event bus if event-driven architecture is enabled - // Note: LocalEventBus is ready to use immediately after new() - no start() method required - let event_bus = if config.base_config.enable_event_driven { - let bus_config = collector_core::EventBusConfig::default(); - let local_bus = LocalEventBus::new(bus_config); - let bus_arc: Arc = Arc::new(local_bus); - Arc::new(RwLock::new(Some(bus_arc))) - } else { - Arc::new(RwLock::new(None)) - }; - // Create backpressure semaphore with validated capacity let backpressure_semaphore = Arc::new(Semaphore::new(config.base_config.max_events_in_flight)); Ok(Self { config, + pending_config: None, database, process_collector, lifecycle_tracker, trigger_manager, analysis_coordinator, - event_bus, stats: Arc::new(MonitorCollectorStats::default()), backpressure_semaphore, consecutive_backpressure_timeouts: Arc::new(AtomicUsize::new(0)), circuit_breaker_until: Arc::new(std::sync::Mutex::new(None)), - shutdown_signal: Arc::new(AtomicBool::new(false)), + // Actor-specific fields + message_receiver, + state: CollectorState::WaitingForAgent, + current_interval: collection_interval, + original_interval: collection_interval, + last_collection: None, + event_bus_connected: false, + buffer_level_percent: None, + pending_shutdown_response: None, + pending_interval: None, + // Event Bus Integration + event_bus_connector: None, }) } - /// Performs process collection and lifecycle analysis. - #[instrument( - skip(self, tx, shutdown_signal), - fields(source = "procmond-monitor-collector") - )] - async fn collect_and_analyze( - &self, + /// Creates a new actor channel and handle. + /// + /// This is a convenience method for creating the channel infrastructure. + /// The returned handle should be used to send messages to the actor. + pub fn create_channel() -> (ActorHandle, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(ACTOR_CHANNEL_CAPACITY); + (ActorHandle::new(tx), rx) + } + + /// Sets the event bus connection status. + /// + /// Called by main.rs after EventBusConnector connects to the broker. + pub const fn set_event_bus_connected(&mut self, connected: bool) { + self.event_bus_connected = connected; + } + + /// Sets the current buffer level percentage. + /// + /// Called when receiving buffer level updates from EventBusConnector. + pub const fn set_buffer_level(&mut self, level_percent: u8) { + self.buffer_level_percent = Some(level_percent); + } + + /// Sets the EventBusConnector for publishing events to the broker. + /// + /// This should be called after constructing the collector and before + /// calling `run()`. The connector should already be connected or will + /// connect during the run loop. + pub fn set_event_bus_connector(&mut self, connector: EventBusConnector) { + self.event_bus_connected = connector.is_connected(); + self.buffer_level_percent = Some(connector.buffer_usage_percent()); + self.event_bus_connector = Some(connector); + } + + /// Takes the EventBusConnector out of the collector for shutdown. + /// + /// Returns `None` if no connector was set. + #[allow(clippy::missing_const_for_fn)] // Option::take() is not const + pub fn take_event_bus_connector(&mut self) -> Option { + self.event_bus_connector.take() + } + + /// Spawns a backpressure monitoring task that adjusts collection interval. + /// + /// This function should be called from main.rs after creating the actor. + /// It spawns a background task that: + /// 1. Receives `BackpressureSignal` from the EventBusConnector + /// 2. On `Activated`: increases interval by 1.5x via `AdjustInterval` message + /// 3. On `Released`: restores original interval via `AdjustInterval` message + /// + /// # Arguments + /// + /// * `handle` - The actor handle for sending messages + /// * `backpressure_rx` - The receiver from `EventBusConnector::take_backpressure_receiver()` + /// * `original_interval` - The original collection interval (before backpressure) + /// + /// # Returns + /// + /// A `JoinHandle` for the spawned task + pub fn spawn_backpressure_monitor( + handle: ActorHandle, + mut backpressure_rx: mpsc::Receiver, + original_interval: Duration, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + info!( + original_interval_ms = original_interval.as_millis(), + "Starting backpressure monitor task" + ); + + while let Some(signal) = backpressure_rx.recv().await { + match signal { + crate::event_bus_connector::BackpressureSignal::Activated => { + // Increase interval by 1.5x (50% slower collection), clamped to 1 hour max + const MAX_INTERVAL_MS: u128 = 3_600_000; // 1 hour + let scaled_ms = original_interval + .as_millis() + .saturating_mul(3) + .saturating_div(2); + let clamped_ms = if scaled_ms > MAX_INTERVAL_MS { + warn!( + original_interval_ms = original_interval.as_millis(), + scaled_interval_ms = scaled_ms, + max_interval_ms = MAX_INTERVAL_MS, + "Backpressure-adjusted interval exceeds maximum; clamping to 1 hour" + ); + MAX_INTERVAL_MS + } else { + scaled_ms + }; + #[allow(clippy::as_conversions)] + // Safe: clamped_ms <= 3_600_000 fits in u64 + let new_interval = Duration::from_millis(clamped_ms as u64); + info!( + original_interval_ms = original_interval.as_millis(), + new_interval_ms = new_interval.as_millis(), + "Backpressure activated - increasing collection interval by 1.5x" + ); + if let Err(e) = handle.adjust_interval(new_interval) { + warn!(error = %e, "Failed to send AdjustInterval message"); + } + } + crate::event_bus_connector::BackpressureSignal::Released => { + // Restore original interval + info!( + original_interval_ms = original_interval.as_millis(), + "Backpressure released - restoring original collection interval" + ); + if let Err(e) = handle.adjust_interval(original_interval) { + warn!(error = %e, "Failed to send AdjustInterval message"); + } + } + } + } + + info!("Backpressure monitor task exiting (channel closed)"); + }) + } + + /// Runs the actor message processing loop. + /// + /// This method should be spawned in a dedicated task. It processes messages + /// sequentially and runs the collection loop when in Running state. + /// + /// # Startup Coordination + /// + /// The actor starts in `WaitingForAgent` state. It waits for a `BeginMonitoring` + /// message before starting the collection loop. This ensures the daemoneye-agent + /// has completed loading state (privileges dropped, all collectors ready). + /// + /// # Arguments + /// + /// * `event_tx` - Channel for sending collection events to downstream processors + #[instrument(skip(self, event_tx), fields(source = "procmond-monitor-collector"))] + pub async fn run(mut self, event_tx: mpsc::Sender) -> anyhow::Result<()> { + const MAX_CONSECUTIVE_FAILURES: u32 = 5; + + info!( + state = %self.state, + collection_interval_secs = self.current_interval.as_secs(), + "Starting Procmond Monitor Collector actor" + ); + + let mut consecutive_failures = 0_u32; + let mut collection_interval = interval(self.current_interval); + // Skip first tick to avoid immediate collection + collection_interval.tick().await; + + loop { + // Check for pending config update at cycle boundary + if let Some(new_config) = self.pending_config.take() { + self.apply_config_update(new_config); + // Update interval if changed + let new_interval = self.config.base_config.collection_interval; + if new_interval != self.original_interval { + self.original_interval = new_interval; + self.current_interval = new_interval; + collection_interval = interval(self.current_interval); + collection_interval.tick().await; // Reset interval + info!( + new_interval_secs = new_interval.as_secs(), + "Collection interval updated from config" + ); + } + } + + // Check for pending interval adjustment (from backpressure) + if let Some(new_interval) = self.pending_interval.take() { + if new_interval == self.current_interval { + debug!( + interval_ms = new_interval.as_millis(), + "Interval adjustment skipped: already at requested interval" + ); + } else { + let old_interval = self.current_interval; + self.current_interval = new_interval; + collection_interval = interval(self.current_interval); + collection_interval.tick().await; // Reset interval + info!( + old_interval_ms = old_interval.as_millis(), + new_interval_ms = new_interval.as_millis(), + is_backpressure = new_interval > self.original_interval, + "Collection interval adjusted (timer recreated)" + ); + } + } + + tokio::select! { + biased; + + // Process incoming messages (highest priority) + msg = self.message_receiver.recv() => { + if let Some(message) = msg { + let should_exit = self.handle_message(message); + if should_exit { + info!("Actor received shutdown signal, exiting"); + break; + } + } else { + info!("Actor message channel closed, exiting"); + break; + } + } + + // Collection tick (only when in Running state) + _ = collection_interval.tick(), if self.state == CollectorState::Running => { + match self.collect_and_analyze_internal(&event_tx).await { + Ok(()) => { + consecutive_failures = 0; + self.last_collection = Some(Instant::now()); + } + Err(e) => { + error!(error = %e, "Collection cycle failed"); + consecutive_failures = consecutive_failures.saturating_add(1); + + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + error!( + consecutive_failures = consecutive_failures, + "Too many consecutive failures, stopping collector" + ); + self.state = CollectorState::Stopped; + + // If there's a pending shutdown response, send error + // Ignore send result - receiver may have been dropped + if let Some(respond_to) = self.pending_shutdown_response.take() { + drop(respond_to.send(Err(anyhow::anyhow!( + "Collector stopped due to {consecutive_failures} consecutive failures" + )))); + } + break; + } + + // Exponential backoff for failures + let backoff_duration = Duration::from_secs(2_u64.pow(consecutive_failures.min(6))); + warn!( + backoff_seconds = backoff_duration.as_secs(), + "Applying backoff after collection failure" + ); + tokio::time::sleep(backoff_duration).await; + } + } + } + } + } + + // Final shutdown + self.state = CollectorState::Stopped; + + // Shutdown EventBusConnector (flushes buffer, closes connection) + if let Some(ref mut connector) = self.event_bus_connector { + if let Err(e) = connector.shutdown().await { + error!(error = %e, "EventBusConnector shutdown failed"); + } else { + info!("EventBusConnector shutdown completed"); + } + } + + info!("Procmond Monitor Collector actor stopped"); + + // Send shutdown completion if there's a pending response + // Ignore send result - receiver may have been dropped + if let Some(respond_to) = self.pending_shutdown_response.take() { + drop(respond_to.send(Ok(()))); + } + + Ok(()) + } + + /// Handles an incoming actor message. + /// + /// Returns `true` if the actor should exit. + fn handle_message(&mut self, message: ActorMessage) -> bool { + match message { + ActorMessage::HealthCheck { respond_to } => { + let health_data = self.build_health_data(); + // Ignore send result - receiver may have been dropped + drop(respond_to.send(health_data)); + false + } + + ActorMessage::UpdateConfig { config, respond_to } => { + // Queue config for application at next cycle boundary + // Ignore send result - receiver may have been dropped + if let Err(e) = config.validate() { + drop( + respond_to + .send(Err(anyhow::anyhow!("Configuration validation failed: {e}"))), + ); + } else { + self.pending_config = Some(*config); + drop(respond_to.send(Ok(()))); + info!("Configuration update queued for next cycle boundary"); + } + false + } + + ActorMessage::GracefulShutdown { respond_to } => { + info!("Graceful shutdown requested"); + self.state = CollectorState::ShuttingDown; + self.pending_shutdown_response = Some(respond_to); + true // Signal to exit the loop + } + + ActorMessage::BeginMonitoring => { + if self.state == CollectorState::WaitingForAgent { + info!("Received BeginMonitoring command, starting collection"); + self.state = CollectorState::Running; + } else { + warn!( + current_state = %self.state, + "Received BeginMonitoring but not in WaitingForAgent state" + ); + } + false + } + + ActorMessage::AdjustInterval { new_interval } => { + // Queue interval change for application at next loop iteration + // This ensures the tokio interval timer is properly recreated + info!( + current_interval_ms = self.current_interval.as_millis(), + new_interval_ms = new_interval.as_millis(), + is_backpressure = new_interval > self.original_interval, + "Interval adjustment queued" + ); + self.pending_interval = Some(new_interval); + false + } + } + } + + /// Builds health check response data. + fn build_health_data(&self) -> HealthCheckData { + HealthCheckData { + state: self.state, + collection_interval: self.current_interval, + original_interval: self.original_interval, + event_bus_connected: self.event_bus_connected, + buffer_level_percent: self.buffer_level_percent, + last_collection: self.last_collection, + collection_cycles: self.stats.collection_cycles.load(Ordering::Relaxed), + lifecycle_events: self.stats.lifecycle_events.load(Ordering::Relaxed), + collection_errors: self.stats.collection_errors.load(Ordering::Relaxed), + backpressure_events: self.stats.backpressure_events.load(Ordering::Relaxed), + } + } + + /// Applies a configuration update. + fn apply_config_update(&mut self, new_config: ProcmondMonitorConfig) { + // Hot-reloadable settings: + // - collection_interval + // - lifecycle_config thresholds + // + // Requires restart (changes have no effect until restart): + // - max_events_in_flight (semaphore capacity cannot be resized at runtime) + // - process_config.excluded_pids (affects collector initialization) + // - enable_event_driven (requires recreating event bus) + + let old_max_in_flight = self.config.base_config.max_events_in_flight; + let new_max_in_flight = new_config.base_config.max_events_in_flight; + + info!( + old_interval_secs = self.config.base_config.collection_interval.as_secs(), + new_interval_secs = new_config.base_config.collection_interval.as_secs(), + "Applying configuration update at cycle boundary" + ); + + // Warn if max_events_in_flight changed (not hot-reloadable) + if old_max_in_flight != new_max_in_flight { + warn!( + old_max_events_in_flight = old_max_in_flight, + requested_max_events_in_flight = new_max_in_flight, + "max_events_in_flight is not hot-reloadable; \ + semaphore capacity will remain unchanged until restart" + ); + } + + // Update config + self.config = new_config; + + debug!("Configuration update applied successfully"); + } + + /// Internal collection and analysis method used by the actor loop. + /// + /// This method: + /// 1. Collects process data from the system + /// 2. Performs lifecycle analysis to detect changes + /// 3. Publishes events via EventBusConnector (if configured) to `events.process.*` topics + /// 4. Sends events to the downstream collector-core channel + #[instrument(skip(self, tx), fields(source = "procmond-monitor-collector"))] + async fn collect_and_analyze_internal( + &mut self, tx: &mpsc::Sender, - shutdown_signal: &Arc, ) -> anyhow::Result<()> { let timer = PerformanceTimer::start("procmond_monitor_collection".to_owned()); let collection_start = Instant::now(); - // Check for shutdown before starting - if shutdown_signal.load(Ordering::Relaxed) { - debug!("Shutdown signal detected, skipping collection"); + // Check state before starting + if self.state != CollectorState::Running { + debug!(state = %self.state, "Skipping collection, not in Running state"); return Ok(()); } @@ -189,7 +886,7 @@ impl ProcmondMonitorCollector { } }; - // Perform lifecycle analysis + // Perform lifecycle analysis to detect process starts, stops, and modifications let lifecycle_events = { let mut tracker = self.lifecycle_tracker.lock().await; match tracker.update_and_detect_changes(process_events.clone()) { @@ -210,19 +907,70 @@ impl ProcmondMonitorCollector { .lifecycle_events .fetch_add(event_count, Ordering::Relaxed); - // Send process events with backpressure handling + // Build a map from PID to event type from lifecycle analysis + // Only processes with lifecycle events (start, stop, modify) get published + use std::collections::HashMap; + let mut pid_to_event_type: HashMap = HashMap::new(); + for lifecycle_event in &lifecycle_events { + #[allow(clippy::pattern_type_mismatch)] // Matching on &enum variant is idiomatic + match lifecycle_event { + ProcessLifecycleEvent::Start { process, .. } => { + pid_to_event_type.insert(process.pid, ProcessEventType::Start); + } + ProcessLifecycleEvent::Stop { process, .. } => { + pid_to_event_type.insert(process.pid, ProcessEventType::Stop); + } + ProcessLifecycleEvent::Modified { current, .. } => { + pid_to_event_type.insert(current.pid, ProcessEventType::Modify); + } + ProcessLifecycleEvent::Suspicious { process, .. } => { + // Suspicious events are also published as modifications + pid_to_event_type.insert(process.pid, ProcessEventType::Modify); + } + } + } + + // Publish process events via EventBusConnector if configured + // Events go to events.process.start, events.process.stop, or events.process.modify + // Only publish events that have a lifecycle change (skip unchanged processes) + if let Some(ref mut connector) = self.event_bus_connector { + // Update connection status + self.event_bus_connected = connector.is_connected(); + self.buffer_level_percent = Some(connector.buffer_usage_percent()); + + for process_event in &process_events { + // Only publish if there's a corresponding lifecycle event + if let Some(&event_type) = pid_to_event_type.get(&process_event.pid) { + match connector.publish(process_event.clone(), event_type).await { + Ok(sequence) => { + debug!( + pid = process_event.pid, + event_type = ?event_type, + sequence = sequence, + "Published process event to EventBus" + ); + } + Err(e) => { + warn!( + pid = process_event.pid, + error = %e, + "Failed to publish to EventBus (event buffered)" + ); + } + } + } + } + } + + // Send process events to collector-core channel with backpressure handling for process_event in process_events { - if shutdown_signal.load(Ordering::Relaxed) { - debug!("Shutdown signal detected during event emission"); + if self.state != CollectorState::Running { + debug!("State changed during event emission, stopping"); break; } if let Err(e) = self - .send_event_with_backpressure( - tx, - CollectionEvent::Process(process_event), - shutdown_signal, - ) + .send_event_with_backpressure_internal(tx, CollectionEvent::Process(process_event)) .await { error!(error = %e, "Failed to send process event"); @@ -243,12 +991,11 @@ impl ProcmondMonitorCollector { Ok(()) } - /// Sends an event with backpressure handling. - async fn send_event_with_backpressure( + /// Internal backpressure handling for sending events. + async fn send_event_with_backpressure_internal( &self, tx: &mpsc::Sender, event: CollectionEvent, - shutdown_signal: &Arc, ) -> anyhow::Result<()> { const CIRCUIT_BREAKER_THRESHOLD: usize = 5; const CIRCUIT_BREAKER_COOLDOWN_SECS: u64 = 10; @@ -265,7 +1012,6 @@ impl ProcmondMonitorCollector { }; if let Some(cooldown_until) = cooldown_until_opt { if Instant::now() < cooldown_until { - // Circuit breaker is active, increment backpressure metric and return error self.stats .backpressure_events .fetch_add(1, Ordering::Relaxed); @@ -288,13 +1034,11 @@ impl ProcmondMonitorCollector { // Try non-blocking acquire first let permit = match self.backpressure_semaphore.try_acquire() { Ok(permit) => { - // Successfully acquired, reset consecutive timeout counter self.consecutive_backpressure_timeouts .store(0, Ordering::Relaxed); permit } Err(_) => { - // No permit available, try blocking acquire with timeout match timeout( Duration::from_secs(5), self.backpressure_semaphore.acquire(), @@ -302,17 +1046,14 @@ impl ProcmondMonitorCollector { .await { Ok(Ok(permit)) => { - // Successfully acquired after waiting, reset counter self.consecutive_backpressure_timeouts .store(0, Ordering::Relaxed); permit } Ok(Err(_)) => { - // Semaphore closed return Err(anyhow::anyhow!("Backpressure semaphore closed")); } Err(_) => { - // Timeout acquiring permit let previous = self .consecutive_backpressure_timeouts .fetch_add(1, Ordering::Relaxed); @@ -326,13 +1067,11 @@ impl ProcmondMonitorCollector { "Backpressure timeout while acquiring permit" ); - // Activate circuit breaker if threshold reached if consecutive >= CIRCUIT_BREAKER_THRESHOLD { - #[allow(clippy::arithmetic_side_effects)] // Safe: Instant + Duration + #[allow(clippy::arithmetic_side_effects)] let cooldown_until = Instant::now() + Duration::from_secs(CIRCUIT_BREAKER_COOLDOWN_SECS); #[allow(clippy::expect_used)] - // Mutex poisoning indicates a panic - propagate it let mut guard = self .circuit_breaker_until .lock() @@ -365,9 +1104,7 @@ impl ProcmondMonitorCollector { match send_result { Ok(Ok(())) => Ok(()), Ok(Err(_)) => { - if !shutdown_signal.load(Ordering::Relaxed) { - warn!("Event channel closed during send"); - } + warn!("Event channel closed during send"); Err(anyhow::anyhow!("Event channel closed")) } Err(_) => { @@ -395,94 +1132,44 @@ impl EventSource for ProcmondMonitorCollector { caps } - #[instrument(skip(self, tx), fields(source = "procmond-monitor-collector"))] + /// Legacy start method - use `run()` for actor-based operation. + /// + /// This method is retained for API compatibility with the EventSource trait, + /// but the actor-based `run()` method should be used for new code. + #[instrument( + skip(self, _tx, _shutdown_signal), + fields(source = "procmond-monitor-collector") + )] async fn start( &self, - tx: mpsc::Sender, + _tx: mpsc::Sender, _shutdown_signal: Arc, ) -> anyhow::Result<()> { - const MAX_CONSECUTIVE_FAILURES: u32 = 5; - - info!( - collection_interval_secs = self.config.base_config.collection_interval.as_secs(), - max_events_in_flight = self.config.base_config.max_events_in_flight, - event_driven = self.config.base_config.enable_event_driven, - "Starting Procmond Monitor Collector" + // In actor mode, this method should not be called directly. + // The actor is started via the `run()` method instead. + warn!( + "EventSource::start() called on actor-based collector. \ + Use ProcmondMonitorCollector::run() instead for actor-based operation." ); - - // Main collection loop - let mut collection_interval = interval(self.config.base_config.collection_interval); - let mut consecutive_failures = 0_u32; - - // Skip first tick to avoid immediate collection - collection_interval.tick().await; - - loop { - tokio::select! { - _ = collection_interval.tick() => { - // Check for shutdown - if self.shutdown_signal.load(Ordering::Relaxed) { - info!("Shutdown signal received, stopping Procmond Monitor Collector"); - break; - } - - // Perform collection and analysis - match self.collect_and_analyze(&tx, &self.shutdown_signal).await { - Ok(()) => { - consecutive_failures = 0; - } - Err(e) => { - error!(error = %e, "Procmond monitor collection failed"); - consecutive_failures = consecutive_failures.saturating_add(1); - - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - error!( - consecutive_failures = consecutive_failures, - "Too many consecutive failures, stopping Procmond Monitor Collector" - ); - return Err(anyhow::anyhow!( - "Procmond Monitor Collector failed {consecutive_failures} consecutive times" - )); - } - - // Exponential backoff for failures - let backoff_duration = Duration::from_secs(2_u64.pow(consecutive_failures.min(6))); - warn!( - backoff_seconds = backoff_duration.as_secs(), - "Applying backoff after collection failure" - ); - tokio::time::sleep(backoff_duration).await; - } - } - } - - () = async { - while !self.shutdown_signal.load(Ordering::Relaxed) { - tokio::time::sleep(Duration::from_millis(100)).await; - } - } => { - info!("Shutdown signal received in monitoring loop"); - break; - } - } - } - - info!("Procmond Monitor Collector stopped successfully"); - Ok(()) + Err(anyhow::anyhow!( + "EventSource::start() is deprecated for actor-based collectors. \ + Use ProcmondMonitorCollector::run() instead." + )) } async fn stop(&self) -> anyhow::Result<()> { - info!("Stopping Procmond Monitor Collector"); - - // Signal shutdown - self.shutdown_signal.store(true, Ordering::Relaxed); - + // In actor mode, shutdown is handled via ActorHandle::graceful_shutdown() + info!("EventSource::stop() called - use ActorHandle::graceful_shutdown() for actor mode"); Ok(()) } async fn health_check(&self) -> anyhow::Result<()> { - // Use the monitor collector trait health check - self.monitor_health_check().await + // In actor mode, health check is handled via ActorHandle::health_check() + // This basic check verifies the collector is in a valid state + if self.state == CollectorState::Stopped { + return Err(anyhow::anyhow!("Collector is stopped")); + } + Ok(()) } } @@ -515,15 +1202,25 @@ mod tests { Arc::new(Mutex::new(db_manager)) } + /// Helper to create a collector with its actor channel + fn create_collector_with_channel( + db_manager: Arc>, + config: ProcmondMonitorConfig, + ) -> anyhow::Result<(ProcmondMonitorCollector, ActorHandle)> { + let (handle, receiver) = ProcmondMonitorCollector::create_channel(); + let collector = ProcmondMonitorCollector::new(db_manager, config, receiver)?; + Ok((collector, handle)) + } + #[tokio::test] async fn test_procmond_monitor_collector_creation() { let db_manager = create_test_database().await; let config = ProcmondMonitorConfig::default(); - let collector = ProcmondMonitorCollector::new(db_manager, config); - assert!(collector.is_ok()); + let result = create_collector_with_channel(db_manager, config); + assert!(result.is_ok()); - let collector = collector.unwrap(); + let (collector, _handle) = result.unwrap(); assert_eq!(collector.name(), "procmond-monitor-collector"); let caps = collector.capabilities(); @@ -544,7 +1241,8 @@ mod tests { ..Default::default() }; - let collector = ProcmondMonitorCollector::new(db_manager.clone(), fast_config).unwrap(); + let (collector, _handle) = + create_collector_with_channel(db_manager.clone(), fast_config).unwrap(); let caps = collector.capabilities(); assert!(caps.contains(SourceCaps::REALTIME)); @@ -557,7 +1255,7 @@ mod tests { ..Default::default() }; - let collector = ProcmondMonitorCollector::new(db_manager, slow_config).unwrap(); + let (collector, _handle) = create_collector_with_channel(db_manager, slow_config).unwrap(); let caps = collector.capabilities(); assert!(!caps.contains(SourceCaps::REALTIME)); } @@ -567,9 +1265,9 @@ mod tests { let db_manager = create_test_database().await; let config = ProcmondMonitorConfig::default(); - let collector = ProcmondMonitorCollector::new(db_manager, config).unwrap(); + let (collector, _handle) = create_collector_with_channel(db_manager, config).unwrap(); - // Initial health check should pass + // Initial health check should pass (collector is in WaitingForAgent state) let health_result = collector.health_check().await; assert!(health_result.is_ok()); } @@ -579,7 +1277,7 @@ mod tests { let db_manager = create_test_database().await; let config = ProcmondMonitorConfig::default(); - let collector = ProcmondMonitorCollector::new(db_manager, config).unwrap(); + let (collector, _handle) = create_collector_with_channel(db_manager, config).unwrap(); // Initial statistics should be zero let stats = collector.stats(); @@ -594,6 +1292,46 @@ mod tests { assert_eq!(stats.backpressure_events, 0); } + #[tokio::test] + async fn test_actor_handle_operations() { + let db_manager = create_test_database().await; + let config = ProcmondMonitorConfig::default(); + + let (collector, handle) = create_collector_with_channel(db_manager, config).unwrap(); + + // Verify initial state + assert_eq!(collector.state, CollectorState::WaitingForAgent); + + // Test that handle methods work (before actor is running, they should fail) + // This is expected because the receiver is held by the collector + assert!(!handle.is_closed()); + } + + #[tokio::test] + async fn test_collector_state_display() { + assert_eq!( + CollectorState::WaitingForAgent.to_string(), + "waiting_for_agent" + ); + assert_eq!(CollectorState::Running.to_string(), "running"); + assert_eq!(CollectorState::ShuttingDown.to_string(), "shutting_down"); + assert_eq!(CollectorState::Stopped.to_string(), "stopped"); + } + + #[tokio::test] + async fn test_health_check_data() { + let db_manager = create_test_database().await; + let config = ProcmondMonitorConfig::default(); + + let (collector, _handle) = create_collector_with_channel(db_manager, config).unwrap(); + + let health_data = collector.build_health_data(); + assert_eq!(health_data.state, CollectorState::WaitingForAgent); + assert!(!health_data.event_bus_connected); + assert!(health_data.last_collection.is_none()); + assert_eq!(health_data.collection_cycles, 0); + } + #[test] fn test_config_validation() { // Test valid configuration @@ -610,4 +1348,9 @@ mod tests { }; assert!(invalid_interval_config.validate().is_err()); } + + #[test] + fn test_actor_channel_capacity() { + assert_eq!(ACTOR_CHANNEL_CAPACITY, 100); + } }