Skip to content

Implement MQTT-style topic hierarchy for daemoneye-eventbus multi-collector coordination #119

@unclesp1d3r

Description

@unclesp1d3r

Implement MQTT-style topic hierarchy for daemoneye-eventbus multi-collector coordination

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #113 (Topic Infrastructure - Completed), #121 (Topic-Based Messaging - Completed)
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.7

Overview

This issue implements the complete topic hierarchy for daemoneye-eventbus, a cross-process IPC pub/sub event bus that enables multi-process collector coordination in DaemonEye. The eventbus provides MQTT-style hierarchical topics with wildcard matching for efficient event routing between multiple separate collector processes (procmond, netmond, fsmond, perfmond) and the daemoneye-agent orchestrator process.

Critical Architectural Context

daemoneye-eventbus is a CROSS-PROCESS IPC system:

  • daemoneye-agent: Separate OS process hosting the broker and acting as a client
  • Collector Processes: Each collector (procmond, netmond, etc.) runs as a separate OS process
  • IPC Transport: Communication via interprocess crate (named pipes on Windows, Unix domain sockets on Unix)
  • Topic-Based Messaging: Pub/sub topic routing layered on top of cross-process IPC

All communication in this system happens across process boundaries via local IPC, NOT within a single process.

Problem Statement

DaemonEye's collector-core framework enables multiple collection components to operate independently, but there's currently no mechanism for cross-process coordination:

  1. Event Distribution: Collector processes cannot publish events to interested subscriber processes without direct IPC connections
  2. Control Plane: No centralized way to manage collector process lifecycle, configuration, and task distribution
  3. Health Monitoring: No unified heartbeat and status reporting across distributed collector processes
  4. Event Filtering: Cannot selectively subscribe to specific event types without receiving all events across process boundaries
  5. Multi-Collector Coordination: No way to coordinate between multiple collector process instances for load balancing and failover

Proposed Solution

Implement a lightweight, cross-process IPC pub/sub event bus with MQTT-inspired topic hierarchy that provides:

Core Topic Namespaces

1. Process Event Topics (events.process.*)

Published by procmond processes → Consumed by daemoneye-agent process via IPC:

  • events.process.lifecycle - Process start/stop/exec events
  • events.process.metadata - Process metadata updates (CPU, memory, etc.)
  • events.process.tree - Process parent-child relationship events
  • events.process.integrity - Executable hash verification events
  • events.process.anomaly - Behavioral anomaly detection events
  • events.process.batch - Bulk enumeration results

2. Collector Control Topics (control.collector.*)

Published by daemoneye-agent process → Consumed by collector processes via IPC:

  • control.collector.lifecycle - Start/stop/restart commands sent to collector processes
  • control.collector.config - Configuration updates and hot-reload for collector processes
  • control.collector.task.process - Detection tasks for procmond processes
  • control.collector.task.network - Detection tasks for netmond processes
  • control.collector.task.filesystem - Detection tasks for fsmond processes
  • control.collector.task.performance - Detection tasks for perfmond processes

3. Health Monitoring Topics (control.health.*)

Bidirectional between all processes via IPC:

  • control.health.heartbeat - Periodic keepalive signals from collector processes
  • control.health.status - Component status updates from any process
  • control.health.diagnostics - Diagnostic information requests to specific processes

4. Future Network/FS/Performance Topics (Reserved)

For future separate collector processes:

  • events.network.* - Network monitoring events from future netmond processes
  • events.filesystem.* - Filesystem monitoring events from future fsmond processes
  • events.performance.* - Performance monitoring events from future perfmond processes

Wildcard Topic Matching (Cross-Process)

Support MQTT-style wildcards for flexible cross-process subscriptions:

  • Single-level (+): Matches exactly one level

    • events.process.+ matches events.process.lifecycle but NOT events.process.tree.update
    • Agent process can subscribe to events.+.lifecycle to receive lifecycle events from all collector types
  • Multi-level (#): Matches zero or more levels (must be last segment)

    • events.# matches all event topics from all collector processes
    • events.process.# matches all events from procmond processes
    • control.collector.# matches all control messages to collector processes

Topic-Based Access Control (Cross-Process Security)

Implement security boundaries between processes:

  • Read-only topics: events.* (collector processes publish, agent process subscribes)
  • Control topics: control.* (agent process publishes, collector processes subscribe)
  • Admin topics: Restricted to daemoneye-agent process only
  • No cross-namespace permissions by default to prevent unauthorized cross-process access

Technical Implementation

Data Structures

use std::collections::HashMap;
use tokio::sync::mpsc;

/// Cross-process event bus broker (hosted in daemoneye-agent process)
pub struct EventBusBroker {
    /// Map of topics to subscriber processes (cross-process connections)
    subscriptions: HashMap<TopicPattern, Vec<ProcessSubscription>>,
    /// Access control for cross-process security
    topic_acl: HashMap<TopicPattern, AccessControl>,
    /// Active IPC connections to collector processes
    process_connections: HashMap<ProcessId, IpcConnection>,
}

/// Represents a subscription from a collector process
pub struct ProcessSubscription {
    /// ID of the subscriber process
    process_id: ProcessId,
    /// IPC channel to send events to this process
    ipc_sender: mpsc::Sender<EventEnvelope>,
}

pub struct TopicPattern {
    segments: Vec<TopicSegment>,
}

pub enum TopicSegment {
    Literal(String),
    SingleWildcard,  // +
    MultiWildcard,   // #
}

/// Event envelope sent across process boundaries via IPC
pub struct EventEnvelope {
    topic: String,
    payload: Vec<u8>,  // Serialized with prost
    timestamp: SystemTime,
    publisher_process_id: ProcessId,
    correlation_metadata: Option<CorrelationMetadata>,
}

pub enum AccessControl {
    ReadOnly,   // Can subscribe (receive via IPC)
    WriteOnly,  // Can publish (send via IPC)
    ReadWrite,  // Both
    Restricted, // daemoneye-agent only
}

Cross-Process Message Flow

1. Collector Process (procmond) → IPC → Broker (agent):
   PublishEvent { topic: "events.process.spawned", payload: ProcessEvent {...} }

2. Broker (agent) → Internal Routing:
   - Match topic against subscriptions
   - Find subscriber processes

3. Broker (agent) → IPC → Subscriber Processes:
   EventEnvelope { topic, payload, ... } sent to each matching process

4. Subscriber Process → Internal Handling:
   - Deserialize payload
   - Process event locally

Implementation Steps

  1. Topic Matching Engine (within broker process)

    impl TopicPattern {
        /// Match cross-process subscription patterns against published topics
        pub fn matches(&self, topic: &str) -> bool {
            // Implement MQTT-style wildcard matching
        }
    }
  2. Cross-Process Subscription Management

    impl EventBusBroker {
        /// Register subscription from a collector process via IPC
        pub async fn subscribe_from_process(
            &mut self,
            process_id: ProcessId,
            pattern: TopicPattern,
            ipc_channel: mpsc::Sender<EventEnvelope>
        ) -> Result<()> {
            // Validate ACL for this process
            // Register subscription
            // Return acknowledgment via IPC
        }
        
        /// Publish event from a collector process via IPC
        pub async fn publish_from_process(
            &self,
            process_id: ProcessId,
            topic: &str,
            payload: Vec<u8>
        ) -> Result<()> {
            // Validate ACL for this process
            // Route to matching subscriber processes via IPC
        }
    }
  3. IPC Integration

    /// Client library for collector processes
    pub struct EventBusClient {
        ipc_connection: IpcConnection,  // interprocess crate connection
        process_id: ProcessId,
    }
    
    impl EventBusClient {
        /// Connect to broker in daemoneye-agent process
        pub async fn connect(broker_address: &str) -> Result<Self> {
            // Establish IPC connection via interprocess crate
        }
        
        /// Subscribe to topics from this collector process
        pub async fn subscribe(&self, pattern: &str) -> Result<EventStream> {
            // Send subscription request via IPC
            // Return stream of events from broker
        }
        
        /// Publish event from this collector process
        pub async fn publish(&self, topic: &str, event: impl Serialize) -> Result<()> {
            // Serialize event
            // Send via IPC to broker
        }
    }

Acceptance Criteria

  • Complete topic hierarchy implemented in daemoneye-eventbus crate
  • MQTT-style wildcard matching (+, #) working across process boundaries
  • Topic-based ACL enforced for cross-process security
  • IPC integration with interprocess crate for cross-process communication
  • Event serialization/deserialization with prost for cross-process messages
  • Cross-process subscription management (register, unregister, enumerate)
  • Cross-process event publishing with routing to subscriber processes
  • Unit tests for topic matching engine
  • Integration tests with multiple collector processes
  • Performance benchmarks for cross-process IPC throughput
  • Documentation of topic hierarchy and cross-process usage patterns
  • Cross-platform validation (Windows named pipes, Unix domain sockets)

Testing Strategy

Unit Tests

  • Topic pattern parsing and validation
  • Wildcard matching correctness
  • ACL enforcement logic

Integration Tests

  • Multi-process coordination tests
  • Spawn daemoneye-agent broker process
  • Spawn multiple collector processes (mock procmond, netmond)
  • Verify cross-process pub/sub functionality
  • Test process failure and reconnection scenarios

Performance Tests

  • Cross-process IPC latency measurement
  • Throughput with multiple publisher/subscriber processes
  • Memory usage under sustained cross-process load
  • Wildcard matching performance at scale

Dependencies

  • interprocess (v2.2.3) - Critical for cross-process IPC transport
  • prost (v0.14.1) - Cross-process message serialization
  • tokio (v1.47.1) - Async runtime for IPC
  • dashmap (v6+) - Concurrent topic routing within broker

Technical Considerations

Cross-Process IPC Performance

  • Target: <1ms p99 latency for local IPC between processes
  • Zero-copy where possible (shared memory future optimization)
  • Efficient serialization with prost

Process Isolation & Security

  • ACL enforcement prevents unauthorized cross-process access
  • Each collector process has limited topic permissions
  • Broker validates all cross-process operations

Failure Handling

  • Detect when collector processes disconnect (broken pipes, socket errors)
  • Clean up subscriptions for terminated processes
  • Retry logic for transient IPC errors

Cross-Platform Support

  • Windows: Named pipes via CreateNamedPipe/CreateFile
  • Unix (Linux/macOS/FreeBSD): Unix domain sockets via socket()
  • Validate on all primary platforms (Windows, Linux, macOS) and secondary (FreeBSD)

Status: Not Started
Priority: High - Required for multi-collector coordination
Estimated Effort: 1 week
Depends On: #113 (Topic Infrastructure - Completed), #121 (Topic-Based Messaging - Completed)

Next Steps:

  1. Design protobuf messages for cross-process EventEnvelope
  2. Implement cross-process subscription management in broker
  3. Create EventBusClient library for collector processes
  4. Add IPC integration tests with multiple processes

Performance Requirements (from Parent #114)

  • Topic Routing Latency: <1ms p99 for matching and routing via local IPC
  • Wildcard Matching: <100μs for typical patterns (e.g., events.process.+)
  • Subscription Management: <100μs for register/unregister local process
  • ACL Enforcement: <10μs overhead per publish/subscribe operation
  • Zero Unsafe Code: All topic matching and routing logic uses safe Rust only

Code Safety

  • Zero unsafe code in topic hierarchy implementation
  • Built entirely on: interprocess (IPC), tokio (async), prost (serialization), dashmap (routing)
  • All dependencies are well-audited and widely used

Metadata

Metadata

Assignees

Labels

architectureSystem architecture and design decisionsenhancementNew feature or requestsecuritySecurity-related issues and vulnerabilities

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions