Skip to content

Epic: Multi-Process Collector Coordination via Event Bus Architecture #114

@unclesp1d3r

Description

@unclesp1d3r

Epic: Multi-Process Collector Coordination via Event Bus Architecture

Parent Epic: #93 - Develop Core DaemonEye Monitoring System
Related Issues: #87 (Event Bus Architecture), #121 (Topic Infrastructure - Completed)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6

Overview

This epic tracks the implementation of multi-process collector coordination capabilities that enable DaemonEye to scale within a single system by coordinating multiple collector processes. By implementing topic-based event distribution through the custom daemoneye-eventbus component, multiple collectors can coordinate their efforts, share workload, and provide redundancy without tight coupling.

Architecture: Cross-Process IPC with Topic-Based Messaging

Critical Architectural Clarification:

The daemoneye-eventbus is a cross-process IPC (Inter-Process Communication) system, NOT an in-process message broker. Key architectural points:

  1. Process Architecture:

    • daemoneye-agent: Separate process that hosts the eventbus broker AND acts as a client
    • Collector Processes: Each collector (procmond, netmond, fsmond, etc.) runs as a separate OS process
    • Communication: All processes communicate via cross-process IPC using the interprocess crate
  2. IPC Transport Layer:

    • Windows: Named pipes via CreateNamedPipe/CreateFile
    • Unix (Linux/macOS/FreeBSD): Unix domain sockets
    • Protocol: Protobuf-serialized messages over the IPC transport
    • Async: All IPC is async via Tokio runtime
  3. Topic-Based Messaging Layer:

    • Layered Architecture: The pub/sub topic system is built ON TOP OF the cross-process IPC infrastructure
    • Custom Implementation: Built entirely on existing dependencies (no external brokers)
    • Broker Role: daemoneye-agent hosts the broker and routes messages between collector processes
┌─────────────────────────────────────────────────────────────────────┐
│                    daemoneye-agent (PROCESS)                        │
│                                                                     │
│  ┌────────────────────┐      ┌─────────────────────────────────┐  │
│  │ Detection Engine   │      │ Eventbus Broker (hosted here)   │  │
│  │ (also a client)    │─IPC─▶│ - Topic routing                 │  │
│  │                    │      │ - Pub/sub patterns              │  │
│  │ - Task generation  │◀IPC──│ - Load balancing                │  │
│  │ - Result aggregation│     │ - Capability routing            │  │
│  └────────────────────┘      └─────────────────────────────────┘  │
│                                          │                          │
└──────────────────────────────────────────┼──────────────────────────┘
                                           │ Cross-Process IPC
                                           │ (Named Pipes / Unix Sockets)
     ┌─────────────────────────────────────┼──────────────────────────────┐
     │                                     │                              │
┌────▼────────────────┐         ┌─────────▼──────────────┐    ┌─────────▼──────────────┐
│ procmond-1 (PROCESS)│         │ procmond-2 (PROCESS)   │    │ netmond-1 (PROCESS)    │
│                     │         │                        │    │                        │
│ Eventbus Client     │         │ Eventbus Client        │    │ Eventbus Client        │
│                     │         │                        │    │                        │
│ Subscribe (IPC):    │         │ Subscribe (IPC):       │    │ Subscribe (IPC):       │
│ • tasks.proc        │         │ • tasks.proc           │    │ • tasks.net            │
│                     │         │                        │    │                        │
│ Publish (IPC):      │         │ Publish (IPC):         │    │ Publish (IPC):         │
│ • results.proc      │         │ • results.proc         │    │ • results.net          │
│ • health            │         │ • health               │    │ • health               │
└─────────────────────┘         └────────────────────────┘    └────────────────────────┘
   Separate Process                Separate Process              Separate Process

Design Goals

Scope: Single-system, multi-process coordination (NOT distributed systems)

Primary Goals

  1. Performance Optimization:

    • Support both high-load scenarios (thousands of events/second)
    • Ultra-low latency for cross-process IPC (<1ms p99 for local sockets)
    • Zero-copy where possible, minimal serialization overhead
    • Efficient backpressure handling to prevent producer/consumer rate mismatches
  2. Cross-Platform Compatibility:

    • Windows, Linux, macOS (primary support)
    • FreeBSD (secondary support)
    • Same crate compiles on all platforms with minimal feature flags
    • Platform-native IPC primitives (named pipes on Windows, Unix sockets on Unix)
  3. Security:

    • Secure IPC communications via OS-provided primitives
    • Named pipes (Windows) and Unix domain sockets reduce attack surface
    • No network exposure by default
    • Process isolation boundaries
  4. Code Safety & Dependencies:

    • No unsafe code in daemoneye-eventbus implementation
    • No external libraries beyond existing workspace dependencies
    • Built entirely on: interprocess, tokio, prost, dashmap
    • All dependencies are well-audited and widely used
  5. Communication Patterns:

    • One-to-one: Direct task assignment to specific collector process
    • One-to-many: Broadcast events to multiple subscriber processes
    • Pub/sub: Topic-based routing with wildcard matching
    • Queue support: Load-balanced distribution across multiple process instances
    • Backpressure tolerance: Producers and consumers operate at different rates

Background & Current Limitations

Current Architecture

DaemonEye's collector-core framework provides a solid foundation with:

  • EventSource Trait: Universal abstraction for collection components
  • IPC Communication: Point-to-point protobuf communication between agent and collector processes
  • Event Aggregation: In-process mpsc channels for event batching
  • Capability Negotiation: Collectors advertise their capabilities (PROCESS, NETWORK, FILESYSTEM, etc.)

Scaling Challenges

However, the current architecture faces limitations when scaling to multiple collector processes on a single system:

  1. Point-to-Point IPC: Current IPC is 1:1 between daemoneye-agent process and a single collector process
  2. No Coordination: Multiple collector processes cannot coordinate tasks or share workload
  3. No Automatic Failover: If a collector process fails, there's no automatic task redistribution
  4. Static Routing: Tasks are sent to specific collector processes rather than routed by capability
  5. No Load Balancing: Cannot distribute high-volume collection tasks across multiple process instances

Strategic Importance

Multi-collector coordination within a single system is essential for:

  • System Resource Utilization: Fully utilize multi-core CPUs by running parallel collector processes
  • Workload Isolation: Different collector types handling their domains (process, network, filesystem)
  • Fault Tolerance: Automatic failover when collector processes crash
  • Performance: Distribute high-load scenarios across multiple processes

Proposed Solution: Custom Cross-Process Event Bus

Implementation Approach

Build a custom event bus using only existing workspace dependencies:

Core Dependencies (Already in Use):

  • interprocess (v2.2.3) - Cross-platform IPC transport (named pipes/Unix sockets)
  • tokio (v1.47.1) - Async runtime for IPC operations
  • prost (v0.14.1) - Efficient protobuf serialization for cross-process messages
  • dashmap (v6+) - Lock-free concurrent HashMap for topic routing

Why Custom Implementation:

  • ✅ No external libraries or unsafe code
  • ✅ Full control over cross-process IPC patterns
  • ✅ Optimized for local single-system performance
  • ✅ Minimal dependencies and attack surface
  • ✅ Tailored backpressure and queue strategies

Architecture Components

// Broker hosted in daemoneye-agent process
pub struct CrossProcessEventBus {
    // Topic routing: topic pattern -> subscriber processes
    topics: Arc<DashMap<String, Vec<ProcessSubscription>>>,
    
    // Active IPC connections to collector processes
    ipc_connections: Arc<DashMap<ProcessId, IpcConnection>>,
    
    // Load balancing state for queue groups
    queue_state: Arc<DashMap<String, QueueGroup>>,
    
    // Backpressure management
    flow_control: Arc<FlowController>,
}

pub struct ProcessSubscription {
    process_id: ProcessId,
    subscription_type: SubscriptionType,
    ipc_sender: mpsc::Sender<EventEnvelope>,
}

pub enum SubscriptionType {
    /// One-to-one: specific process subscription
    Direct,
    /// One-to-many: broadcast to all matching subscribers
    Broadcast,
    /// Queue: load-balanced across multiple processes
    Queue { group_name: String },
}

pub struct QueueGroup {
    members: Vec<ProcessId>,
    next_index: AtomicUsize,  // Round-robin state
    strategy: LoadBalancingStrategy,
}

pub enum LoadBalancingStrategy {
    RoundRobin,
    LeastLoaded,
    Random,
}

Communication Patterns

1. One-to-One (Direct Task Assignment)

// Agent → specific procmond instance
eventbus.publish_direct(
    process_id,
    "control.collector.task.process",
    task
).await?;

2. One-to-Many (Broadcast)

// Agent → all procmond instances
eventbus.publish_broadcast(
    "control.collector.config.reload",
    config
).await?;

3. Pub/Sub (Topic-Based Routing)

// Collector subscribes to topic pattern
eventbus.subscribe("events.process.+", SubscriptionType::Broadcast).await?;

// Agent publishes to topic
eventbus.publish("events.process.spawned", event).await?;

4. Queue (Load-Balanced Distribution)

// Multiple procmond processes join queue group
eventbus.subscribe_queue(
    "control.collector.task.process",
    "procmond-workers"
).await?;

// Agent publishes task - only ONE process receives it
eventbus.publish_queue("control.collector.task.process", task).await?;

Backpressure Handling

pub struct FlowController {
    /// Per-process send buffer limits
    max_buffer_size: usize,
    
    /// Strategy when buffer full
    overflow_strategy: OverflowStrategy,
}

pub enum OverflowStrategy {
    /// Block producer until space available
    Block,
    /// Drop oldest messages
    DropOldest,
    /// Drop newest messages
    DropNewest,
    /// Return error to producer
    Error,
}

Key Components

1. Topic-Based Message Routing (Cross-Process)

Task Distribution Topics (agent → collectors via IPC):

  • control.collector.task.process - Process monitoring tasks sent to procmond processes
  • control.collector.task.network - Network monitoring tasks sent to netmond processes
  • control.collector.task.filesystem - Filesystem monitoring tasks sent to fsmond processes
  • control.collector.task.performance - Performance monitoring tasks sent to perfmond processes

Result Aggregation Topics (collectors → agent via IPC):

  • events.process.* - Process events from procmond processes
  • events.network.* - Network events from netmond processes
  • events.filesystem.* - Filesystem events from fsmond processes
  • events.performance.* - Performance events from perfmond processes

Health & Control Topics (bidirectional cross-process):

  • control.health.heartbeat - Collector process heartbeat signals
  • control.health.status - Process health status updates
  • control.collector.lifecycle - Process start/stop/config commands

2. Capability-Based Routing (Cross-Process)

Collector processes advertise their capabilities through the event bus:

  • Capability Advertisement: Each collector process publishes its SourceCaps on startup via IPC
  • Dynamic Routing: Agent routes tasks to appropriate collector processes based on capabilities
  • Automatic Discovery: New collector processes automatically join the coordination mesh via IPC
  • Graceful Departure: Collector processes unsubscribe topics on shutdown

3. Load Balancing Strategies (Cross-Process)

Strategies:

  • Round-Robin: Distribute tasks evenly across available collector processes (lowest latency)
  • Least-Loaded: Route to collector process with lowest queue depth (best for fairness)
  • Random: Random selection (simple, minimal state)

Failover:

  • Heartbeat Monitoring: Detect failed collector processes via missed heartbeats
  • Task Redistribution: Automatically reassign tasks from failed processes
  • Process Restart: Automatically restart crashed collector processes

4. Result Aggregation & Correlation (Cross-Process)

Correlation Metadata:

struct CorrelationMetadata {
    correlation_id: Uuid,           // Unique workflow identifier
    parent_correlation_id: Option<Uuid>,  // For hierarchical workflows
    root_correlation_id: Uuid,      // Original workflow root
    sequence_number: u64,           // Ordering within workflow
    workflow_stage: String,         // Current stage (e.g., "collection", "analysis")
    source_process: String,         // Originating collector process
    correlation_tags: HashMap<String, String>,  // Flexible tagging
}

Aggregation Strategies:

  • Stream-Based: Real-time aggregation as results arrive from different processes via IPC
  • Batch-Based: Collect results until timeout or threshold met
  • Correlation-Based: Group results by correlation IDs for multi-stage workflows across processes

Cross-Platform Support Matrix

Component Windows Linux macOS FreeBSD Notes
interprocess ✅ Primary ✅ Primary ✅ Primary ✅ Secondary Named pipes (Win) / Unix sockets - Cross-process IPC
tokio ✅ Primary ✅ Primary ✅ Primary ✅ Secondary Full async runtime support
tokio::sync ✅ Primary ✅ Primary ✅ Primary ✅ Secondary In-process channels (within broker process only)
prost ✅ Primary ✅ Primary ✅ Primary ✅ Secondary Pure Rust, no platform deps
dashmap ✅ Primary ✅ Primary ✅ Primary ✅ Secondary Lock-free concurrent data structures

Cross-Process Transport Strategy:

  1. Primary: Unix domain sockets (Linux/macOS/FreeBSD) or Named pipes (Windows) via interprocess for local cross-process IPC
  2. Fallback: TCP loopback (127.0.0.1) for platforms without Unix socket support (rare)
  3. Not Used: Network sockets to external systems (out of scope)

Dependency Updates Required

No new external dependencies - use only existing workspace dependencies:

[workspace.dependencies]
# Already in use - sufficient for custom event bus
interprocess = "2.2.3"  # Cross-process IPC transport (CRITICAL)
prost = "0.14.1"        # Cross-process message serialization
tokio = { version = "1.47.1", features = ["full"] }  # Async runtime

# Add for custom event bus implementation
dashmap = "6.1"         # Lock-free concurrent HashMap for topic routing
parking_lot = "0.12"    # Faster sync primitives (optional optimization)

Implementation Plan

Phase 1: Foundation ✅ (Completed - #113, #121)

  • ✅ Basic topic infrastructure in daemoneye-eventbus
  • ✅ Topic routing and subscription mechanisms
  • ✅ Wildcard topic matching support (+ and # patterns)
  • ✅ Message broker with pub/sub capabilities

Phase 2: Coordination Workflows (#115)

  • Implement topic-based task distribution for multiple collector process types
  • Create capability-based routing for task publishing across processes
  • Add result aggregation from domain-specific topics across processes
  • Implement basic load balancing and failover for collector processes
  • Crates to integrate: tokio::sync::mpsc (bounded channels), dashmap (topic routing)

Phase 3: Advanced Routing (#116, #117)

  • Task distribution logic using eventbus topic publishing to collector processes
  • Collector type routing based on process capabilities
  • Task queuing and priority handling across process boundaries
  • Dynamic routing updates as collector processes join/leave
  • Result correlation and ordering logic from multiple processes
  • Automatic task redistribution on collector process failure
  • Crates to integrate: dashmap for routing tables, queue group logic for cross-process coordination

Phase 4: Testing & Validation (#118)

  • End-to-end tests with multiple collector processes (procmond, netmond, etc.)
  • Task distribution and result aggregation workflow validation across processes
  • Load balancing and failover scenario testing with process crashes
  • Performance benchmarking under high load with multiple processes
  • Test crates: Existing criterion, proptest, insta

Phase 5: Complete Topic Hierarchy (#119)

  • Implement full events.* topic hierarchy for cross-process event routing
  • Complete control.* topic structure for cross-process control plane
  • Topic-based access control and security boundaries between processes
  • Wildcard matching optimization for cross-process subscriptions

Phase 6: Correlation & Forensics (#120)

  • Correlation metadata implementation with process tracking
  • Sequence numbering and workflow stage tracking across processes
  • Correlation ID propagation through EventBus envelopes across IPC boundaries
  • Forensic correlation tracking for security investigations across process boundaries

Benefits & Value Proposition

Performance

  • Ultra-Low Latency: <1ms p99 for local IPC operations
  • High Throughput: Thousands of events/second across processes
  • Efficient Serialization: Protobuf minimizes overhead
  • Zero-Copy Potential: Shared memory optimizations (future)

Scalability (Single System)

  • Multi-Core Utilization: Distribute work across multiple collector processes
  • Specialized Collectors: Different collector process types handle their specific domains
  • Dynamic Scaling: Collector processes can join/leave without system reconfiguration

Reliability

  • Fault Tolerance: Automatic failover prevents single points of failure across processes
  • Load Distribution: Prevents any single collector process from being overwhelmed
  • Graceful Degradation: System continues operating even if collector processes fail
  • Process Isolation: Collector crashes don't affect agent or other collectors

Security

  • Reduced Attack Surface: Local IPC only (no network exposure)
  • OS-Provided Security: Named pipes and Unix sockets with OS permissions
  • Process Boundaries: Strong isolation between components
  • Safe Code Only: No unsafe blocks in daemoneye-eventbus

Maintainability

  • Minimal Dependencies: Only well-audited crates (interprocess, tokio, prost)
  • Cross-Platform: Same code on Windows, Linux, macOS, FreeBSD
  • Observable: Built-in health monitoring and metrics across all processes
  • Flexible: Topic-based architecture adapts to new requirements without process coupling

Requirements Mapping

This epic implements the following requirements from the specification:

  • 15.1: Multi-collector support with unified event processing across processes
  • 15.3: Event coordination and task distribution across process boundaries
  • 15.4: Result aggregation and correlation from multiple collector processes
  • 16.1: Capability-based routing and dynamic feature discovery across processes
  • 16.3: Load balancing and failover mechanisms for process failures
  • 16.4: Health monitoring and availability tracking across all processes

Subtasks

Success Criteria

  • Multiple collector process instances can coordinate through cross-process event bus
  • Tasks are automatically routed to appropriate collector processes based on capabilities
  • Load balancing distributes work efficiently across collector processes
  • Automatic failover when collector processes become unavailable
  • Results are correctly aggregated with correlation metadata from multiple processes
  • Complete topic hierarchy implemented for cross-process event routing
  • End-to-end tests validate multi-collector process scenarios with crashes and restarts
  • Performance benchmarks achieve <1ms p99 latency for local IPC
  • Cross-platform support validated on Windows, Linux, macOS, and FreeBSD
  • Zero unsafe code in daemoneye-eventbus implementation
  • Backpressure handling prevents producer/consumer rate mismatches
  • Documentation updated with multi-collector process deployment guides

Technical Considerations

Cross-Platform Support

  • Full Windows and Unix support for cross-process event bus broker
  • Named Pipes (Windows) and UNIX sockets (Linux/macOS/FreeBSD) for cross-process IPC via interprocess
  • Minimal feature flags required for platform differences
  • Platform-specific testing on primary (Windows/Linux/macOS) and secondary (FreeBSD) targets

Security

  • Topic-based access control to prevent unauthorized cross-process access
  • OS-provided IPC security (named pipe/Unix socket permissions)
  • Process authentication via IPC connection metadata
  • No unsafe code in daemoneye-eventbus implementation

Performance

  • Efficient cross-process message routing with minimal IPC overhead
  • Backpressure handling to prevent overload across process boundaries
  • Configurable buffer sizes and timeouts for cross-process communication
  • Zero-copy message passing where possible with shared memory (future optimization)
  • Benchmark target: <1ms p99 latency for local cross-process IPC

Monitoring & Observability

  • Metrics for cross-process message throughput, latency, queue depths
  • Collector process health and availability tracking
  • Topic subscription monitoring across all processes
  • Integration with existing tracing infrastructure with process-aware spans

Status: In Progress
Priority: High - Essential for multi-process coordination
Estimated Effort: 3-4 weeks across all subtasks

Next Steps:

  1. Begin Phase 2 implementation (Implement message broker for multi-collector coordination and load balancing #115) with custom event bus broker
  2. Focus on cross-process IPC patterns using interprocess + tokio
  3. Implement one-to-one, one-to-many, pub/sub, and queue patterns
  4. Add backpressure handling and flow control
  5. Performance testing and optimization for <1ms p99 latency

Sub-issues

Metadata

Metadata

Assignees

Labels

core-featureCore system functionalitydaemoneyeAn important component of the larger DaemonEye suite.epicParent issue for larger effortshigh-priorityHigh priority issues requiring immediate attention

Type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions