-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(daemoneye-agent): implement agent loading state, heartbeat detection, and recovery #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(daemoneye-agent): implement agent loading state, heartbeat detection, and recovery #134
Conversation
…ersions Add documentation for two Clippy lints encountered during PR review: - map_err_ignore: Name ignored variables in closures (`|_elapsed|` not `|_|`) - as_conversions: Add `#[allow(clippy::as_conversions)]` with safety comment Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ordination Implement the loading state machine for daemoneye-agent that coordinates collector registration during startup: - Add AgentState enum with state machine transitions: Loading → Ready → SteadyState (plus StartupFailed and ShuttingDown) - Add CollectorsConfig module for `/etc/daemoneye/collectors.json`: - JSON-based collector configuration with validation - Support for enabled/disabled collectors, startup timeouts, heartbeat intervals - Builder pattern for CollectorEntry construction - Add collector readiness tracking via registration: - CollectorReadinessTracker struct tracks expected vs ready collectors - Registration marks collectors as ready automatically - Implement startup timeout handling: - Configurable timeout from max of enabled collectors' startup_timeout_secs - wait_for_collectors_ready() with polling and timeout detection - Marks agent as StartupFailed on timeout - Implement "begin monitoring" broadcast: - Sends lifecycle event on control.collector.lifecycle topic - Called after transition to Ready state - Add privilege dropping stub for future implementation - Integrate loading state into main.rs startup sequence: - Load collectors config, wait for registration, transition states - Broadcast begin monitoring, enter steady state operation - Add 10 integration tests for loading state workflow - Add 27+ unit tests for state machine and configuration Future work: Heartbeat failure detection and escalating recovery actions (Tasks #12, #13, #17 are blocked pending this foundation) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Caution Review failedFailed to post review comments Summary by CodeRabbit
WalkthroughIntroduces a collector configuration module, an agent startup state machine with readiness tracking in the broker, heartbeat-based recovery and escalation logic, and extensive unit/integration tests exercising readiness, heartbeat detection, and recovery flows. Changes
Sequence Diagram(s)sequenceDiagram
participant Agent as Agent (main)
participant Config as ConfigLoader
participant Broker as BrokerManager
participant Registry as RegistrationProvider
participant Collector as Collector
Agent->>Config: load_collectors_config()
Config-->>Agent: CollectorsConfig
Agent->>Broker: set_collectors_config(config)
Agent->>Broker: wait_for_collectors_ready(timeout)
Broker->>Registry: initialize readiness expectations
loop poll / registrations
Collector->>Registry: register_collector()
Registry->>Broker: notify_registration(id)
Broker->>Registry: mark_collector_ready(id)
Registry-->>Broker: readiness update
end
Broker-->>Agent: all_collectors_ready -> transition_to_ready()
Agent->>Broker: transition_to_steady_state()
Agent->>Broker: broadcast_begin_monitoring()
Agent->>Broker: drop_privileges()
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
2 new issues
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements an agent loading state machine for coordinating startup between daemoneye-agent and its collectors. The state machine ensures all collectors are ready before transitioning to normal operation.
Changes:
- Implemented agent loading state machine (Loading → Ready → SteadyState) with error states in
broker_manager.rs - Added collectors configuration module in
collector_config.rsfor loading and validating/etc/daemoneye/collectors.json - Integrated loading state coordination into
main.rsstartup sequence with timeout handling - Added 10 integration tests and 51 unit tests for comprehensive coverage
- Updated spec to mark Ticket 3 as complete
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| spec/procmond/index.md | Marked Ticket 3 (RPC Service and Registration Manager) as complete |
| daemoneye-agent/tests/loading_state_integration.rs | New integration tests for loading state workflow including timeout, registration, and full lifecycle |
| daemoneye-agent/src/main.rs | Integrated loading state coordination with configuration loading, collector waiting, privilege dropping stub, and state transitions |
| daemoneye-agent/src/lib.rs | Exported new public API items for collector configuration |
| daemoneye-agent/src/collector_config.rs | New module for loading, validating, and managing collector configurations from JSON |
| daemoneye-agent/src/broker_manager.rs | Implemented AgentState enum, CollectorReadinessTracker, state transition methods, and integrated with registration |
| AGENTS.md | Added coding guidelines for ignored variables in closures and as_conversions |
| // Broadcast "begin monitoring" to all collectors | ||
| if let Err(e) = broker_manager.broadcast_begin_monitoring().await { | ||
| error!(error = %e, "Failed to broadcast begin monitoring"); | ||
| // Continue anyway - broadcast failure is not fatal for the main loop | ||
| warn!("Collectors may not have received begin monitoring signal"); | ||
| } | ||
|
|
||
| // Transition to SteadyState | ||
| if let Err(e) = broker_manager.transition_to_steady_state().await { | ||
| error!(error = %e, "Failed to transition to SteadyState"); | ||
| // This shouldn't happen if we're in Ready state, but log and continue | ||
| warn!("Agent may not be in expected state"); | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "begin monitoring" broadcast is being performed twice: once explicitly here (line 204) and again inside transition_to_steady_state() (broker_manager.rs line 951). This duplication means collectors will receive the BeginMonitoring message twice.
Either remove the explicit broadcast call here and let transition_to_steady_state() handle it, or remove the broadcast from inside transition_to_steady_state() and keep it here. The recommended approach is to remove the explicit call here since the transition method's documentation states it broadcasts "begin monitoring" during the transition.
|
|
||
| // Broadcast "begin monitoring" to all collectors | ||
| drop(state); // Release lock before async operations | ||
| self.broadcast_begin_monitoring().await?; |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
State transition and broadcast are not atomic. If broadcast_begin_monitoring() fails, the state will already have been changed to SteadyState (line 947), but the error will propagate (line 951 with ?). This leaves the agent in SteadyState even though the broadcast failed.
Consider one of these approaches:
- Perform the broadcast BEFORE changing state, so failure prevents the transition
- Revert the state on broadcast failure
- Make broadcast failure non-fatal (remove
?and log error) since collectors might still receive the message through other means
The current main.rs treats broadcast failure as non-fatal (line 204-208), so option 3 aligns with that design.
| self.broadcast_begin_monitoring().await?; | |
| if let Err(error) = self.broadcast_begin_monitoring().await { | |
| warn!(%error, "Failed to broadcast begin monitoring; continuing in SteadyState"); | |
| } |
| // 6. Broadcast begin monitoring | ||
| manager.broadcast_begin_monitoring().await?; | ||
|
|
||
| // 7. Transition to SteadyState |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test broadcasts "begin monitoring" explicitly at line 288, and then calls transition_to_steady_state() at line 291. However, transition_to_steady_state() also broadcasts "begin monitoring" internally (broker_manager.rs line 951). This means the broadcast happens twice in this test.
This test should match the actual usage pattern. Remove line 288 to let transition_to_steady_state() handle the broadcast, or adjust the test to match the corrected implementation once the duplicate broadcast issue in main.rs is resolved.
| // 6. Broadcast begin monitoring | |
| manager.broadcast_begin_monitoring().await?; | |
| // 7. Transition to SteadyState | |
| // 6. Transition to SteadyState |
| let message = serde_json::json!({ | ||
| "type": "BeginMonitoring", | ||
| "timestamp": std::time::SystemTime::now() | ||
| .duration_since(std::time::SystemTime::UNIX_EPOCH) | ||
| .unwrap_or_default() | ||
| .as_millis(), |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using unwrap_or_default() here means if the system time is before Unix epoch (an extremely rare but possible scenario), the timestamp will silently be 0 milliseconds instead of propagating an error. This could cause confusion in logs or collector behavior.
Consider either:
- Using a proper error with context:
.context("System time is before Unix epoch")? - Adding a warning log when the fallback is used
- Documenting this behavior if it's intentional
Given that this is for a "BeginMonitoring" message, option 1 (failing fast) is recommended since it indicates a serious system misconfiguration.
| let message = serde_json::json!({ | |
| "type": "BeginMonitoring", | |
| "timestamp": std::time::SystemTime::now() | |
| .duration_since(std::time::SystemTime::UNIX_EPOCH) | |
| .unwrap_or_default() | |
| .as_millis(), | |
| let timestamp = std::time::SystemTime::now() | |
| .duration_since(std::time::SystemTime::UNIX_EPOCH) | |
| .context("System time is before Unix epoch when creating BeginMonitoring timestamp")? | |
| .as_millis(); | |
| let message = serde_json::json!({ | |
| "type": "BeginMonitoring", | |
| "timestamp": timestamp, |
| //! ``` | ||
| //! | ||
| //! - **Loading**: Agent starting, broker initializing, spawning collectors | ||
| //! - **Ready**: All collectors registered and reported "ready", privileges dropped |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation states that in the Ready state "privileges dropped", but this is inaccurate. Looking at main.rs lines 196-201, privilege dropping occurs AFTER the transition to Ready state. The Ready state only indicates that all collectors have registered.
Update the documentation to clarify: "All collectors registered and reported 'ready'. Waiting for privilege dropping before transitioning to steady state." or similar wording that doesn't imply privileges have already been dropped.
| //! - **Ready**: All collectors registered and reported "ready", privileges dropped | |
| //! - **Ready**: All collectors registered and reported "ready". Awaiting privilege | |
| //! dropping before transitioning to `SteadyState`. |
| Loading, | ||
|
|
||
| /// All collectors have registered and reported "ready". | ||
| /// The agent has dropped privileges (if configured). |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This documentation states "The agent has dropped privileges (if configured)" in the Ready state, but according to main.rs (lines 196-201), privilege dropping occurs AFTER transitioning to Ready. The Ready state should only indicate that all collectors have registered.
Update this to: "All collectors have registered and reported 'ready'. Privilege dropping will occur before transitioning to steady state." to match the actual implementation.
| /// The agent has dropped privileges (if configured). | |
| /// Privilege dropping will occur before transitioning to steady state. |
…ecovery Add comprehensive heartbeat monitoring and recovery infrastructure: - HeartbeatStatus enum (Healthy, Degraded, Failed) with needs_recovery() - Missed heartbeat tracking in CollectorRegistry with check_heartbeats() - RecoveryAction escalation chain: HealthCheck → GracefulShutdown → ForceKill → Restart - CollectorRecoveryState for tracking recovery attempts per collector - execute_recovery() function for automated escalating recovery - 21 integration tests covering heartbeat and recovery workflows Also includes documentation updates for async-in-tracing gotchas, pre-commit handling, and cross-crate trait imports. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Loading State Machine
/etc/daemoneye/collectors.jsonwith JSON schema and validationcontrol.collector.lifecycletopicHeartbeat Detection & Recovery
Test Plan
cargo clippy -- -D warningspasses with zero warnings🤖 Generated with Claude Code