Skip to content

Implement Result Aggregation, Correlation, and Load Balancing for Multi-Collector Coordination #117

@unclesp1d3r

Description

@unclesp1d3r

Add result aggregation and load balancing for collector processes

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115 (Coordination Workflows), #116 (Task Distribution)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.4

Design Goals (from Parent #114)

Scope: Single-system, local multi-process coordination

  1. Performance: <1ms p99 latency for result aggregation from local processes
  2. Cross-Platform: Windows, Linux, macOS, FreeBSD
  3. Security: Local IPC only (no network exposure)
  4. Code Safety: Zero unsafe code in aggregation implementation
  5. Dependencies: Only use interprocess, tokio, prost, dashmap

Critical Architectural Context

Cross-process result aggregation:

  • daemoneye-agent (separate process): hosts broker, aggregates results from local collectors
  • Collector processes (procmond-1, procmond-2, netmond-1, etc.): separate OS processes on same machine
  • All communication via local cross-process IPC using interprocess crate
  • Result aggregation and load balancing happen across process boundaries

Overview

This task implements result aggregation and advanced load balancing capabilities within daemoneye-eventbus, enabling the agent to efficiently collect and correlate results from multiple collector processes on the local system.

Architecture

Result Aggregation Flow (Cross-Process)

1. Collector Processes Execute Tasks
   ├─▶ procmond-1: Process enumeration
   ├─▶ procmond-2: Process integrity checks
   └─▶ netmond-1: Network connections

2. Results Published via Local IPC
   ├─▶ procmond-1 → events.process.lifecycle
   ├─▶ procmond-2 → events.process.integrity
   └─▶ netmond-1 → events.network.connections

3. Broker Receives Results (In Agent Process)
   ├─▶ Route by topic pattern
   ├─▶ Group by correlation_id
   └─▶ Buffer until complete

4. Aggregated Results (In Agent Process)
   ├─▶ Merge related events from multiple local processes
   ├─▶ Deduplicate if needed
   └─▶ Forward to detection engine

Result Aggregator (In Broker Process)

/// Aggregates results from multiple local collector processes
pub struct ResultAggregator {
    // Pending result sets indexed by correlation ID
    pending_results: Arc<DashMap<Uuid, ResultSet>>,
    
    // Completion callback
    on_complete: Arc<dyn Fn(AggregatedResult) + Send + Sync>,
    
    // Timeout for incomplete result sets
    timeout_ms: u64,
}

pub struct ResultSet {
    correlation_id: Uuid,
    expected_collectors: HashSet<ProcessId>,
    received_results: Vec<(ProcessId, DetectionResult)>,
    created_at: Instant,
}

impl ResultAggregator {
    /// Add result from local collector process
    pub async fn add_result(
        &self,
        process_id: ProcessId,
        correlation_id: Uuid,
        result: DetectionResult,
    ) -> Result<()> {
        let mut result_set = self.pending_results
            .entry(correlation_id)
            .or_insert_with(|| ResultSet::new(correlation_id));
        
        result_set.received_results.push((process_id, result));
        
        // Check if complete
        if result_set.is_complete() {
            let aggregated = self.aggregate(result_set)?;
            (self.on_complete)(aggregated);
            self.pending_results.remove(&correlation_id);
        }
        
        Ok(())
    }
    
    /// Aggregate results from multiple local processes
    fn aggregate(&self, result_set: ResultSet) -> Result<AggregatedResult> {
        // Merge results, deduplicate, correlate
        let merged = self.merge_results(result_set.received_results)?;
        
        Ok(AggregatedResult {
            correlation_id: result_set.correlation_id,
            results: merged,
            collector_count: result_set.expected_collectors.len(),
            complete: true,
        })
    }
}

Advanced Load Balancer (Across Local Processes)

pub struct AdvancedLoadBalancer {
    // Real-time load metrics per local process
    metrics: Arc<DashMap<ProcessId, LoadMetrics>>,
    
    // Historical performance data
    history: Arc<DashMap<ProcessId, PerformanceHistory>>,
}

pub struct LoadMetrics {
    active_tasks: AtomicUsize,
    queue_depth: AtomicUsize,
    cpu_usage: AtomicU64,  // 0-100%
    memory_bytes: AtomicU64,
    last_heartbeat: AtomicU64,  // Unix timestamp
}

pub struct PerformanceHistory {
    avg_task_duration_ms: f64,
    success_rate: f64,
    total_tasks_completed: u64,
}

impl AdvancedLoadBalancer {
    /// Select best local process using weighted scoring
    pub fn select_weighted(
        &self,
        candidates: Vec<ProcessId>,
    ) -> Result<ProcessId> {
        candidates
            .into_iter()
            .max_by_key(|&pid| {
                self.calculate_score(pid)
            })
            .ok_or(Error::NoCollectorAvailable)
    }
    
    /// Calculate fitness score for local process
    fn calculate_score(&self, process_id: ProcessId) -> u64 {
        let metrics = self.metrics.get(&process_id)?;
        let history = self.history.get(&process_id)?;
        
        // Lower is better for load indicators
        let load_score = 1000 - metrics.active_tasks.load(Ordering::Relaxed) * 10;
        let queue_score = 1000 - metrics.queue_depth.load(Ordering::Relaxed) * 5;
        
        // Higher is better for performance
        let perf_score = (history.success_rate * 100.0) as u64;
        
        load_score + queue_score + perf_score
    }
    
    /// Update metrics from local collector heartbeat
    pub fn update_metrics(&self, process_id: ProcessId, heartbeat: CollectorHeartbeat) {
        let mut metrics = self.metrics.entry(process_id).or_insert_with(LoadMetrics::default);
        
        metrics.active_tasks.store(heartbeat.active_tasks as usize, Ordering::Relaxed);
        metrics.queue_depth.store(heartbeat.queue_depth as usize, Ordering::Relaxed);
        metrics.cpu_usage.store(heartbeat.cpu_usage.to_bits(), Ordering::Relaxed);
        metrics.memory_bytes.store(heartbeat.memory_bytes, Ordering::Relaxed);
    }
}

Failover Manager (For Local Processes)

pub struct FailoverManager {
    health_tracker: Arc<HealthTracker>,
    task_router: Arc<TaskRouter>,
}

impl FailoverManager {
    /// Handle local collector process failure
    pub async fn handle_failure(&self, failed_process_id: ProcessId) -> Result<()> {
        // 1. Mark process as unhealthy
        self.health_tracker.mark_unhealthy(failed_process_id).await?;
        
        // 2. Get pending tasks from failed local process
        let pending_tasks = self.get_pending_tasks(failed_process_id).await?;
        
        // 3. Redistribute to healthy local processes
        for task in pending_tasks {
            self.task_router.route_task(task).await?;
        }
        
        // 4. Clean up subscriptions
        self.cleanup_subscriptions(failed_process_id).await?;
        
        Ok(())
    }
    
    /// Detect failed local processes via heartbeat monitoring
    pub async fn monitor_heartbeats(&self) {
        loop {
            tokio::time::sleep(Duration::from_secs(10)).await;
            
            let now = Instant::now();
            let stale_processes = self.health_tracker
                .find_stale_processes(Duration::from_secs(30))
                .await?;
            
            for process_id in stale_processes {
                warn!("Local collector process {} missed heartbeat, initiating failover", process_id);
                self.handle_failure(process_id).await?;
            }
        }
    }
}

Implementation Steps

1. Result Aggregator

impl ResultAggregator {
    pub fn new(timeout_ms: u64, on_complete: impl Fn(AggregatedResult) + Send + Sync + 'static) -> Self {
        Self {
            pending_results: Arc::new(DashMap::new()),
            on_complete: Arc::new(on_complete),
            timeout_ms,
        }
    }
    
    /// Start timeout monitor for incomplete result sets
    pub fn start_timeout_monitor(self: Arc<Self>) {
        tokio::spawn(async move {
            loop {
                tokio::time::sleep(Duration::from_millis(self.timeout_ms / 2)).await;
                
                let now = Instant::now();
                let mut timed_out = Vec::new();
                
                for entry in self.pending_results.iter() {
                    let result_set = entry.value();
                    if now.duration_since(result_set.created_at).as_millis() as u64 > self.timeout_ms {
                        timed_out.push(*entry.key());
                    }
                }
                
                for correlation_id in timed_out {
                    if let Some((_, result_set)) = self.pending_results.remove(&correlation_id) {
                        warn!("Result set {} timed out, partial results available", correlation_id);
                        let partial = self.aggregate(result_set)?;
                        (self.on_complete)(partial);
                    }
                }
            }
        });
    }
}

2. Advanced Load Balancer

impl AdvancedLoadBalancer {
    /// Update performance history from completed task
    pub fn record_task_completion(
        &self,
        process_id: ProcessId,
        duration_ms: u64,
        success: bool,
    ) {
        let mut history = self.history.entry(process_id).or_insert_with(PerformanceHistory::default);
        
        // Update moving average
        let alpha = 0.3; // Smoothing factor
        history.avg_task_duration_ms = 
            alpha * duration_ms as f64 + (1.0 - alpha) * history.avg_task_duration_ms;
        
        // Update success rate
        let total = history.total_tasks_completed as f64;
        history.success_rate = 
            (history.success_rate * total + if success { 1.0 } else { 0.0 }) / (total + 1.0);
        
        history.total_tasks_completed += 1;
    }
}

3. Failover Manager

impl FailoverManager {
    /// Attempt to restart failed local collector process
    pub async fn attempt_restart(&self, failed_process_id: ProcessId) -> Result<()> {
        // Get collector metadata
        let collector_info = self.get_collector_info(failed_process_id)?;
        
        // Spawn new collector process
        let new_pid = self.spawn_collector_process(&collector_info).await?;
        
        // Wait for registration
        tokio::time::timeout(
            Duration::from_secs(30),
            self.wait_for_registration(new_pid)
        ).await??;
        
        info!("Successfully restarted local collector process {}", new_pid);
        Ok(())
    }
}

Acceptance Criteria

  • Result aggregator implemented for local collector processes
  • Advanced load balancing with weighted scoring (local processes)
  • Failover manager with automatic task redistribution (local processes)
  • Heartbeat monitoring and stale process detection
  • Performance metrics tracking per local collector process
  • Result correlation by correlation_id across local processes
  • Timeout handling for incomplete result sets
  • Automatic process restart on failure (local processes)
  • Performance: <1ms p99 latency for result aggregation
  • Zero unsafe code in aggregation and balancing logic
  • Unit tests for aggregation logic
  • Integration tests with multiple local collector failures
  • Cross-platform validation (Windows, Linux, macOS, FreeBSD)
  • Documentation of aggregation strategies

Testing Strategy

Unit Tests

  • Result merging and deduplication
  • Weighted scoring algorithm
  • Timeout handling
  • Heartbeat staleness detection

Integration Tests

  • Aggregate results from multiple local processes
  • Load balancing across healthy local processes
  • Failover when local process crashes
  • Task redistribution on failure

Performance Tests

  • Aggregation latency with varying local process counts
  • Load balancing decision time
  • Failover response time (<3 heartbeat intervals = 30s)
  • Target: <1ms p99 latency for aggregation

Performance Requirements

  • Result Aggregation Latency: <1ms p99 for merging results from local processes
  • Load Balancing Decision Time: <1ms for weighted scoring
  • Failover Detection: Within 3 heartbeat intervals (default 30s)
  • Task Redistribution: <10s for all pending tasks from failed local process

Code Safety

  • Zero unsafe code in aggregation and balancing logic
  • Built entirely on: interprocess, tokio, prost, dashmap
  • All dependencies are well-audited

Status: Not Started
Priority: High
Estimated Effort: 1 week
Depends On: #115 (Coordination Workflows), #116 (Task Distribution)

Metadata

Metadata

Assignees

Labels

architectureSystem architecture and design decisionscore-featureCore system functionalitydaemoneyeAn important component of the larger DaemonEye suite.high-priorityHigh priority issues requiring immediate attention

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions