Skip to content

Commit 753c53d

Browse files
authored
feat: revamp metrics (#31)
1 parent d7004da commit 753c53d

File tree

20 files changed

+1621
-1509
lines changed

20 files changed

+1621
-1509
lines changed

apps/skit/src/server.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1890,9 +1890,7 @@ impl<S> InstrumentedOneshotStream<S> {
18901890
impl<S> Drop for InstrumentedOneshotStream<S> {
18911891
fn drop(&mut self) {
18921892
if !self.recorded {
1893-
// If the client disconnects early, the response body stream is dropped without EOF.
1894-
// Record as error so we still get visibility into partial/aborted oneshot executions.
1895-
self.record("error");
1893+
self.record("incomplete");
18961894
}
18971895
}
18981896
}
@@ -2060,6 +2058,9 @@ async fn process_oneshot_pipeline_handler(
20602058
.with_description(
20612059
"Oneshot pipeline runtime from request start until response stream ends",
20622060
)
2061+
.with_boundaries(
2062+
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_PIPELINE_DURATION.to_vec(),
2063+
)
20632064
.build()
20642065
})
20652066
.clone();
@@ -2262,7 +2263,12 @@ async fn metrics_middleware(req: axum::http::Request<Body>, next: Next) -> Respo
22622263
let meter = global::meter("skit_server");
22632264
(
22642265
meter.u64_counter("http.server.requests").build(),
2265-
meter.f64_histogram("http.server.duration").build(),
2266+
meter
2267+
.f64_histogram("http.server.duration")
2268+
.with_boundaries(
2269+
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_HTTP_DURATION.to_vec(),
2270+
)
2271+
.build(),
22662272
)
22672273
})
22682274
.clone();

apps/skit/src/session.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,9 @@ impl Default for SessionManager {
354354
.f64_histogram("session.duration")
355355
.with_description("Session lifetime duration in seconds")
356356
.with_unit("s")
357+
.with_boundaries(
358+
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_SESSION_DURATION.to_vec(),
359+
)
357360
.build(),
358361
}
359362
}

crates/core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub mod control;
5353
pub mod error;
5454
pub mod frame_pool;
5555
pub mod helpers;
56+
pub mod metrics;
5657
pub mod moq_gateway;
5758
pub mod node;
5859
pub mod node_config;

crates/core/src/metrics.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2+
//
3+
// SPDX-License-Identifier: MPL-2.0
4+
5+
//! Shared metrics configuration and histogram boundaries.
6+
//!
7+
//! Defines standard histogram bucket boundaries for OpenTelemetry metrics
8+
//! to ensure accurate percentile calculations across the codebase.
9+
10+
/// Sub-millisecond boundaries for per-packet codec operations (10μs to 1s)
11+
/// Used by: opus encode/decode
12+
pub const HISTOGRAM_BOUNDARIES_CODEC_PACKET: &[f64] =
13+
&[0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0];
14+
15+
/// Millisecond-to-minute boundaries for per-file operations (1ms to 60s)
16+
/// Used by: mp3/flac/wav decode/demux
17+
pub const HISTOGRAM_BOUNDARIES_FILE_OPERATION: &[f64] =
18+
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0];
19+
20+
/// Node execution boundaries (10μs to 60s)
21+
/// Used by: node.execution.duration
22+
pub const HISTOGRAM_BOUNDARIES_NODE_EXECUTION: &[f64] =
23+
&[0.00001, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0, 60.0];
24+
25+
/// Backpressure wait time boundaries (1μs to 10s)
26+
/// Used by: pin_distributor.send_wait_seconds
27+
pub const HISTOGRAM_BOUNDARIES_BACKPRESSURE: &[f64] =
28+
&[0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0];
29+
30+
/// Pacer lateness boundaries (1ms to 10s)
31+
/// Used by: pacer.lateness_seconds
32+
pub const HISTOGRAM_BOUNDARIES_PACER_LATENESS: &[f64] =
33+
&[0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0];
34+
35+
/// Clock offset boundaries in milliseconds (0.1ms to 10s)
36+
/// Used by: moq.push.clock_offset_ms
37+
pub const HISTOGRAM_BOUNDARIES_CLOCK_OFFSET_MS: &[f64] =
38+
&[0.1, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0, 10000.0];
39+
40+
/// Frame gap boundaries in milliseconds (1ms to 1s, with common frame rates)
41+
/// Used by: moq.peer.inter_frame_ms
42+
pub const HISTOGRAM_BOUNDARIES_FRAME_GAP_MS: &[f64] =
43+
&[1.0, 5.0, 10.0, 16.0, 20.0, 33.0, 50.0, 100.0, 200.0, 500.0, 1000.0];
44+
45+
/// Pipeline duration boundaries (10ms to 5 minutes)
46+
/// Used by: oneshot_pipeline.duration
47+
pub const HISTOGRAM_BOUNDARIES_PIPELINE_DURATION: &[f64] =
48+
&[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0];
49+
50+
/// HTTP request duration boundaries (1ms to 60s)
51+
/// Used by: http.server.duration
52+
pub const HISTOGRAM_BOUNDARIES_HTTP_DURATION: &[f64] =
53+
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0];
54+
55+
/// Session lifetime boundaries (1s to 24 hours)
56+
/// Used by: session.duration
57+
pub const HISTOGRAM_BOUNDARIES_SESSION_DURATION: &[f64] =
58+
&[1.0, 10.0, 60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 21600.0, 86400.0];

crates/engine/src/dynamic_actor.rs

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ pub struct DynamicEngine {
5252
HashMap<String, mpsc::Sender<streamkit_core::pins::PinManagementMessage>>,
5353
/// Map of node pin metadata: NodeId -> Pin Metadata (for runtime type validation)
5454
pub(super) node_pin_metadata: HashMap<String, NodePinMetadata>,
55+
/// Map of node_id -> node_kind for labeling metrics
56+
pub(super) node_kinds: HashMap<String, String>,
5557
pub(super) batch_size: usize,
5658
/// Session ID for gateway registration (if applicable)
5759
pub(super) session_id: Option<String>,
@@ -75,11 +77,11 @@ pub struct DynamicEngine {
7577
pub(super) nodes_active_gauge: opentelemetry::metrics::Gauge<u64>,
7678
pub(super) node_state_transitions_counter: opentelemetry::metrics::Counter<u64>,
7779
pub(super) engine_operations_counter: opentelemetry::metrics::Counter<u64>,
78-
// Node-level packet metrics
79-
pub(super) node_packets_received_gauge: opentelemetry::metrics::Gauge<u64>,
80-
pub(super) node_packets_sent_gauge: opentelemetry::metrics::Gauge<u64>,
81-
pub(super) node_packets_discarded_gauge: opentelemetry::metrics::Gauge<u64>,
82-
pub(super) node_packets_errored_gauge: opentelemetry::metrics::Gauge<u64>,
80+
// Node-level packet metrics (counters, not gauges - for proper rate() calculation)
81+
pub(super) node_packets_received_counter: opentelemetry::metrics::Counter<u64>,
82+
pub(super) node_packets_sent_counter: opentelemetry::metrics::Counter<u64>,
83+
pub(super) node_packets_discarded_counter: opentelemetry::metrics::Counter<u64>,
84+
pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter<u64>,
8385
// Node state metric (1=running, 0=not running)
8486
pub(super) node_state_gauge: opentelemetry::metrics::Gauge<u64>,
8587
}
@@ -363,16 +365,50 @@ impl DynamicEngine {
363365
"Node stats updated"
364366
);
365367

366-
// Store the current stats
367-
self.node_stats.insert(update.node_id.clone(), update.stats.clone());
368+
let node_kind =
369+
self.node_kinds.get(&update.node_id).map_or("unknown", std::string::String::as_str);
370+
let labels = &[
371+
KeyValue::new("node_id", update.node_id.clone()),
372+
KeyValue::new("node_kind", node_kind.to_string()),
373+
];
374+
375+
let prev_stats = self.node_stats.get(&update.node_id);
376+
377+
let delta_received = prev_stats.map_or(update.stats.received, |prev| {
378+
if update.stats.received < prev.received {
379+
update.stats.received
380+
} else {
381+
update.stats.received - prev.received
382+
}
383+
});
384+
let delta_sent = prev_stats.map_or(update.stats.sent, |prev| {
385+
if update.stats.sent < prev.sent {
386+
update.stats.sent
387+
} else {
388+
update.stats.sent - prev.sent
389+
}
390+
});
391+
let delta_discarded = prev_stats.map_or(update.stats.discarded, |prev| {
392+
if update.stats.discarded < prev.discarded {
393+
update.stats.discarded
394+
} else {
395+
update.stats.discarded - prev.discarded
396+
}
397+
});
398+
let delta_errored = prev_stats.map_or(update.stats.errored, |prev| {
399+
if update.stats.errored < prev.errored {
400+
update.stats.errored
401+
} else {
402+
update.stats.errored - prev.errored
403+
}
404+
});
368405

369-
// Record metrics with node_id label
370-
let labels = &[KeyValue::new("node_id", update.node_id.clone())];
406+
self.node_packets_received_counter.add(delta_received, labels);
407+
self.node_packets_sent_counter.add(delta_sent, labels);
408+
self.node_packets_discarded_counter.add(delta_discarded, labels);
409+
self.node_packets_errored_counter.add(delta_errored, labels);
371410

372-
self.node_packets_received_gauge.record(update.stats.received, labels);
373-
self.node_packets_sent_gauge.record(update.stats.sent, labels);
374-
self.node_packets_discarded_gauge.record(update.stats.discarded, labels);
375-
self.node_packets_errored_gauge.record(update.stats.errored, labels);
411+
self.node_stats.insert(update.node_id.clone(), update.stats.clone());
376412

377413
// Broadcast to all subscribers
378414
self.stats_subscribers.retain(|subscriber| {
@@ -862,6 +898,7 @@ impl DynamicEngine {
862898
self.node_stats.remove(node_id);
863899
self.node_pin_metadata.remove(node_id);
864900
self.pin_management_txs.remove(node_id);
901+
self.node_kinds.remove(node_id);
865902
self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]);
866903
}
867904

@@ -881,6 +918,7 @@ impl DynamicEngine {
881918
tracing::info!(name = %node_id, kind = %kind, "Adding node to graph");
882919
match self.registry.create_node(&kind, params.as_ref()) {
883920
Ok(node) => {
921+
self.node_kinds.insert(node_id.clone(), kind.clone());
884922
// Delegate initialization to helper function
885923
// Pass by reference to avoid unnecessary clones
886924
if let Err(e) = self

crates/engine/src/dynamic_pin_distributor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ impl PinDistributorActor {
9292
let send_wait_histogram = meter
9393
.f64_histogram("pin_distributor.send_wait_seconds")
9494
.with_description("Time spent waiting for downstream capacity (backpressure)")
95+
.with_boundaries(streamkit_core::metrics::HISTOGRAM_BOUNDARIES_BACKPRESSURE.to_vec())
9596
.build();
9697
let queue_depth_gauge = meter
9798
.u64_gauge("pin_distributor.queue_depth")

crates/engine/src/graph_builder.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use streamkit_core::node::{InitContext, NodeContext, OutputRouting, OutputSender
1313
use streamkit_core::packet_meta::{can_connect, packet_type_registry};
1414
use streamkit_core::pins::PinUpdate;
1515
use streamkit_core::state::{NodeState, NodeStateUpdate, StopReason};
16+
use streamkit_core::stats::NodeStatsUpdate;
1617
use streamkit_core::types::{Packet, PacketType};
1718
use streamkit_core::PinCardinality;
1819
use tokio::sync::mpsc;
@@ -62,6 +63,7 @@ pub async fn wire_and_spawn_graph(
6263
batch_size: usize,
6364
media_channel_capacity: usize,
6465
state_tx: Option<mpsc::Sender<NodeStateUpdate>>,
66+
stats_tx: Option<mpsc::Sender<NodeStatsUpdate>>,
6567
cancellation_token: Option<tokio_util::sync::CancellationToken>,
6668
audio_pool: Option<Arc<AudioFramePool>>,
6769
) -> Result<HashMap<String, LiveNode>, StreamKitError> {
@@ -356,9 +358,9 @@ pub async fn wire_and_spawn_graph(
356358
output_sender: OutputSender::new(name.clone(), OutputRouting::Direct(direct_outputs)),
357359
batch_size,
358360
state_tx: node_state_tx.clone(),
359-
stats_tx: None, // Stateless pipelines don't track stats
360-
telemetry_tx: None, // Stateless pipelines don't emit telemetry
361-
session_id: None, // Stateless pipelines don't have sessions
361+
stats_tx: stats_tx.clone(), // Used by oneshot metrics recording
362+
telemetry_tx: None, // Stateless pipelines don't emit telemetry
363+
session_id: None, // Stateless pipelines don't have sessions
362364
cancellation_token: cancellation_token.clone(),
363365
pin_management_rx: None, // Stateless pipelines don't support dynamic pins
364366
audio_pool: audio_pool.clone(),
@@ -384,12 +386,13 @@ pub async fn wire_and_spawn_graph(
384386
let meter = global::meter("skit_engine");
385387
let histogram = meter
386388
.f64_histogram("node.execution.duration")
389+
.with_boundaries(streamkit_core::metrics::HISTOGRAM_BOUNDARIES_NODE_EXECUTION.to_vec())
387390
.build();
388391
let status = if result.is_ok() { "ok" } else { "error" };
389392

390393
let labels = [
391-
KeyValue::new("node.name", name.clone()),
392-
KeyValue::new("node.kind", kind.clone()),
394+
KeyValue::new("node_id", name.clone()),
395+
KeyValue::new("node_kind", kind.clone()),
393396
KeyValue::new("status", status),
394397
];
395398
histogram.record(duration.as_secs_f64(), &labels);

crates/engine/src/lib.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ impl Engine {
235235
pin_distributors: HashMap::new(),
236236
pin_management_txs: HashMap::new(),
237237
node_pin_metadata: HashMap::new(),
238+
node_kinds: HashMap::new(),
238239
batch_size: config.packet_batch_size,
239240
session_id: config.session_id,
240241
audio_pool: self.audio_pool.clone(),
@@ -257,20 +258,20 @@ impl Engine {
257258
.u64_counter("engine.operations")
258259
.with_description("Engine control operations")
259260
.build(),
260-
node_packets_received_gauge: meter
261-
.u64_gauge("node.packets.received")
261+
node_packets_received_counter: meter
262+
.u64_counter("node.packets.received")
262263
.with_description("Total packets received by node")
263264
.build(),
264-
node_packets_sent_gauge: meter
265-
.u64_gauge("node.packets.sent")
265+
node_packets_sent_counter: meter
266+
.u64_counter("node.packets.sent")
266267
.with_description("Total packets sent by node")
267268
.build(),
268-
node_packets_discarded_gauge: meter
269-
.u64_gauge("node.packets.discarded")
269+
node_packets_discarded_counter: meter
270+
.u64_counter("node.packets.discarded")
270271
.with_description("Total packets discarded by node")
271272
.build(),
272-
node_packets_errored_gauge: meter
273-
.u64_gauge("node.packets.errored")
273+
node_packets_errored_counter: meter
274+
.u64_counter("node.packets.errored")
274275
.with_description("Total packet processing errors by node")
275276
.build(),
276277
node_state_gauge: meter

0 commit comments

Comments
 (0)