Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 278 additions & 63 deletions procmond/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
#![forbid(unsafe_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect formatting, autoformat by running qlty fmt. [rustfmt:fmt]


use clap::Parser;
use collector_core::{Collector, CollectorConfig, CollectorRegistrationConfig};
use collector_core::{CollectionEvent, Collector, CollectorConfig, CollectorRegistrationConfig};
use daemoneye_lib::{config, storage, telemetry};
use procmond::{ProcessEventSource, ProcessSourceConfig};
use procmond::{
ProcessEventSource, ProcessSourceConfig,
event_bus_connector::EventBusConnector,
monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig},
};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::info;
use tokio::sync::{Mutex, mpsc};
use tracing::{error, info, warn};

/// Parse and validate the collection interval argument.
///
Expand Down Expand Up @@ -98,69 +103,279 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
"Database stats retrieved"
);

// Create collector configuration
let mut collector_config = CollectorConfig::new()
.with_component_name("procmond".to_owned())
.with_ipc_endpoint(daemoneye_lib::ipc::IpcConfig::default().endpoint_path)
.with_max_event_sources(1)
.with_event_buffer_size(1000)
.with_shutdown_timeout(Duration::from_secs(30))
.with_health_check_interval(Duration::from_secs(60))
.with_telemetry(true)
.with_debug_logging(cli.log_level == "debug");

// Enable broker registration for RPC service
// Note: In a real deployment, the broker would be provided via configuration
// For now, we'll configure registration but it will only work if a broker is available
collector_config.registration = Some(CollectorRegistrationConfig {
enabled: true,
broker: None, // Will be set if broker is available via environment/config
collector_id: Some("procmond".to_owned()),
collector_type: Some("procmond".to_owned()),
topic: "control.collector.registration".to_owned(),
timeout: Duration::from_secs(10),
retry_attempts: 3,
heartbeat_interval: Duration::from_secs(30),
attributes: HashMap::new(),
});

// Create process source configuration
let process_config = ProcessSourceConfig {
collection_interval: Duration::from_secs(cli.interval),
collect_enhanced_metadata: cli.enhanced_metadata,
max_processes_per_cycle: cli.max_processes,
compute_executable_hashes: cli.compute_hashes,
..Default::default()
};

// Create process event source
let process_source = ProcessEventSource::with_config(db_manager, process_config);

// Log RPC service status before moving collector_config
// The RPC service will be automatically started by collector-core after broker registration
let registration_enabled = collector_config
.registration
.as_ref()
.is_some_and(|r| r.enabled);
let collector_id_str = collector_config
.registration
.as_ref()
.and_then(|r| r.collector_id.as_deref())
.unwrap_or("procmond");

if registration_enabled {
// Check for broker configuration via environment variable
// DAEMONEYE_BROKER_SOCKET: If set, use actor mode with EventBusConnector
// If not set, use standalone mode with collector-core
let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok();
Comment on lines +108 to +109
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation: The DAEMONEYE_BROKER_SOCKET environment variable is read but not validated. If it's set to an empty string or invalid path, the connection will fail later with unclear errors. Consider validating the path exists and is accessible, or at minimum check for empty string before proceeding with actor mode initialization.

Suggested change
// If not set, use standalone mode with collector-core
let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok();
// If not set or invalid, use standalone mode with collector-core
let broker_socket = match std::env::var("DAEMONEYE_BROKER_SOCKET") {
Ok(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
warn!(
"DAEMONEYE_BROKER_SOCKET is set but empty; falling back to standalone mode"
);
None
} else {
let path = std::path::Path::new(trimmed);
if !path.exists() {
warn!(
socket_path = %trimmed,
"DAEMONEYE_BROKER_SOCKET points to a non-existent path; falling back to standalone mode"
);
None
} else {
Some(trimmed.to_owned())
}
}
}
Err(_) => None,
};

Copilot uses AI. Check for mistakes.

if let Some(ref socket_path) = broker_socket {
// ========================================================================
// Actor Mode: Use ProcmondMonitorCollector with EventBusConnector
// ========================================================================
info!(
collector_id = %collector_id_str,
"RPC service will be initialized after broker registration"
socket_path = %socket_path,
"Broker socket configured, starting in actor mode"
);
}

// Create and configure collector
let mut collector = Collector::new(collector_config);
collector.register(Box::new(process_source))?;
// Create actor channel (bounded, capacity: 100)
let (actor_handle, message_receiver) = ProcmondMonitorCollector::create_channel();

// Create ProcmondMonitorConfig from CLI arguments
let monitor_config = ProcmondMonitorConfig {
base_config: collector_core::MonitorCollectorConfig {
collection_interval: Duration::from_secs(cli.interval),
..Default::default()
},
process_config: procmond::process_collector::ProcessCollectionConfig {
collect_enhanced_metadata: cli.enhanced_metadata,
max_processes: cli.max_processes,
compute_executable_hashes: cli.compute_hashes,
..Default::default()
},
..Default::default()
};

// Create the actor-based collector
let mut collector = ProcmondMonitorCollector::new(
Arc::clone(&db_manager),
monitor_config,
message_receiver,
)?;

// Initialize EventBusConnector with WAL directory
let wal_dir = PathBuf::from(&cli.database).parent().map_or_else(
|| PathBuf::from("/var/lib/daemoneye/wal"),
|p| p.join("wal"),
);
Comment on lines +146 to +149
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WAL directory derivation is fragile: Using parent of database path as fallback location could result in permission issues or unexpected behavior. If cli.database is "/" or "/file.db", the parent() call may return None or "/", and then join("wal") creates "/wal" which is unlikely to be writable. Consider using a well-defined default location like "/var/lib/daemoneye/wal" always, or requiring explicit WAL path configuration. The current fallback logic is error-prone.

Suggested change
let wal_dir = PathBuf::from(&cli.database).parent().map_or_else(
|| PathBuf::from("/var/lib/daemoneye/wal"),
|p| p.join("wal"),
);
// Use a fixed, well-defined WAL directory instead of deriving from the database path
let wal_dir = PathBuf::from("/var/lib/daemoneye/wal");

Copilot uses AI. Check for mistakes.

// Ensure WAL directory exists (fail-fast to avoid confusing WAL init failures)
std::fs::create_dir_all(&wal_dir).map_err(|e| {
error!(
wal_dir = ?wal_dir,
error = %e,
"Failed to create WAL directory"
);
e
})?;

let mut event_bus_connector = EventBusConnector::new(wal_dir).await?;

// Attempt to connect to the broker
match event_bus_connector.connect().await {
Ok(()) => {
info!("Connected to daemoneye-agent broker");

// Replay any events from WAL (crash recovery)
match event_bus_connector.replay_wal().await {
Ok(replayed) if replayed > 0 => {
info!(
replayed = replayed,
"Replayed events from WAL after connection"
);
}
Ok(_) => {
info!("No events to replay from WAL");
}
Err(e) => {
warn!(error = %e, "Failed to replay WAL, some events may be delayed");
}
}
}
Err(e) => {
warn!(
error = %e,
"Failed to connect to broker, will buffer events until connection available"
);
}
}

// Take the backpressure receiver before moving connector to collector
let backpressure_rx = event_bus_connector.take_backpressure_receiver();

// Set the EventBusConnector on the collector
collector.set_event_bus_connector(event_bus_connector);

// Spawn backpressure monitor task if we have the receiver
let original_interval = Duration::from_secs(cli.interval);
let backpressure_task = backpressure_rx.map_or_else(
|| {
warn!("Backpressure receiver not available, dynamic interval adjustment disabled");
None
},
|bp_rx| {
Some(ProcmondMonitorCollector::spawn_backpressure_monitor(
actor_handle.clone(),
bp_rx,
original_interval,
))
},
);

// Create event channel for the actor's output
let (event_tx, mut event_rx) = mpsc::channel::<CollectionEvent>(1000);

// Clone handle for shutdown task
let shutdown_handle = actor_handle.clone();

// Spawn task to handle graceful shutdown on Ctrl+C
let shutdown_task = tokio::spawn(async move {
// Wait for Ctrl+C
if let Err(e) = tokio::signal::ctrl_c().await {
error!(error = %e, "Failed to listen for Ctrl+C signal");
return;
}
info!("Received Ctrl+C, initiating graceful shutdown");

// Run the collector (this will handle IPC, event processing, and lifecycle management)
collector.run().await?;
// Send graceful shutdown to actor
match shutdown_handle.graceful_shutdown().await {
Ok(()) => info!("Actor shutdown completed successfully"),
Err(e) => error!(error = %e, "Actor shutdown failed"),
}
});
Comment on lines +229 to +234
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the shutdown_task, if graceful_shutdown() returns an error, execution continues without propagating the error or affecting the main shutdown flow. This could mask critical shutdown issues.

Consider storing the shutdown result and checking it after the tokio::select!, then propagating any error to the outer Result. This ensures shutdown errors are properly surfaced to the caller.

Copilot uses AI. Check for mistakes.
Comment on lines +220 to +234
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent shutdown semantics: The actor's run() method handles graceful shutdown internally and closes the EventBusConnector (lines 707-714), but the shutdown_task in main.rs also expects to coordinate shutdown via graceful_shutdown() message. If the actor stops due to consecutive failures or other internal reasons, the shutdown_task won't know and will continue waiting. Consider having the actor signal its completion reason (success vs error) so main.rs can handle accordingly.

Copilot uses AI. Check for mistakes.

// Startup behavior: begin monitoring immediately on launch.
//
// The collector currently does not wait for an explicit "begin monitoring"
// command from the agent. This makes procmond usable in isolation and in
// test environments without requiring the full agent/broker stack.
//
// If coordinated startup with the agent becomes a hard requirement in the
// future, this is the place to integrate a subscription to a
// `control.collector.lifecycle` (or similar) control topic and defer
// calling `begin_monitoring()` until the appropriate control message is
// received.
info!("Starting collection immediately on startup");
if let Err(e) = actor_handle.begin_monitoring() {
error!(error = %e, "Failed to send BeginMonitoring command");
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling: If begin_monitoring() fails (line 248), the error is logged but the actor task is still spawned and will remain in WaitingForAgent state forever. The actor will never collect data. Consider returning an error and aborting startup if begin_monitoring() fails, or implementing a retry mechanism.

Suggested change
error!(error = %e, "Failed to send BeginMonitoring command");
error!(error = %e, "Failed to send BeginMonitoring command");
// Abort startup: without a successful BeginMonitoring command, the actor
// would run but never collect data, remaining stuck in an idle state.
std::process::exit(1);

Copilot uses AI. Check for mistakes.
}

// Spawn the actor task
let actor_task = tokio::spawn(async move {
if let Err(e) = collector.run(event_tx).await {
error!(error = %e, "Actor run loop failed");
}
});

// Spawn task to consume events from the actor (logging only for now)
let event_consumer_task = tokio::spawn(async move {
let mut event_count = 0_u64;
while let Some(event) = event_rx.recv().await {
event_count = event_count.saturating_add(1);
if event_count.is_multiple_of(100) {
info!(total_events = event_count, "Processing collection events");
}
// In a full implementation, events would be sent to downstream processors
match event {
CollectionEvent::Process(pe) => {
tracing::trace!(pid = pe.pid, name = %pe.name, "Received process event");
}
CollectionEvent::Network(_)
| CollectionEvent::Filesystem(_)
| CollectionEvent::Performance(_)
| CollectionEvent::TriggerRequest(_) => {
tracing::trace!("Received non-process event");
}
}
}
info!(total_events = event_count, "Event consumer task exiting");
});
Comment on lines +260 to +281
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event consumer task (lines 260-281) silently discards all events - it only logs them. This means all process events collected by the actor are lost. The PR description mentions this is "logging only for now" but provides no path forward. This breaks the core functionality of the collector. Either implement proper event handling (send to downstream processors) or add a clear TODO with tracking issue. For production use, events should at least be persisted to the database.

Copilot uses AI. Check for mistakes.

// Wait for actor to complete (either by shutdown or error)
tokio::select! {
result = actor_task => {
if let Err(e) = result {
error!(error = %e, "Actor task panicked");
}
}
_ = shutdown_task => {
info!("Shutdown task completed");
}
}

// Clean up backpressure monitor task
if let Some(bp_task) = backpressure_task {
bp_task.abort();
info!("Backpressure monitor task aborted");
Comment on lines +297 to +298
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete shutdown: The backpressure monitor task is aborted (line 297), but the spawned actor_task is only awaited via tokio::select! which completes as soon as either actor_task or shutdown_task finishes. If shutdown_task finishes first (normal case), the actor_task may still be running. The actor should be awaited after the select! to ensure it completes. Additionally, consider using a timeout or explicit cancellation rather than abort() for the backpressure task to allow graceful cleanup.

Suggested change
bp_task.abort();
info!("Backpressure monitor task aborted");
// Prefer graceful shutdown with a bounded timeout; abort only as a last resort.
match tokio::time::timeout(Duration::from_secs(5), bp_task).await {
Ok(Ok(())) => {
info!("Backpressure monitor task completed successfully");
}
Ok(Err(e)) => {
error!(error = %e, "Backpressure monitor task join error");
}
Err(_) => {
warn!("Backpressure monitor task did not complete within timeout; aborting");
// On timeout, forcefully abort and await the join to ensure completion.
// This avoids leaving a detached task running past shutdown.
if let Some(mut bp_task) = backpressure_task {
bp_task.abort();
if let Err(e) = bp_task.await {
error!(error = %e, "Backpressure monitor task join error after abort");
} else {
info!("Backpressure monitor task aborted and joined");
}
}
}
}

Copilot uses AI. Check for mistakes.
}

// Wait for event consumer to exit naturally (channel sender is dropped)
// Use a timeout to avoid hanging indefinitely
match tokio::time::timeout(Duration::from_secs(5), event_consumer_task).await {
Ok(Ok(())) => info!("Event consumer task completed successfully"),
Ok(Err(e)) => error!(error = %e, "Event consumer task join error"),
Err(_) => {
warn!("Event consumer task did not complete within timeout");
}
}

info!("Procmond actor mode shutdown complete");
} else {
// ========================================================================
// Standalone Mode: Use ProcessEventSource with collector-core
// ========================================================================
info!("No broker socket configured, starting in standalone mode");

// Create collector configuration
let mut collector_config = CollectorConfig::new()
.with_component_name("procmond".to_owned())
.with_ipc_endpoint(daemoneye_lib::ipc::IpcConfig::default().endpoint_path)
.with_max_event_sources(1)
.with_event_buffer_size(1000)
.with_shutdown_timeout(Duration::from_secs(30))
.with_health_check_interval(Duration::from_secs(60))
.with_telemetry(true)
.with_debug_logging(cli.log_level == "debug");

// Enable broker registration for RPC service (if broker becomes available)
collector_config.registration = Some(CollectorRegistrationConfig {
enabled: true,
broker: None,
collector_id: Some("procmond".to_owned()),
collector_type: Some("procmond".to_owned()),
topic: "control.collector.registration".to_owned(),
timeout: Duration::from_secs(10),
retry_attempts: 3,
heartbeat_interval: Duration::from_secs(30),
attributes: HashMap::new(),
});

// Create process source configuration
let process_config = ProcessSourceConfig {
collection_interval: Duration::from_secs(cli.interval),
collect_enhanced_metadata: cli.enhanced_metadata,
max_processes_per_cycle: cli.max_processes,
compute_executable_hashes: cli.compute_hashes,
..Default::default()
};

// Create process event source
let process_source = ProcessEventSource::with_config(db_manager, process_config);

// Log RPC service status
let registration_enabled = collector_config
.registration
.as_ref()
.is_some_and(|r| r.enabled);
let collector_id_str = collector_config
.registration
.as_ref()
.and_then(|r| r.collector_id.as_deref())
.unwrap_or("procmond");

if registration_enabled {
info!(
collector_id = %collector_id_str,
"RPC service will be initialized after broker registration"
);
}

// Create and configure collector
let mut collector = Collector::new(collector_config);
collector.register(Box::new(process_source))?;

// Run the collector (handles IPC, event processing, and lifecycle management)
collector.run().await?;
}

Ok(())
Copy link
Contributor

@qltysh qltysh bot Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with high complexity (count = 29): main [qlty:function-complexity]

}
Loading
Loading