-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(procmond): implement actor pattern and startup coordination #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f1f32c9
e87a821
a527fd4
401b3ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,14 +1,19 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| #![forbid(unsafe_code)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use clap::Parser; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use collector_core::{Collector, CollectorConfig, CollectorRegistrationConfig}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use collector_core::{CollectionEvent, Collector, CollectorConfig, CollectorRegistrationConfig}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use daemoneye_lib::{config, storage, telemetry}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use procmond::{ProcessEventSource, ProcessSourceConfig}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use procmond::{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ProcessEventSource, ProcessSourceConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| event_bus_connector::EventBusConnector, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::collections::HashMap; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::path::PathBuf; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use std::time::Duration; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use tokio::sync::Mutex; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use tracing::info; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use tokio::sync::{Mutex, mpsc}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| use tracing::{error, info, warn}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Parse and validate the collection interval argument. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -98,69 +103,279 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
| "Database stats retrieved" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create collector configuration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let mut collector_config = CollectorConfig::new() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_component_name("procmond".to_owned()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_ipc_endpoint(daemoneye_lib::ipc::IpcConfig::default().endpoint_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_max_event_sources(1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_event_buffer_size(1000) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_shutdown_timeout(Duration::from_secs(30)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_health_check_interval(Duration::from_secs(60)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_telemetry(true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .with_debug_logging(cli.log_level == "debug"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Enable broker registration for RPC service | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Note: In a real deployment, the broker would be provided via configuration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // For now, we'll configure registration but it will only work if a broker is available | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| collector_config.registration = Some(CollectorRegistrationConfig { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| enabled: true, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| broker: None, // Will be set if broker is available via environment/config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| collector_id: Some("procmond".to_owned()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| collector_type: Some("procmond".to_owned()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| topic: "control.collector.registration".to_owned(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timeout: Duration::from_secs(10), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| retry_attempts: 3, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| heartbeat_interval: Duration::from_secs(30), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| attributes: HashMap::new(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create process source configuration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let process_config = ProcessSourceConfig { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| collection_interval: Duration::from_secs(cli.interval), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| collect_enhanced_metadata: cli.enhanced_metadata, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_processes_per_cycle: cli.max_processes, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| compute_executable_hashes: cli.compute_hashes, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ..Default::default() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create process event source | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let process_source = ProcessEventSource::with_config(db_manager, process_config); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Log RPC service status before moving collector_config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // The RPC service will be automatically started by collector-core after broker registration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let registration_enabled = collector_config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .registration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .as_ref() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .is_some_and(|r| r.enabled); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let collector_id_str = collector_config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .registration | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .as_ref() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .and_then(|r| r.collector_id.as_deref()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .unwrap_or("procmond"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if registration_enabled { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check for broker configuration via environment variable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // DAEMONEYE_BROKER_SOCKET: If set, use actor mode with EventBusConnector | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If not set, use standalone mode with collector-core | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+108
to
+109
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If not set, use standalone mode with collector-core | |
| let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok(); | |
| // If not set or invalid, use standalone mode with collector-core | |
| let broker_socket = match std::env::var("DAEMONEYE_BROKER_SOCKET") { | |
| Ok(raw) => { | |
| let trimmed = raw.trim(); | |
| if trimmed.is_empty() { | |
| warn!( | |
| "DAEMONEYE_BROKER_SOCKET is set but empty; falling back to standalone mode" | |
| ); | |
| None | |
| } else { | |
| let path = std::path::Path::new(trimmed); | |
| if !path.exists() { | |
| warn!( | |
| socket_path = %trimmed, | |
| "DAEMONEYE_BROKER_SOCKET points to a non-existent path; falling back to standalone mode" | |
| ); | |
| None | |
| } else { | |
| Some(trimmed.to_owned()) | |
| } | |
| } | |
| } | |
| Err(_) => None, | |
| }; |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WAL directory derivation is fragile: Using parent of database path as fallback location could result in permission issues or unexpected behavior. If cli.database is "/" or "/file.db", the parent() call may return None or "/", and then join("wal") creates "/wal" which is unlikely to be writable. Consider using a well-defined default location like "/var/lib/daemoneye/wal" always, or requiring explicit WAL path configuration. The current fallback logic is error-prone.
| let wal_dir = PathBuf::from(&cli.database).parent().map_or_else( | |
| || PathBuf::from("/var/lib/daemoneye/wal"), | |
| |p| p.join("wal"), | |
| ); | |
| // Use a fixed, well-defined WAL directory instead of deriving from the database path | |
| let wal_dir = PathBuf::from("/var/lib/daemoneye/wal"); |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the shutdown_task, if graceful_shutdown() returns an error, execution continues without propagating the error or affecting the main shutdown flow. This could mask critical shutdown issues.
Consider storing the shutdown result and checking it after the tokio::select!, then propagating any error to the outer Result. This ensures shutdown errors are properly surfaced to the caller.
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent shutdown semantics: The actor's run() method handles graceful shutdown internally and closes the EventBusConnector (lines 707-714), but the shutdown_task in main.rs also expects to coordinate shutdown via graceful_shutdown() message. If the actor stops due to consecutive failures or other internal reasons, the shutdown_task won't know and will continue waiting. Consider having the actor signal its completion reason (success vs error) so main.rs can handle accordingly.
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling: If begin_monitoring() fails (line 248), the error is logged but the actor task is still spawned and will remain in WaitingForAgent state forever. The actor will never collect data. Consider returning an error and aborting startup if begin_monitoring() fails, or implementing a retry mechanism.
| error!(error = %e, "Failed to send BeginMonitoring command"); | |
| error!(error = %e, "Failed to send BeginMonitoring command"); | |
| // Abort startup: without a successful BeginMonitoring command, the actor | |
| // would run but never collect data, remaining stuck in an idle state. | |
| std::process::exit(1); |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The event consumer task (lines 260-281) silently discards all events - it only logs them. This means all process events collected by the actor are lost. The PR description mentions this is "logging only for now" but provides no path forward. This breaks the core functionality of the collector. Either implement proper event handling (send to downstream processors) or add a clear TODO with tracking issue. For production use, events should at least be persisted to the database.
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete shutdown: The backpressure monitor task is aborted (line 297), but the spawned actor_task is only awaited via tokio::select! which completes as soon as either actor_task or shutdown_task finishes. If shutdown_task finishes first (normal case), the actor_task may still be running. The actor should be awaited after the select! to ensure it completes. Additionally, consider using a timeout or explicit cancellation rather than abort() for the backpressure task to allow graceful cleanup.
| bp_task.abort(); | |
| info!("Backpressure monitor task aborted"); | |
| // Prefer graceful shutdown with a bounded timeout; abort only as a last resort. | |
| match tokio::time::timeout(Duration::from_secs(5), bp_task).await { | |
| Ok(Ok(())) => { | |
| info!("Backpressure monitor task completed successfully"); | |
| } | |
| Ok(Err(e)) => { | |
| error!(error = %e, "Backpressure monitor task join error"); | |
| } | |
| Err(_) => { | |
| warn!("Backpressure monitor task did not complete within timeout; aborting"); | |
| // On timeout, forcefully abort and await the join to ensure completion. | |
| // This avoids leaving a detached task running past shutdown. | |
| if let Some(mut bp_task) = backpressure_task { | |
| bp_task.abort(); | |
| if let Err(e) = bp_task.await { | |
| error!(error = %e, "Backpressure monitor task join error after abort"); | |
| } else { | |
| info!("Backpressure monitor task aborted and joined"); | |
| } | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect formatting, autoformat by running
qlty fmt. [rustfmt:fmt]