From 52387ce9966d01bd07da9f45626ebe28ac690dcf Mon Sep 17 00:00:00 2001 From: Maxime Riaud Date: Fri, 23 Jan 2026 15:05:00 +0100 Subject: [PATCH 1/3] feat(capture): make tick duration configurable Add configurable sampling frequency to CaptureManager via tick_duration_ms parameter. Previously hardcoded to 1000ms (1Hz), now supports arbitrary intervals (e.g., 100ms for 10Hz, 10ms for 100Hz). Changes: - Add CaptureManagerBuilder for ergonomic configuration - Add SignalWatchers struct to group required signal watchers - Add tick_duration_ms parameter to all constructors - Export DEFAULT_TICK_DURATION_MS constant (1000ms) - Standardize tick_duration_ms type to u64 everywhere Breaking changes: - Constructor signatures changed to use SignalWatchers - new_with_format, new_jsonl, new_parquet, new_multi all have new signatures Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/manager.rs | 310 ++++++++++++++++---- lading_capture/src/manager/state_machine.rs | 36 ++- 2 files changed, 275 insertions(+), 71 deletions(-) diff --git a/lading_capture/src/manager.rs b/lading_capture/src/manager.rs index 2ca2a6c9f..75eab9202 100644 --- a/lading_capture/src/manager.rs +++ b/lading_capture/src/manager.rs @@ -31,9 +31,192 @@ use rustc_hash::FxHashMap; use state_machine::{Event, Operation, StateMachine}; use tracing::{error, info, warn}; -/// Duration of a single `Accumulator` tick in milliseconds, drives the -/// `CaptureManager` polling interval. -const TICK_DURATION_MS: u128 = 1_000; +/// Default duration of a single `Accumulator` tick in milliseconds. +/// Can be overridden via [`CaptureManagerBuilder::tick_duration_ms`]. +pub const DEFAULT_TICK_DURATION_MS: u64 = 1_000; + +/// Signal watchers required by [`CaptureManager`]. +/// +/// Groups the three signal watchers to reduce argument count in constructors. +#[derive(Debug)] +pub struct SignalWatchers { + /// Watcher for shutdown signal. + pub shutdown: lading_signal::Watcher, + /// Watcher for experiment started signal. + pub experiment_started: lading_signal::Watcher, + /// Watcher for target running signal. + pub target_running: lading_signal::Watcher, +} + +/// Builder for [`CaptureManager`]. +/// +/// # Example +/// +/// ```ignore +/// let manager = CaptureManagerBuilder::new(format) +/// .flush_seconds(5) +/// .tick_duration_ms(100) // 10Hz sampling +/// .expiration(Duration::from_secs(120)) +/// .shutdown(shutdown_watcher) +/// .experiment_started(experiment_watcher) +/// .target_running(target_watcher) +/// .build(); +/// ``` +pub struct CaptureManagerBuilder { + format: F, + flush_seconds: u64, + tick_duration_ms: u64, + expiration: Duration, + shutdown: Option, + experiment_started: Option, + target_running: Option, + clock: C, +} + +impl std::fmt::Debug for CaptureManagerBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CaptureManagerBuilder") + .field("flush_seconds", &self.flush_seconds) + .field("tick_duration_ms", &self.tick_duration_ms) + .field("expiration", &self.expiration) + .finish_non_exhaustive() + } +} + +impl CaptureManagerBuilder { + /// Create a new builder with the given output format. + /// + /// Uses sensible defaults: + /// - `flush_seconds`: 1 + /// - `tick_duration_ms`: 1000 (1Hz) + /// - `expiration`: 60 seconds + /// - `clock`: [`RealClock`] + pub fn new(format: F) -> Self { + Self { + format, + flush_seconds: 1, + tick_duration_ms: DEFAULT_TICK_DURATION_MS, + expiration: Duration::from_secs(60), + shutdown: None, + experiment_started: None, + target_running: None, + clock: RealClock::default(), + } + } +} + +impl CaptureManagerBuilder { + /// Set the flush interval in seconds. + #[must_use] + pub fn flush_seconds(mut self, seconds: u64) -> Self { + self.flush_seconds = seconds; + self + } + + /// Set the tick duration in milliseconds. + /// + /// This controls the sampling frequency: + /// - 1000ms = 1Hz (default) + /// - 100ms = 10Hz + /// - 10ms = 100Hz + #[must_use] + pub fn tick_duration_ms(mut self, ms: u64) -> Self { + self.tick_duration_ms = ms; + self + } + + /// Set the metric expiration duration. + #[must_use] + pub fn expiration(mut self, expiration: Duration) -> Self { + self.expiration = expiration; + self + } + + /// Set the shutdown signal watcher (required). + #[must_use] + pub fn shutdown(mut self, watcher: lading_signal::Watcher) -> Self { + self.shutdown = Some(watcher); + self + } + + /// Set the experiment started signal watcher (required). + #[must_use] + pub fn experiment_started(mut self, watcher: lading_signal::Watcher) -> Self { + self.experiment_started = Some(watcher); + self + } + + /// Set the target running signal watcher (required). + #[must_use] + pub fn target_running(mut self, watcher: lading_signal::Watcher) -> Self { + self.target_running = Some(watcher); + self + } + + /// Set all signal watchers at once (required). + #[must_use] + pub fn signals(mut self, signals: SignalWatchers) -> Self { + self.shutdown = Some(signals.shutdown); + self.experiment_started = Some(signals.experiment_started); + self.target_running = Some(signals.target_running); + self + } + + /// Set a custom clock (for testing). + #[must_use] + pub fn clock(self, clock: C2) -> CaptureManagerBuilder { + CaptureManagerBuilder { + format: self.format, + flush_seconds: self.flush_seconds, + tick_duration_ms: self.tick_duration_ms, + expiration: self.expiration, + shutdown: self.shutdown, + experiment_started: self.experiment_started, + target_running: self.target_running, + clock, + } + } + + /// Build the [`CaptureManager`]. + /// + /// # Panics + /// + /// Panics if `shutdown`, `experiment_started`, or `target_running` were not set. + pub fn build(self) -> CaptureManager + where + C: Clone + 'static, + { + let shutdown = self + .shutdown + .expect("shutdown watcher is required - call .shutdown()"); + let experiment_started = self + .experiment_started + .expect("experiment_started watcher is required - call .experiment_started()"); + let target_running = self + .target_running + .expect("target_running watcher is required - call .target_running()"); + + let registry = Arc::new(Registry::new(AtomicStorage)); + let (snd, recv) = mpsc::channel(10_000); + let accumulator = Accumulator::new(); + + CaptureManager { + expiration: self.expiration, + format: self.format, + flush_seconds: self.flush_seconds, + tick_duration_ms: self.tick_duration_ms, + shutdown: Some(shutdown), + _experiment_started: experiment_started, + target_running, + registry, + accumulator, + global_labels: FxHashMap::default(), + snd, + recv, + clock: self.clock, + } + } +} pub(crate) struct Sender { pub(crate) snd: mpsc::Sender, @@ -266,6 +449,8 @@ pub struct CaptureManager { expiration: Duration, format: F, flush_seconds: u64, + /// Duration of a single tick in milliseconds (default: 1000ms = 1Hz) + tick_duration_ms: u64, shutdown: Option, _experiment_started: lading_signal::Watcher, target_running: lading_signal::Watcher, @@ -287,35 +472,29 @@ impl std::fmt::Debug for CaptureManager { } impl CaptureManager { - /// Create a new [`CaptureManager`] with a custom format and clock + /// Create a new [`CaptureManager`] with a custom format and clock. + /// + /// Consider using [`CaptureManagerBuilder`] for a more ergonomic API. + /// + /// # Arguments + /// + /// * `tick_duration_ms` - Duration of a single tick in milliseconds (e.g., 1000 for 1Hz, + /// 500 for 2Hz, 100 for 10Hz). This controls the sampling frequency. pub fn new_with_format( format: F, flush_seconds: u64, - shutdown: lading_signal::Watcher, - experiment_started: lading_signal::Watcher, - target_running: lading_signal::Watcher, + tick_duration_ms: u64, expiration: Duration, + signals: SignalWatchers, clock: C, ) -> Self { - let registry = Arc::new(Registry::new(AtomicStorage)); - - let (snd, recv) = mpsc::channel(10_000); // total arbitrary constant - let accumulator = Accumulator::new(); - - Self { - expiration, - format, - flush_seconds, - shutdown: Some(shutdown), - _experiment_started: experiment_started, - target_running, - registry, - accumulator, - global_labels: FxHashMap::default(), - snd, - recv, - clock, - } + CaptureManagerBuilder::new(format) + .flush_seconds(flush_seconds) + .tick_duration_ms(tick_duration_ms) + .expiration(expiration) + .signals(signals) + .clock(clock) + .build() } /// Install the [`CaptureManager`] as global [`metrics::Recorder`] @@ -374,7 +553,7 @@ impl CaptureManager { let mut flush_interval = self .clock - .interval(Duration::from_millis(TICK_DURATION_MS as u64)); + .interval(Duration::from_millis(self.tick_duration_ms)); let shutdown_wait = self .shutdown .take() @@ -387,6 +566,7 @@ impl CaptureManager { self.expiration, self.format, self.flush_seconds, + self.tick_duration_ms, self.registry, self.accumulator, self.global_labels, @@ -417,49 +597,52 @@ impl CaptureManager { impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based JSONL writer /// + /// # Arguments + /// + /// * `tick_duration_ms` - Duration of a single tick in milliseconds (e.g., 1000 for 1Hz) + /// /// # Errors /// /// Function will error if the underlying capture file cannot be opened. pub async fn new_jsonl( capture_path: PathBuf, flush_seconds: u64, - shutdown: lading_signal::Watcher, - experiment_started: lading_signal::Watcher, - target_running: lading_signal::Watcher, + tick_duration_ms: u64, expiration: Duration, + signals: SignalWatchers, ) -> Result { let fp = fs::File::create(&capture_path).await?; let fp = fp.into_std().await; let writer = BufWriter::new(fp); let format = jsonl::Format::new(writer); - Ok(Self::new_with_format( - format, - flush_seconds, - shutdown, - experiment_started, - target_running, - expiration, - RealClock::default(), - )) + Ok(CaptureManagerBuilder::new(format) + .flush_seconds(flush_seconds) + .tick_duration_ms(tick_duration_ms) + .expiration(expiration) + .signals(signals) + .build()) } } impl CaptureManager>, RealClock> { /// Create a new [`CaptureManager`] with file-based Parquet writer /// + /// # Arguments + /// + /// * `tick_duration_ms` - Duration of a single tick in milliseconds (e.g., 1000 for 1Hz) + /// /// # Errors /// /// Function will error if the underlying capture file cannot be opened or /// if Parquet writer creation fails. pub async fn new_parquet( capture_path: PathBuf, - flush_seconds: u64, compression_level: i32, - shutdown: lading_signal::Watcher, - experiment_started: lading_signal::Watcher, - target_running: lading_signal::Watcher, + flush_seconds: u64, + tick_duration_ms: u64, expiration: Duration, + signals: SignalWatchers, ) -> Result { let fp = fs::File::create(&capture_path) .await @@ -468,15 +651,12 @@ impl CaptureManager>, RealCloc let writer = BufWriter::new(fp); let format = parquet::Format::new(writer, compression_level)?; - Ok(Self::new_with_format( - format, - flush_seconds, - shutdown, - experiment_started, - target_running, - expiration, - RealClock::default(), - )) + Ok(CaptureManagerBuilder::new(format) + .flush_seconds(flush_seconds) + .tick_duration_ms(tick_duration_ms) + .expiration(expiration) + .signals(signals) + .build()) } } @@ -492,18 +672,21 @@ impl /// is used to generate two output files: `{base_path}.jsonl` and /// `{base_path}.parquet`. /// + /// # Arguments + /// + /// * `tick_duration_ms` - Duration of a single tick in milliseconds (e.g., 1000 for 1Hz) + /// /// # Errors /// /// Function will error if either capture file cannot be opened or if /// format creation fails. pub async fn new_multi( base_path: PathBuf, - flush_seconds: u64, compression_level: i32, - shutdown: lading_signal::Watcher, - experiment_started: lading_signal::Watcher, - target_running: lading_signal::Watcher, + flush_seconds: u64, + tick_duration_ms: u64, expiration: Duration, + signals: SignalWatchers, ) -> Result { let jsonl_path = base_path.with_extension("jsonl"); let parquet_path = base_path.with_extension("parquet"); @@ -524,15 +707,12 @@ impl let format = multi::Format::new(jsonl_format, parquet_format); - Ok(Self::new_with_format( - format, - flush_seconds, - shutdown, - experiment_started, - target_running, - expiration, - RealClock::default(), - )) + Ok(CaptureManagerBuilder::new(format) + .flush_seconds(flush_seconds) + .tick_duration_ms(tick_duration_ms) + .expiration(expiration) + .signals(signals) + .build()) } } diff --git a/lading_capture/src/manager/state_machine.rs b/lading_capture/src/manager/state_machine.rs index 9d9a2c0fb..a60366e99 100644 --- a/lading_capture/src/manager/state_machine.rs +++ b/lading_capture/src/manager/state_machine.rs @@ -27,8 +27,10 @@ use std::sync::atomic::Ordering; use tracing::{debug, info, trace, warn}; use uuid::Uuid; -/// Duration of a single `Accumulator` tick in milliseconds -pub(crate) const TICK_DURATION_MS: u128 = 1_000; +/// Default duration of a single `Accumulator` tick in milliseconds. +/// This constant is kept for backwards compatibility in tests. +#[cfg(test)] +pub(crate) const DEFAULT_TICK_DURATION_MS: u128 = 1_000; /// Reserved label names that collide with top-level JSON fields in `json::Line`. /// Labels with these names will be filtered out to prevent duplicate field errors @@ -92,6 +94,8 @@ pub(crate) struct StateMachine { start: Instant, /// Start time in milliseconds for deriving metric timestamps from ticks start_ms: u128, + /// Duration of a single tick in milliseconds + tick_duration_ms: u64, /// How long metrics can age before being discarded expiration: Duration, /// Output format for writing metrics, optional only to accomodate @@ -113,11 +117,16 @@ pub(crate) struct StateMachine { impl StateMachine { /// Create a new state machine + /// + /// # Arguments + /// + /// * `tick_duration_ms` - Duration of a single tick in milliseconds (e.g., 1000 for 1Hz) #[allow(clippy::too_many_arguments)] pub(crate) fn new( expiration: Duration, format: F, flush_interval: u64, + tick_duration_ms: u64, registry: Arc>, accumulator: Accumulator, mut global_labels: FxHashMap, @@ -159,6 +168,7 @@ impl StateMachine { run_id, start, start_ms, + tick_duration_ms, expiration, format: Some(format), flush_interval, @@ -303,7 +313,7 @@ impl StateMachine { let mut line_count = 0; for (key, value, tick) in self.accumulator.flush() { // Calculate time from tick to ensure strictly increasing time values - let time_ms = self.start_ms + (u128::from(tick) * TICK_DURATION_MS); + let time_ms = self.start_ms + (u128::from(tick) * u128::from(self.tick_duration_ms)); self.write_metric_line(&key, &value, tick, time_ms)?; line_count += 1; } @@ -333,7 +343,7 @@ impl StateMachine { for metrics in accumulator.drain() { for (key, value, tick) in metrics { // Calculate time from tick to ensure strictly increasing time values - let time_ms = self.start_ms + (u128::from(tick) * TICK_DURATION_MS); + let time_ms = self.start_ms + (u128::from(tick) * u128::from(self.tick_duration_ms)); self.write_metric_line(&key, &value, tick, time_ms)?; } } @@ -372,7 +382,7 @@ impl StateMachine { // Calculate when this metric was actually recorded based on its tick. let tick_age = self.accumulator.current_tick.saturating_sub(tick); - let tick_age_ms = u128::from(tick_age) * TICK_DURATION_MS; + let tick_age_ms = u128::from(tick_age) * u128::from(self.tick_duration_ms); // Skip any line that has expired. if tick_age_ms > self.expiration.as_millis() { return Ok(()); @@ -785,6 +795,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -998,6 +1009,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1028,6 +1040,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1052,6 +1065,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1081,6 +1095,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1113,6 +1128,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, global_labels, @@ -1230,6 +1246,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1266,6 +1283,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry.clone(), accumulator, labels, @@ -1285,7 +1303,7 @@ mod tests { .get_or_create_gauge(&gauge_key, |g| metrics::Gauge::from_arc(g.clone())) .increment(1.0); - clock.advance(TICK_DURATION_MS); + clock.advance(DEFAULT_TICK_DURATION_MS); machine.next(Event::FlushTick).unwrap(); } @@ -1397,6 +1415,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1443,6 +1462,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1496,6 +1516,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1547,6 +1568,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1598,6 +1620,7 @@ mod tests { Duration::from_secs(3600), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, @@ -1641,6 +1664,7 @@ mod tests { Duration::from_secs(60), format, 1, + DEFAULT_TICK_DURATION_MS as u64, registry, accumulator, labels, From 1aa8829f49f0bb3b23338c184d3c659809696b9f Mon Sep 17 00:00:00 2001 From: Maxime Riaud Date: Fri, 23 Jan 2026 15:51:38 +0100 Subject: [PATCH 2/3] fix(lading): update call sites for new CaptureManager API Update the main lading binary to use SignalWatchers struct and the new constructor signature with tick_duration_ms parameter. Co-Authored-By: Claude Opus 4.5 --- lading/src/bin/lading.rs | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/lading/src/bin/lading.rs b/lading/src/bin/lading.rs index efea36e34..17a58f69d 100644 --- a/lading/src/bin/lading.rs +++ b/lading/src/bin/lading.rs @@ -17,7 +17,7 @@ use lading::{ target::{self, Behavior, Output}, target_metrics, }; -use lading_capture::manager::CaptureManager; +use lading_capture::manager::{CaptureManager, SignalWatchers, DEFAULT_TICK_DURATION_MS}; use metrics::gauge; use metrics_exporter_prometheus::PrometheusBuilder; use regex::Regex; @@ -446,13 +446,17 @@ async fn inner_main( format, } => match format { config::CaptureFormat::Jsonl { flush_seconds } => { + let signals = SignalWatchers { + shutdown: shutdown_watcher.register()?, + experiment_started: experiment_started_watcher.clone(), + target_running: target_running_watcher.clone(), + }; let mut capture_manager = CaptureManager::new_jsonl( path, flush_seconds, - shutdown_watcher.register()?, - experiment_started_watcher.clone(), - target_running_watcher.clone(), + DEFAULT_TICK_DURATION_MS, expiration, + signals, ) .await?; for (k, v) in global_labels { @@ -469,14 +473,18 @@ async fn inner_main( flush_seconds, compression_level, } => { + let signals = SignalWatchers { + shutdown: shutdown_watcher.register()?, + experiment_started: experiment_started_watcher.clone(), + target_running: target_running_watcher.clone(), + }; let mut capture_manager = CaptureManager::new_multi( path, - flush_seconds, compression_level, - shutdown_watcher.register()?, - experiment_started_watcher.clone(), - target_running_watcher.clone(), + flush_seconds, + DEFAULT_TICK_DURATION_MS, expiration, + signals, ) .await .map_err(io::Error::other)?; @@ -494,14 +502,18 @@ async fn inner_main( flush_seconds, compression_level, } => { + let signals = SignalWatchers { + shutdown: shutdown_watcher.register()?, + experiment_started: experiment_started_watcher.clone(), + target_running: target_running_watcher.clone(), + }; let mut capture_manager = CaptureManager::new_parquet( path, - flush_seconds, compression_level, - shutdown_watcher.register()?, - experiment_started_watcher.clone(), - target_running_watcher.clone(), + flush_seconds, + DEFAULT_TICK_DURATION_MS, expiration, + signals, ) .await .map_err(io::Error::other)?; From 2d4382401c030d3e8ace0a8feb8a55c9351f89c4 Mon Sep 17 00:00:00 2001 From: Maxime Riaud Date: Fri, 23 Jan 2026 17:35:15 +0100 Subject: [PATCH 3/3] fix(capture): update fuzz harness for new CaptureManager API Update the fuzz_capture_harness binary to use SignalWatchers struct and the new constructor signature with tick_duration_ms parameter. Co-Authored-By: Claude Opus 4.5 --- lading_capture/src/bin/fuzz_capture_harness.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lading_capture/src/bin/fuzz_capture_harness.rs b/lading_capture/src/bin/fuzz_capture_harness.rs index 8f3780818..8c166ed76 100644 --- a/lading_capture/src/bin/fuzz_capture_harness.rs +++ b/lading_capture/src/bin/fuzz_capture_harness.rs @@ -11,8 +11,11 @@ use anyhow::{Context, Result}; use arbitrary::Arbitrary; use lading_capture::{ - formats::jsonl, line::Line, manager::CaptureManager, manager::RealClock, - test::writer::InMemoryWriter, validate, + formats::jsonl, + line::Line, + manager::{CaptureManager, SignalWatchers, DEFAULT_TICK_DURATION_MS}, + test::writer::InMemoryWriter, + validate, }; use rand::{Rng, SeedableRng, rngs::SmallRng}; use std::io::{self, Read}; @@ -354,14 +357,17 @@ async fn run_capture_manager(config: &FuzzInput) -> Result { let (target_watcher, target_broadcast) = lading_signal::signal(); let format = jsonl::Format::new(writer.clone()); + let signals = SignalWatchers { + shutdown: shutdown_watcher, + experiment_started: experiment_watcher, + target_running: target_watcher, + }; let manager = CaptureManager::new_with_format( format, 1, - shutdown_watcher, - experiment_watcher, - target_watcher, + DEFAULT_TICK_DURATION_MS, Duration::from_secs(3600), - RealClock::default(), + signals, ); let start = Instant::now();