-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(procmond): implement RPC service and registration manager #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(procmond): implement RPC service and registration manager #133
Conversation
…dering
Document two commonly encountered Clippy lints:
- uninlined_format_args: use {variable} syntax in format!/anyhow! macros
- unnecessary_negation: prefer == checks first in if-else blocks
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add two new components for procmond's broker integration: - RpcServiceHandler: Handles RPC requests (HealthCheck, UpdateConfig, GracefulShutdown) by forwarding to the actor and returning responses. Includes timeout handling, error categorization, and stats tracking. - RegistrationManager: Manages registration lifecycle with daemoneye-agent including state machine (Unregistered→Registered→Deregistered), heartbeat publishing, and graceful deregistration on shutdown. Both components are integrated into main.rs actor mode with: - Shared EventBusConnector via Arc<RwLock<>> - Heartbeat background task with configurable interval - Graceful shutdown with deregistration Includes comprehensive unit tests (26 total) covering: - Request parsing and actor coordination - State transitions and error handling - Heartbeat message construction - Stats tracking helpers Note: RPC message publishing uses placeholder implementations until EventBusConnector gains generic message support (currently only supports ProcessEvent publishing). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Caution Review failedFailed to post review comments Summary by CodeRabbit
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
15 new issues
|
| // Create a separate EventBusConnector for the collector | ||
| // Note: The collector takes ownership of its connector, while the registration | ||
| // and RPC services share a separate connector for control messages. | ||
| // TODO: Refactor to share the connector more elegantly when EventBusConnector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "Registration request prepared (integration pending)" | ||
| ); | ||
|
|
||
| // TODO: Integrate with EventBusConnector when raw topic publishing is available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| RegistrationError::DeregistrationFailed(format!("Failed to serialize request: {e}")) | ||
| })?; | ||
|
|
||
| // TODO: EventBusConnector currently only supports ProcessEvent publishing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Build topic for heartbeat | ||
| let topic = format!("{}.{}", HEARTBEAT_TOPIC_PREFIX, self.config.collector_id); | ||
|
|
||
| // TODO: EventBusConnector currently only supports ProcessEvent publishing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| }; | ||
|
|
||
| let metrics = HeartbeatMetrics { | ||
| processes_collected: 0, // TODO: Get from actor stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| let metrics = HeartbeatMetrics { | ||
| processes_collected: 0, // TODO: Get from actor stats | ||
| events_published: 0, // TODO: Get from connector stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| actor_handle: ActorHandle, | ||
| /// Event bus connector for publishing responses. | ||
| /// | ||
| /// TODO: Currently unused because EventBusConnector only supports ProcessEvent publishing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| /// Config update requests. | ||
| pub config_updates: u64, | ||
| /// Shutdown requests. | ||
| pub shutdown_requests: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| RpcServiceError::PublishFailed(format!("Failed to serialize response: {e}")) | ||
| })?; | ||
|
|
||
| // TODO: Integrate with EventBusConnector when raw topic publishing is available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds initial scaffolding for procmond control-plane integration by introducing an RPC request handler and a registration/heartbeat manager, and wiring them into actor mode startup/shutdown.
Changes:
- Added
RpcServiceHandlerto parse/dispatch collector RPC operations and build structured responses. - Added
RegistrationManagerto manage a registration state machine and periodic heartbeats. - Integrated both into
procmondactor-mode startup/shutdown flow and updated exports/docs.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| procmond/src/rpc_service.rs | New RPC handler with request dispatch, response building, and stats + unit tests. |
| procmond/src/registration.rs | New registration/heartbeat manager with state tracking + unit tests. |
| procmond/src/main.rs | Actor-mode wiring for registration + heartbeat task + RPC handler placeholder + shutdown deregistration. |
| procmond/src/lib.rs | Exposes new modules (and additional monitor_collector re-exports). |
| AGENTS.md | Documents additional Rust/Clippy conventions. |
| // Check if request has expired | ||
| if request.deadline < SystemTime::now() { | ||
| warn!( | ||
| request_id = %request_id, | ||
| "RPC request deadline has passed" | ||
| ); | ||
| self.record_request_timeout().await; | ||
| return self.create_timeout_response(&request, start_time); | ||
| } | ||
|
|
||
| // Dispatch to appropriate handler | ||
| let result = match operation { | ||
| CollectorOperation::HealthCheck => self.handle_health_check(&request).await, | ||
| CollectorOperation::UpdateConfig => self.handle_config_update(&request).await, | ||
| CollectorOperation::GracefulShutdown => self.handle_graceful_shutdown(&request).await, |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle_request only checks whether the deadline has already passed, but it never enforces the remaining deadline (or config.default_timeout) around the actor calls. If the actor hangs or the response channel is dropped, this can block indefinitely and still return RpcStatus::Error instead of Timeout.
Wrap each operation in tokio::time::timeout using the smaller of (deadline - now) and config.default_timeout, and map elapsed timeouts to RpcServiceError::Timeout.
| "max_events_in_flight" => { | ||
| if let Some(max) = value.as_u64() { | ||
| config.base_config.max_events_in_flight = | ||
| usize::try_from(max).unwrap_or(usize::MAX); | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The u64 -> usize conversion falls back to usize::MAX on overflow (unwrap_or(usize::MAX)). This can turn an invalid/unrepresentable input into an extremely large setting, which is usually worse than rejecting the request.
Return an InvalidRequest error when the value can't be represented as usize, and consider validating reasonable bounds for max_events_in_flight.
| "max_processes" => { | ||
| if let Some(max) = value.as_u64() { | ||
| config.process_config.max_processes = | ||
| usize::try_from(max).unwrap_or(usize::MAX); | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue as max_events_in_flight: overflowing max_processes falls back to usize::MAX, which effectively turns a bad input into “unlimited”.
Return an InvalidRequest error (or clamp to a documented maximum) when the provided value can’t be represented safely.
procmond/src/registration.rs
Outdated
| // Infer connection status from buffer usage: | ||
| // If buffer is not full, we're likely connected (or recently were) | ||
| // This is a heuristic until EventBusConnector exposes connection state | ||
| let connection_status = if buffer_level_percent < 100.0 { | ||
| ConnectionStatus::Connected | ||
| } else { | ||
| ConnectionStatus::Disconnected | ||
| }; |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Connection status is inferred from buffer_level_percent < 100.0, but EventBusConnector already exposes is_connected(). Buffer usage reaching 100% doesn’t necessarily imply disconnection, and being <100% doesn’t guarantee connectivity.
Use self.event_bus.read().await.is_connected() (and optionally include buffer usage as a separate metric) so heartbeat connection status matches the connector’s actual view.
| // Keep RPC service reference alive (it will be used for handling incoming requests) | ||
| // In a full implementation, we would spawn a task to process RPC requests from the event bus | ||
| let _rpc_service = rpc_service; |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc_service is only kept alive via let _rpc_service = rpc_service; and no task is spawned to subscribe/read RPC requests from the event bus, so the RPC service handler is effectively inactive in production.
If RPC handling isn’t wired up yet due to EventBusConnector limitations, consider gating this behind a feature/config flag or deferring initialization until request subscription/publishing is actually implemented, to avoid giving the impression that RPC is supported.
Signed-off-by: UncleSp1d3r <unclesp1d3r@evilbitlabs.io>
- Fix incorrect uptime calculation: Track service start_time in RpcServiceHandler instead of using last_collection.elapsed() - Fix potential WAL directory conflict: Use separate subdirectory (wal/collector) for collector's EventBusConnector - Fix TOCTOU race condition: Use atomic check-and-set in register() via new try_transition_to_registering() helper method Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 8 out of 9 changed files in this pull request and generated 11 comments.
| // Check if request has expired | ||
| if request.deadline < SystemTime::now() { | ||
| warn!( | ||
| request_id = %request_id, | ||
| "RPC request deadline has passed" | ||
| ); | ||
| self.record_request_timeout().await; | ||
| return self.create_timeout_response(&request, start_time); | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle_request only checks request.deadline once up-front, but the awaited actor operations (health_check, update_config, graceful_shutdown) have no timeout and can block indefinitely if the actor stalls. To make deadline/timeout handling real, wrap each awaited operation in tokio::time::timeout using min(request.deadline, now + config.default_timeout) and return a Timeout/RpcStatus::Timeout response when exceeded.
| //! This module provides the `RpcServiceHandler` component that handles RPC requests | ||
| //! from the daemoneye-agent via the event bus. It subscribes to the control topic | ||
| //! `control.collector.procmond`, parses incoming RPC requests, and coordinates with | ||
| //! the `ProcmondMonitorCollector` actor to execute operations. |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The module-level docs say RpcServiceHandler “subscribes to the control topic” and “publishes responses”, but the implementation currently has no subscription logic and publish_response is explicitly a placeholder (it only serializes/logs). Please adjust the rustdoc to reflect the current responsibilities/limitations so callers don’t assume event-bus integration exists yet.
| let config_request = match request.payload { | ||
| RpcPayload::ConfigUpdate(ref req) => req, |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle_config_update matches on request.payload, but request is a &RpcRequest and RpcPayload is not Copy. Accessing the field this way attempts to move the payload out of a shared reference and won’t compile. Match on &request.payload (and adjust patterns) instead.
| let config_request = match request.payload { | |
| RpcPayload::ConfigUpdate(ref req) => req, | |
| let config_request = match &request.payload { | |
| RpcPayload::ConfigUpdate(req) => req, |
| let (code, category) = match *error { | ||
| RpcServiceError::SubscriptionFailed(_) => { | ||
| ("SUBSCRIPTION_FAILED", ErrorCategory::Communication) | ||
| } | ||
| RpcServiceError::PublishFailed(_) => ("PUBLISH_FAILED", ErrorCategory::Communication), |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create_error_response matches on *error even though error is a &RpcServiceError and the enum contains owned Strings. This attempts to move out of a shared reference and should not compile. Match on the reference instead (e.g., match error { RpcServiceError::SubscriptionFailed(_) => ... }) or use match error with match ergonomics.
| let reason = match request.payload { | ||
| RpcPayload::Shutdown(ref shutdown_req) => shutdown_req.reason.clone(), | ||
| RpcPayload::Lifecycle(_) | ||
| | RpcPayload::Registration(_) | ||
| | RpcPayload::RegistrationResponse(_) | ||
| | RpcPayload::Deregistration(_) | ||
| | RpcPayload::HealthCheck(_) | ||
| | RpcPayload::ConfigUpdate(_) |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handle_graceful_shutdown matches on request.payload even though request is a &RpcRequest and RpcPayload is not Copy. This tries to move the payload out of a borrowed request and should not compile. Match on &request.payload and clone only the needed fields.
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
This commit addresses all 11 Copilot review comments from PR #133: ## EventBusConnector Initialization (main.rs) - Add connect() and replay_wal() calls for collector_event_bus before use - Previously the collector's EventBusConnector was never connected, meaning events would only buffer and never publish to the broker - Move backpressure receiver to collector_event_bus (not shared event_bus) to ensure the backpressure monitor receives signals from the correct connector instance ## Actor Operation Timeouts (rpc_service.rs, registration.rs) - Add tokio::time::timeout wrappers to handle_health_check(), handle_config_update(), handle_graceful_shutdown(), and publish_heartbeat() - Without timeouts, a stalled actor would block RPC handlers indefinitely - Extract calculate_timeout() helper for deadline-aware timeout computation - Add #[allow(clippy::as_conversions)] with safety comment for u128->u64 ## Documentation Fixes - Update rpc_service.rs module docs to clarify current limitations: no subscription support, response publishing only logs/serializes - Fix misleading heartbeat log: "Heartbeat published" -> "Heartbeat prepared (event bus publish pending connector extension)" ## Test Isolation - Replace hardcoded "/tmp/test-wal" with tempfile::tempdir() in test helpers - Each test now gets its own isolated WAL directory, preventing potential cross-test interference when running in parallel Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add comprehensive integration tests that verify actor mode components work together correctly. These tests address coverage gaps identified during Copilot review: ## EventBusConnector Initialization (3 tests) - test_connector_not_connected_by_default: Verifies connector starts disconnected - test_events_buffered_when_not_connected: Verifies events buffer when not connected (catches missing connect() calls) - test_multiple_connectors_isolation: Verifies separate WAL directories don't interfere ## Backpressure Receiver (2 tests) - test_backpressure_receiver_per_connector: Verifies each connector has its own receiver (catches wrong receiver bug) - test_backpressure_signals_to_correct_receiver: Verifies signals route to correct receiver ## Actor Timeout Behavior (3 tests) - test_health_check_timeout_with_unresponsive_actor: Verifies timeout when actor doesn't respond (50ms timeout for fast tests) - test_health_check_succeeds_with_responsive_actor: Verifies success with responsive actor - test_deadline_exceeded_returns_immediately: Verifies expired deadlines return immediately without waiting ## Component Integration (3 tests) - test_registration_manager_initial_state: Verifies initial state - test_heartbeat_skipped_when_not_registered: Verifies heartbeat logic - test_shared_event_bus_between_components: Verifies RegistrationManager and RpcServiceHandler share EventBusConnector correctly ## Parallel Execution (8 tests) - Parallel tests for connectors, registration managers, and RPC handlers - All use tempfile::tempdir() for isolation - Run with --test-threads=8 to verify no interference ## Full Setup Test (1 test) - test_full_actor_mode_component_initialization: Mimics main.rs setup with two connectors, verifying backpressure receiver comes from collector's connector (not shared) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 9 out of 10 changed files in this pull request and generated 9 comments.
| // Perform registration with daemoneye-agent | ||
| info!("Registering with daemoneye-agent"); | ||
| match registration_manager.register().await { | ||
| Ok(response) => { | ||
| info!( | ||
| collector_id = %response.collector_id, | ||
| heartbeat_interval_ms = response.heartbeat_interval_ms, | ||
| assigned_topics = ?response.assigned_topics, | ||
| "Registration successful" | ||
| ); | ||
| } | ||
| Err(e) => { | ||
| // Log warning but continue - procmond can operate without registration | ||
| // in standalone/development scenarios | ||
| warn!( | ||
| error = %e, | ||
| "Registration failed, continuing in standalone mode" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Start heartbeat task (only publishes when registered) | ||
| let heartbeat_task = | ||
| RegistrationManager::spawn_heartbeat_task(Arc::clone(®istration_manager)); | ||
| info!("Heartbeat task started"); | ||
|
|
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RegistrationManager::register() and the heartbeat task are invoked in actor mode, but the current RegistrationManager implementation explicitly simulates successful registration and does not publish registration/heartbeat messages to the broker yet. As written, procmond will log “Registration successful” and start the heartbeat loop even when the broker connection failed, which is misleading operationally. Consider gating registration/heartbeat startup on real broker connectivity and/or a feature flag until generic publish/subscribe is implemented in EventBusConnector.
| Err(e) => { | ||
| self.record_request_failure().await; | ||
| error!( | ||
| request_id = %request_id, | ||
| error = %e, | ||
| "RPC request failed" | ||
| ); | ||
| self.create_error_response(&request, &e, start_time) | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RpcServiceError::Timeout is currently turned into RpcStatus::Error via create_error_response(), and only the expired-deadline path returns RpcStatus::Timeout. This makes runtime timeouts indistinguishable from other errors and also skips requests_timed_out accounting. Consider handling RpcServiceError::Timeout separately (set RpcStatus::Timeout and increment the timeout counter) instead of routing it through the generic error response path.
| Err(e) => { | |
| self.record_request_failure().await; | |
| error!( | |
| request_id = %request_id, | |
| error = %e, | |
| "RPC request failed" | |
| ); | |
| self.create_error_response(&request, &e, start_time) | |
| } | |
| Err(e) => match e { | |
| RpcServiceError::Timeout => { | |
| self.record_request_timeout().await; | |
| warn!( | |
| request_id = %request_id, | |
| "RPC request timed out during processing" | |
| ); | |
| self.create_timeout_response(&request, start_time) | |
| } | |
| other => { | |
| self.record_request_failure().await; | |
| error!( | |
| request_id = %request_id, | |
| error = %other, | |
| "RPC request failed" | |
| ); | |
| self.create_error_response(&request, &other, start_time) | |
| } | |
| }, |
| /// Whether the service is running. | ||
| running: Arc<std::sync::atomic::AtomicBool>, | ||
| /// Statistics tracking. | ||
| stats: Arc<RwLock<RpcServiceStats>>, | ||
| /// Service start time for uptime tracking. | ||
| start_time: std::time::Instant, | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RpcServiceHandler::is_running() will always return false because running is never set to true anywhere in this module (only set to false on shutdown). Either set it to true during initialization/startup (and expose an explicit start() method) or remove the flag to avoid misleading callers.
| /// Configuration for the RPC service handler. | ||
| #[derive(Debug, Clone)] | ||
| pub struct RpcServiceConfig { | ||
| /// Collector identifier. | ||
| pub collector_id: String, | ||
| /// Control topic to subscribe to. | ||
| pub control_topic: String, | ||
| /// Response topic prefix. | ||
| pub response_topic_prefix: String, | ||
| /// Default operation timeout. | ||
| pub default_timeout: Duration, | ||
| /// Maximum concurrent RPC requests. | ||
| pub max_concurrent_requests: usize, | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RpcServiceConfig::max_concurrent_requests is never enforced (only set in defaults/tests). Since this field is part of the public config surface, it currently suggests concurrency limiting that doesn’t happen. Consider enforcing it with a semaphore in handle_request() (or removing the field until it’s implemented).
procmond/src/rpc_service.rs
Outdated
| // Start with default config (in practice, we'd get current config from actor) | ||
| let mut config = ProcmondMonitorConfig::default(); | ||
|
|
||
| // Apply changes from request | ||
| for (key, value) in &config_request.config_changes { | ||
| match key.as_str() { | ||
| "collection_interval_secs" => { | ||
| if let Some(secs) = value.as_u64() { | ||
| config.base_config.collection_interval = Duration::from_secs(secs); | ||
| } | ||
| } | ||
| "max_events_in_flight" => { | ||
| if let Some(max) = value.as_u64() { | ||
| config.base_config.max_events_in_flight = | ||
| usize::try_from(max).unwrap_or(usize::MAX); | ||
| } | ||
| } |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_config_from_changes() starts from ProcmondMonitorConfig::default() and applies only the provided keys. For a "config_changes" patch request, this will reset any non-mentioned settings (e.g., CLI-provided intervals/flags) back to defaults. Consider applying changes on top of the current actor config (e.g., add an ActorHandle::get_config()/snapshot API or store the last applied config in the handler).
| pub const HEARTBEAT_TOPIC_PREFIX: &str = "control.health.heartbeat"; | ||
|
|
||
| /// Registration topic. | ||
| pub const REGISTRATION_TOPIC: &str = "control.collector.lifecycle"; |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REGISTRATION_TOPIC is set to control.collector.lifecycle, but DaemonEye’s topic constants distinguish lifecycle vs registration (see daemoneye-eventbus/src/topics.rs, collector::REGISTRATION = "control.collector.registration"). Registration/deregistration messages should use the registration topic, not the lifecycle topic.
| pub const REGISTRATION_TOPIC: &str = "control.collector.lifecycle"; | |
| pub const REGISTRATION_TOPIC: &str = "control.collector.registration"; |
| /// Publishes an RPC response to the event bus. | ||
| /// | ||
| /// The response is published to the topic derived from the correlation metadata. | ||
| /// | ||
| /// # Note | ||
| /// | ||
| /// This method is currently a placeholder. Full integration with the EventBusConnector | ||
| /// for RPC response publishing requires additional API support (raw topic publishing). | ||
| /// For now, the response should be handled by the caller. | ||
| #[allow(clippy::unused_async)] // Will be async when EventBusConnector supports RPC | ||
| pub async fn publish_response(&self, response: RpcResponse) -> RpcServiceResult<()> { | ||
| let response_topic = format!( | ||
| "{}.{}", | ||
| self.config.response_topic_prefix, response.correlation_metadata.correlation_id | ||
| ); |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
publish_response() derives the response topic from correlation_metadata.correlation_id, but the broker RPC client subscribes to control.rpc.response.{client_id} (see daemoneye-eventbus/src/rpc.rs, CollectorRpcClient::new). With the current logic, responses will be published to a topic clients are not listening to. Consider deriving the response topic from request.client_id (or passing the response topic / client_id into this method) and using correlation_id only as the broker correlation key.
| // Store assigned heartbeat interval | ||
| let assigned_interval = | ||
| Duration::from_millis(response.heartbeat_interval_ms); |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assigned heartbeat interval from the registration response is used directly to create a tokio::time::interval later. If the agent ever returns heartbeat_interval_ms = 0, tokio::time::interval(Duration::ZERO) will panic. Consider validating the assigned interval (minimum 1ms, or fall back to the configured heartbeat interval) before storing it.
| // Store assigned heartbeat interval | |
| let assigned_interval = | |
| Duration::from_millis(response.heartbeat_interval_ms); | |
| // Store assigned heartbeat interval, ensuring it is never zero | |
| let raw_heartbeat_ms = response.heartbeat_interval_ms; | |
| let sanitized_heartbeat_ms = core::cmp::max(raw_heartbeat_ms, 1); | |
| if sanitized_heartbeat_ms != raw_heartbeat_ms { | |
| warn!( | |
| collector_id = %self.config.collector_id, | |
| received_heartbeat_interval_ms = raw_heartbeat_ms, | |
| sanitized_heartbeat_interval_ms = sanitized_heartbeat_ms, | |
| "Received invalid heartbeat interval from agent; clamped to minimum of 1ms" | |
| ); | |
| } | |
| let assigned_interval = | |
| Duration::from_millis(sanitized_heartbeat_ms as u64); |
procmond/src/registration.rs
Outdated
| // Get buffer usage percentage from connector (0-100) | ||
| // Drop the lock immediately after reading | ||
| let buffer_level_percent = f64::from(self.event_bus.read().await.buffer_usage_percent()); | ||
|
|
||
| // Infer connection status from buffer usage: | ||
| // If buffer is not full, we're likely connected (or recently were) | ||
| // This is a heuristic until EventBusConnector exposes connection state | ||
| let connection_status = if buffer_level_percent < 100.0 { | ||
| ConnectionStatus::Connected | ||
| } else { | ||
| ConnectionStatus::Disconnected |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
build_heartbeat_message() infers connection_status from buffer_level_percent < 100.0, but EventBusConnector already exposes is_connected(). The current heuristic will report Connected even when disconnected but idle (empty buffer), which makes heartbeat diagnostics inaccurate. Prefer using event_bus.read().await.is_connected() (and possibly a separate "buffering" state when disconnected).
| // Get buffer usage percentage from connector (0-100) | |
| // Drop the lock immediately after reading | |
| let buffer_level_percent = f64::from(self.event_bus.read().await.buffer_usage_percent()); | |
| // Infer connection status from buffer usage: | |
| // If buffer is not full, we're likely connected (or recently were) | |
| // This is a heuristic until EventBusConnector exposes connection state | |
| let connection_status = if buffer_level_percent < 100.0 { | |
| ConnectionStatus::Connected | |
| } else { | |
| ConnectionStatus::Disconnected | |
| // Get buffer usage percentage (0-100) and connection state from connector. | |
| // Keep the read lock scope as small as possible. | |
| let (buffer_level_percent, connection_status) = { | |
| let connector = self.event_bus.read().await; | |
| let buffer_level_percent = | |
| f64::from(connector.buffer_usage_percent()); | |
| let connection_status = if connector.is_connected() { | |
| ConnectionStatus::Connected | |
| } else { | |
| ConnectionStatus::Disconnected | |
| }; | |
| (buffer_level_percent, connection_status) |
## registration.rs: Use is_connected() for connection status - Replace buffer-level heuristic with EventBusConnector::is_connected() - Buffer being <100% full doesn't guarantee connectivity - is_connected() reflects actual connection state to broker ## rpc_service.rs: Validate config values, reject overflow - build_config_from_changes now returns Result<_, RpcServiceError> - Reject max_events_in_flight > 100,000 with clear error message - Reject max_processes > 1,000,000 with clear error message - Return InvalidRequest error on u64->usize overflow instead of silently using usize::MAX (which could cause resource exhaustion) - Add tests for overflow rejection behavior - Document limitation: partial updates reset unspecified fields to defaults (TODO: fetch current config from actor) ## Already addressed in previous commits or code - Race condition in register(): try_transition_to_registering() already does atomic check-and-set with write lock - uptime_seconds: Already correctly uses self.start_time.elapsed() (service start time), not last_collection.elapsed() Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| use crate::monitor_collector::ProcmondMonitorConfig; | ||
|
|
||
| // Start with default config | ||
| // TODO: Fetch current config from actor to enable partial updates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "max_events_in_flight value {max} cannot be represented on this platform" | ||
| )) | ||
| })?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| )) | ||
| }, | ||
| )?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
| } | ||
|
|
||
| Ok(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| }; | ||
|
|
||
| let result = RpcServiceHandler::build_config_from_changes(&config_request); | ||
| assert!(result.is_err(), "Should reject overflow values"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| }; | ||
|
|
||
| let result = RpcServiceHandler::build_config_from_changes(&config_request); | ||
| assert!(result.is_err(), "Should reject overflow values"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary
RpcServiceHandlerfor handling RPC requests (HealthCheck, UpdateConfig, GracefulShutdown) with timeout handling, error categorization, and stats trackingRegistrationManagerfor registration lifecycle with daemoneye-agent including state machine, heartbeat publishing, and graceful deregistrationTest Plan
cargo clippy -p procmond -- -D warningspasses with no warningsNotes
RPC message publishing uses placeholder implementations until EventBusConnector gains generic message support (currently only supports ProcessEvent publishing). This is tracked for a future enhancement.
🤖 Generated with Claude Code