-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(procmond): implement RPC service and registration manager #133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4486c4f
19d7ddf
58900cd
e070ee4
f26b02f
0654dc0
83194b7
cdff94d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,3 +28,4 @@ lychee = "0.22.0" | |
| markdownlint-cli2 = "0.20.0" | ||
| protobuf = "33.4" | ||
| pre-commit = "4.5.1" | ||
| protoc = "33.4" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,13 +7,15 @@ use procmond::{ | |
| ProcessEventSource, ProcessSourceConfig, | ||
| event_bus_connector::EventBusConnector, | ||
| monitor_collector::{ProcmondMonitorCollector, ProcmondMonitorConfig}, | ||
| registration::{RegistrationConfig, RegistrationManager, RegistrationState}, | ||
| rpc_service::{RpcServiceConfig, RpcServiceHandler}, | ||
| }; | ||
| use std::collections::HashMap; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
| use std::time::Duration; | ||
| use tokio::sync::{Mutex, mpsc}; | ||
| use tracing::{error, info, warn}; | ||
| use tokio::sync::{Mutex, RwLock, mpsc}; | ||
| use tracing::{debug, error, info, warn}; | ||
|
|
||
| /// Parse and validate the collection interval argument. | ||
| /// | ||
|
|
@@ -158,7 +160,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| e | ||
| })?; | ||
|
|
||
| let mut event_bus_connector = EventBusConnector::new(wal_dir).await?; | ||
| let mut event_bus_connector = EventBusConnector::new(wal_dir.clone()).await?; | ||
|
|
||
| // Attempt to connect to the broker | ||
| match event_bus_connector.connect().await { | ||
|
|
@@ -189,15 +191,105 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| } | ||
| } | ||
|
|
||
| // Take the backpressure receiver before moving connector to collector | ||
| let backpressure_rx = event_bus_connector.take_backpressure_receiver(); | ||
| // Wrap EventBusConnector in Arc<RwLock<>> for sharing between components | ||
| // Note: backpressure receiver is taken from collector_event_bus below, not here | ||
| let event_bus = Arc::new(RwLock::new(event_bus_connector)); | ||
|
|
||
| // Set the EventBusConnector on the collector | ||
| collector.set_event_bus_connector(event_bus_connector); | ||
| // ======================================================================== | ||
| // Initialize Registration Manager | ||
| // ======================================================================== | ||
| let registration_config = RegistrationConfig::default(); | ||
| let registration_manager = Arc::new(RegistrationManager::new( | ||
| Arc::clone(&event_bus), | ||
| actor_handle.clone(), | ||
| registration_config, | ||
| )); | ||
|
|
||
| info!( | ||
| collector_id = %registration_manager.collector_id(), | ||
| "Registration manager initialized" | ||
| ); | ||
|
|
||
| // Perform registration with daemoneye-agent | ||
| info!("Registering with daemoneye-agent"); | ||
| match registration_manager.register().await { | ||
| Ok(response) => { | ||
| info!( | ||
| collector_id = %response.collector_id, | ||
| heartbeat_interval_ms = response.heartbeat_interval_ms, | ||
| assigned_topics = ?response.assigned_topics, | ||
| "Registration successful" | ||
| ); | ||
| } | ||
| Err(e) => { | ||
| // Log warning but continue - procmond can operate without registration | ||
| // in standalone/development scenarios | ||
| warn!( | ||
| error = %e, | ||
| "Registration failed, continuing in standalone mode" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Start heartbeat task (only publishes when registered) | ||
| let heartbeat_task = | ||
| RegistrationManager::spawn_heartbeat_task(Arc::clone(®istration_manager)); | ||
| info!("Heartbeat task started"); | ||
|
|
||
|
Comment on lines
+213
to
+238
|
||
| // ======================================================================== | ||
| // Initialize RPC Service Handler | ||
| // ======================================================================== | ||
| let rpc_config = RpcServiceConfig::default(); | ||
| let rpc_service = | ||
| RpcServiceHandler::new(actor_handle.clone(), Arc::clone(&event_bus), rpc_config); | ||
|
|
||
| info!( | ||
| control_topic = %rpc_service.config().control_topic, | ||
| "RPC service handler initialized" | ||
| ); | ||
|
|
||
| // Create a separate EventBusConnector for the collector with its own WAL directory | ||
| // to avoid conflicts with the shared event_bus connector. | ||
| // Note: The collector takes ownership of its connector, while the registration | ||
| // and RPC services share a separate connector for control messages. | ||
| // TODO: Refactor to share the connector more elegantly when EventBusConnector | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // supports both ProcessEvent and generic message publishing. | ||
| let collector_wal_dir = wal_dir.join("collector"); | ||
| std::fs::create_dir_all(&collector_wal_dir).map_err(|e| { | ||
| error!( | ||
| wal_dir = ?collector_wal_dir, | ||
| error = %e, | ||
| "Failed to create collector WAL directory" | ||
| ); | ||
| e | ||
| })?; | ||
| let mut collector_event_bus = EventBusConnector::new(collector_wal_dir).await?; | ||
|
|
||
| // Connect collector's EventBusConnector and replay WAL (required for publishing) | ||
| match collector_event_bus.connect().await { | ||
| Ok(()) => { | ||
| info!("Collector EventBusConnector connected"); | ||
| if let Err(e) = collector_event_bus.replay_wal().await { | ||
| warn!(error = %e, "Failed to replay collector WAL"); | ||
| } | ||
| } | ||
| Err(e) => { | ||
| warn!( | ||
| error = %e, | ||
| "Collector EventBusConnector failed to connect, will buffer events" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Take backpressure receiver from the collector's event bus (not the shared one) | ||
| // so the backpressure monitor listens to the correct connector | ||
| let collector_backpressure_rx = collector_event_bus.take_backpressure_receiver(); | ||
|
|
||
| collector.set_event_bus_connector(collector_event_bus); | ||
|
|
||
| // Spawn backpressure monitor task if we have the receiver | ||
| let original_interval = Duration::from_secs(cli.interval); | ||
| let backpressure_task = backpressure_rx.map_or_else( | ||
| let backpressure_task = collector_backpressure_rx.map_or_else( | ||
| || { | ||
| warn!("Backpressure receiver not available, dynamic interval adjustment disabled"); | ||
| None | ||
|
|
@@ -214,8 +306,9 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| // Create event channel for the actor's output | ||
| let (event_tx, mut event_rx) = mpsc::channel::<CollectionEvent>(1000); | ||
|
|
||
| // Clone handle for shutdown task | ||
| // Clone handles for shutdown task | ||
| let shutdown_handle = actor_handle.clone(); | ||
| let shutdown_registration = Arc::clone(®istration_manager); | ||
|
|
||
| // Spawn task to handle graceful shutdown on Ctrl+C | ||
| let shutdown_task = tokio::spawn(async move { | ||
|
|
@@ -226,13 +319,28 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| } | ||
| info!("Received Ctrl+C, initiating graceful shutdown"); | ||
|
|
||
| // Deregister from agent | ||
| if shutdown_registration.state().await == RegistrationState::Registered { | ||
| debug!("Deregistering from daemoneye-agent"); | ||
| if let Err(e) = shutdown_registration | ||
| .deregister(Some("Graceful shutdown".to_owned())) | ||
| .await | ||
| { | ||
| warn!(error = %e, "Deregistration failed"); | ||
| } | ||
| } | ||
|
|
||
| // Send graceful shutdown to actor | ||
| match shutdown_handle.graceful_shutdown().await { | ||
| Ok(()) => info!("Actor shutdown completed successfully"), | ||
| Err(e) => error!(error = %e, "Actor shutdown failed"), | ||
| } | ||
| }); | ||
|
|
||
| // Keep RPC service reference alive (it will be used for handling incoming requests) | ||
| // In a full implementation, we would spawn a task to process RPC requests from the event bus | ||
| let _rpc_service = rpc_service; | ||
|
Comment on lines
+340
to
+342
|
||
|
|
||
| // Startup behavior: begin monitoring immediately on launch. | ||
| // | ||
| // The collector currently does not wait for an explicit "begin monitoring" | ||
|
|
@@ -298,6 +406,10 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
| info!("Backpressure monitor task aborted"); | ||
| } | ||
|
|
||
| // Clean up heartbeat task | ||
| heartbeat_task.abort(); | ||
| info!("Heartbeat task aborted"); | ||
|
|
||
| // Wait for event consumer to exit naturally (channel sender is dropped) | ||
| // Use a timeout to avoid hanging indefinitely | ||
| match tokio::time::timeout(Duration::from_secs(5), event_consumer_task).await { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lint reference here looks incorrect: the Clippy lint that suggests flipping
if !cond { ... } else { ... }into a positive condition isclippy::if_not_else(and related lints), notclippy::unnecessary_negation.Consider updating this bullet to reference the correct lint (or rewording it more generally as “prefer positive conditions / avoid
if !condwhen possible”).