From 532bde90f4d208b985408646ab6cd06555b89f11 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Mon, 23 Mar 2026 20:52:50 +1100 Subject: [PATCH 1/3] feat: introduce otel metrics in certifier --- packages/talos_certifier/src/lib.rs | 1 + packages/talos_certifier/src/otel.rs | 43 +++++++++++ .../src/services/certifier_service.rs | 65 +++------------- .../src/services/decision_outbox_service.rs | 74 ++++++++++++++++++- .../src/services/tests/certifier_service.rs | 2 - .../services/tests/decision_outbox_service.rs | 8 +- packages/talos_certifier_adapters/Cargo.toml | 4 + .../src/certifier_kafka_pg.rs | 5 +- 8 files changed, 143 insertions(+), 59 deletions(-) create mode 100644 packages/talos_certifier/src/otel.rs diff --git a/packages/talos_certifier/src/lib.rs b/packages/talos_certifier/src/lib.rs index 78579fe8..8ec9e17b 100644 --- a/packages/talos_certifier/src/lib.rs +++ b/packages/talos_certifier/src/lib.rs @@ -4,6 +4,7 @@ pub mod core; pub mod errors; pub mod healthcheck; pub mod model; +pub mod otel; pub mod ports; pub mod services; pub mod talos_certifier_service; diff --git a/packages/talos_certifier/src/otel.rs b/packages/talos_certifier/src/otel.rs new file mode 100644 index 00000000..27548e9c --- /dev/null +++ b/packages/talos_certifier/src/otel.rs @@ -0,0 +1,43 @@ +use opentelemetry::global; +use opentelemetry_otlp::WithExportConfig; +use strum::Display; +use thiserror::Error as ThisError; + +#[derive(Debug, ThisError)] +#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")] +pub struct OtelInitError { + pub kind: InitErrorType, + pub reason: String, + pub cause: Option, +} + +#[derive(Debug, Display, PartialEq, Clone)] +pub enum InitErrorType { + MetricError, +} + +pub fn init_otel_metrics(grpc_endpoint: Option) -> Result<(), OtelInitError> { + if let Some(grpc_endpoint) = grpc_endpoint { + let otel_exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(grpc_endpoint) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .build() + .map_err(|metric_error| OtelInitError { + kind: InitErrorType::MetricError, + reason: "Unable to initialise metrics exporter".into(), + cause: Some(format!("{:?}", metric_error)), + })?; + + let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_periodic_exporter(otel_exporter) + .build(); + + tracing::info!("OTEL metrics provider initialised with endpoint"); + global::set_meter_provider(provider); + } else { + tracing::info!("No OTEL endpoint provided, metrics will default to NoopMeterProvider"); + } + + Ok(()) +} diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index 6f58257f..f1c578fb 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, error, info, warn}; +use opentelemetry::global; use talos_suffix::core::{SuffixConfig, SuffixMetricsConfig}; use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait}; use time::OffsetDateTime; @@ -16,17 +17,11 @@ use crate::{ Certifier, ChannelMessage, }; -use opentelemetry::global; -use opentelemetry_otlp::WithExportConfig; -use strum::Display; -use thiserror::Error as ThisError; - /// Certifier service configuration #[derive(Debug, Clone, Default)] pub struct CertifierServiceConfig { /// Suffix config pub suffix_config: SuffixConfig, - pub otel_grpc_endpoint: Option, } pub struct CertifierService { @@ -39,45 +34,6 @@ pub struct CertifierService { pub config: CertifierServiceConfig, } -#[derive(Debug, ThisError)] -#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")] -pub struct OtelInitError { - pub kind: InitErrorType, - pub reason: String, - pub cause: Option, -} - -#[derive(Debug, Display, PartialEq, Clone)] -pub enum InitErrorType { - MetricError, -} - -pub fn init_otel_metrics(grpc_endpoint: Option) -> Result<(), OtelInitError> { - if let Some(grpc_endpoint) = grpc_endpoint { - let otel_exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(grpc_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .build() - .map_err(|metric_error| OtelInitError { - kind: InitErrorType::MetricError, - reason: "Unable to initialise metrics exporter".into(), - cause: Some(format!("{:?}", metric_error)), - })?; - - let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() - .with_periodic_exporter(otel_exporter) - .build(); - - tracing::info!("OTEL metrics provider initialised"); - global::set_meter_provider(provider); - } - - tracing::info!("OTEL metrics initialised"); - - Ok(()) -} - impl CertifierService { pub fn new( message_channel_rx: mpsc::Receiver>, @@ -89,14 +45,17 @@ impl CertifierService { let certifier = Certifier::new(); let config = config.unwrap_or_default(); - - let suffix = if config.otel_grpc_endpoint.is_some() { - let _ = init_otel_metrics(config.otel_grpc_endpoint.clone()); - let meter = global::meter("certifier"); - Suffix::with_config(config.suffix_config.clone(), Some((SuffixMetricsConfig { prefix: "certifier".into() }, meter))) - } else { - Suffix::with_config(config.suffix_config.clone(), None) - }; + let meter = global::meter("talos_certifier"); + + let suffix = Suffix::with_config( + config.suffix_config.clone(), + Some(( + SuffixMetricsConfig { + prefix: "talos_certifier".into(), + }, + meter.clone(), + )), + ); Self { suffix, diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 264e851e..1653a418 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -4,11 +4,14 @@ use ahash::HashMap; use async_trait::async_trait; use log::{debug, error}; +use opentelemetry::global; +use opentelemetry::metrics::{Histogram, Meter}; use time::OffsetDateTime; use tokio::sync::mpsc; use crate::core::ServiceResult; use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders}; +use crate::model::metrics::TxProcessingTimeline; use crate::model::DEFAULT_DECISION_MESSAGE_VERSION; use crate::{ core::{DecisionOutboxChannelMessage, System, SystemService}, @@ -18,12 +21,68 @@ use crate::{ SystemMessage, }; +/// Captures important otel metrics around the journey of candidate and decision messages +#[derive(Debug, Clone)] +pub struct CertifierProcessingTimelineMetrics { + /// Capture the time from initiator got the request to certify at the cohort till the `Candidate` arrived on certifier + pub talos_initiator_to_candidate_received: Histogram, + /// Capture the time from the `Candidate` arrived on certifier to `Decision` created + pub talos_candidate_received_to_decision_created: Histogram, + /// Capture the time from the `Candidate` arrived on certifier to `Decision` published. + /// + /// **IMPORTANT METRIC**: This gives the full time spend within the certifier. This helps in gauging the performance of certifier. + pub talos_candidate_received_to_decision_published: Histogram, + /// Capture the time from initiator got the request to certify at the cohort till the `Decision` was created on certifier + pub talos_initiator_to_decision_created: Histogram, + /// Capture the time from initiator got the request to certify at the cohort till the `Decision` was published on certifier. + /// + /// **IMPORTANT METRIC**: This metric can be considerd as a `talos_initiator_to_candidate_received + talos_candidate_received_to_decision_published`. + /// But together it helps to gauge the performance from the time a request landed within the talos ecosystem on the cohort end to when the certifier published + /// it's corresponding decision. + /// + pub talos_initiator_to_decision_published: Histogram, +} + +impl CertifierProcessingTimelineMetrics { + pub fn new(meter: Meter) -> Self { + Self { + talos_initiator_to_candidate_received: meter.f64_histogram("talos_initiator_to_certifier_cm_recvd").with_unit("ms").build(), + talos_candidate_received_to_decision_created: meter.f64_histogram("talos_certifier_cm_recvd_to_dm_created").with_unit("ms").build(), + talos_candidate_received_to_decision_published: meter.f64_histogram("talos_certifier_cm_recvd_to_dm_published").with_unit("ms").build(), + talos_initiator_to_decision_created: meter.f64_histogram("talos_initiator_to_certifier_dm_created").with_unit("ms").build(), + talos_initiator_to_decision_published: meter.f64_histogram("talos_initiator_to_certifier_dm_published").with_unit("ms").build(), + } + } + + fn convert_ns_to_ms(start_ns: i128, end_ns: i128) -> f64 { + (end_ns - start_ns) as f64 / 1_000_000_f64 + } + + pub fn record_processing_metrics(&self, process_times: &TxProcessingTimeline, publish_end: i128) { + self.talos_candidate_received_to_decision_created.record( + Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.candidate_received), + &[], + ); + self.talos_candidate_received_to_decision_created + .record(Self::convert_ns_to_ms(process_times.candidate_received, process_times.decision_created_at), &[]); + self.talos_candidate_received_to_decision_created + .record(Self::convert_ns_to_ms(process_times.candidate_received, publish_end), &[]); + self.talos_initiator_to_decision_created.record( + Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.decision_created_at), + &[], + ); + self.talos_initiator_to_decision_published + .record(Self::convert_ns_to_ms(process_times.candidate_processing_started, publish_end), &[]); + } +} + pub struct DecisionOutboxService { pub system: System, pub decision_outbox_channel_tx: mpsc::Sender, pub decision_outbox_channel_rx: mpsc::Receiver, pub decision_store: Arc + Sync + Send>>, pub decision_publisher: Arc>, + pub metrics: CertifierProcessingTimelineMetrics, } impl DecisionOutboxService { @@ -40,6 +99,7 @@ impl DecisionOutboxService { decision_publisher, decision_outbox_channel_tx, decision_outbox_channel_rx, + metrics: CertifierProcessingTimelineMetrics::new(global::meter("talos_certifier")), } } @@ -78,6 +138,7 @@ impl DecisionOutboxService { publisher: &Arc>, decision_message: &DecisionMessage, headers: HashMap, + metrics: &CertifierProcessingTimelineMetrics, ) -> ServiceResult { let xid = decision_message.xid.clone(); let decision_str = serde_json::to_string(&decision_message).map_err(|e| { @@ -90,14 +151,21 @@ impl DecisionOutboxService { })?; debug!("Publishing message {}", decision_message.version); - publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| { + + let publish_result = publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| { Box::new(SystemServiceError { kind: SystemServiceErrorKind::MessagePublishError, reason: publish_error.reason, data: publish_error.data, //Some(format!("{:?}", decision_message)), service: "Decision Outbox Service".to_string(), }) - }) + }); + + let publish_end_ns = OffsetDateTime::now_utc().unix_timestamp_nanos(); + // record metrics of candidate and it's corresponding decision message. + metrics.record_processing_metrics(&decision_message.metrics, publish_end_ns); + + publish_result } } @@ -120,6 +188,7 @@ impl SystemService for DecisionOutboxService { self.system.name.clone(), None, )); + let metrics = self.metrics.clone(); async move { match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await { @@ -128,6 +197,7 @@ impl SystemService for DecisionOutboxService { &publisher, &decision, decision_headers.add_cert_headers(DecisionCertHeaders::new(&decision)).build().into(), + &metrics, ) .await { diff --git a/packages/talos_certifier/src/services/tests/certifier_service.rs b/packages/talos_certifier/src/services/tests/certifier_service.rs index 1469719a..716438b5 100644 --- a/packages/talos_certifier/src/services/tests/certifier_service.rs +++ b/packages/talos_certifier/src/services/tests/certifier_service.rs @@ -333,7 +333,6 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() { prune_start_threshold: Some(3), min_size_after_prune: Some(2), }, - otel_grpc_endpoint: None, }), ); @@ -567,7 +566,6 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() { capacity: 5, ..Default::default() }, - otel_grpc_endpoint: None, }), ); diff --git a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs index 51351c0c..4b63b598 100644 --- a/packages/talos_certifier/src/services/tests/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/tests/decision_outbox_service.rs @@ -11,10 +11,12 @@ use crate::{ errors::{DecisionStoreError, DecisionStoreErrorKind}, DecisionStore, MessagePublisher, }, + services::decision_outbox_service::CertifierProcessingTimelineMetrics, SystemMessage, }; use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; +use opentelemetry::global; use tokio::{ sync::{broadcast, mpsc}, time::{sleep_until, Instant}, @@ -369,7 +371,11 @@ async fn test_capture_publish_error() { metrics: TxProcessingTimeline::default(), }; - if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new()).await { + let metrics = CertifierProcessingTimelineMetrics::new(global::meter("random-noop-meter")); + + if let Err(publish_error) = + DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new(), &metrics).await + { assert!(publish_error.kind == SystemServiceErrorKind::MessagePublishError); } } diff --git a/packages/talos_certifier_adapters/Cargo.toml b/packages/talos_certifier_adapters/Cargo.toml index 3e7fc94e..8e7a4875 100644 --- a/packages/talos_certifier_adapters/Cargo.toml +++ b/packages/talos_certifier_adapters/Cargo.toml @@ -31,6 +31,10 @@ rdkafka = { version = "0.34.0", features = ["sasl"] } # Ahash hashmap ahash = "0.8.3" +# opentelemetry +# opentelemetry = { version = "0.28.0", default-features = false, features = [ +# "trace", +# ] } # uuid uuid = { version = "1.4.1", features = ["v4"] } # postgres diff --git a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs index 4849be90..f30ec0b7 100644 --- a/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs +++ b/packages/talos_certifier_adapters/src/certifier_kafka_pg.rs @@ -5,6 +5,7 @@ use crate::{self as Adapters, KafkaConsumer}; use std::sync::{atomic::AtomicI64, Arc}; use talos_certifier::core::SystemService; use talos_certifier::model::{CandidateMessage, DecisionMessage}; +use talos_certifier::otel::init_otel_metrics; use talos_certifier::ports::DecisionStore; use talos_certifier::services::{CertifierServiceConfig, MessageReceiverServiceConfig}; @@ -67,6 +68,9 @@ pub async fn certifier_with_kafka_pg(channel_buffer: TalosCertifierChannelBuffer name: config.app_name.unwrap_or("talos_certifier".to_string()), }; + // Otel initialisation + let _ = init_otel_metrics(config.otel_grpc_endpoint); + let (tx, rx) = mpsc::channel(channel_buffer.message_receiver); /* START - Kafka consumer service */ @@ -109,7 +113,6 @@ pub async fn certifier_with_kafka_pg(channel_buffer: TalosCertifierChannelBuffer system.clone(), Some(CertifierServiceConfig { suffix_config: config.suffix_config.unwrap_or_default(), - otel_grpc_endpoint: config.otel_grpc_endpoint, }), )), }; From a23fd9a4a65723cf40ec6a7cd8c549c95cdd17d3 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Tue, 24 Mar 2026 11:59:16 +1100 Subject: [PATCH 2/3] chore: fix mapping correct metrics --- .../talos_certifier/src/services/decision_outbox_service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 1653a418..39d5c23d 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -59,13 +59,13 @@ impl CertifierProcessingTimelineMetrics { } pub fn record_processing_metrics(&self, process_times: &TxProcessingTimeline, publish_end: i128) { - self.talos_candidate_received_to_decision_created.record( + self.talos_initiator_to_candidate_received.record( Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.candidate_received), &[], ); self.talos_candidate_received_to_decision_created .record(Self::convert_ns_to_ms(process_times.candidate_received, process_times.decision_created_at), &[]); - self.talos_candidate_received_to_decision_created + self.talos_candidate_received_to_decision_published .record(Self::convert_ns_to_ms(process_times.candidate_received, publish_end), &[]); self.talos_initiator_to_decision_created.record( Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.decision_created_at), From 337db1d6e867418ac5fc680aac415f564cbde695 Mon Sep 17 00:00:00 2001 From: Gethyl Kurian Date: Wed, 1 Apr 2026 12:49:46 +1100 Subject: [PATCH 3/3] chore: update the metric name and fix metric boundary --- .../src/services/decision_outbox_service.rs | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 39d5c23d..bfa8c2fb 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -46,11 +46,21 @@ pub struct CertifierProcessingTimelineMetrics { impl CertifierProcessingTimelineMetrics { pub fn new(meter: Meter) -> Self { Self { - talos_initiator_to_candidate_received: meter.f64_histogram("talos_initiator_to_certifier_cm_recvd").with_unit("ms").build(), - talos_candidate_received_to_decision_created: meter.f64_histogram("talos_certifier_cm_recvd_to_dm_created").with_unit("ms").build(), - talos_candidate_received_to_decision_published: meter.f64_histogram("talos_certifier_cm_recvd_to_dm_published").with_unit("ms").build(), - talos_initiator_to_decision_created: meter.f64_histogram("talos_initiator_to_certifier_dm_created").with_unit("ms").build(), - talos_initiator_to_decision_published: meter.f64_histogram("talos_initiator_to_certifier_dm_published").with_unit("ms").build(), + // Cross service metrics + // Metric naming convention ____ms + talos_initiator_to_candidate_received: meter.f64_histogram("talos_initiator_certifier_candidate_received").with_unit("ms").build(), + talos_initiator_to_decision_created: meter.f64_histogram("talos_initiator_certifier_decision_created").with_unit("ms").build(), + talos_initiator_to_decision_published: meter.f64_histogram("talos_initiator_certifier_decision_published").with_unit("ms").build(), + // Internal metrics + // Metric naming convention _certifier___ms + talos_candidate_received_to_decision_created: meter + .f64_histogram("talos_certifier_candidate_received_decision_created") + .with_unit("ms") + .build(), + talos_candidate_received_to_decision_published: meter + .f64_histogram("talos_certifier_candidate_received_decision_published") + .with_unit("ms") + .build(), } } @@ -60,7 +70,7 @@ impl CertifierProcessingTimelineMetrics { pub fn record_processing_metrics(&self, process_times: &TxProcessingTimeline, publish_end: i128) { self.talos_initiator_to_candidate_received.record( - Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.candidate_received), + Self::convert_ns_to_ms(process_times.certification_started, process_times.candidate_received), &[], ); self.talos_candidate_received_to_decision_created @@ -68,11 +78,11 @@ impl CertifierProcessingTimelineMetrics { self.talos_candidate_received_to_decision_published .record(Self::convert_ns_to_ms(process_times.candidate_received, publish_end), &[]); self.talos_initiator_to_decision_created.record( - Self::convert_ns_to_ms(process_times.candidate_processing_started, process_times.decision_created_at), + Self::convert_ns_to_ms(process_times.certification_started, process_times.decision_created_at), &[], ); self.talos_initiator_to_decision_published - .record(Self::convert_ns_to_ms(process_times.candidate_processing_started, publish_end), &[]); + .record(Self::convert_ns_to_ms(process_times.certification_started, publish_end), &[]); } }