Skip to content
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ flowchart LR

- **Edition**: Rust 2024 (MSRV: 1.91+)
- **Linting**: `cargo clippy -- -D warnings` (zero warnings)
- **Format args**: Use `{variable}` inlined syntax in `format!`/`anyhow!` macros (`clippy::uninlined_format_args`)
- **If-else ordering**: Clippy prefers `==` checks first in if-else (`clippy::unnecessary_negation`)
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lint reference here looks incorrect: the Clippy lint that suggests flipping if !cond { ... } else { ... } into a positive condition is clippy::if_not_else (and related lints), not clippy::unnecessary_negation.

Consider updating this bullet to reference the correct lint (or rewording it more generally as “prefer positive conditions / avoid if !cond when possible”).

Suggested change
- **If-else ordering**: Clippy prefers `==` checks first in if-else (`clippy::unnecessary_negation`)
- **If conditions**: Prefer positive conditions; avoid `if !cond { .. } else { .. }` when possible (`clippy::if_not_else`)

Copilot uses AI. Check for mistakes.
- **Safety**: `unsafe_code = "forbid"` at workspace level
- **Formatting**: `rustfmt` with 119 char line length
- **Rustdoc**: Escape brackets in paths like `/proc/\[pid\]/stat` to avoid broken link warnings
Expand Down
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ bytes = "1.11.0"
chrono = { version = "0.4.43", features = ["serde"] }

# CLI and configuration
clap = { version = "4.5.55", features = ["derive"] }
clap = { version = "4.5.56", features = ["derive"] }

# Internal libraries
collector-core = { path = "collector-core" }
Expand All @@ -80,12 +80,12 @@ futures-util = "0.3.31"

# System information and IPC
hostname-validator = "1.1.1"
insta = { version = "1.46.1", features = ["filters"] }
insta = { version = "1.46.2", features = ["filters"] }
interprocess = { version = "2.2.3", features = ["tokio"] }
parking_lot = "0.12.5"

# Serialization
postcard = { version = "1.1", features = ["alloc"] }
postcard = { version = "1.1.3", features = ["alloc"] }

# Testing utilities
predicates = "3.1.3"
Expand Down
1 change: 1 addition & 0 deletions mise.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ lychee = "0.22.0"
markdownlint-cli2 = "0.20.0"
protobuf = "33.4"
pre-commit = "4.5.1"
protoc = "33.4"
7 changes: 6 additions & 1 deletion procmond/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod event_source;
pub mod lifecycle;
pub mod monitor_collector;
pub mod process_collector;
pub mod registration;
pub mod rpc_service;
pub mod wal;

#[cfg(target_os = "linux")]
Expand All @@ -27,7 +29,10 @@ pub use lifecycle::{
LifecycleTrackingStats, ProcessLifecycleEvent, ProcessLifecycleTracker, ProcessSnapshot,
SuspiciousEventSeverity,
};
pub use monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig};
pub use monitor_collector::{
ACTOR_CHANNEL_CAPACITY, ActorError, ActorHandle, ActorMessage, CollectorState,
HealthCheckData as ActorHealthCheckData, ProcmondMonitorCollector, ProcmondMonitorConfig,
};
pub use process_collector::{
CollectionStats, FallbackProcessCollector, ProcessCollectionConfig, ProcessCollectionError,
ProcessCollectionResult, ProcessCollector, ProcessCollectorCapabilities,
Expand Down
130 changes: 121 additions & 9 deletions procmond/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use procmond::{
ProcessEventSource, ProcessSourceConfig,
event_bus_connector::EventBusConnector,
monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig},
registration::{RegistrationConfig, RegistrationManager, RegistrationState},
rpc_service::{RpcServiceConfig, RpcServiceHandler},
};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, mpsc};
use tracing::{error, info, warn};
use tokio::sync::{Mutex, RwLock, mpsc};
use tracing::{debug, error, info, warn};

/// Parse and validate the collection interval argument.
///
Expand Down Expand Up @@ -158,7 +160,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
e
})?;

let mut event_bus_connector = EventBusConnector::new(wal_dir).await?;
let mut event_bus_connector = EventBusConnector::new(wal_dir.clone()).await?;

// Attempt to connect to the broker
match event_bus_connector.connect().await {
Expand Down Expand Up @@ -189,15 +191,105 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

// Take the backpressure receiver before moving connector to collector
let backpressure_rx = event_bus_connector.take_backpressure_receiver();
// Wrap EventBusConnector in Arc<RwLock<>> for sharing between components
// Note: backpressure receiver is taken from collector_event_bus below, not here
let event_bus = Arc::new(RwLock::new(event_bus_connector));

// Set the EventBusConnector on the collector
collector.set_event_bus_connector(event_bus_connector);
// ========================================================================
// Initialize Registration Manager
// ========================================================================
let registration_config = RegistrationConfig::default();
let registration_manager = Arc::new(RegistrationManager::new(
Arc::clone(&event_bus),
actor_handle.clone(),
registration_config,
));

info!(
collector_id = %registration_manager.collector_id(),
"Registration manager initialized"
);

// Perform registration with daemoneye-agent
info!("Registering with daemoneye-agent");
match registration_manager.register().await {
Ok(response) => {
info!(
collector_id = %response.collector_id,
heartbeat_interval_ms = response.heartbeat_interval_ms,
assigned_topics = ?response.assigned_topics,
"Registration successful"
);
}
Err(e) => {
// Log warning but continue - procmond can operate without registration
// in standalone/development scenarios
warn!(
error = %e,
"Registration failed, continuing in standalone mode"
);
}
}

// Start heartbeat task (only publishes when registered)
let heartbeat_task =
RegistrationManager::spawn_heartbeat_task(Arc::clone(&registration_manager));
info!("Heartbeat task started");

Comment on lines +213 to +238
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegistrationManager::register() and the heartbeat task are invoked in actor mode, but the current RegistrationManager implementation explicitly simulates successful registration and does not publish registration/heartbeat messages to the broker yet. As written, procmond will log “Registration successful” and start the heartbeat loop even when the broker connection failed, which is misleading operationally. Consider gating registration/heartbeat startup on real broker connectivity and/or a feature flag until generic publish/subscribe is implemented in EventBusConnector.

Copilot uses AI. Check for mistakes.
// ========================================================================
// Initialize RPC Service Handler
// ========================================================================
let rpc_config = RpcServiceConfig::default();
let rpc_service =
RpcServiceHandler::new(actor_handle.clone(), Arc::clone(&event_bus), rpc_config);

info!(
control_topic = %rpc_service.config().control_topic,
"RPC service handler initialized"
);

// Create a separate EventBusConnector for the collector with its own WAL directory
// to avoid conflicts with the shared event_bus connector.
// Note: The collector takes ownership of its connector, while the registration
// and RPC services share a separate connector for control messages.
// TODO: Refactor to share the connector more elegantly when EventBusConnector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: Refactor to share the connector more elegantly when EventBusConnector [ripgrep:TODO]

// supports both ProcessEvent and generic message publishing.
let collector_wal_dir = wal_dir.join("collector");
std::fs::create_dir_all(&collector_wal_dir).map_err(|e| {
error!(
wal_dir = ?collector_wal_dir,
error = %e,
"Failed to create collector WAL directory"
);
e
})?;
let mut collector_event_bus = EventBusConnector::new(collector_wal_dir).await?;

// Connect collector's EventBusConnector and replay WAL (required for publishing)
match collector_event_bus.connect().await {
Ok(()) => {
info!("Collector EventBusConnector connected");
if let Err(e) = collector_event_bus.replay_wal().await {
warn!(error = %e, "Failed to replay collector WAL");
}
}
Err(e) => {
warn!(
error = %e,
"Collector EventBusConnector failed to connect, will buffer events"
);
}
}

// Take backpressure receiver from the collector's event bus (not the shared one)
// so the backpressure monitor listens to the correct connector
let collector_backpressure_rx = collector_event_bus.take_backpressure_receiver();

collector.set_event_bus_connector(collector_event_bus);

// Spawn backpressure monitor task if we have the receiver
let original_interval = Duration::from_secs(cli.interval);
let backpressure_task = backpressure_rx.map_or_else(
let backpressure_task = collector_backpressure_rx.map_or_else(
|| {
warn!("Backpressure receiver not available, dynamic interval adjustment disabled");
None
Expand All @@ -214,8 +306,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create event channel for the actor's output
let (event_tx, mut event_rx) = mpsc::channel::<CollectionEvent>(1000);

// Clone handle for shutdown task
// Clone handles for shutdown task
let shutdown_handle = actor_handle.clone();
let shutdown_registration = Arc::clone(&registration_manager);

// Spawn task to handle graceful shutdown on Ctrl+C
let shutdown_task = tokio::spawn(async move {
Expand All @@ -226,13 +319,28 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
info!("Received Ctrl+C, initiating graceful shutdown");

// Deregister from agent
if shutdown_registration.state().await == RegistrationState::Registered {
debug!("Deregistering from daemoneye-agent");
if let Err(e) = shutdown_registration
.deregister(Some("Graceful shutdown".to_owned()))
.await
{
warn!(error = %e, "Deregistration failed");
}
}

// Send graceful shutdown to actor
match shutdown_handle.graceful_shutdown().await {
Ok(()) => info!("Actor shutdown completed successfully"),
Err(e) => error!(error = %e, "Actor shutdown failed"),
}
});

// Keep RPC service reference alive (it will be used for handling incoming requests)
// In a full implementation, we would spawn a task to process RPC requests from the event bus
let _rpc_service = rpc_service;
Comment on lines +340 to +342
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc_service is only kept alive via let _rpc_service = rpc_service; and no task is spawned to subscribe/read RPC requests from the event bus, so the RPC service handler is effectively inactive in production.

If RPC handling isn’t wired up yet due to EventBusConnector limitations, consider gating this behind a feature/config flag or deferring initialization until request subscription/publishing is actually implemented, to avoid giving the impression that RPC is supported.

Copilot uses AI. Check for mistakes.

// Startup behavior: begin monitoring immediately on launch.
//
// The collector currently does not wait for an explicit "begin monitoring"
Expand Down Expand Up @@ -298,6 +406,10 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Backpressure monitor task aborted");
}

// Clean up heartbeat task
heartbeat_task.abort();
info!("Heartbeat task aborted");

// Wait for event consumer to exit naturally (channel sender is dropped)
// Use a timeout to avoid hanging indefinitely
match tokio::time::timeout(Duration::from_secs(5), event_consumer_task).await {
Expand Down
Loading
Loading