Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 178 additions & 5 deletions buswatch-sdk/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,56 @@ use buswatch_types::{Microseconds, ModuleMetrics, ReadMetrics, Snapshot, WriteMe
use parking_lot::RwLock;

/// Thread-safe metrics for a single topic read stream.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct ReadState {
pub count: AtomicU64,
pub pending_since: RwLock<Option<Instant>>,
/// Previous count and timestamp for rate computation
pub prev_snapshot: RwLock<Option<(u64, Instant)>>,
}

impl Default for ReadState {
fn default() -> Self {
Self {
count: AtomicU64::new(0),
pending_since: RwLock::new(None),
prev_snapshot: RwLock::new(None),
}
}
}

/// Thread-safe metrics for a single topic write stream.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct WriteState {
pub count: AtomicU64,
pub pending_since: RwLock<Option<Instant>>,
/// Previous count and timestamp for rate computation
pub prev_snapshot: RwLock<Option<(u64, Instant)>>,
}

impl Default for WriteState {
fn default() -> Self {
Self {
count: AtomicU64::new(0),
pending_since: RwLock::new(None),
prev_snapshot: RwLock::new(None),
}
}
}

/// Compute rate (messages per second) from previous and current state.
fn compute_rate(prev: Option<(u64, Instant)>, current_count: u64, now: Instant) -> Option<f64> {
let (prev_count, prev_time) = prev?;
let elapsed = now.duration_since(prev_time);
let elapsed_secs = elapsed.as_secs_f64();

// Avoid division by zero and require at least 10ms between samples
if elapsed_secs < 0.01 {
return None;
}

let delta = current_count.saturating_sub(prev_count);
Some(delta as f64 / elapsed_secs)
}

/// Thread-safe metrics for a single module.
Expand Down Expand Up @@ -68,6 +107,8 @@ impl ModuleState {
}

/// Collect current metrics into a ModuleMetrics snapshot.
///
/// This also updates the previous snapshot for rate computation.
pub fn collect(&self) -> ModuleMetrics {
let now = Instant::now();

Expand All @@ -82,13 +123,20 @@ impl ModuleState {
Microseconds::from(duration)
});

// Compute rate from previous snapshot
let prev = *state.prev_snapshot.read();
let rate = compute_rate(prev, count, now);

// Update previous snapshot for next collection
*state.prev_snapshot.write() = Some((count, now));

(
topic.clone(),
ReadMetrics {
count,
backlog: None, // SDK doesn't track backlog directly
backlog: None, // SDK computes backlog at GlobalState level
pending,
rate: None, // Could compute if we track history
rate,
},
)
})
Expand All @@ -105,12 +153,19 @@ impl ModuleState {
Microseconds::from(duration)
});

// Compute rate from previous snapshot
let prev = *state.prev_snapshot.read();
let rate = compute_rate(prev, count, now);

// Update previous snapshot for next collection
*state.prev_snapshot.write() = Some((count, now));

(
topic.clone(),
WriteMetrics {
count,
pending,
rate: None,
rate,
},
)
})
Expand Down Expand Up @@ -416,4 +471,122 @@ mod tests {
assert_eq!(metrics.reads.get("topic-b").unwrap().count, 20);
assert_eq!(metrics.writes.get("topic-c").unwrap().count, 30);
}

#[test]
fn rate_is_none_on_first_collection() {
let state = ModuleState::default();

state
.get_or_create_read("topic")
.count
.fetch_add(100, Ordering::Relaxed);
state
.get_or_create_write("output")
.count
.fetch_add(50, Ordering::Relaxed);

let metrics = state.collect();

// First collection should have no rate (no previous snapshot)
assert_eq!(metrics.reads.get("topic").unwrap().rate, None);
assert_eq!(metrics.writes.get("output").unwrap().rate, None);
}

#[test]
fn rate_computed_on_second_collection() {
let state = ModuleState::default();

let read = state.get_or_create_read("topic");
let write = state.get_or_create_write("output");

// Initial state
read.count.store(0, Ordering::Relaxed);
write.count.store(0, Ordering::Relaxed);

// First collection to establish baseline
let _ = state.collect();

// Wait a bit and add some messages
std::thread::sleep(std::time::Duration::from_millis(50));
read.count.fetch_add(100, Ordering::Relaxed);
write.count.fetch_add(50, Ordering::Relaxed);

// Second collection should compute rate
let metrics = state.collect();

let read_rate = metrics.reads.get("topic").unwrap().rate;
let write_rate = metrics.writes.get("output").unwrap().rate;

assert!(read_rate.is_some(), "Read rate should be computed");
assert!(write_rate.is_some(), "Write rate should be computed");

// Rate should be approximately 100 messages / 0.05 seconds = 2000 msg/s
// Allow for timing variations
let read_rate = read_rate.unwrap();
assert!(
read_rate > 500.0 && read_rate < 10000.0,
"Read rate {} should be reasonable",
read_rate
);

let write_rate = write_rate.unwrap();
assert!(
write_rate > 250.0 && write_rate < 5000.0,
"Write rate {} should be reasonable",
write_rate
);
}

#[test]
fn rate_handles_zero_delta() {
let state = ModuleState::default();

let read = state.get_or_create_read("topic");
read.count.store(100, Ordering::Relaxed);

// First collection
let _ = state.collect();

// Wait but don't add any messages
std::thread::sleep(std::time::Duration::from_millis(20));

// Second collection
let metrics = state.collect();

let rate = metrics.reads.get("topic").unwrap().rate;
assert!(rate.is_some());
assert_eq!(rate.unwrap(), 0.0, "Rate should be 0 when no new messages");
}

#[test]
fn compute_rate_function_basic() {
let now = Instant::now();
let one_sec_ago = now - std::time::Duration::from_secs(1);

// 100 messages in 1 second = 100 msg/s
let rate = compute_rate(Some((0, one_sec_ago)), 100, now);
assert!(rate.is_some());
let rate = rate.unwrap();
assert!(
(rate - 100.0).abs() < 1.0,
"Rate should be ~100, got {}",
rate
);
}

#[test]
fn compute_rate_returns_none_when_elapsed_too_short() {
let now = Instant::now();
let just_now = now - std::time::Duration::from_millis(5); // Only 5ms

let rate = compute_rate(Some((0, just_now)), 100, now);
assert!(rate.is_none(), "Rate should be None when elapsed < 10ms");
}

#[test]
fn compute_rate_returns_none_when_no_previous() {
let now = Instant::now();
let rate = compute_rate(None, 100, now);
assert!(rate.is_none());
}
}