11//! Helpers for querying and asserting on Prometheus metrics and JSON API endpoints
22//! exposed by SV2 components during integration tests.
33
4+ use serde:: Deserialize ;
45use std:: net:: SocketAddr ;
6+ use std:: time:: Duration ;
7+
8+ // ── Typed response structs for JSON API endpoints ─────────────────────────────
9+
10+ /// Response from `/api/v1/global`
11+ #[ derive( Debug , Deserialize ) ]
12+ pub struct GlobalResponse {
13+ pub uptime_secs : u64 ,
14+ pub server : Option < ServerSummary > ,
15+ pub sv2_clients : Option < Sv2ClientsSummary > ,
16+ pub sv1_clients : Option < Sv1ClientsSummary > ,
17+ }
18+
19+ /// Server summary in global response
20+ #[ derive( Debug , Deserialize ) ]
21+ pub struct ServerSummary {
22+ pub extended_channels : u64 ,
23+ pub standard_channels : u64 ,
24+ }
25+
26+ /// SV2 clients summary in global response
27+ #[ derive( Debug , Deserialize ) ]
28+ pub struct Sv2ClientsSummary {
29+ pub total_clients : u64 ,
30+ }
31+
32+ /// SV1 clients summary in global response
33+ #[ derive( Debug , Deserialize ) ]
34+ pub struct Sv1ClientsSummary {
35+ pub total_clients : u64 ,
36+ }
37+
38+ /// Response from `/api/v1/server`
39+ #[ derive( Debug , Deserialize ) ]
40+ pub struct ServerResponse {
41+ pub extended_channels_count : usize ,
42+ }
43+
44+ /// Response from `/api/v1/server/channels`
45+ #[ derive( Debug , Deserialize ) ]
46+ pub struct ServerChannelsResponse {
47+ pub total_extended : u64 ,
48+ pub extended_channels : Vec < ServerExtendedChannel > ,
49+ }
50+
51+ /// Extended channel info from server perspective
52+ #[ derive( Debug , Deserialize ) ]
53+ pub struct ServerExtendedChannel {
54+ pub shares_acknowledged : Option < u64 > ,
55+ }
56+
57+ /// Generic paginated response
58+ #[ derive( Debug , Deserialize ) ]
59+ pub struct PaginatedResponse < T > {
60+ pub total : u64 ,
61+ pub items : Vec < T > ,
62+ }
63+
64+ /// Client metadata in paginated clients list
65+ #[ derive( Debug , Deserialize ) ]
66+ pub struct ClientMetadata {
67+ pub client_id : u64 ,
68+ }
69+
70+ /// Response from `/api/v1/clients/{id}/channels`
71+ #[ derive( Debug , Deserialize ) ]
72+ pub struct ClientChannelsResponse {
73+ pub client_id : u64 ,
74+ pub total_extended : u64 ,
75+ pub total_standard : u64 ,
76+ }
77+
78+ /// SV1 client info
79+ #[ derive( Debug , Deserialize ) ]
80+ pub struct Sv1Client {
81+ pub client_id : u64 ,
82+ pub authorized_worker_name : String ,
83+ }
84+
85+ /// Error response body
86+ #[ derive( Debug , Deserialize ) ]
87+ pub struct ErrorResponse {
88+ pub error : String ,
89+ }
90+
91+ /// Root endpoint response
92+ #[ derive( Debug , Deserialize ) ]
93+ pub struct RootResponse {
94+ pub service : String ,
95+ pub endpoints : serde_json:: Value ,
96+ }
97+
98+ /// Default timeout used when polling for eventually-consistent data.
99+ /// Needs to be generous enough for the monitoring snapshot cache (1s refresh) to populate
100+ /// under CI load, where components may take several seconds to complete handshakes.
101+ pub const POLL_TIMEOUT : Duration = Duration :: from_secs ( 15 ) ;
5102
6103/// Fetch the raw Prometheus text-format metrics from a component's `/metrics` endpoint.
7104/// Uses `spawn_blocking` to avoid blocking the tokio runtime with synchronous HTTP calls.
@@ -27,6 +124,98 @@ pub async fn fetch_api(monitoring_addr: SocketAddr, path: &str) -> String {
27124 . expect ( "spawn_blocking for fetch_api panicked" )
28125}
29126
127+ /// Fetch a JSON API endpoint and parse the response into a `serde_json::Value`.
128+ pub async fn fetch_api_json ( monitoring_addr : SocketAddr , path : & str ) -> serde_json:: Value {
129+ let body = fetch_api ( monitoring_addr, path) . await ;
130+ serde_json:: from_str ( & body) . unwrap_or_else ( |e| {
131+ panic ! (
132+ "Failed to parse JSON from {} response: {}\n Body: {}" ,
133+ path, e, body
134+ )
135+ } )
136+ }
137+
138+ /// Fetch a JSON API endpoint and parse the response into a typed struct.
139+ pub async fn fetch_api_typed < T : serde:: de:: DeserializeOwned > (
140+ monitoring_addr : SocketAddr ,
141+ path : & str ,
142+ ) -> T {
143+ let body = fetch_api ( monitoring_addr, path) . await ;
144+ serde_json:: from_str ( & body) . unwrap_or_else ( |e| {
145+ panic ! (
146+ "Failed to parse JSON from {} into {}: {}\n Body: {}" ,
147+ path,
148+ std:: any:: type_name:: <T >( ) ,
149+ e,
150+ body
151+ )
152+ } )
153+ }
154+
155+ /// Fetch a JSON API endpoint returning both the HTTP status code and parsed JSON body.
156+ /// Unlike `fetch_api_json`, this does **not** panic on non-2xx responses, so it can be
157+ /// used to test error endpoints (e.g. 404).
158+ pub async fn fetch_api_with_status (
159+ monitoring_addr : SocketAddr ,
160+ path : & str ,
161+ ) -> ( i32 , serde_json:: Value ) {
162+ let url = format ! ( "http://{}{}" , monitoring_addr, path) ;
163+ tokio:: task:: spawn_blocking ( move || {
164+ let ( status, bytes) = crate :: utils:: http:: make_get_request_with_status ( & url, 5 ) ;
165+ let body = String :: from_utf8 ( bytes) . expect ( "api response should be valid UTF-8" ) ;
166+ let json: serde_json:: Value = serde_json:: from_str ( & body) . unwrap_or_else ( |e| {
167+ panic ! (
168+ "Failed to parse JSON from {} (status {}): {}\n Body: {}" ,
169+ url, status, e, body
170+ )
171+ } ) ;
172+ ( status, json)
173+ } )
174+ . await
175+ . expect ( "spawn_blocking for fetch_api_with_status panicked" )
176+ }
177+
178+ /// Poll a JSON API endpoint until a numeric field at `json_pointer` (RFC 6901, e.g.
179+ /// `"/sv2_clients/total_clients"`) reaches `>= min`. Returns the full JSON value once
180+ /// satisfied. Panics if the condition is not met within `timeout`.
181+ ///
182+ /// This is the JSON equivalent of `poll_until_metric_gte` — use it for endpoints whose
183+ /// data only appears after the monitoring snapshot cache has refreshed.
184+ pub async fn poll_until_api_field_gte (
185+ monitoring_addr : SocketAddr ,
186+ path : & str ,
187+ json_pointer : & str ,
188+ min : f64 ,
189+ timeout : std:: time:: Duration ,
190+ ) -> serde_json:: Value {
191+ let deadline = tokio:: time:: Instant :: now ( ) + timeout;
192+ loop {
193+ // Use fetch_api_with_status so that transient non-2xx responses (e.g. 404
194+ // before the snapshot cache has populated) are retried instead of panicking.
195+ let ( status, json) = fetch_api_with_status ( monitoring_addr, path) . await ;
196+ if ( 200 ..300 ) . contains ( & status) {
197+ if let Some ( val) = json. pointer ( json_pointer) {
198+ let num = val. as_f64 ( ) . unwrap_or ( 0.0 ) ;
199+ if num >= min {
200+ return json;
201+ }
202+ }
203+ }
204+ if tokio:: time:: Instant :: now ( ) >= deadline {
205+ panic ! (
206+ "JSON field '{}' at {} never reached >= {} within {:?}. Last status: {}. Last response:\n {}" ,
207+ json_pointer,
208+ path,
209+ min,
210+ timeout,
211+ status,
212+ serde_json:: to_string_pretty( & json) . unwrap_or_default( )
213+ ) ;
214+ }
215+ tokio:: time:: sleep ( std:: time:: Duration :: from_millis ( 500 ) ) . await ;
216+ }
217+ }
218+
30219/// Parse a specific metric value from Prometheus text format.
31220/// Returns `None` if the metric line is not found.
32221///
@@ -53,6 +242,7 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio
53242}
54243
55244/// Assert that a metric is present and its value satisfies the given predicate.
245+ #[ track_caller]
56246pub ( crate ) fn assert_metric < F : Fn ( f64 ) -> bool > (
57247 metrics_text : & str ,
58248 metric_name : & str ,
@@ -80,6 +270,7 @@ pub(crate) fn assert_metric<F: Fn(f64) -> bool>(
80270}
81271
82272/// Assert that a metric is present with a value >= the given minimum.
273+ #[ track_caller]
83274pub fn assert_metric_gte ( metrics_text : & str , metric_name : & str , min : f64 ) {
84275 assert_metric (
85276 metrics_text,
@@ -90,6 +281,7 @@ pub fn assert_metric_gte(metrics_text: &str, metric_name: &str, min: f64) {
90281}
91282
92283/// Assert that a metric is present with the exact given value.
284+ #[ track_caller]
93285pub fn assert_metric_eq ( metrics_text : & str , metric_name : & str , expected : f64 ) {
94286 assert_metric (
95287 metrics_text,
@@ -100,6 +292,7 @@ pub fn assert_metric_eq(metrics_text: &str, metric_name: &str, expected: f64) {
100292}
101293
102294/// Assert that a metric name does NOT appear in the metrics output at all.
295+ #[ track_caller]
103296pub fn assert_metric_not_present ( metrics_text : & str , metric_name : & str ) {
104297 for line in metrics_text. lines ( ) {
105298 if line. starts_with ( '#' ) {
@@ -118,6 +311,7 @@ pub fn assert_metric_not_present(metrics_text: &str, metric_name: &str) {
118311}
119312
120313/// Assert that a metric name appears at least once in the metrics output (with any label/value).
314+ #[ track_caller]
121315pub fn assert_metric_present ( metrics_text : & str , metric_name : & str ) {
122316 for line in metrics_text. lines ( ) {
123317 if line. starts_with ( '#' ) {
@@ -199,6 +393,7 @@ pub async fn assert_api_health(monitoring_addr: SocketAddr) {
199393}
200394
201395/// Assert that the uptime metric is present and positive.
396+ #[ track_caller]
202397pub fn assert_uptime ( metrics_text : & str ) {
203398 assert_metric (
204399 metrics_text,
0 commit comments