Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/talos_certifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod core;
pub mod errors;
pub mod healthcheck;
pub mod model;
pub mod otel;
pub mod ports;
pub mod services;
pub mod talos_certifier_service;
Expand Down
43 changes: 43 additions & 0 deletions packages/talos_certifier/src/otel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use strum::Display;
use thiserror::Error as ThisError;

#[derive(Debug, ThisError)]
#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")]
pub struct OtelInitError {
pub kind: InitErrorType,
pub reason: String,
pub cause: Option<String>,
}

#[derive(Debug, Display, PartialEq, Clone)]
pub enum InitErrorType {
MetricError,
}

pub fn init_otel_metrics(grpc_endpoint: Option<String>) -> Result<(), OtelInitError> {
if let Some(grpc_endpoint) = grpc_endpoint {
let otel_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(grpc_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()
.map_err(|metric_error| OtelInitError {
kind: InitErrorType::MetricError,
reason: "Unable to initialise metrics exporter".into(),
cause: Some(format!("{:?}", metric_error)),
})?;

let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otel_exporter)
.build();

tracing::info!("OTEL metrics provider initialised with endpoint");
global::set_meter_provider(provider);
} else {
tracing::info!("No OTEL endpoint provided, metrics will default to NoopMeterProvider");
}

Ok(())
}
65 changes: 12 additions & 53 deletions packages/talos_certifier/src/services/certifier_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use log::{debug, error, info, warn};
use opentelemetry::global;
use talos_suffix::core::{SuffixConfig, SuffixMetricsConfig};
use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait};
use time::OffsetDateTime;
Expand All @@ -16,17 +17,11 @@ use crate::{
Certifier, ChannelMessage,
};

use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use strum::Display;
use thiserror::Error as ThisError;

/// Certifier service configuration
#[derive(Debug, Clone, Default)]
pub struct CertifierServiceConfig {
/// Suffix config
pub suffix_config: SuffixConfig,
pub otel_grpc_endpoint: Option<String>,
}

pub struct CertifierService {
Expand All @@ -39,45 +34,6 @@ pub struct CertifierService {
pub config: CertifierServiceConfig,
}

#[derive(Debug, ThisError)]
#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")]
pub struct OtelInitError {
pub kind: InitErrorType,
pub reason: String,
pub cause: Option<String>,
}

#[derive(Debug, Display, PartialEq, Clone)]
pub enum InitErrorType {
MetricError,
}

pub fn init_otel_metrics(grpc_endpoint: Option<String>) -> Result<(), OtelInitError> {
if let Some(grpc_endpoint) = grpc_endpoint {
let otel_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(grpc_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()
.map_err(|metric_error| OtelInitError {
kind: InitErrorType::MetricError,
reason: "Unable to initialise metrics exporter".into(),
cause: Some(format!("{:?}", metric_error)),
})?;

let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otel_exporter)
.build();

tracing::info!("OTEL metrics provider initialised");
global::set_meter_provider(provider);
}

tracing::info!("OTEL metrics initialised");

Ok(())
}

impl CertifierService {
pub fn new(
message_channel_rx: mpsc::Receiver<ChannelMessage<CandidateMessage>>,
Expand All @@ -89,14 +45,17 @@ impl CertifierService {
let certifier = Certifier::new();

let config = config.unwrap_or_default();

let suffix = if config.otel_grpc_endpoint.is_some() {
let _ = init_otel_metrics(config.otel_grpc_endpoint.clone());
let meter = global::meter("certifier");
Suffix::with_config(config.suffix_config.clone(), Some((SuffixMetricsConfig { prefix: "certifier".into() }, meter)))
} else {
Suffix::with_config(config.suffix_config.clone(), None)
};
let meter = global::meter("talos_certifier");

let suffix = Suffix::with_config(
config.suffix_config.clone(),
Some((
SuffixMetricsConfig {
prefix: "talos_certifier".into(),
},
meter.clone(),
)),
);

Self {
suffix,
Expand Down
84 changes: 82 additions & 2 deletions packages/talos_certifier/src/services/decision_outbox_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use ahash::HashMap;
use async_trait::async_trait;
use log::{debug, error};

use opentelemetry::global;
use opentelemetry::metrics::{Histogram, Meter};
use time::OffsetDateTime;
use tokio::sync::mpsc;

use crate::core::ServiceResult;
use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders};
use crate::model::metrics::TxProcessingTimeline;
use crate::model::DEFAULT_DECISION_MESSAGE_VERSION;
use crate::{
core::{DecisionOutboxChannelMessage, System, SystemService},
Expand All @@ -18,12 +21,78 @@ use crate::{
SystemMessage,
};

/// Captures important otel metrics around the journey of candidate and decision messages
#[derive(Debug, Clone)]
pub struct CertifierProcessingTimelineMetrics {
/// Capture the time from initiator got the request to certify at the cohort till the `Candidate` arrived on certifier
pub talos_initiator_to_candidate_received: Histogram<f64>,
/// Capture the time from the `Candidate` arrived on certifier to `Decision` created
pub talos_candidate_received_to_decision_created: Histogram<f64>,
/// Capture the time from the `Candidate` arrived on certifier to `Decision` published.
///
/// **IMPORTANT METRIC**: This gives the full time spend within the certifier. This helps in gauging the performance of certifier.
pub talos_candidate_received_to_decision_published: Histogram<f64>,
/// Capture the time from initiator got the request to certify at the cohort till the `Decision` was created on certifier
pub talos_initiator_to_decision_created: Histogram<f64>,
/// Capture the time from initiator got the request to certify at the cohort till the `Decision` was published on certifier.
///
/// **IMPORTANT METRIC**: This metric can be considerd as a `talos_initiator_to_candidate_received + talos_candidate_received_to_decision_published`.
/// But together it helps to gauge the performance from the time a request landed within the talos ecosystem on the cohort end to when the certifier published
/// it's corresponding decision.
///
pub talos_initiator_to_decision_published: Histogram<f64>,
}

impl CertifierProcessingTimelineMetrics {
pub fn new(meter: Meter) -> Self {
Self {
// Cross service metrics
// Metric naming convention <Namespace>_<Origin>_<Destination=certifier>_<Destination Stage>_ms
talos_initiator_to_candidate_received: meter.f64_histogram("talos_initiator_certifier_candidate_received").with_unit("ms").build(),
talos_initiator_to_decision_created: meter.f64_histogram("talos_initiator_certifier_decision_created").with_unit("ms").build(),
talos_initiator_to_decision_published: meter.f64_histogram("talos_initiator_certifier_decision_published").with_unit("ms").build(),
// Internal metrics
// Metric naming convention <Namespace>_certifier_<from stage>_<to Stage>_ms
talos_candidate_received_to_decision_created: meter
.f64_histogram("talos_certifier_candidate_received_decision_created")
.with_unit("ms")
.build(),
talos_candidate_received_to_decision_published: meter
.f64_histogram("talos_certifier_candidate_received_decision_published")
.with_unit("ms")
.build(),
}
}

fn convert_ns_to_ms(start_ns: i128, end_ns: i128) -> f64 {
(end_ns - start_ns) as f64 / 1_000_000_f64
}

pub fn record_processing_metrics(&self, process_times: &TxProcessingTimeline, publish_end: i128) {
self.talos_initiator_to_candidate_received.record(
Self::convert_ns_to_ms(process_times.certification_started, process_times.candidate_received),
&[],
);
self.talos_candidate_received_to_decision_created
.record(Self::convert_ns_to_ms(process_times.candidate_received, process_times.decision_created_at), &[]);
self.talos_candidate_received_to_decision_published
.record(Self::convert_ns_to_ms(process_times.candidate_received, publish_end), &[]);
self.talos_initiator_to_decision_created.record(
Self::convert_ns_to_ms(process_times.certification_started, process_times.decision_created_at),
&[],
);
self.talos_initiator_to_decision_published
.record(Self::convert_ns_to_ms(process_times.certification_started, publish_end), &[]);
}
}

pub struct DecisionOutboxService {
pub system: System,
pub decision_outbox_channel_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
pub decision_outbox_channel_rx: mpsc::Receiver<DecisionOutboxChannelMessage>,
pub decision_store: Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Sync + Send>>,
pub decision_publisher: Arc<Box<dyn MessagePublisher + Sync + Send>>,
pub metrics: CertifierProcessingTimelineMetrics,
}

impl DecisionOutboxService {
Expand All @@ -40,6 +109,7 @@ impl DecisionOutboxService {
decision_publisher,
decision_outbox_channel_tx,
decision_outbox_channel_rx,
metrics: CertifierProcessingTimelineMetrics::new(global::meter("talos_certifier")),
}
}

Expand Down Expand Up @@ -78,6 +148,7 @@ impl DecisionOutboxService {
publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
decision_message: &DecisionMessage,
headers: HashMap<String, String>,
metrics: &CertifierProcessingTimelineMetrics,
) -> ServiceResult {
let xid = decision_message.xid.clone();
let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
Expand All @@ -90,14 +161,21 @@ impl DecisionOutboxService {
})?;

debug!("Publishing message {}", decision_message.version);
publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| {

let publish_result = publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| {
Box::new(SystemServiceError {
kind: SystemServiceErrorKind::MessagePublishError,
reason: publish_error.reason,
data: publish_error.data, //Some(format!("{:?}", decision_message)),
service: "Decision Outbox Service".to_string(),
})
})
});

let publish_end_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
// record metrics of candidate and it's corresponding decision message.
metrics.record_processing_metrics(&decision_message.metrics, publish_end_ns);

publish_result
}
}

Expand All @@ -120,6 +198,7 @@ impl SystemService for DecisionOutboxService {
self.system.name.clone(),
None,
));
let metrics = self.metrics.clone();

async move {
match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
Expand All @@ -128,6 +207,7 @@ impl SystemService for DecisionOutboxService {
&publisher,
&decision,
decision_headers.add_cert_headers(DecisionCertHeaders::new(&decision)).build().into(),
&metrics,
)
.await
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,6 @@ async fn test_certification_check_suffix_prune_is_ready_threshold_30pc() {
prune_start_threshold: Some(3),
min_size_after_prune: Some(2),
},
otel_grpc_endpoint: None,
}),
);

Expand Down Expand Up @@ -567,7 +566,6 @@ async fn test_certification_check_suffix_prune_is_not_at_threshold() {
capacity: 5,
..Default::default()
},
otel_grpc_endpoint: None,
}),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ use crate::{
errors::{DecisionStoreError, DecisionStoreErrorKind},
DecisionStore, MessagePublisher,
},
services::decision_outbox_service::CertifierProcessingTimelineMetrics,
SystemMessage,
};
use ahash::{HashMap, HashMapExt};
use async_trait::async_trait;
use opentelemetry::global;
use tokio::{
sync::{broadcast, mpsc},
time::{sleep_until, Instant},
Expand Down Expand Up @@ -369,7 +371,11 @@ async fn test_capture_publish_error() {
metrics: TxProcessingTimeline::default(),
};

if let Err(publish_error) = DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new()).await {
let metrics = CertifierProcessingTimelineMetrics::new(global::meter("random-noop-meter"));

if let Err(publish_error) =
DecisionOutboxService::publish_decision(&Arc::new(Box::new(mock_decision_publisher)), &decision_message, HashMap::new(), &metrics).await
{
assert!(publish_error.kind == SystemServiceErrorKind::MessagePublishError);
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/talos_certifier_adapters/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ rdkafka = { version = "0.34.0", features = ["sasl"] }
# Ahash hashmap
ahash = "0.8.3"

# opentelemetry
# opentelemetry = { version = "0.28.0", default-features = false, features = [
# "trace",
# ] }
# uuid
uuid = { version = "1.4.1", features = ["v4"] }
# postgres
Expand Down
5 changes: 4 additions & 1 deletion packages/talos_certifier_adapters/src/certifier_kafka_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{self as Adapters, KafkaConsumer};
use std::sync::{atomic::AtomicI64, Arc};
use talos_certifier::core::SystemService;
use talos_certifier::model::{CandidateMessage, DecisionMessage};
use talos_certifier::otel::init_otel_metrics;
use talos_certifier::ports::DecisionStore;

use talos_certifier::services::{CertifierServiceConfig, MessageReceiverServiceConfig};
Expand Down Expand Up @@ -67,6 +68,9 @@ pub async fn certifier_with_kafka_pg(channel_buffer: TalosCertifierChannelBuffer
name: config.app_name.unwrap_or("talos_certifier".to_string()),
};

// Otel initialisation
let _ = init_otel_metrics(config.otel_grpc_endpoint);

let (tx, rx) = mpsc::channel(channel_buffer.message_receiver);

/* START - Kafka consumer service */
Expand Down Expand Up @@ -109,7 +113,6 @@ pub async fn certifier_with_kafka_pg(channel_buffer: TalosCertifierChannelBuffer
system.clone(),
Some(CertifierServiceConfig {
suffix_config: config.suffix_config.unwrap_or_default(),
otel_grpc_endpoint: config.otel_grpc_endpoint,
}),
)),
};
Expand Down
Loading