diff --git a/buswatch-sdk/src/state.rs b/buswatch-sdk/src/state.rs index a5842c8..4ae55ea 100644 --- a/buswatch-sdk/src/state.rs +++ b/buswatch-sdk/src/state.rs @@ -9,17 +9,56 @@ use buswatch_types::{Microseconds, ModuleMetrics, ReadMetrics, Snapshot, WriteMe use parking_lot::RwLock; /// Thread-safe metrics for a single topic read stream. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct ReadState { pub count: AtomicU64, pub pending_since: RwLock>, + /// Previous count and timestamp for rate computation + pub prev_snapshot: RwLock>, +} + +impl Default for ReadState { + fn default() -> Self { + Self { + count: AtomicU64::new(0), + pending_since: RwLock::new(None), + prev_snapshot: RwLock::new(None), + } + } } /// Thread-safe metrics for a single topic write stream. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct WriteState { pub count: AtomicU64, pub pending_since: RwLock>, + /// Previous count and timestamp for rate computation + pub prev_snapshot: RwLock>, +} + +impl Default for WriteState { + fn default() -> Self { + Self { + count: AtomicU64::new(0), + pending_since: RwLock::new(None), + prev_snapshot: RwLock::new(None), + } + } +} + +/// Compute rate (messages per second) from previous and current state. +fn compute_rate(prev: Option<(u64, Instant)>, current_count: u64, now: Instant) -> Option { + let (prev_count, prev_time) = prev?; + let elapsed = now.duration_since(prev_time); + let elapsed_secs = elapsed.as_secs_f64(); + + // Avoid division by zero and require at least 10ms between samples + if elapsed_secs < 0.01 { + return None; + } + + let delta = current_count.saturating_sub(prev_count); + Some(delta as f64 / elapsed_secs) } /// Thread-safe metrics for a single module. @@ -68,6 +107,8 @@ impl ModuleState { } /// Collect current metrics into a ModuleMetrics snapshot. + /// + /// This also updates the previous snapshot for rate computation. pub fn collect(&self) -> ModuleMetrics { let now = Instant::now(); @@ -82,13 +123,20 @@ impl ModuleState { Microseconds::from(duration) }); + // Compute rate from previous snapshot + let prev = *state.prev_snapshot.read(); + let rate = compute_rate(prev, count, now); + + // Update previous snapshot for next collection + *state.prev_snapshot.write() = Some((count, now)); + ( topic.clone(), ReadMetrics { count, - backlog: None, // SDK doesn't track backlog directly + backlog: None, // SDK computes backlog at GlobalState level pending, - rate: None, // Could compute if we track history + rate, }, ) }) @@ -105,12 +153,19 @@ impl ModuleState { Microseconds::from(duration) }); + // Compute rate from previous snapshot + let prev = *state.prev_snapshot.read(); + let rate = compute_rate(prev, count, now); + + // Update previous snapshot for next collection + *state.prev_snapshot.write() = Some((count, now)); + ( topic.clone(), WriteMetrics { count, pending, - rate: None, + rate, }, ) }) @@ -416,4 +471,122 @@ mod tests { assert_eq!(metrics.reads.get("topic-b").unwrap().count, 20); assert_eq!(metrics.writes.get("topic-c").unwrap().count, 30); } + + #[test] + fn rate_is_none_on_first_collection() { + let state = ModuleState::default(); + + state + .get_or_create_read("topic") + .count + .fetch_add(100, Ordering::Relaxed); + state + .get_or_create_write("output") + .count + .fetch_add(50, Ordering::Relaxed); + + let metrics = state.collect(); + + // First collection should have no rate (no previous snapshot) + assert_eq!(metrics.reads.get("topic").unwrap().rate, None); + assert_eq!(metrics.writes.get("output").unwrap().rate, None); + } + + #[test] + fn rate_computed_on_second_collection() { + let state = ModuleState::default(); + + let read = state.get_or_create_read("topic"); + let write = state.get_or_create_write("output"); + + // Initial state + read.count.store(0, Ordering::Relaxed); + write.count.store(0, Ordering::Relaxed); + + // First collection to establish baseline + let _ = state.collect(); + + // Wait a bit and add some messages + std::thread::sleep(std::time::Duration::from_millis(50)); + read.count.fetch_add(100, Ordering::Relaxed); + write.count.fetch_add(50, Ordering::Relaxed); + + // Second collection should compute rate + let metrics = state.collect(); + + let read_rate = metrics.reads.get("topic").unwrap().rate; + let write_rate = metrics.writes.get("output").unwrap().rate; + + assert!(read_rate.is_some(), "Read rate should be computed"); + assert!(write_rate.is_some(), "Write rate should be computed"); + + // Rate should be approximately 100 messages / 0.05 seconds = 2000 msg/s + // Allow for timing variations + let read_rate = read_rate.unwrap(); + assert!( + read_rate > 500.0 && read_rate < 10000.0, + "Read rate {} should be reasonable", + read_rate + ); + + let write_rate = write_rate.unwrap(); + assert!( + write_rate > 250.0 && write_rate < 5000.0, + "Write rate {} should be reasonable", + write_rate + ); + } + + #[test] + fn rate_handles_zero_delta() { + let state = ModuleState::default(); + + let read = state.get_or_create_read("topic"); + read.count.store(100, Ordering::Relaxed); + + // First collection + let _ = state.collect(); + + // Wait but don't add any messages + std::thread::sleep(std::time::Duration::from_millis(20)); + + // Second collection + let metrics = state.collect(); + + let rate = metrics.reads.get("topic").unwrap().rate; + assert!(rate.is_some()); + assert_eq!(rate.unwrap(), 0.0, "Rate should be 0 when no new messages"); + } + + #[test] + fn compute_rate_function_basic() { + let now = Instant::now(); + let one_sec_ago = now - std::time::Duration::from_secs(1); + + // 100 messages in 1 second = 100 msg/s + let rate = compute_rate(Some((0, one_sec_ago)), 100, now); + assert!(rate.is_some()); + let rate = rate.unwrap(); + assert!( + (rate - 100.0).abs() < 1.0, + "Rate should be ~100, got {}", + rate + ); + } + + #[test] + fn compute_rate_returns_none_when_elapsed_too_short() { + let now = Instant::now(); + let just_now = now - std::time::Duration::from_millis(5); // Only 5ms + + let rate = compute_rate(Some((0, just_now)), 100, now); + assert!(rate.is_none(), "Rate should be None when elapsed < 10ms"); + } + + #[test] + fn compute_rate_returns_none_when_no_previous() { + let now = Instant::now(); + let rate = compute_rate(None, 100, now); + assert!(rate.is_none()); + } }