Skip to content

Implement Task Distribution and Capability-Based Routing via EventBus #116

@unclesp1d3r

Description

@unclesp1d3r

Implement task distribution and capability-based routing across processes

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115 (Coordination Workflows), #117 (Result Aggregation)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.3

Design Goals (from Parent #114)

Scope: Single-system, local multi-process coordination

  1. Performance: <1ms p99 latency for task routing via local cross-process IPC
  2. Cross-Platform: Windows, Linux, macOS, FreeBSD
  3. Security: Local IPC only (no network exposure)
  4. Code Safety: Zero unsafe code in routing implementation
  5. Dependencies: Only use interprocess, tokio, prost, dashmap

Critical Architectural Context

Task distribution across local processes:

  • daemoneye-agent (separate process): hosts broker, routes tasks
  • Collector processes (procmond-1, procmond-2, netmond-1, etc.): separate OS processes on same machine
  • Communication via local cross-process IPC using interprocess crate
  • Task distribution and routing happens across process boundaries on local system

Overview

This task implements the task distribution and capability-based routing system within daemoneye-eventbus, enabling the agent to intelligently route tasks to appropriate collector processes on the local system based on their advertised capabilities.

Architecture

Routing Flow (Local Cross-Process)

1. Collector Process Startup (Local Process)
   ├─▶ Connect to broker via local IPC
   ├─▶ Advertise capabilities (PROCESS, NETWORK, etc.)
   └─▶ Subscribe to relevant task topics

2. Task Creation (In Agent Process)
   ├─▶ Analyze required capabilities
   ├─▶ Match against available local collectors
   └─▶ Route to appropriate topic

3. Broker Routing (Within Agent Process)
   ├─▶ Match topic to subscribed local processes
   ├─▶ Apply load balancing strategy
   └─▶ Send via local IPC to selected process

4. Task Execution (In Collector Process)
   ├─▶ Receive task via local IPC
   ├─▶ Execute collection
   └─▶ Publish results via local IPC

Capability Registry (In Broker Process)

/// Registry of local collector process capabilities
pub struct CapabilityRegistry {
    // Map of process IDs to their capabilities (local processes only)
    collectors: Arc<DashMap<ProcessId, CollectorInfo>>,
    
    // Capability index for fast lookups (local processes)
    capability_index: Arc<DashMap<SourceCaps, Vec<ProcessId>>>,
}

pub struct CollectorInfo {
    process_id: ProcessId,
    capabilities: SourceCaps,
    health: CollectorHealth,
    last_heartbeat: Instant,
    topics: Vec<String>,
}

impl CapabilityRegistry {
    /// Register local collector process
    pub async fn register(&self, info: CollectorInfo) -> Result<()> {
        // Add to local process registry
    }
    
    /// Find local collectors matching capability
    pub fn find_by_capability(&self, cap: SourceCaps) -> Vec<ProcessId> {
        // Return matching local process IDs
    }
}

Task Router (In Broker Process)

/// Routes tasks to appropriate local collector processes
pub struct TaskRouter {
    capability_registry: Arc<CapabilityRegistry>,
    load_balancer: Arc<LoadBalancer>,
    eventbus: Arc<EventBusBroker>,
}

impl TaskRouter {
    /// Route task to appropriate local collector process(es)
    pub async fn route_task(&self, task: DetectionTask) -> Result<()> {
        // 1. Determine required capabilities
        let required_caps = task.required_capabilities();
        
        // 2. Find matching local collectors
        let candidates = self.capability_registry
            .find_by_capability(required_caps)?;
        
        // 3. Apply load balancing (local processes)
        let target = self.load_balancer
            .select(candidates, LoadBalancingStrategy::RoundRobin)?;
        
        // 4. Publish to topic (routed to local process via IPC)
        let topic = self.task_topic_for_capability(required_caps);
        self.eventbus.publish_local(&topic, task).await?;
        
        Ok(())
    }
}

Load Balancing (Across Local Processes)

pub struct LoadBalancer {
    // Track load per local collector process
    process_load: Arc<DashMap<ProcessId, ProcessLoad>>,
}

pub struct ProcessLoad {
    active_tasks: AtomicUsize,
    queue_depth: AtomicUsize,
    last_task_time: Instant,
}

pub enum LoadBalancingStrategy {
    /// Distribute evenly across local processes
    RoundRobin,
    /// Route to least-loaded local process
    LeastLoaded,
    /// Random selection among local processes
    Random,
}

Topic Mapping (Cross-Process)

// Map capabilities to task topics for local process routing
fn task_topic_for_capability(cap: SourceCaps) -> &'static str {
    match cap {
        SourceCaps::PROCESS => "control.collector.task.process",
        SourceCaps::NETWORK => "control.collector.task.network",
        SourceCaps::FILESYSTEM => "control.collector.task.filesystem",
        SourceCaps::PERFORMANCE => "control.collector.task.performance",
    }
}

Implementation Steps

1. Capability Registry

Note: "In-process" refers to code running WITHIN the broker process (daemoneye-agent), NOT within collector processes. All communication TO collectors uses cross-process IPC.

// In-process implementation within broker (uses tokio::sync for broker-internal coordination)
// IPC used for all communication WITH collector processes

impl CapabilityRegistry {
    pub fn new() -> Self {
        Self {
            collectors: Arc::new(DashMap::new()),
            capability_index: Arc::new(DashMap::new()),
        }
    }
    
    /// Register local collector process (called when process connects via IPC)
    pub async fn register(&self, info: CollectorInfo) -> Result<()> {
        let process_id = info.process_id;
        let caps = info.capabilities;
        
        // Store local collector info
        self.collectors.insert(process_id, info);
        
        // Update capability index
        self.capability_index
            .entry(caps)
            .or_insert_with(Vec::new)
            .push(process_id);
        
        Ok(())
    }
    
    /// Unregister local collector process (when process disconnects IPC)
    pub async fn unregister(&self, process_id: ProcessId) -> Result<()> {
        if let Some((_, info)) = self.collectors.remove(&process_id) {
            // Remove from capability index
            if let Some(mut pids) = self.capability_index.get_mut(&info.capabilities) {
                pids.retain(|&id| id != process_id);
            }
        }
        Ok(())
    }
}

2. Task Router

impl TaskRouter {
    pub fn new(
        capability_registry: Arc<CapabilityRegistry>,
        load_balancer: Arc<LoadBalancer>,
        eventbus: Arc<EventBusBroker>,
    ) -> Self {
        Self {
            capability_registry,
            load_balancer,
            eventbus,
        }
    }
    
    /// Route single task to local collector process
    pub async fn route_task(&self, task: DetectionTask) -> Result<()> {
        let required_caps = task.required_capabilities();
        let candidates = self.capability_registry.find_by_capability(required_caps)?;
        
        if candidates.is_empty() {
            return Err(Error::NoCapableCollector(required_caps));
        }
        
        let target = self.load_balancer.select(candidates, LoadBalancingStrategy::RoundRobin)?;
        let topic = self.task_topic_for_capability(required_caps);
        
        // Publish to local process via IPC
        self.eventbus.publish_local(&topic, task).await?;
        
        Ok(())
    }
    
    /// Broadcast task to all local collectors with capability
    pub async fn broadcast_task(&self, task: DetectionTask) -> Result<()> {
        let required_caps = task.required_capabilities();
        let topic = self.task_topic_for_capability(required_caps);
        
        // Broadcast to all local subscribers via IPC
        self.eventbus.publish_broadcast_local(&topic, task).await?;
        
        Ok(())
    }
}

3. Load Balancer

impl LoadBalancer {
    /// Select best local process for task
    pub fn select(
        &self,
        candidates: Vec<ProcessId>,
        strategy: LoadBalancingStrategy,
    ) -> Result<ProcessId> {
        match strategy {
            LoadBalancingStrategy::RoundRobin => {
                self.round_robin_select(candidates)
            }
            LoadBalancingStrategy::LeastLoaded => {
                self.least_loaded_select(candidates)
            }
            LoadBalancingStrategy::Random => {
                self.random_select(candidates)
            }
        }
    }
    
    fn least_loaded_select(&self, candidates: Vec<ProcessId>) -> Result<ProcessId> {
        candidates
            .into_iter()
            .min_by_key(|&pid| {
                self.process_load
                    .get(&pid)
                    .map(|load| load.active_tasks.load(Ordering::Relaxed))
                    .unwrap_or(usize::MAX)
            })
            .ok_or(Error::NoCollectorAvailable)
    }
}

Acceptance Criteria

  • Capability registry implemented for local collector processes
  • Task router with capability matching (local processes)
  • Load balancing strategies (RoundRobin, LeastLoaded, Random) for local processes
  • Topic-based task distribution via local IPC
  • Dynamic collector registration/unregistration for local processes
  • Health-based routing decisions (exclude unhealthy local processes)
  • Performance: <1ms p99 latency for routing decisions
  • Zero unsafe code in routing implementation
  • Unit tests for routing logic
  • Integration tests with multiple local collector types
  • Cross-platform validation (Windows, Linux, macOS, FreeBSD)
  • Documentation of routing strategies

Testing Strategy

Unit Tests

  • Capability matching logic
  • Load balancing algorithms
  • Topic selection
  • Health-based filtering

Integration Tests

  • Route tasks to correct local collector types
  • Load distribution across multiple local processes
  • Failover when local collector becomes unhealthy
  • Dynamic registration/unregistration of local processes

Performance Tests

  • Routing decision latency (<1ms p99 target)
  • Throughput with many concurrent tasks
  • Memory overhead of routing tables

Performance Requirements

  • Routing Decision Time: <1ms p99 for selecting target local process
  • Registration Time: <100μs for adding/removing local collector
  • Lookup Time: <100μs for capability matching

Code Safety

  • Zero unsafe code in routing and balancing logic
  • Built entirely on: interprocess (IPC), tokio (async), prost (serialization), dashmap (routing tables)

Status: Not Started
Priority: High
Estimated Effort: 1 week
Depends On: #115 (Coordination Workflows)

Metadata

Metadata

Assignees

Labels

architectureSystem architecture and design decisionsdaemoneyeAn important component of the larger DaemonEye suite.enhancementNew feature or request

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions