Skip to content

End-to-end tests for multi-collector coordination with task distribution and failover #118

@unclesp1d3r

Description

@unclesp1d3r

Write end-to-end tests with multiple collector process coordination

Parent Epic: #114 - Multi-Process Collector Coordination via Event Bus Architecture
Related Issues: #115, #116, #117
Specification: .kiro/specs/daemoneye-core-monitoring/tasks.md Section 2.6.5

Design Goals Validation (from Parent #114)

This issue validates the following design goals:

  1. Performance: <1ms p99 latency for local cross-process IPC
  2. Cross-Platform: Windows, Linux, macOS, FreeBSD
  3. Security: Local IPC only via named pipes/Unix sockets
  4. Code Safety: Zero unsafe code in daemoneye-eventbus
  5. Communication Patterns: One-to-one, one-to-many, pub/sub, queue with backpressure

Overview

This task implements comprehensive end-to-end tests for multi-process collector coordination via daemoneye-eventbus, validating the entire system under various scenarios including normal operation, failures, and high load.

Test Scenarios

1. Multi-Process Coordination Tests

Scenario: Multiple Collectors of Same Type (Local Processes)

#[tokio::test]
async fn test_multiple_procmond_instances() {
    // Spawn daemoneye-agent broker process
    let broker = start_broker().await?;
    
    // Spawn 3 procmond processes on same machine
    let procmond1 = spawn_collector_process("procmond", 1).await?;
    let procmond2 = spawn_collector_process("procmond", 2).await?;
    let procmond3 = spawn_collector_process("procmond", 3).await?;
    
    // All 3 subscribe to control.collector.task.process
    
    // Publish 9 tasks - should be distributed round-robin
    for i in 0..9 {
        broker.publish("control.collector.task.process", task(i)).await?;
    }
    
    // Verify each procmond got exactly 3 tasks
    assert_eq!(procmond1.received_tasks().len(), 3);
    assert_eq!(procmond2.received_tasks().len(), 3);
    assert_eq!(procmond3.received_tasks().len(), 3);
}

Scenario: Mixed Collector Types (Local Processes)

#[tokio::test]
async fn test_mixed_collector_types() {
    let broker = start_broker().await?;
    
    // Spawn different local collector types
    let procmond = spawn_collector_process("procmond", 1).await?;
    let netmond = spawn_collector_process("netmond", 1).await?;
    let fsmond = spawn_collector_process("fsmond", 1).await?;
    
    // Publish capability-specific tasks
    broker.publish("control.collector.task.process", process_task()).await?;
    broker.publish("control.collector.task.network", network_task()).await?;
    broker.publish("control.collector.task.filesystem", fs_task()).await?;
    
    // Verify correct routing
    assert!(procmond.received_tasks().len() == 1);
    assert!(netmond.received_tasks().len() == 1);
    assert!(fsmond.received_tasks().len() == 1);
}

2. Failure and Recovery Tests

Scenario: Collector Process Crash

#[tokio::test]
async fn test_collector_process_crash() {
    let broker = start_broker().await?;
    
    // Start 2 procmond processes
    let procmond1 = spawn_collector_process("procmond", 1).await?;
    let procmond2 = spawn_collector_process("procmond", 2).await?;
    
    // Publish task to procmond1
    let task_id = broker.publish_direct(procmond1.id(), task()).await?;
    
    // Kill procmond1 mid-execution
    procmond1.kill().await?;
    
    // Wait for failover detection
    tokio::time::sleep(Duration::from_secs(35)).await;
    
    // Task should be redistributed to procmond2
    let result = broker.get_result(task_id).await?;
    assert_eq!(result.source_process_id, procmond2.id());
}

Scenario: Broker Process Restart

#[tokio::test]
async fn test_broker_restart() {
    let broker = start_broker().await?;
    
    // Spawn collectors
    let procmond = spawn_collector_process("procmond", 1).await?;
    let netmond = spawn_collector_process("netmond", 1).await?;
    
    // Restart broker process
    broker.restart().await?;
    
    // Collectors should reconnect automatically
    wait_for_reconnection(&procmond, Duration::from_secs(10)).await?;
    wait_for_reconnection(&netmond, Duration::from_secs(10)).await?;
    
    // System should be operational
    broker.publish("control.collector.task.process", task()).await?;
    let result = procmond.receive_task(Duration::from_secs(5)).await?;
    assert!(result.is_some());
}

3. Load and Performance Tests

Scenario: High Volume Task Distribution

#[tokio::test]
async fn test_high_volume_tasks() {
    let broker = start_broker().await?;
    
    // Spawn 5 procmond processes
    let collectors: Vec<_> = (0..5)
        .map(|i| spawn_collector_process("procmond", i))
        .collect::<Result<_>>().await?;
    
    // Publish 10,000 tasks
    let start = Instant::now();
    for i in 0..10_000 {
        broker.publish("control.collector.task.process", task(i)).await?;
    }
    let publish_duration = start.elapsed();
    
    // Wait for all results
    let mut results = Vec::new();
    for _ in 0..10_000 {
        results.push(broker.receive_result().await?);
    }
    let total_duration = start.elapsed();
    
    // Verify performance
    assert!(publish_duration < Duration::from_secs(5)); // <500μs per task
    assert!(total_duration < Duration::from_secs(60)); // <6ms per task roundtrip
    
    // Verify distribution
    let task_counts: Vec<_> = collectors.iter()
        .map(|c| c.received_tasks().len())
        .collect();
    
    // Should be roughly even (within 10%)
    let avg = 10_000 / 5;
    for count in task_counts {
        assert!((count as i32 - avg as i32).abs() < (avg as i32 / 10));
    }
}

Scenario: IPC Latency Benchmark

#[tokio::test]
async fn test_local_ipc_latency() {
    let broker = start_broker().await?;
    let procmond = spawn_collector_process("procmond", 1).await?;
    
    // Measure latency for 1000 requests
    let mut latencies = Vec::new();
    
    for _ in 0..1000 {
        let start = Instant::now();
        broker.publish("control.collector.task.process", task()).await?;
        let result = broker.receive_result().await?;
        latencies.push(start.elapsed());
    }
    
    // Calculate percentiles
    latencies.sort();
    let p50 = latencies[latencies.len() / 2];
    let p95 = latencies[latencies.len() * 95 / 100];
    let p99 = latencies[latencies.len() * 99 / 100];
    
    // Assert performance targets
    assert!(p50 < Duration::from_micros(500), "p50 latency too high: {:?}", p50);
    assert!(p99 < Duration::from_millis(1), "p99 latency too high: {:?}", p99);
    
    println!("Local IPC Latency - p50: {:?}, p95: {:?}, p99: {:?}", p50, p95, p99);
}

4. Backpressure and Flow Control Tests

Scenario: Slow Consumer

#[tokio::test]
async fn test_backpressure_handling() {
    let broker = start_broker_with_config(BrokerConfig {
        max_buffer_size: 100,
        overflow_strategy: OverflowStrategy::DropOldest,
    }).await?;
    
    // Spawn slow collector
    let slow_collector = spawn_collector_process_with_delay("procmond", 1, Duration::from_millis(100)).await?;
    
    // Publish 200 tasks rapidly (exceeds buffer)
    for i in 0..200 {
        broker.publish("control.collector.task.process", task(i)).await?;
    }
    
    // Collector should have dropped oldest tasks
    let received = slow_collector.received_tasks();
    assert_eq!(received.len(), 100);
    assert_eq!(received[0].id, 100); // Oldest (0-99) dropped
}

5. Cross-Platform Tests

#[cfg(all(test, target_os = "windows"))]
#[tokio::test]
async fn test_named_pipes_transport() {
    // Test Windows named pipes
    let broker = start_broker_with_transport(Transport::NamedPipes).await?;
    let procmond = spawn_collector_process("procmond", 1).await?;
    
    broker.publish("control.collector.task.process", task()).await?;
    let result = procmond.receive_task(Duration::from_secs(5)).await?;
    assert!(result.is_some());
}

#[cfg(all(test, target_family = "unix"))]
#[tokio::test]
async fn test_unix_sockets_transport() {
    // Test Unix domain sockets
    let broker = start_broker_with_transport(Transport::UnixSockets).await?;
    let procmond = spawn_collector_process("procmond", 1).await?;
    
    broker.publish("control.collector.task.process", task()).await?;
    let result = procmond.receive_task(Duration::from_secs(5)).await?;
    assert!(result.is_some());
}

Test Infrastructure

Mock Collector Process

/// Spawns a mock collector process for testing
async fn spawn_collector_process(
    collector_type: &str,
    instance: u32,
) -> Result<MockCollectorProcess> {
    let process_id = format!("{}-{}", collector_type, instance);
    
    // Spawn actual OS process running mock collector
    let child = Command::new("target/debug/mock-collector")
        .arg("--type").arg(collector_type)
        .arg("--instance").arg(instance.to_string())
        .spawn()?;
    
    // Wait for registration
    wait_for_process_ready(&process_id, Duration::from_secs(10)).await?;
    
    Ok(MockCollectorProcess {
        process_id,
        child,
    })
}

Acceptance Criteria

  • Multi-process coordination tests implemented
  • Failure and recovery tests passing
  • Load and performance tests validate targets
  • Backpressure handling tests passing
  • Cross-platform tests passing (Windows, Linux, macOS, FreeBSD)
  • Performance validated: <1ms p99 latency for local IPC
  • Code safety verified: Zero unsafe code in daemoneye-eventbus
  • Backpressure validated: All overflow strategies tested
  • CI integration for automated testing
  • Test coverage > 80% for eventbus code
  • Documentation of test scenarios

Testing Pyramid

Unit Tests (40%)

  • Topic matching logic
  • Load balancing algorithms
  • Result aggregation logic
  • Serialization/deserialization

Integration Tests (40%)

  • Multi-process coordination
  • IPC communication
  • Failure scenarios
  • Resource cleanup

End-to-End Tests (20%)

  • Full system validation
  • Performance benchmarks
  • Cross-platform validation

Performance Validation Criteria

  • Local IPC Latency: p99 < 1ms
  • Task Distribution: > 1000 tasks/sec
  • Failover Detection: < 35 seconds (3 heartbeat intervals)
  • Memory Overhead: < 50MB for broker with 10 collectors
  • Cross-Platform: Same performance on Windows/Linux/macOS/FreeBSD

Code Safety Verification

  • Run cargo build with no unsafe code warnings
  • Run cargo clippy with no unsafe code usage
  • Verify all dependencies (interprocess, tokio, prost, dashmap) are safe
  • Code review confirms zero unsafe blocks in daemoneye-eventbus

Status: Not Started
Priority: High
Estimated Effort: 1-2 weeks
Depends On: #115, #116, #117

Metadata

Metadata

Assignees

Labels

high-priorityHigh priority issues requiring immediate attentionintegrationRelated to integration testing and component integrationtestingRelated to test development and test infrastructure

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions