Skip to content

Implement message broker for multi-collector coordination and load balancing #115

@unclesp1d3r

Description

@unclesp1d3r

Implement multi-process collector coordination workflows via cross-process event bus

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #113 (Topic Infrastructure - Completed), #121 (Topic-Based Messaging - Completed)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.2

Overview

This task implements a cross-process daemoneye-eventbus broker component to enable coordination between the daemoneye-agent process and multiple collector process instances. The broker provides topic-based task distribution, result aggregation, and load balancing capabilities for local cross-process IPC communication only, enabling multiple instances of the same collector type (e.g., multiple procmond processes) to work together through the event bus.

Design Goals (from Parent #114)

Scope: Single-system, multi-process coordination (NOT distributed systems)

  1. Performance: <1ms p99 latency for local cross-process IPC
  2. Cross-Platform: Windows, Linux, macOS (primary); FreeBSD (secondary)
  3. Security: Local IPC only via named pipes/Unix sockets (NO network exposure)
  4. Code Safety: Zero unsafe code in daemoneye-eventbus implementation
  5. Dependencies: Only use existing workspace deps (interprocess, tokio, prost, dashmap)

Critical Architectural Context

This is a CROSS-PROCESS IPC system for LOCAL coordination:

  • daemoneye-agent process: Hosts the eventbus broker AND acts as a client (separate OS process)
  • Collector processes: Each collector (procmond-1, procmond-2, netmond-1, etc.) runs as a separate OS process on the same machine
  • IPC Transport: All communication via interprocess crate (named pipes on Windows, Unix domain sockets on Unix)
  • No Network Transport: This system is for local multi-process coordination only
┌─────────────────────────────────────────────────────────────────┐
│              daemoneye-agent (PROCESS)                          │
│              Running on: localhost                               │
│                                                                 │
│  ┌──────────────┐      ┌──────────────────────────────────┐   │
│  │ Detection    │      │  eventbus broker (hosted here)   │   │
│  │ Engine       │─IPC─▶│  - Topic routing                 │   │
│  │ (Client)     │      │  - Pub/sub patterns              │   │
│  │              │◀IPC──│  - Load balancing                │   │
│  └──────────────┘      │  - Capability routing            │   │
│                        └──────────────────────────────────┘   │
│                                      │                         │
└──────────────────────────────────────┼─────────────────────────┘
                                       │ Local Cross-Process IPC
                                       │ (Named Pipes / Unix Sockets)
         ┌─────────────────────────────┼───────────────────────────┐
         │                             │                           │
    ┌────▼────────────┐      ┌────────▼─────────┐      ┌─────────▼────────┐
    │ procmond-1      │      │ procmond-2       │      │ netmond-1        │
    │ (PROCESS)       │      │ (PROCESS)        │      │ (PROCESS)        │
    │ localhost       │      │ localhost        │      │ localhost        │
    │                 │      │                  │      │                  │
    │ Eventbus Client │      │ Eventbus Client  │      │ Eventbus Client  │
    │                 │      │                  │      │                  │
    │ Subscribe(IPC): │      │ Subscribe(IPC):  │      │ Subscribe(IPC):  │
    │ • tasks.proc    │      │ • tasks.proc     │      │ • tasks.net      │
    │                 │      │                  │      │                  │
    │ Publish(IPC):   │      │ Publish(IPC):    │      │ Publish(IPC):    │
    │ • results.proc  │      │ • results.proc   │      │ • results.net    │
    │ • health        │      │ • health         │      │ • health         │
    └─────────────────┘      └──────────────────┘      └──────────────────┘
   Separate Process         Separate Process           Separate Process
   Same Machine             Same Machine               Same Machine

Context

Current Architecture Limitations

The current architecture uses point-to-point IPC via local sockets (interprocess crate) with client-side load balancing. While this supports multiple collector process endpoints on the same machine, it has several limitations for coordinating multiple collector process instances:

  1. No centralized cross-process coordination: The daemoneye-agent maintains its own connection pool and load balancing state for all collector processes on the local system
  2. Limited local scalability: Adding/removing collector processes on the same machine requires reconfiguring the agent
  3. No result aggregation: Results from multiple local collector processes must be handled individually
  4. Complex failover: Failover logic between local collector processes is duplicated in the agent
  5. No topic-based routing: Tasks are routed client-side based on capabilities, but no cross-process pub/sub mechanism exists for local coordination

Proposed Solution: Custom Cross-Process Event Bus

Build a custom event bus using only existing workspace dependencies for local multi-process coordination:

Core Dependencies (Already in Use):

  • interprocess (v2.2.3) - Local cross-process IPC transport (named pipes/Unix sockets)
  • tokio (v1.47.1) - Async runtime for IPC operations
  • prost (v0.14.1) - Efficient protobuf serialization for cross-process messages
  • dashmap (v6+) - Lock-free concurrent HashMap for topic routing

Why Custom Implementation:

  • ✅ No external libraries or unsafe code
  • ✅ Full control over local cross-process IPC patterns
  • ✅ Optimized for local single-system performance
  • ✅ Minimal dependencies and attack surface
  • ✅ Tailored backpressure and queue strategies for local processes

Architecture Components

// Broker hosted in daemoneye-agent process
pub struct CrossProcessEventBus {
    // Topic routing: topic pattern -> subscriber processes on this machine
    topics: Arc<DashMap<String, Vec<LocalProcessSubscription>>>,
    
    // Active IPC connections to local collector processes
    local_ipc_connections: Arc<DashMap<ProcessId, IpcConnection>>,
    
    // Load balancing state for local queue groups
    local_queue_state: Arc<DashMap<String, LocalQueueGroup>>,
    
    // Backpressure management for local IPC
    flow_control: Arc<FlowController>,
}

pub struct LocalProcessSubscription {
    process_id: ProcessId,
    subscription_type: SubscriptionType,
    ipc_sender: mpsc::Sender<EventEnvelope>,
}

pub enum SubscriptionType {
    /// One-to-one: specific process subscription
    Direct,
    /// One-to-many: broadcast to all matching local subscribers
    Broadcast,
    /// Queue: load-balanced across multiple local processes
    Queue { group_name: String },
}

pub struct LocalQueueGroup {
    members: Vec<ProcessId>,
    next_index: AtomicUsize,
    strategy: LoadBalancingStrategy,
}

pub enum LoadBalancingStrategy {
    RoundRobin,
    LeastLoaded,
    Random,
}

Communication Patterns (Local IPC Only)

1. One-to-One (Direct Local Task Assignment)

// Agent → specific local procmond instance
eventbus.publish_direct(
    local_process_id,
    "control.collector.task.process",
    task
).await?;

2. One-to-Many (Local Broadcast)

// Agent → all local procmond instances on this machine
eventbus.publish_broadcast(
    "control.collector.config.reload",
    config
).await?;

3. Pub/Sub (Local Topic-Based Routing)

// Local collector subscribes to topic pattern
eventbus.subscribe("events.process.+", SubscriptionType::Broadcast).await?;

// Agent publishes to topic (routed to local subscribers only)
eventbus.publish("events.process.spawned", event).await?;

4. Queue (Load-Balanced Distribution Across Local Processes)

// Multiple local procmond processes join queue group
eventbus.subscribe_queue(
    "control.collector.task.process",
    "local-procmond-workers"
).await?;

// Agent publishes task - only ONE local process receives it
eventbus.publish_queue("control.collector.task.process", task).await?;

Backpressure Handling (Local IPC)

pub struct FlowController {
    /// Per-process send buffer limits for local IPC
    max_buffer_size: usize,
    
    /// Strategy when buffer full
    overflow_strategy: OverflowStrategy,
}

pub enum OverflowStrategy {
    /// Block producer until space available
    Block,
    /// Drop oldest messages
    DropOldest,
    /// Drop newest messages
    DropNewest,
    /// Return error to producer
    Error,
}

Implementation Steps

1. Broker Component (In daemoneye-agent process)

// daemoneye-eventbus/src/broker.rs

/// Cross-process event bus broker for LOCAL multi-process coordination
pub struct EventBusBroker {
    topics: Arc<DashMap<String, Vec<LocalProcessSubscription>>>,
    local_connections: Arc<DashMap<ProcessId, IpcConnection>>,
    queue_groups: Arc<DashMap<String, LocalQueueGroup>>,
}

impl EventBusBroker {
    /// Create new broker for local process coordination
    pub async fn new() -> Result<Self> {
        // Initialize broker with local IPC listener
    }
    
    /// Accept connection from local collector process
    pub async fn accept_local_connection(&self, stream: LocalIpcStream) -> Result<()> {
        // Handle IPC connection from collector on same machine
    }
    
    /// Publish event to local subscriber processes
    pub async fn publish_local(&self, topic: &str, payload: Vec<u8>) -> Result<()> {
        // Route to local subscribers only
    }
}

2. Client Component (In collector processes)

// daemoneye-eventbus/src/client.rs

/// Event bus client for local collector processes
pub struct EventBusClient {
    local_ipc_connection: IpcConnection,
    process_id: ProcessId,
}

impl EventBusClient {
    /// Connect to local broker in daemoneye-agent process
    pub async fn connect_local(broker_address: &str) -> Result<Self> {
        // Establish local IPC connection via interprocess crate
    }
    
    /// Subscribe to topics (local process coordination)
    pub async fn subscribe(&self, pattern: &str) -> Result<EventStream> {
        // Send subscription request via local IPC
    }
    
    /// Publish event to local broker
    pub async fn publish(&self, topic: &str, event: impl Serialize) -> Result<()> {
        // Send via local IPC to broker
    }
}

3. IPC Integration (Local Only)

// Use interprocess crate for local IPC transport

#[cfg(windows)]
use interprocess::local_socket::LocalSocketStream;

#[cfg(unix)]
use interprocess::local_socket::LocalSocketStream;

/// Local IPC connection wrapper
pub struct IpcConnection {
    stream: LocalSocketStream,
    process_id: ProcessId,
}

Acceptance Criteria

  • EventBusBroker component implemented for local multi-process coordination
  • Local cross-process IPC integration with interprocess crate (named pipes/Unix sockets)
  • Topic-based routing for local pub/sub
  • Local collector process registration and capability advertisement via IPC
  • Cross-process task distribution with load balancing (local processes only)
  • Result aggregation from multiple local collector processes via IPC
  • Local cross-process health monitoring and heartbeat tracking
  • Automatic failover when local collector processes fail
  • Performance benchmarks achieve <1ms p99 latency for local IPC
  • Zero unsafe code in daemoneye-eventbus implementation
  • Cross-platform validation (Windows, Linux, macOS, FreeBSD)
  • Unit tests for broker logic
  • Integration tests with multiple local collector processes
  • Documentation of broker architecture and local cross-process usage

Testing Strategy

Unit Tests

  • Topic routing logic within broker
  • Local load balancing algorithms
  • Capability matching
  • Local health tracking

Integration Tests

  • Local multi-process coordination: Spawn agent process + multiple collector processes on same machine
  • Task distribution across local processes via IPC
  • Result aggregation from multiple local processes
  • Local process failure and recovery scenarios
  • Cross-process load balancing validation (local only)

Performance Tests

  • Local cross-process IPC latency (agent ↔ collectors on same machine)
  • Throughput with multiple local collector processes
  • Memory overhead of broker
  • Failover response time when local processes crash
  • Target: <1ms p99 latency for local IPC

Dependencies

No new external dependencies - use only existing workspace dependencies:

  • interprocess (v2.2.3) - Critical for local cross-process IPC transport
  • prost (v0.14.1) - Cross-process message serialization
  • tokio (v1.47.1) - Async runtime
  • dashmap (v6+) - Concurrent data structures within broker

Technical Considerations

Cross-Platform Support (Local IPC)

  • Windows: Named pipes for local IPC
  • Unix (Linux/macOS/FreeBSD): Unix domain sockets for local IPC
  • Same crate compiles on all platforms with minimal feature flags
  • Validate on all primary platforms

Local IPC Performance

  • Target: <1ms p99 latency for local cross-process IPC
  • Efficient serialization with prost
  • Minimize copies across local process boundaries
  • Zero-copy optimizations where possible (future)

Local Process Lifecycle Management

  • Detect when local collector processes crash (broken pipes, connection errors)
  • Clean up subscriptions for terminated local processes
  • Automatic reconnection for transient failures

Security (Local IPC)

  • OS-provided IPC security (named pipe/Unix socket permissions)
  • Process authentication for local IPC connections
  • Prevent unauthorized topic access from local processes
  • No network exposure

Code Safety

  • Zero unsafe code in daemoneye-eventbus implementation
  • Built entirely on safe Rust abstractions
  • All dependencies are well-audited

Status: Not Started
Priority: High - Critical for local multi-collector coordination
Estimated Effort: 1-2 weeks
Depends On: #113 (Topic Infrastructure - Completed), #121 (Topic-Based Messaging - Completed)

Next Steps:

  1. Design EventBusBroker API for local cross-process coordination
  2. Implement local IPC integration with interprocess crate
  3. Create EventBusClient library for local collector processes
  4. Add integration tests with multiple local processes
  5. Implement load balancing strategies for local process selection
  6. Validate <1ms p99 latency for local IPC

Metadata

Metadata

Assignees

Labels

architectureSystem architecture and design decisionsbreaking-changeMajor design change that breaks compatibility with existing componentscore-featureCore system functionalityenhancementNew feature or requestipcInter-Process Communicationprocess-monitoringProcess monitoring and enumeration features

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions