Skip to content

Add correlation metadata and workflow tracking for multi-collector coordination #120

@unclesp1d3r

Description

@unclesp1d3r

Add correlation metadata and multi-collector workflow support across processes

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115, #116, #117
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.8

Design Goals (from Parent #114)

Scope: Single-system correlation tracking (NOT distributed tracing)

  1. Performance: Correlation overhead <5% of base local IPC latency
  2. Cross-Platform: Windows, Linux, macOS, FreeBSD
  3. Security: Local process tracking only (no network exposure)
  4. Code Safety: Zero unsafe code in correlation logic
  5. Dependencies: Only use interprocess, tokio, prost, dashmap

Critical Architectural Context

Cross-process correlation tracking:

  • daemoneye-agent (separate process): hosts broker and correlates events from local processes
  • Collector processes (procmond, netmond, etc.): separate OS processes on same machine publishing events via IPC
  • Correlation metadata tracks events across local process boundaries
  • EventBus envelopes carry correlation data through local cross-process IPC

This is for local process coordination, NOT distributed tracing across networks

Overview

This task implements correlation metadata and multi-collector workflow support within daemoneye-eventbus, enabling the agent to track and correlate events across multiple local collector processes for complex detection workflows and forensic investigations.

Correlation Architecture

Correlation Metadata (Carried Across Local IPC)

/// Correlation metadata for tracking workflows across local processes
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CorrelationMetadata {
    /// Unique workflow identifier
    pub correlation_id: Uuid,
    
    /// Parent correlation for hierarchical workflows
    pub parent_correlation_id: Option<Uuid>,
    
    /// Root workflow identifier
    pub root_correlation_id: Uuid,
    
    /// Sequence number within workflow
    pub sequence_number: u64,
    
    /// Current workflow stage
    pub workflow_stage: String,
    
    /// Originating local collector process
    pub source_process: String,
    
    /// Flexible tagging for forensic correlation
    pub correlation_tags: HashMap<String, String>,
    
    /// Timestamp when correlation started
    pub started_at: SystemTime,
}

EventBus Envelope (Transmitted via Local IPC)

/// Event envelope carrying payload and correlation metadata via local IPC
#[derive(Debug, Serialize, Deserialize)]
pub struct EventEnvelope {
    /// Topic this event was published to
    pub topic: String,
    
    /// Serialized event payload
    pub payload: Vec<u8>,
    
    /// Timestamp when published
    pub timestamp: SystemTime,
    
    /// Local publisher process ID
    pub publisher_process_id: ProcessId,
    
    /// Correlation metadata (optional)
    pub correlation_metadata: Option<CorrelationMetadata>,
}

Correlation Tracker (In Broker Process)

/// Tracks correlation workflows across local collector processes
pub struct CorrelationTracker {
    // Active workflows indexed by correlation ID
    active_workflows: Arc<DashMap<Uuid, WorkflowState>>,
    
    // Event history for forensic analysis
    event_history: Arc<RwLock<VecDeque<CorrelatedEvent>>>,
    
    // Maximum history size
    max_history_size: usize,
}

pub struct WorkflowState {
    correlation_id: Uuid,
    started_at: Instant,
    stages: HashMap<String, StageInfo>,
    participating_processes: HashSet<ProcessId>,
    total_events: usize,
}

pub struct StageInfo {
    stage_name: String,
    started_at: Instant,
    completed_at: Option<Instant>,
    events_count: usize,
}

pub struct CorrelatedEvent {
    correlation_id: Uuid,
    event: EventEnvelope,
    received_at: Instant,
}

Workflow Scenarios

1. Simple Single-Stage Workflow (Across Local Processes)

// Agent initiates workflow
let correlation_id = Uuid::new_v4();
let metadata = CorrelationMetadata {
    correlation_id,
    root_correlation_id: correlation_id,
    workflow_stage: "enumeration".to_string(),
    source_process: "daemoneye-agent".to_string(),
    ..Default::default()
};

// Publish task with correlation to local collectors
broker.publish_with_correlation(
    "control.collector.task.process",
    task,
    metadata.clone()
).await?;

// Collector processes respond with same correlation_id
// Results automatically grouped by correlation tracker

2. Multi-Stage Workflow (Across Local Processes)

// Stage 1: Enumeration (local procmond processes)
let root_id = Uuid::new_v4();
let stage1 = CorrelationMetadata {
    correlation_id: Uuid::new_v4(),
    root_correlation_id: root_id,
    workflow_stage: "enumeration".to_string(),
    ..Default::default()
};

broker.publish_with_correlation("control.collector.task.process", enum_task, stage1).await?;

// Wait for stage 1 results from local processes
let enum_results = broker.wait_for_correlation_results(stage1.correlation_id).await?;

// Stage 2: Integrity checks on enumerated processes (local procmond)
let stage2 = CorrelationMetadata {
    correlation_id: Uuid::new_v4(),
    parent_correlation_id: Some(stage1.correlation_id),
    root_correlation_id: root_id,
    workflow_stage: "integrity".to_string(),
    correlation_tags: hashmap! {
        "parent_stage".to_string() => "enumeration".to_string(),
    },
    ..Default::default()
};

broker.publish_with_correlation("control.collector.task.process", integrity_task, stage2).await?;

// Stage 3: Network connections from suspicious processes (local netmond)
let stage3 = CorrelationMetadata {
    correlation_id: Uuid::new_v4(),
    parent_correlation_id: Some(stage2.correlation_id),
    root_correlation_id: root_id,
    workflow_stage: "network_analysis".to_string(),
    ..Default::default()
};

broker.publish_with_correlation("control.collector.task.network", network_task, stage3).await?;

3. Forensic Correlation (Local Process Investigation)

// Tag events for forensic investigation of local processes
let forensic_metadata = CorrelationMetadata {
    correlation_id: incident_id,
    root_correlation_id: incident_id,
    workflow_stage: "forensic_collection".to_string(),
    correlation_tags: hashmap! {
        "investigation_id".to_string() => "INC-12345".to_string(),
        "suspicious_pid".to_string() => "4892".to_string(),
        "analyst".to_string() => "alice".to_string(),
    },
    ..Default::default()
};

// Collect all events related to PID 4892 from local collectors
broker.publish_with_correlation("control.collector.task.process", forensic_task, forensic_metadata).await?;

// Later: Query all events for this investigation from local event history
let related_events = correlation_tracker
    .find_events_by_tag("investigation_id", "INC-12345")
    .await?;

Implementation Steps

1. Correlation Metadata Propagation

impl EventBusBroker {
    /// Publish event with correlation metadata to local subscribers
    pub async fn publish_with_correlation(
        &self,
        topic: &str,
        payload: Vec<u8>,
        correlation: CorrelationMetadata,
    ) -> Result<()> {
        let envelope = EventEnvelope {
            topic: topic.to_string(),
            payload,
            timestamp: SystemTime::now(),
            publisher_process_id: self.process_id(),
            correlation_metadata: Some(correlation.clone()),
        };
        
        // Track workflow
        self.correlation_tracker.track_event(&envelope).await?;
        
        // Route to local subscribers
        self.route_envelope(envelope).await?;
        
        Ok(())
    }
}

2. Correlation Tracking

impl CorrelationTracker {
    /// Track event in correlation workflow
    pub async fn track_event(&self, envelope: &EventEnvelope) -> Result<()> {
        if let Some(correlation) = &envelope.correlation_metadata {
            // Update workflow state
            let mut workflow = self.active_workflows
                .entry(correlation.correlation_id)
                .or_insert_with(|| WorkflowState::new(correlation.correlation_id));
            
            workflow.total_events += 1;
            workflow.participating_processes.insert(envelope.publisher_process_id);
            
            // Track stage
            let stage = workflow.stages
                .entry(correlation.workflow_stage.clone())
                .or_insert_with(|| StageInfo::new(&correlation.workflow_stage));
            
            stage.events_count += 1;
            
            // Add to event history
            let mut history = self.event_history.write().await;
            history.push_back(CorrelatedEvent {
                correlation_id: correlation.correlation_id,
                event: envelope.clone(),
                received_at: Instant::now(),
            });
            
            // Trim history if needed
            if history.len() > self.max_history_size {
                history.pop_front();
            }
        }
        
        Ok(())
    }
    
    /// Wait for all results from a correlation workflow
    pub async fn wait_for_correlation_results(
        &self,
        correlation_id: Uuid,
        timeout: Duration,
    ) -> Result<Vec<EventEnvelope>> {
        let deadline = Instant::now() + timeout;
        let mut results = Vec::new();
        
        loop {
            // Check for completion
            if let Some(workflow) = self.active_workflows.get(&correlation_id) {
                if workflow.is_complete() {
                    // Collect all events for this correlation from local history
                    let history = self.event_history.read().await;
                    for event in history.iter() {
                        if event.correlation_id == correlation_id {
                            results.push(event.event.clone());
                        }
                    }
                    return Ok(results);
                }
            }
            
            // Check timeout
            if Instant::now() > deadline {
                return Err(Error::CorrelationTimeout(correlation_id));
            }
            
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }
}

3. Forensic Query API

impl CorrelationTracker {
    /// Find events by correlation tag (for forensic investigations)
    pub async fn find_events_by_tag(
        &self,
        tag_key: &str,
        tag_value: &str,
    ) -> Result<Vec<EventEnvelope>> {
        let history = self.event_history.read().await;
        
        Ok(history
            .iter()
            .filter(|event| {
                event.event.correlation_metadata
                    .as_ref()
                    .and_then(|meta| meta.correlation_tags.get(tag_key))
                    .map(|v| v == tag_value)
                    .unwrap_or(false)
            })
            .map(|event| event.event.clone())
            .collect())
    }
    
    /// Get workflow timeline (for forensic analysis)
    pub async fn get_workflow_timeline(&self, root_correlation_id: Uuid) -> Result<WorkflowTimeline> {
        let history = self.event_history.read().await;
        
        let events: Vec<_> = history
            .iter()
            .filter(|event| {
                event.event.correlation_metadata
                    .as_ref()
                    .map(|meta| meta.root_correlation_id == root_correlation_id)
                    .unwrap_or(false)
            })
            .map(|event| TimelineEvent {
                timestamp: event.received_at,
                stage: event.event.correlation_metadata.as_ref().unwrap().workflow_stage.clone(),
                source_process: event.event.publisher_process_id,
                event: event.event.clone(),
            })
            .collect();
        
        Ok(WorkflowTimeline {
            root_correlation_id,
            events,
        })
    }
}

Acceptance Criteria

  • CorrelationMetadata implemented and serializable
  • EventEnvelope carries correlation metadata across local IPC
  • CorrelationTracker tracks workflows across local processes
  • Multi-stage workflow support with parent/child relationships
  • Forensic tagging and querying capabilities
  • Workflow timeline reconstruction from local event history
  • Correlation overhead <5% of base local IPC latency
  • Event history with configurable retention
  • Zero unsafe code in correlation logic
  • Unit tests for correlation tracking
  • Integration tests with multi-stage workflows across local processes
  • Documentation of correlation patterns

Testing Strategy

Unit Tests

  • Correlation ID generation and propagation
  • Workflow state tracking
  • Event history management
  • Tag-based querying

Integration Tests

  • Simple single-stage workflow across local processes
  • Multi-stage workflow with dependencies
  • Forensic tagging and retrieval
  • Timeline reconstruction

Performance Requirements (from Parent #114)

  • Correlation Overhead: <5% of base local IPC latency (<50μs additional per message)
  • Event History Query: <10ms for tag-based search in 10K events
  • Timeline Reconstruction: <100ms for workflows with 1K events
  • Memory Overhead: <10MB for 10K correlated events in history

Code Safety

  • Zero unsafe code in correlation tracking logic
  • Built entirely on: interprocess, tokio, prost, dashmap
  • All dependencies are well-audited

Use Cases

1. Threat Hunting (Local System)

  • Correlate process execution, network activity, and file access from local collectors
  • Tag suspicious patterns for investigation
  • Reconstruct attack timeline from local event history

2. Compliance Auditing (Local System)

  • Track multi-stage data access workflows across local processes
  • Correlate user actions with system events
  • Generate audit trails from local event history

3. Performance Monitoring (Local System)

  • Correlate resource usage across local collector domains
  • Track performance impact of specific workflows
  • Identify bottlenecks in multi-stage processing on local system

Status: Not Started
Priority: Medium
Estimated Effort: 1 week
Depends On: #115, #116, #117

Metadata

Metadata

Assignees

Labels

architectureSystem architecture and design decisionsauditAudit logging and forensic featuresdata-modelsData structure and model relatedipcInter-Process CommunicationsecuritySecurity-related issues and vulnerabilities

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions