Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async fn main() {
channel_buffers: None,
commit_size: Some(2_000),
commit_frequency: None,
otel_grpc_endpoint: None,
};

messenger_with_kafka(config).await.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use talos_common_utils::otel::propagated_context::PropagatedSpanContextData;
use talos_common_utils::otel::{
initialiser::{init_otel_logs_tracing, init_otel_metrics},
propagated_context::PropagatedSpanContextData,
};
use tracing_opentelemetry::OpenTelemetrySpanExt;

use opentelemetry::global;
Expand All @@ -30,10 +33,7 @@ use talos_rdkafka_utils::kafka_config::KafkaConfig;

use crate::{
model::{internal::CertificationAttemptFailure, ClientErrorKind},
otel::{
initialiser::{init_otel_logs_tracing, init_otel_metrics},
meters::CohortMeters,
},
otel::meters::CohortMeters,
};

use crate::{
Expand Down
113 changes: 1 addition & 112 deletions packages/cohort_sdk/src/otel/initialiser.rs
Original file line number Diff line number Diff line change
@@ -1,118 +1,7 @@
use opentelemetry::global;
use opentelemetry::trace::{TraceError, TracerProvider as _};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use strum::Display;
use thiserror::Error as ThisError;
use tracing::subscriber::{set_global_default, SetGlobalDefaultError};
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter};
use talos_common_utils::otel::initialiser::OtelInitError;

use crate::model::ClientError;

pub fn init_otel_logs_tracing(name: String, enable_tracing: bool, grpc_endpoint: Option<String>, default_level: &'static str) -> Result<(), OtelInitError> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));

let layer_fmt = fmt::Layer::new().json();

if !enable_tracing {
// setup only logging
let subscriber = tracing_subscriber::registry().with(layer_fmt).with(env_filter);

set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?;
return Ok(());
}

opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());

if let Some(grpc_endpoint) = grpc_endpoint.clone() {
let otel_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(grpc_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()
.map_err(OtelInitError::from_exporter_error)?;

let otlp_trace_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(otel_exporter)
.build();

let otlp_layer = tracing_opentelemetry::layer().with_tracer(otlp_trace_provider.tracer(name.clone()));
let subscriber = tracing_subscriber::registry().with(layer_fmt).with(env_filter).with(otlp_layer);

set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?;
} else {
let layer_fmt = BunyanFormattingLayer::new(name.clone(), std::io::stdout);
let subscriber = tracing_subscriber::registry().with(env_filter).with(JsonStorageLayer).with(layer_fmt);

set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?;
}

tracing::info!("OTEL logging and tracing initialised");

Ok(())
}

pub fn init_otel_metrics(grpc_endpoint: Option<String>) -> Result<(), OtelInitError> {
if let Some(grpc_endpoint) = grpc_endpoint {
let otel_exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_endpoint(grpc_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.build()
.map_err(|metric_error| OtelInitError {
kind: InitErrorType::MetricError,
reason: "Unable to initialise metrics exporter".into(),
cause: Some(format!("{:?}", metric_error)),
})?;

let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder()
.with_periodic_exporter(otel_exporter)
.build();

tracing::info!("OTEL metrics provider initialised");
global::set_meter_provider(provider);
}

tracing::info!("OTEL metrics initialised");

Ok(())
}

#[derive(Debug, ThisError)]
#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")]
pub struct OtelInitError {
pub kind: InitErrorType,
pub reason: String,
pub cause: Option<String>,
}

impl OtelInitError {
pub fn from_global_subscriber_error(cause: SetGlobalDefaultError) -> Self {
OtelInitError {
kind: InitErrorType::GlobalScubscriberError,
reason: "Unable to set subscriber into global OTEL registry".into(),
cause: Some(cause.to_string()),
}
}

pub fn from_exporter_error(cause: TraceError) -> Self {
OtelInitError {
kind: InitErrorType::SpanExporter,
reason: "Unable to initialise OTEL exporter".into(),
cause: Some(cause.to_string()),
}
}
}

#[derive(Debug, Display, PartialEq, Clone)]
pub enum InitErrorType {
GlobalScubscriberError,
MetricError,
SpanExporter,
}

impl From<OtelInitError> for ClientError {
fn from(value: OtelInitError) -> Self {
Self {
Expand Down
4 changes: 2 additions & 2 deletions packages/talos_agent/src/metrics/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Metrics {
total_count += 1;
count_in_bucket += 1;

//log::error!("debug: {}, {}, {}, {}", tx.id, tx.candidate_publish_4.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.decision_duration_6.as_micros());
//tracing::error!("debug: {}, {}, {}, {}", tx.id, tx.candidate_publish_4.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.decision_duration_6.as_micros());

if total_count % progress_frequency == 0 {
tracing::warn!("METRIC agent-spans(progress): {} of {}", total_count, sorted.len());
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Metrics {
} else {
// let d = sum.candidate_kafka_trip_5 + tx.candidate_kafka_trip_5;
// if d.as_micros() * 1000 > 18446744073709551615 {
// log::error!("{} sum.candidate_kafka_trip_5: sum: {} mcs, tx: {} mcs id: {}", bucket, d.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.id);
// tracing::error!("{} sum.candidate_kafka_trip_5: sum: {} mcs, tx: {} mcs id: {}", bucket, d.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.id);
// }
sum.outbox_1 += tx.outbox_1;
sum.enqueing_2 += tx.enqueing_2;
Expand Down
4 changes: 0 additions & 4 deletions packages/talos_certifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ strum = { version = "0.24", features = ["derive"] }
# Ahash hashmap
ahash = "0.8.3"

# Logging
log = { workspace = true }
env_logger = { workspace = true }

# Async
tokio = { workspace = true }
futures-util = "0.3.26"
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_certifier/src/certifier/certification.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use ahash::AHashMap;
use log::debug;
use tracing::debug;

use super::CertifierCandidate;

Expand Down
4 changes: 2 additions & 2 deletions packages/talos_certifier/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use log::debug;
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::Debug;
use std::str::FromStr;
use tracing::debug;

trait EnvVarFromStr {
fn from_env_var_str(name: &str, v: &str) -> Self;
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Config {
pg_database: var("PG_DATABASE"),
};

log::debug!("Config loaded from environment variables: {:?}", config);
debug!("Config loaded from environment variables: {:?}", config);

config
}
Expand Down
2 changes: 1 addition & 1 deletion packages/talos_certifier/src/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use log::error;
use std::collections::HashMap;
use std::path::PathBuf;
use std::{fs, thread};
use tokio::fs::{create_dir_all, OpenOptions};
use tokio::io::AsyncWriteExt;
use tracing::error;

//TODO it may be better to use std::env::temp_dir, although it cannot be a const then
const HEALTH_CHECK_DIR: &str = "/tmp/healthchecks";
Expand Down
Loading
Loading