Skip to content

Conversation

@unclesp1d3r
Copy link
Member

Summary

  • Refactor ProcmondMonitorCollector to use actor pattern with message-passing architecture
  • Add dual-mode operation in main.rs: actor mode (with broker) and standalone mode (without broker)
  • Implement ActorMessage enum for coordinated state management (HealthCheck, UpdateConfig, GracefulShutdown, BeginMonitoring, AdjustInterval)
  • Integrate EventBusConnector with WAL for crash-recoverable event delivery
  • Add dynamic interval adjustment (1.5x slowdown) on backpressure signals
  • Implement configuration hot-reload at collection cycle boundaries

Key Changes

Actor Pattern (monitor_collector.rs)

  • Bounded mpsc channel (capacity: 100) for actor messages
  • Sequential message processing (no concurrent state mutations)
  • Oneshot channels for request/response patterns
  • ActorHandle for typed message-passing interface

Startup Coordination (main.rs)

  • Reads DAEMONEYE_BROKER_SOCKET environment variable to determine mode
  • Actor mode: initializes EventBusConnector, spawns backpressure monitor, handles graceful shutdown
  • Standalone mode: falls back to ProcessEventSource with collector-core framework
  • TODO placeholder for control.collector.lifecycle topic subscription

Test plan

  • All 199 procmond tests pass
  • Clippy passes with zero warnings (cargo clippy --workspace -- -D warnings)
  • Pre-commit hooks pass (fmt, clippy, cargo check, cargo-audit)
  • Manual testing with DAEMONEYE_BROKER_SOCKET set
  • Integration testing with daemoneye-agent broker

🤖 Generated with Claude Code

Refactor main.rs to support two operational modes:

Actor Mode (DAEMONEYE_BROKER_SOCKET set):
- Creates bounded mpsc channel (capacity: 100) for actor messages
- Initializes EventBusConnector with WAL for crash recovery
- Connects to broker and replays pending WAL events
- Spawns backpressure monitor (1.5x interval on backpressure)
- Implements graceful shutdown on Ctrl+C via ActorHandle
- Prepares for startup coordination with agent (TODO: lifecycle topic)

Standalone Mode (no broker):
- Falls back to ProcessEventSource with collector-core framework
- Maintains backward compatibility for standalone deployments

This completes the actor pattern implementation from Ticket 2, enabling
coordinated state management between procmond and daemoneye-agent.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings February 1, 2026 01:31
@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Feb 1, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 1, 2026

Caution

Review failed

Failed to post review comments

Summary by CodeRabbit

  • New Features

    • Actor-based monitoring mode with broker-backed, persisted event delivery and event-bus integration
    • Dual-mode operation: actor mode or standalone mode selected at startup
    • Runtime hot-reload of monitor config, dynamic interval adjustment, and graceful shutdown
  • Reliability

    • Backpressure-aware monitoring with adaptive intervals and buffer-aware behavior
    • Enhanced health checks and runtime health visibility
  • Tests

    • Expanded tests covering actor flows, health checks, and backpressure
  • Documentation

    • Guidance for actor usage, hot-reload, and runtime behavior adjustments

✏️ Tip: You can customize this high-level summary in your review settings.

Walkthrough

Adds an actor-mode runtime (enabled by DAEMONEYE_BROKER_SOCKET) that starts a WAL-backed, actor-based ProcmondMonitorCollector with EventBusConnector, backpressure monitoring, and graceful startup/shutdown/replay; retains the existing standalone collector-core ProcessEventSource and RPC/CLI flow as a fallback.

Changes

Cohort / File(s) Summary
Actor-Model Collector
procmond/src/monitor_collector.rs
Introduces an actor-based ProcmondMonitorCollector with ActorMessage, CollectorState, HealthCheckData, ActorError, ActorHandle, ACTOR_CHANNEL_CAPACITY, create_channel(), run() actor loop, backpressure monitor, EventBusConnector wiring, actor-driven lifecycle and health/config APIs.
Dual-Mode Startup & CLI
procmond/src/main.rs
Adds startup branching: when DAEMONEYE_BROKER_SOCKET is set, initialize WAL dir, create actor channel, connect/replay broker, attach EventBusConnector, spawn backpressure monitor, begin monitoring and manage graceful shutdown; otherwise retain standalone Collector/ProcessEventSource, RPC/CLI, and telemetry initialization.
Public API Surface
procmond/src/monitor_collector.rs, procmond/src/main.rs
Exports new actor-related types and methods (actor lifecycle, health checks, config update, WAL PathBuf usage, EventBusConnector APIs). Standalone public types remain but are now the fallback path.
Tests & Helpers (actor path)
procmond/src/...tests, procmond/src/...helpers
Added/updated tests and helpers for actor-channel creation, ActorHandle operations (health/update/shutdown), health data, capacity/backpressure behavior, and actor run semantics.

Sequence Diagram(s)

sequenceDiagram
    actor User
    participant Main as main.rs
    participant WAL as WAL Dir
    participant Broker as Broker
    participant Collector as ProcmondMonitorCollector
    participant EventBus as EventBusConnector
    participant Channel as Actor Channel

    rect rgba(100, 150, 200, 0.5)
        Note over User,Collector: Actor Mode Flow
        User->>Main: Start w/ DAEMONEYE_BROKER_SOCKET
        Main->>WAL: Initialize WAL directory
        Main->>Collector: create_channel() -> ActorHandle + Receiver
        Main->>Broker: Connect & replay WAL
        Main->>Collector: set_event_bus_connector(EventBus)
        Main->>Collector: spawn_backpressure_monitor()
        Collector->>Channel: process ActorMessage events
        Collector->>EventBus: publish collected events
        EventBus->>Broker: forward events
    end
Loading
sequenceDiagram
    actor User
    participant Main as main.rs
    participant CollectorCore as collector-core
    participant EventSource as ProcessEventSource
    participant RPC as RPC Handler

    rect rgba(150, 100, 150, 0.5)
        Note over User,RPC: Standalone Mode Flow
        User->>Main: Start without DAEMONEYE_BROKER_SOCKET
        Main->>CollectorCore: Initialize CollectorConfig
        Main->>EventSource: create ProcessEventSource
        CollectorCore->>EventSource: register & receive events
        Main->>RPC: register RPC scaffolding
        RPC->>CollectorCore: handle telemetry & health checks
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐇 I hopped through WAL and actor lanes,

messages danced in buffered trains,
a broker hummed, the monitor woke,
I nibbled backpressure till it spoke,
two modes now sprout where once was one.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: implementing actor pattern and startup coordination in procmond, which aligns with the core refactoring shown in both main.rs and monitor_collector.rs.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, covering actor pattern implementation, dual-mode operation, message types, integration points, and test status.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch implement-actor-pattern-and-startup-coordinator

Comment @coderabbitai help to get the list of available commands and usage tips.

@dosubot
Copy link

dosubot bot commented Feb 1, 2026

Related Documentation

Checked 21 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

@qltysh
Copy link
Contributor

qltysh bot commented Feb 1, 2026

❌ 1 blocking issue (4 total)

Tool Category Rule Count
rustfmt Style Incorrect formatting, autoformat by running qlty fmt. 1
qlty Structure Function with high complexity (count = 29): main 2
qlty Structure High total complexity (count = 83) 1

@qltysh one-click actions:

  • Auto-fix formatting (qlty fmt && git push)

@@ -1,14 +1,19 @@
#![forbid(unsafe_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect formatting, autoformat by running qlty fmt. [rustfmt:fmt]

collector.run().await?;
}

Ok(())
Copy link
Contributor

@qltysh qltysh bot Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with high complexity (count = 29): main [qlty:function-complexity]

fn test_actor_channel_capacity() {
assert_eq!(ACTOR_CHANNEL_CAPACITY, 100);
}
}
Copy link
Contributor

@qltysh qltysh bot Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High total complexity (count = 83) [qlty:file-complexity]

@dosubot dosubot bot added architecture System architecture and design decisions process-monitoring Process monitoring and enumeration features procmond Issues related to the process monitoring daemon labels Feb 1, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a significant architectural change to procmond, introducing an actor pattern for the ProcmondMonitorCollector with message-passing concurrency and dual-mode operation. The changes enable coordination with the daemoneye-agent broker via EventBusConnector with WAL-backed crash recovery, while preserving backward compatibility through standalone mode.

Changes:

  • Refactored ProcmondMonitorCollector to use actor pattern with bounded message channels, sequential message processing, and actor-based state management
  • Added dual-mode operation in main.rs: actor mode (when DAEMONEYE_BROKER_SOCKET env var is set) with broker integration, or standalone mode (without env var) using existing collector-core framework
  • Integrated EventBusConnector with WAL for reliable event delivery and backpressure-aware dynamic interval adjustment (1.5x slowdown)

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 15 comments.

File Description
procmond/src/monitor_collector.rs Implements actor pattern types (ActorMessage, CollectorState, ActorHandle), refactors collector to use message-based coordination, adds backpressure monitoring, integrates EventBusConnector, implements configuration hot-reload at cycle boundaries, and deprecates legacy EventSource trait methods
procmond/src/main.rs Adds environment variable-based mode selection, implements actor mode initialization with EventBusConnector/WAL setup and graceful shutdown coordination, preserves standalone mode for backward compatibility, adds event consumer task for processing collector output

Comment on lines 198 to 211
let original_interval = Duration::from_secs(cli.interval);
let _backpressure_task = backpressure_rx.map_or_else(
|| {
warn!("Backpressure receiver not available, dynamic interval adjustment disabled");
None
},
|bp_rx| {
Some(ProcmondMonitorCollector::spawn_backpressure_monitor(
actor_handle.clone(),
bp_rx,
original_interval,
))
},
);
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure monitor task handle is stored in _backpressure_task with a leading underscore (indicating intentional unused), but it's never joined or aborted during shutdown. This means the task will continue running even after the main actor exits, and any panics in the task won't be detected.

Consider storing the handle without the underscore prefix and either joining it gracefully during shutdown (after the actor completes) or aborting it explicitly like the event_consumer_task. This ensures proper cleanup and allows propagating any errors from the task.

Copilot uses AI. Check for mistakes.
Comment on lines +1216 to +1229
#[tokio::test]
async fn test_actor_handle_operations() {
let db_manager = create_test_database().await;
let config = ProcmondMonitorConfig::default();

let (collector, handle) = create_collector_with_channel(db_manager, config).unwrap();

// Verify initial state
assert_eq!(collector.state, CollectorState::WaitingForAgent);

// Test that handle methods work (before actor is running, they should fail)
// This is expected because the receiver is held by the collector
assert!(!handle.is_closed());
}
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_actor_handle_operations only verifies that the handle channel is not closed, but doesn't test any actual actor message handling. With the actor pattern being a core architectural change, there should be integration tests that:

  1. Spawn the actor and send BeginMonitoring
  2. Verify state transitions (WaitingForAgent -> Running)
  3. Test HealthCheck message/response
  4. Test UpdateConfig message/response
  5. Test GracefulShutdown coordination
  6. Test AdjustInterval message handling

Consider adding integration tests that actually run the actor loop in a test environment to verify the message handling works correctly.

Copilot uses AI. Check for mistakes.
Comment on lines 735 to 745
ActorMessage::AdjustInterval { new_interval } => {
let old_interval = self.current_interval;
self.current_interval = new_interval;
info!(
old_interval_ms = old_interval.as_millis(),
new_interval_ms = new_interval.as_millis(),
is_backpressure = new_interval > self.original_interval,
"Collection interval adjusted"
);
false
}
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AdjustInterval message handler updates self.current_interval but does not recreate the collection_interval tokio::time::Interval. This means the interval adjustment won't actually take effect until the next config update that recreates the interval.

The fix requires recreating the interval timer in the run() loop when this message is handled, similar to how it's done for config updates. Consider storing a flag to signal that the interval needs to be recreated, or handle the interval recreation directly in the message handler by passing a mutable reference to the interval.

Copilot uses AI. Check for mistakes.
/// Sets the current buffer level percentage.
///
/// Called when receiving buffer level updates from EventBusConnector.
pub const fn set_buffer_level(&mut self, level_percent: u8) {
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, set_buffer_level is marked as const but mutates self.buffer_level_percent. This is incorrect - mutable methods cannot be const in Rust.

Remove the const modifier from this method signature as it performs mutation.

Suggested change
pub const fn set_buffer_level(&mut self, level_percent: u8) {
pub fn set_buffer_level(&mut self, level_percent: u8) {

Copilot uses AI. Check for mistakes.
Comment on lines +552 to +560
crate::event_bus_connector::BackpressureSignal::Released => {
// Restore original interval
info!(
original_interval_ms = original_interval.as_millis(),
"Backpressure released - restoring original collection interval"
);
if let Err(e) = handle.adjust_interval(original_interval) {
warn!(error = %e, "Failed to send AdjustInterval message");
}
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure monitor always restores to the original interval captured at startup. However, if the user updates the collection_interval via UpdateConfig while under backpressure, the next Released signal will restore to the old original_interval instead of the new configured interval.

This creates a race condition where configuration updates can be overridden by backpressure state. Consider either:

  1. Having the backpressure monitor query the current configured interval from the actor via a new message type
  2. Sending the new original_interval to the backpressure monitor when config is updated
  3. Making the backpressure adjustment relative (multiply current by 1.5x, divide by 1.5x) rather than absolute

Copilot uses AI. Check for mistakes.
@codecov
Copy link

codecov bot commented Feb 1, 2026

Codecov Report

❌ Patch coverage is 62.82528% with 200 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
procmond/src/monitor_collector.rs 32.60% 184 Missing ⚠️
procmond/src/main.rs 93.96% 16 Missing ⚠️

📢 Thoughts on this report? Let us know!

- Add fail-fast WAL directory creation with error logging
- Fix backpressure interval calculation with max clamp (1 hour)
- Properly abort backpressure task on shutdown
- Add timeout for event consumer task during shutdown
- Improve hot-reload warning for non-reloadable settings
- Update startup coordination documentation comment
- Restore const fn on setter methods (Rust 2024 allows &mut self)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@unclesp1d3r
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Feb 1, 2026

✅ Actions performed

Full review triggered.

@unclesp1d3r
Copy link
Member Author

PR Review Comments Addressed

The following Copilot review comments have been addressed:

Already addressed in commit e87a821:

  1. main.rs#212 - Backpressure task shutdown: renamed to backpressure_task and now properly aborting during shutdown
  2. main.rs#158 - WAL directory fail-fast: now using ? operator with error logging
  3. main.rs#248 - Startup coordination: updated comment explaining intentional design
  4. main.rs#294 - Event consumer timeout: added 5s timeout waiting before abort
  5. monitor_collector.rs#542 - Interval clamping: added 1 hour max clamp
  6. monitor_collector.rs#788 - Hot-reload warning: added warning for non-reloadable settings
  7. monitor_collector.rs#480 - const fn: Rust 2024 edition allows const fn with &mut self - clippy requires it

Addressed in upcoming commit:

  1. monitor_collector.rs#756 - AdjustInterval timer recreation: added pending_interval field to queue interval changes and recreate timer at next loop iteration
  2. main.rs#311 - EventBusConnector shutdown: added explicit shutdown call in run() before actor exits
  3. monitor_collector.rs#885 - Event type mapping: now using lifecycle events to determine correct ProcessEventType (Start/Stop/Modify)

Acknowledged but deferred:

  • monitor_collector.rs#1249 - Integration tests: Valid request, will address in separate PR
  • monitor_collector.rs#1114 - EventSource trait methods: Current warning behavior is acceptable for deprecated API
  • main.rs#234 - Shutdown error propagation: Limited options in spawned task context
  • monitor_collector.rs#571 - Backpressure interval vs config update: Design consideration for future enhancement

- Add pending_interval field for deferred interval updates ensuring
  tokio interval timer is properly recreated at loop boundaries
- Fix event type mapping to use ProcessLifecycleEvent analysis instead
  of hardcoding all events as Start
- Add EventBusConnector shutdown on actor exit to ensure buffer flush
  and proper connection close
- Import ProcessLifecycleEvent for lifecycle-based event type mapping

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings February 1, 2026 19:29
@@ -243,12 +986,11 @@ impl ProcmondMonitorCollector {
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function with high complexity (count = 22): collect_and_analyze_internal [qlty:function-complexity]

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 17 comments.

Comment on lines +566 to +574
crate::event_bus_connector::BackpressureSignal::Released => {
// Restore original interval
info!(
original_interval_ms = original_interval.as_millis(),
"Backpressure released - restoring original collection interval"
);
if let Err(e) = handle.adjust_interval(original_interval) {
warn!(error = %e, "Failed to send AdjustInterval message");
}
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure monitor always restores the original_interval on release, but this doesn't account for configuration hot-reload changing the original_interval while backpressure is active. If the config is updated (changing collection_interval) while under backpressure, and then backpressure releases, the restored interval will be stale. Consider having the backpressure monitor read the current config's interval or storing the "config_interval" separately from "original_interval" to handle this edge case.

Copilot uses AI. Check for mistakes.
Comment on lines +523 to +527
pub fn spawn_backpressure_monitor(
handle: ActorHandle,
mut backpressure_rx: mpsc::Receiver<crate::event_bus_connector::BackpressureSignal>,
original_interval: Duration,
) -> tokio::task::JoinHandle<()> {
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backpressure monitor task is spawned with the original_interval captured at startup, but this value becomes stale if config is hot-reloaded. The backpressure monitor will continue using the old original_interval for calculating the 1.5x slowdown even after config updates. This breaks the intended behavior of hot-reload. Consider either: 1) passing the ActorHandle so backpressure can query current config, 2) restarting the backpressure monitor on config update, or 3) having the actor send updated original_interval to the backpressure task via a separate channel.

Copilot uses AI. Check for mistakes.
Comment on lines 631 to 643
if let Some(new_interval) = self.pending_interval.take()
&& new_interval != self.current_interval
{
let old_interval = self.current_interval;
self.current_interval = new_interval;
collection_interval = interval(self.current_interval);
collection_interval.tick().await; // Reset interval
info!(
old_interval_ms = old_interval.as_millis(),
new_interval_ms = new_interval.as_millis(),
is_backpressure = new_interval > self.original_interval,
"Collection interval adjusted (timer recreated)"
);
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pending_interval is cleared with take() but only applied if new_interval != current_interval. If they're equal, the pending value is lost but wasn't applied either. This means if backpressure sends the same interval multiple times, only the first one is processed. While this may be intentional deduplication, it's not documented. Consider logging when an interval adjustment is skipped due to equality to aid debugging.

Suggested change
if let Some(new_interval) = self.pending_interval.take()
&& new_interval != self.current_interval
{
let old_interval = self.current_interval;
self.current_interval = new_interval;
collection_interval = interval(self.current_interval);
collection_interval.tick().await; // Reset interval
info!(
old_interval_ms = old_interval.as_millis(),
new_interval_ms = new_interval.as_millis(),
is_backpressure = new_interval > self.original_interval,
"Collection interval adjusted (timer recreated)"
);
if let Some(new_interval) = self.pending_interval.take() {
if new_interval != self.current_interval {
let old_interval = self.current_interval;
self.current_interval = new_interval;
collection_interval = interval(self.current_interval);
collection_interval.tick().await; // Reset interval
info!(
old_interval_ms = old_interval.as_millis(),
new_interval_ms = new_interval.as_millis(),
is_backpressure = new_interval > self.original_interval,
"Collection interval adjusted (timer recreated)"
);
} else {
info!(
interval_ms = new_interval.as_millis(),
"Pending collection interval adjustment skipped: new interval equals current interval"
);
}

Copilot uses AI. Check for mistakes.
Comment on lines +1290 to +1303
#[tokio::test]
async fn test_actor_handle_operations() {
let db_manager = create_test_database().await;
let config = ProcmondMonitorConfig::default();

let (collector, handle) = create_collector_with_channel(db_manager, config).unwrap();

// Verify initial state
assert_eq!(collector.state, CollectorState::WaitingForAgent);

// Test that handle methods work (before actor is running, they should fail)
// This is expected because the receiver is held by the collector
assert!(!handle.is_closed());
}
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test coverage for the actor pattern's core functionality. The new tests (test_actor_handle_operations, test_health_check_data) only verify initial state but don't test: message passing, state transitions (WaitingForAgent -> Running -> ShuttingDown), config hot-reload, interval adjustment, graceful shutdown, or the run() loop. According to the coding guidelines, if other functions in the file have test coverage, new functionality should too. Consider adding integration tests that spawn the actor and exercise the full lifecycle.

Copilot uses AI. Check for mistakes.
// Ignore send result - receiver may have been dropped
if let Some(respond_to) = self.pending_shutdown_response.take() {
drop(respond_to.send(Err(anyhow::anyhow!(
"Collector stopped due to {consecutive_failures} consecutive failures"
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message construction uses format string in anyhow::anyhow! which won't interpolate the variable. This will produce error text like "Collector stopped due to {consecutive_failures} consecutive failures" instead of showing the actual number. Use format! macro: anyhow::anyhow!("Collector stopped due to {} consecutive failures", consecutive_failures)

Suggested change
"Collector stopped due to {consecutive_failures} consecutive failures"
"Collector stopped due to {} consecutive failures",
consecutive_failures

Copilot uses AI. Check for mistakes.
Comment on lines +1149 to +1152
Err(anyhow::anyhow!(
"EventSource::start() is deprecated for actor-based collectors. \
Use ProcmondMonitorCollector::run() instead."
))
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The EventSource trait implementation's start() method now returns an error, breaking API compatibility. According to DaemonEye coding guidelines, the EventSource trait is part of the collector-core framework. Existing code that calls start() on this collector will now fail. Consider either maintaining backward compatibility by implementing a shim that spawns the actor internally, or documenting this as a breaking change with migration guidance.

Suggested change
Err(anyhow::anyhow!(
"EventSource::start() is deprecated for actor-based collectors. \
Use ProcmondMonitorCollector::run() instead."
))
// For backward compatibility with the EventSource trait, do not fail here.
// Existing callers of `start()` will see a successful result, while new
// code should invoke `ProcmondMonitorCollector::run()` directly.
Ok(())

Copilot uses AI. Check for mistakes.
Comment on lines +260 to +281
let event_consumer_task = tokio::spawn(async move {
let mut event_count = 0_u64;
while let Some(event) = event_rx.recv().await {
event_count = event_count.saturating_add(1);
if event_count.is_multiple_of(100) {
info!(total_events = event_count, "Processing collection events");
}
// In a full implementation, events would be sent to downstream processors
match event {
CollectionEvent::Process(pe) => {
tracing::trace!(pid = pe.pid, name = %pe.name, "Received process event");
}
CollectionEvent::Network(_)
| CollectionEvent::Filesystem(_)
| CollectionEvent::Performance(_)
| CollectionEvent::TriggerRequest(_) => {
tracing::trace!("Received non-process event");
}
}
}
info!(total_events = event_count, "Event consumer task exiting");
});
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event consumer task (lines 260-281) silently discards all events - it only logs them. This means all process events collected by the actor are lost. The PR description mentions this is "logging only for now" but provides no path forward. This breaks the core functionality of the collector. Either implement proper event handling (send to downstream processors) or add a clear TODO with tracking issue. For production use, events should at least be persisted to the database.

Copilot uses AI. Check for mistakes.
}

// Collection tick (only when in Running state)
_ = collection_interval.tick(), if self.state == CollectorState::Running => {
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak potential: If the actor is in WaitingForAgent state and never receives BeginMonitoring, it will loop indefinitely processing only messages, with the collection_interval timer never firing. The actor will remain alive consuming resources. Consider adding a timeout or maximum wait duration in WaitingForAgent state, or allowing the actor to be configured to start immediately without waiting.

Copilot uses AI. Check for mistakes.
///
/// * `event_tx` - Channel for sending collection events to downstream processors
#[instrument(skip(self, event_tx), fields(source = "procmond-monitor-collector"))]
pub async fn run(mut self, event_tx: mpsc::Sender<CollectionEvent>) -> anyhow::Result<()> {
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Violation of DaemonEye actor pattern guidelines: The actor uses mutable self (&mut self in run() and handle_message()) which means the actor must own itself. However, the coding guidelines state actors should use message-passing with immutable shared state via Arc. The current pattern makes it impossible to share the collector across multiple handles or access its state from other tasks. Consider refactoring to use Arc<Mutex<CollectorState>> pattern for shared mutable state, or document why this deviation is necessary.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +108 to +109
// If not set, use standalone mode with collector-core
let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok();
Copy link

Copilot AI Feb 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation: The DAEMONEYE_BROKER_SOCKET environment variable is read but not validated. If it's set to an empty string or invalid path, the connection will fail later with unclear errors. Consider validating the path exists and is accessible, or at minimum check for empty string before proceeding with actor mode initialization.

Suggested change
// If not set, use standalone mode with collector-core
let broker_socket = std::env::var("DAEMONEYE_BROKER_SOCKET").ok();
// If not set or invalid, use standalone mode with collector-core
let broker_socket = match std::env::var("DAEMONEYE_BROKER_SOCKET") {
Ok(raw) => {
let trimmed = raw.trim();
if trimmed.is_empty() {
warn!(
"DAEMONEYE_BROKER_SOCKET is set but empty; falling back to standalone mode"
);
None
} else {
let path = std::path::Path::new(trimmed);
if !path.exists() {
warn!(
socket_path = %trimmed,
"DAEMONEYE_BROKER_SOCKET points to a non-existent path; falling back to standalone mode"
);
None
} else {
Some(trimmed.to_owned())
}
}
}
Err(_) => None,
};

Copilot uses AI. Check for mistakes.
Address additional PR review comment: when a pending interval adjustment
is skipped because the new interval equals the current interval, log at
debug level to aid debugging of backpressure oscillation scenarios.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@unclesp1d3r
Copy link
Member Author

Additional PR Review Comments Addressed (Round 2)

Addressed in commit 401b3ed:

  • monitor_collector.rs:643 - Added debug logging when interval adjustment is skipped due to equality, helping debug backpressure oscillation scenarios

Verified Not a Bug:

  • monitor_collector.rs:685 - The {consecutive_failures} syntax is correct - Rust's anyhow! macro supports inlined format args. Clippy actually enforces uninlined_format_args lint which requires this syntax.

Design Decisions (deferred or acknowledged):

  • monitor_collector.rs:574, 527, 627 - Backpressure/config hot-reload interaction: These comments identify a valid edge case where config is updated while backpressure is active. This is a design consideration for future enhancement, but current behavior is acceptable since:

    1. Config hot-reload only affects collection interval setting
    2. Backpressure using stale original_interval is conservative (still applies 1.5x slowdown)
    3. The interval will correct itself on the next config reload after backpressure releases
  • monitor_collector.rs:1303 - Test coverage for actor lifecycle: Valid request, will address in a separate PR focused on integration testing

  • main.rs:298 - Shutdown sequencing: The current implementation is sufficient because:

    1. actor_task completion is what we're waiting for in tokio::select!
    2. The actor properly shuts down EventBusConnector before exiting
    3. Backpressure task abort is a cleanup of a monitoring task, not critical path
  • main.rs:149 - WAL directory derivation: The current logic handles edge cases via the ? operator with error context. Using a fixed path like /var/lib/daemoneye would break cross-platform compatibility (Windows, macOS). The current approach uses the database directory parent which is user-controlled.

  • monitor_collector.rs:986 (qlty) - Function complexity: collect_and_analyze_internal complexity is inherent to the collection/analysis workflow. Splitting would add abstraction overhead without clarity benefits.

@unclesp1d3r unclesp1d3r merged commit b145f64 into main Feb 2, 2026
19 of 20 checks passed
@unclesp1d3r unclesp1d3r deleted the implement-actor-pattern-and-startup-coordinator branch February 2, 2026 00:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

architecture System architecture and design decisions process-monitoring Process monitoring and enumeration features procmond Issues related to the process monitoring daemon size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants