diff --git a/Cargo.lock b/Cargo.lock index 4bd4ca9c..da4121d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,9 +3229,7 @@ dependencies = [ "async-trait", "cargo-husky", "criterion 0.4.0", - "env_logger", "futures-util", - "log", "opentelemetry 0.28.0", "opentelemetry-otlp", "opentelemetry-stdout 0.28.0", @@ -3260,9 +3258,7 @@ dependencies = [ "ahash 0.8.11", "async-trait", "deadpool-postgres", - "env_logger", "futures-util", - "log", "mockall", "rdkafka 0.34.0", "refinery", @@ -3278,6 +3274,7 @@ dependencies = [ "time", "tokio", "tokio-postgres", + "tracing", "uuid", ] @@ -3317,9 +3314,19 @@ name = "talos_common_utils" version = "0.2.60-dev" dependencies = [ "opentelemetry 0.28.0", + "opentelemetry-otlp", + "opentelemetry-stdout 0.28.0", + "opentelemetry_api", + "opentelemetry_sdk 0.28.0", "serial_test", + "strum 0.27.1", + "thiserror 2.0.11", "time", "tokio", + "tracing", + "tracing-bunyan-formatter", + "tracing-opentelemetry", + "tracing-subscriber", ] [[package]] @@ -3328,7 +3335,6 @@ version = "0.2.60-dev" dependencies = [ "ahash 0.8.11", "async-trait", - "env_logger", "futures-executor", "futures-util", "icu_normalizer", @@ -3336,8 +3342,12 @@ dependencies = [ "idna", "idna_adapter", "litemap", - "log", "mockall", + "opentelemetry 0.28.0", + "opentelemetry-otlp", + "opentelemetry-stdout 0.28.0", + "opentelemetry_api", + "opentelemetry_sdk 0.28.0", "rand 0.8.5", "rdkafka 0.34.0", "serde", @@ -3353,6 +3363,7 @@ dependencies = [ "time", "tokio", "tokio-test", + "tracing", "yoke", "zerofrom", ] @@ -3363,11 +3374,9 @@ version = "0.2.60-dev" dependencies = [ "ahash 0.8.11", "async-trait", - "env_logger", "futures-executor", "futures-util", "indexmap 2.7.1", - "log", "mockall", "rand 0.8.5", "rdkafka 0.34.0", @@ -3381,6 +3390,7 @@ dependencies = [ "time", "tokio", "tokio-test", + "tracing", "uuid", ] @@ -3388,8 +3398,6 @@ dependencies = [ name = "talos_metrics" version = "0.2.60-dev" dependencies = [ - "env_logger", - "log", "once_cell", "opentelemetry 0.20.0", "opentelemetry-stdout 0.1.0", @@ -3406,8 +3414,6 @@ version = "0.2.60-dev" dependencies = [ "async-trait", "criterion 0.5.1", - "env_logger", - "log", "mockall", "rdkafka 0.34.0", "serial_test", @@ -3415,6 +3421,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-test", + "tracing", ] [[package]] diff --git a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs index ad3de29c..42f26be9 100644 --- a/examples/messenger_using_kafka/examples/messenger_using_kafka.rs +++ b/examples/messenger_using_kafka/examples/messenger_using_kafka.rs @@ -40,6 +40,7 @@ async fn main() { channel_buffers: None, commit_size: Some(2_000), commit_frequency: None, + otel_grpc_endpoint: None, }; messenger_with_kafka(config).await.unwrap(); diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index bbd00183..3d32d63d 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -3,7 +3,10 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use talos_common_utils::otel::propagated_context::PropagatedSpanContextData; +use talos_common_utils::otel::{ + initialiser::{init_otel_logs_tracing, init_otel_metrics}, + propagated_context::PropagatedSpanContextData, +}; use tracing_opentelemetry::OpenTelemetrySpanExt; use opentelemetry::global; @@ -30,10 +33,7 @@ use talos_rdkafka_utils::kafka_config::KafkaConfig; use crate::{ model::{internal::CertificationAttemptFailure, ClientErrorKind}, - otel::{ - initialiser::{init_otel_logs_tracing, init_otel_metrics}, - meters::CohortMeters, - }, + otel::meters::CohortMeters, }; use crate::{ diff --git a/packages/cohort_sdk/src/otel/initialiser.rs b/packages/cohort_sdk/src/otel/initialiser.rs index 57821f44..d6bbf5e0 100644 --- a/packages/cohort_sdk/src/otel/initialiser.rs +++ b/packages/cohort_sdk/src/otel/initialiser.rs @@ -1,118 +1,7 @@ -use opentelemetry::global; -use opentelemetry::trace::{TraceError, TracerProvider as _}; -use opentelemetry_otlp::WithExportConfig; -use opentelemetry_sdk::propagation::TraceContextPropagator; -use strum::Display; -use thiserror::Error as ThisError; -use tracing::subscriber::{set_global_default, SetGlobalDefaultError}; -use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; -use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::{fmt, EnvFilter}; +use talos_common_utils::otel::initialiser::OtelInitError; use crate::model::ClientError; -pub fn init_otel_logs_tracing(name: String, enable_tracing: bool, grpc_endpoint: Option, default_level: &'static str) -> Result<(), OtelInitError> { - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level)); - - let layer_fmt = fmt::Layer::new().json(); - - if !enable_tracing { - // setup only logging - let subscriber = tracing_subscriber::registry().with(layer_fmt).with(env_filter); - - set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?; - return Ok(()); - } - - opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); - - if let Some(grpc_endpoint) = grpc_endpoint.clone() { - let otel_exporter = opentelemetry_otlp::SpanExporter::builder() - .with_tonic() - .with_endpoint(grpc_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .build() - .map_err(OtelInitError::from_exporter_error)?; - - let otlp_trace_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder() - .with_batch_exporter(otel_exporter) - .build(); - - let otlp_layer = tracing_opentelemetry::layer().with_tracer(otlp_trace_provider.tracer(name.clone())); - let subscriber = tracing_subscriber::registry().with(layer_fmt).with(env_filter).with(otlp_layer); - - set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?; - } else { - let layer_fmt = BunyanFormattingLayer::new(name.clone(), std::io::stdout); - let subscriber = tracing_subscriber::registry().with(env_filter).with(JsonStorageLayer).with(layer_fmt); - - set_global_default(subscriber).map_err(OtelInitError::from_global_subscriber_error)?; - } - - tracing::info!("OTEL logging and tracing initialised"); - - Ok(()) -} - -pub fn init_otel_metrics(grpc_endpoint: Option) -> Result<(), OtelInitError> { - if let Some(grpc_endpoint) = grpc_endpoint { - let otel_exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(grpc_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .build() - .map_err(|metric_error| OtelInitError { - kind: InitErrorType::MetricError, - reason: "Unable to initialise metrics exporter".into(), - cause: Some(format!("{:?}", metric_error)), - })?; - - let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() - .with_periodic_exporter(otel_exporter) - .build(); - - tracing::info!("OTEL metrics provider initialised"); - global::set_meter_provider(provider); - } - - tracing::info!("OTEL metrics initialised"); - - Ok(()) -} - -#[derive(Debug, ThisError)] -#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")] -pub struct OtelInitError { - pub kind: InitErrorType, - pub reason: String, - pub cause: Option, -} - -impl OtelInitError { - pub fn from_global_subscriber_error(cause: SetGlobalDefaultError) -> Self { - OtelInitError { - kind: InitErrorType::GlobalScubscriberError, - reason: "Unable to set subscriber into global OTEL registry".into(), - cause: Some(cause.to_string()), - } - } - - pub fn from_exporter_error(cause: TraceError) -> Self { - OtelInitError { - kind: InitErrorType::SpanExporter, - reason: "Unable to initialise OTEL exporter".into(), - cause: Some(cause.to_string()), - } - } -} - -#[derive(Debug, Display, PartialEq, Clone)] -pub enum InitErrorType { - GlobalScubscriberError, - MetricError, - SpanExporter, -} - impl From for ClientError { fn from(value: OtelInitError) -> Self { Self { diff --git a/packages/talos_agent/src/metrics/core.rs b/packages/talos_agent/src/metrics/core.rs index 63c71bee..95f70af9 100644 --- a/packages/talos_agent/src/metrics/core.rs +++ b/packages/talos_agent/src/metrics/core.rs @@ -157,7 +157,7 @@ impl Metrics { total_count += 1; count_in_bucket += 1; - //log::error!("debug: {}, {}, {}, {}", tx.id, tx.candidate_publish_4.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.decision_duration_6.as_micros()); + //tracing::error!("debug: {}, {}, {}, {}", tx.id, tx.candidate_publish_4.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.decision_duration_6.as_micros()); if total_count % progress_frequency == 0 { tracing::warn!("METRIC agent-spans(progress): {} of {}", total_count, sorted.len()); @@ -190,7 +190,7 @@ impl Metrics { } else { // let d = sum.candidate_kafka_trip_5 + tx.candidate_kafka_trip_5; // if d.as_micros() * 1000 > 18446744073709551615 { - // log::error!("{} sum.candidate_kafka_trip_5: sum: {} mcs, tx: {} mcs id: {}", bucket, d.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.id); + // tracing::error!("{} sum.candidate_kafka_trip_5: sum: {} mcs, tx: {} mcs id: {}", bucket, d.as_micros(), tx.candidate_kafka_trip_5.as_micros(), tx.id); // } sum.outbox_1 += tx.outbox_1; sum.enqueing_2 += tx.enqueing_2; diff --git a/packages/talos_certifier/Cargo.toml b/packages/talos_certifier/Cargo.toml index e7d1a2ae..d24cde26 100644 --- a/packages/talos_certifier/Cargo.toml +++ b/packages/talos_certifier/Cargo.toml @@ -16,10 +16,6 @@ strum = { version = "0.24", features = ["derive"] } # Ahash hashmap ahash = "0.8.3" -# Logging -log = { workspace = true } -env_logger = { workspace = true } - # Async tokio = { workspace = true } futures-util = "0.3.26" diff --git a/packages/talos_certifier/src/certifier/certification.rs b/packages/talos_certifier/src/certifier/certification.rs index a9c38836..d078f6f0 100644 --- a/packages/talos_certifier/src/certifier/certification.rs +++ b/packages/talos_certifier/src/certifier/certification.rs @@ -1,5 +1,5 @@ use ahash::AHashMap; -use log::debug; +use tracing::debug; use super::CertifierCandidate; diff --git a/packages/talos_certifier/src/config.rs b/packages/talos_certifier/src/config.rs index a3df9ba1..35544158 100644 --- a/packages/talos_certifier/src/config.rs +++ b/packages/talos_certifier/src/config.rs @@ -1,8 +1,8 @@ -use log::debug; use serde::{Deserialize, Serialize}; use std::env; use std::fmt::Debug; use std::str::FromStr; +use tracing::debug; trait EnvVarFromStr { fn from_env_var_str(name: &str, v: &str) -> Self; @@ -73,7 +73,7 @@ impl Config { pg_database: var("PG_DATABASE"), }; - log::debug!("Config loaded from environment variables: {:?}", config); + debug!("Config loaded from environment variables: {:?}", config); config } diff --git a/packages/talos_certifier/src/healthcheck.rs b/packages/talos_certifier/src/healthcheck.rs index ec992e35..b3e3a469 100644 --- a/packages/talos_certifier/src/healthcheck.rs +++ b/packages/talos_certifier/src/healthcheck.rs @@ -1,9 +1,9 @@ -use log::error; use std::collections::HashMap; use std::path::PathBuf; use std::{fs, thread}; use tokio::fs::{create_dir_all, OpenOptions}; use tokio::io::AsyncWriteExt; +use tracing::error; //TODO it may be better to use std::env::temp_dir, although it cannot be a const then const HEALTH_CHECK_DIR: &str = "/tmp/healthchecks"; diff --git a/packages/talos_certifier/src/services/certifier_service.rs b/packages/talos_certifier/src/services/certifier_service.rs index 6f58257f..da3e0ead 100644 --- a/packages/talos_certifier/src/services/certifier_service.rs +++ b/packages/talos_certifier/src/services/certifier_service.rs @@ -2,11 +2,12 @@ use std::sync::atomic::AtomicI64; use std::sync::Arc; use async_trait::async_trait; -use log::{debug, error, info, warn}; +use talos_common_utils::otel::initialiser::init_otel_metrics; use talos_suffix::core::{SuffixConfig, SuffixMetricsConfig}; use talos_suffix::{get_nonempty_suffix_items, Suffix, SuffixTrait}; use time::OffsetDateTime; use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; use crate::certifier::utils::generate_certifier_sets_from_suffix; use crate::{ @@ -17,9 +18,6 @@ use crate::{ }; use opentelemetry::global; -use opentelemetry_otlp::WithExportConfig; -use strum::Display; -use thiserror::Error as ThisError; /// Certifier service configuration #[derive(Debug, Clone, Default)] @@ -39,45 +37,6 @@ pub struct CertifierService { pub config: CertifierServiceConfig, } -#[derive(Debug, ThisError)] -#[error("Error initialising OTEL telemetry: '{kind}'.\nReason: {reason}\nCause: {cause:?}")] -pub struct OtelInitError { - pub kind: InitErrorType, - pub reason: String, - pub cause: Option, -} - -#[derive(Debug, Display, PartialEq, Clone)] -pub enum InitErrorType { - MetricError, -} - -pub fn init_otel_metrics(grpc_endpoint: Option) -> Result<(), OtelInitError> { - if let Some(grpc_endpoint) = grpc_endpoint { - let otel_exporter = opentelemetry_otlp::MetricExporter::builder() - .with_tonic() - .with_endpoint(grpc_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .build() - .map_err(|metric_error| OtelInitError { - kind: InitErrorType::MetricError, - reason: "Unable to initialise metrics exporter".into(), - cause: Some(format!("{:?}", metric_error)), - })?; - - let provider = opentelemetry_sdk::metrics::SdkMeterProvider::builder() - .with_periodic_exporter(otel_exporter) - .build(); - - tracing::info!("OTEL metrics provider initialised"); - global::set_meter_provider(provider); - } - - tracing::info!("OTEL metrics initialised"); - - Ok(()) -} - impl CertifierService { pub fn new( message_channel_rx: mpsc::Receiver>, diff --git a/packages/talos_certifier/src/services/decision_outbox_service.rs b/packages/talos_certifier/src/services/decision_outbox_service.rs index 264e851e..02585eaf 100644 --- a/packages/talos_certifier/src/services/decision_outbox_service.rs +++ b/packages/talos_certifier/src/services/decision_outbox_service.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use ahash::HashMap; use async_trait::async_trait; -use log::{debug, error}; +use tracing::{debug, error}; use time::OffsetDateTime; use tokio::sync::mpsc; diff --git a/packages/talos_certifier/src/services/healthcheck_service.rs b/packages/talos_certifier/src/services/healthcheck_service.rs index 4992da21..10600b12 100644 --- a/packages/talos_certifier/src/services/healthcheck_service.rs +++ b/packages/talos_certifier/src/services/healthcheck_service.rs @@ -6,7 +6,7 @@ use crate::{ use std::time::Duration; use async_trait::async_trait; -use log::{debug, error, info}; +use tracing::{debug, error, info}; pub struct HealthCheckService { pub healthcheck: HealthChecks, diff --git a/packages/talos_certifier/src/services/message_receiver_service.rs b/packages/talos_certifier/src/services/message_receiver_service.rs index 58d8e20d..57160aaa 100644 --- a/packages/talos_certifier/src/services/message_receiver_service.rs +++ b/packages/talos_certifier/src/services/message_receiver_service.rs @@ -4,8 +4,8 @@ use std::{ }; use async_trait::async_trait; -use log::{error, info, warn}; use tokio::{sync::mpsc, time::Interval}; +use tracing::{error, info, warn}; use crate::{ core::{ServiceResult, System, SystemService}, diff --git a/packages/talos_certifier/src/talos_certifier_service.rs b/packages/talos_certifier/src/talos_certifier_service.rs index 21f9266a..da0a1d4c 100644 --- a/packages/talos_certifier/src/talos_certifier_service.rs +++ b/packages/talos_certifier/src/talos_certifier_service.rs @@ -5,7 +5,7 @@ use std::sync::{ use crate::{core::ServiceResult, SystemMessage}; use futures_util::future::join_all; -use log::{error, info}; +use tracing::{error, info}; use crate::core::{System, SystemService}; diff --git a/packages/talos_certifier_adapters/Cargo.toml b/packages/talos_certifier_adapters/Cargo.toml index a67aeba2..206fdb71 100644 --- a/packages/talos_certifier_adapters/Cargo.toml +++ b/packages/talos_certifier_adapters/Cargo.toml @@ -16,9 +16,6 @@ description = "Adapters used in Talos Certifier" serde = { workspace = true } serde_json = { workspace = true } -# Logging -log = { workspace = true } -env_logger = { workspace = true } # Async tokio = { workspace = true } async-trait = { workspace = true } @@ -48,6 +45,8 @@ thiserror = "1.0.31" # Test mockall = "0.11.0" +tracing = { version = "0.1.41", features = ["log"] } + # internal crates talos_metrics = { path = "../talos_metrics", version = "0.2.60-dev" } talos_certifier = { path = "../talos_certifier", version = "0.2.60-dev" } diff --git a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs index a64ef554..4e84476a 100644 --- a/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs +++ b/packages/talos_certifier_adapters/src/bin/histogram_decision_timeline_from_kafka.rs @@ -6,6 +6,7 @@ use talos_certifier::{ ChannelMessage, }; use talos_certifier_adapters::KafkaConsumer; +use talos_common_utils::otel::initialiser::init_otel_logs_tracing; use talos_metrics::model::MinMax; use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; @@ -13,8 +14,7 @@ use tokio::time::timeout; #[tokio::main] async fn main() -> Result<(), String> { - env_logger::builder().format_timestamp_millis().init(); - + init_otel_logs_tracing("histogram-generator".into(), false, None, "info").map_err(|e| e.reason)?; let mut kafka_config = KafkaConfig::from_env(None); kafka_config.extend( None, @@ -28,7 +28,7 @@ async fn main() -> Result<(), String> { let mut kafka_consumer = KafkaConsumer::new(&kafka_config); kafka_consumer.subscribe().await.unwrap(); - log::warn!("Aggregating timelines..."); + tracing::warn!("Aggregating timelines..."); // collect min and max times of each span in the decision processing timeline let aggregates = aggregate_timelines(&mut kafka_consumer).await?; let total_decisions = aggregates.1; @@ -45,7 +45,7 @@ async fn main() -> Result<(), String> { kafka_consumer2.subscribe().await.unwrap(); - log::warn!("Computing histograms of {} decisions", total_decisions); + tracing::warn!("Computing histograms of {} decisions", total_decisions); let mut bukets_start_at = MinMax::default(); bukets_start_at.add(aggregates.0.candidate_published.min); @@ -57,7 +57,7 @@ async fn main() -> Result<(), String> { let earliest_bucket_start = bukets_start_at.min; - log::warn!( + tracing::warn!( "Earliest bucket starts at {:?}", OffsetDateTime::from_unix_timestamp_nanos(earliest_bucket_start) ); @@ -80,10 +80,10 @@ async fn main() -> Result<(), String> { if let ChannelMessage::Decision(decision) = message { let decision_message = decision.message; - // log::warn!("Decision {:?}", msg_decision); + // tracing::warn!("Decision {:?}", msg_decision); item_number += 1; if item_number % progress_frequency == 0 { - log::warn!("Progress: {} of {}", item_number, total_decisions); + tracing::warn!("Progress: {} of {}", item_number, total_decisions); } hist_candidate_published.include(&decision_message.metrics, decision_message.metrics.candidate_published); hist_candidate_received.include(&decision_message.metrics, decision_message.metrics.candidate_received); @@ -98,7 +98,7 @@ async fn main() -> Result<(), String> { } } - log::warn!("Reading of messages ended"); + tracing::warn!("Reading of messages ended"); kafka_consumer2.unsubscribe().await; let mut bucket_length = MinMax::default(); @@ -109,11 +109,11 @@ async fn main() -> Result<(), String> { bucket_length.add(hist_db_save_started.buckets.len() as i128); bucket_length.add(hist_db_save_ended.buckets.len() as i128); - log::warn!("Histograms are ready, there are: {} 1 second buckets.", bucket_length.max); + tracing::warn!("Histograms are ready, there are: {} 1 second buckets.", bucket_length.max); - log::warn!("Headers: 'By Publish Time',,,,,,'-','By Receive Time',,,,,,'-','Processing Started',,,,,,'-','By Decision Created',,,,,,'-','By DB Save Started',,,,,,'-','By DB Save Ended',,,,,"); + tracing::warn!("Headers: 'By Publish Time',,,,,,'-','By Receive Time',,,,,,'-','Processing Started',,,,,,'-','By Decision Created',,,,,,'-','By DB Save Started',,,,,,'-','By DB Save Ended',,,,,"); for bucket in 1..bucket_length.max { - log::warn!( + tracing::warn!( "METRIC (histograms MAX): {},'-',{},'-',{},'-',{},'-',{},'-',{}", hist_candidate_published.to_csv_max(bucket).await, hist_candidate_received.to_csv_max(bucket).await, diff --git a/packages/talos_certifier_adapters/src/kafka/consumer.rs b/packages/talos_certifier_adapters/src/kafka/consumer.rs index 22fd9102..a1731506 100644 --- a/packages/talos_certifier_adapters/src/kafka/consumer.rs +++ b/packages/talos_certifier_adapters/src/kafka/consumer.rs @@ -1,7 +1,6 @@ use std::{marker::PhantomData, sync::Arc, time::Duration}; use async_trait::async_trait; -use log::debug; use rdkafka::{ consumer::{Consumer, ConsumerContext, DefaultConsumerContext, StreamConsumer}, Message, TopicPartitionList, @@ -20,6 +19,7 @@ use talos_certifier::{ use talos_rdkafka_utils::kafka_config::KafkaConfig; use time::OffsetDateTime; use tokio::task::JoinHandle; +use tracing::debug; use crate::{kafka::utils::get_message_headers, KafkaAdapterError}; diff --git a/packages/talos_certifier_adapters/src/kafka/contexts/certifier_consumer_context.rs b/packages/talos_certifier_adapters/src/kafka/contexts/certifier_consumer_context.rs index 8280654a..2b994c3e 100644 --- a/packages/talos_certifier_adapters/src/kafka/contexts/certifier_consumer_context.rs +++ b/packages/talos_certifier_adapters/src/kafka/contexts/certifier_consumer_context.rs @@ -1,8 +1,8 @@ -use log::{error, info}; use rdkafka::{ consumer::{ConsumerContext, Rebalance}, ClientContext, }; +use tracing::{error, info}; use talos_common_utils::{ sync::{try_send_with_retry, TrySendWithRetryConfig}, diff --git a/packages/talos_certifier_adapters/src/kafka/contexts/replicator_consumer_context.rs b/packages/talos_certifier_adapters/src/kafka/contexts/replicator_consumer_context.rs index f612b29f..f9a25b9e 100644 --- a/packages/talos_certifier_adapters/src/kafka/contexts/replicator_consumer_context.rs +++ b/packages/talos_certifier_adapters/src/kafka/contexts/replicator_consumer_context.rs @@ -1,6 +1,6 @@ use tokio::sync::mpsc; -use log::{error, info}; +use tracing::{error, info}; // use futures_executor::block_on; use rdkafka::{ consumer::{ConsumerContext, Rebalance}, diff --git a/packages/talos_certifier_adapters/src/kafka/producer.rs b/packages/talos_certifier_adapters/src/kafka/producer.rs index dc86d049..cf367482 100644 --- a/packages/talos_certifier_adapters/src/kafka/producer.rs +++ b/packages/talos_certifier_adapters/src/kafka/producer.rs @@ -1,12 +1,12 @@ use ahash::HashMap; use async_trait::async_trait; -use log::debug; use rdkafka::producer::{BaseRecord, DefaultProducerContext, ThreadedProducer}; use talos_certifier::{ errors::SystemServiceError, ports::{common::SharedPortTraits, errors::MessagePublishError, MessagePublisher}, }; use talos_rdkafka_utils::kafka_config::KafkaConfig; +use tracing::debug; use crate::kafka::utils::build_kafka_headers; diff --git a/packages/talos_certifier_adapters/src/postgres/config.rs b/packages/talos_certifier_adapters/src/postgres/config.rs index d4c3eb47..37aa2811 100644 --- a/packages/talos_certifier_adapters/src/postgres/config.rs +++ b/packages/talos_certifier_adapters/src/postgres/config.rs @@ -1,5 +1,5 @@ -use log::debug; use talos_common_utils::{env_var, env_var_with_defaults}; +use tracing::debug; #[derive(Debug, Clone)] pub struct PgConfig { diff --git a/packages/talos_certifier_adapters/src/postgres/pg.rs b/packages/talos_certifier_adapters/src/postgres/pg.rs index 76bf590c..e9cd8d4f 100644 --- a/packages/talos_certifier_adapters/src/postgres/pg.rs +++ b/packages/talos_certifier_adapters/src/postgres/pg.rs @@ -2,7 +2,6 @@ use std::time::Duration; use async_trait::async_trait; use deadpool_postgres::{Config, ManagerConfig, Object, Pool, PoolConfig, PoolError, Runtime}; -use log::{debug, error, warn}; use serde_json::{json, Value}; use talos_certifier::{ model::DecisionMessage, @@ -12,6 +11,7 @@ use talos_certifier::{ DecisionStore, }, }; +use tracing::{debug, error, warn}; use tokio_postgres::NoTls; diff --git a/packages/talos_cohort_replicator/src/otel/mod.rs b/packages/talos_cohort_replicator/src/otel/mod.rs index 7890cb8c..6bda6c5f 100644 --- a/packages/talos_cohort_replicator/src/otel/mod.rs +++ b/packages/talos_cohort_replicator/src/otel/mod.rs @@ -1,2 +1 @@ -pub mod initialiser; pub mod otel_config; diff --git a/packages/talos_cohort_replicator/src/talos_cohort_replicator.rs b/packages/talos_cohort_replicator/src/talos_cohort_replicator.rs index 064b269a..5a5c6c08 100644 --- a/packages/talos_cohort_replicator/src/talos_cohort_replicator.rs +++ b/packages/talos_cohort_replicator/src/talos_cohort_replicator.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use opentelemetry::global; use talos_certifier::{ports::MessageReciever, ChannelMessage}; +use talos_common_utils::otel::initialiser::{init_otel_logs_tracing, init_otel_metrics}; use talos_suffix::{ core::{SuffixConfig, SuffixMetricsConfig}, Suffix, @@ -13,10 +14,7 @@ use crate::{ core::Replicator, errors::{ReplicatorError, ReplicatorErrorKind}, models::{ReplicatorCandidate, ReplicatorCandidateMessage}, - otel::{ - initialiser::{init_otel_logs_tracing, init_otel_metrics}, - otel_config::ReplicatorOtelConfig, - }, + otel::otel_config::ReplicatorOtelConfig, services::{ replicator_service::{ReplicatorService, ReplicatorServiceConfig}, statemap_installer_service::{installation_service, StatemapInstallerConfig}, diff --git a/packages/talos_common_utils/Cargo.toml b/packages/talos_common_utils/Cargo.toml index 9b44c581..d7c759bb 100644 --- a/packages/talos_common_utils/Cargo.toml +++ b/packages/talos_common_utils/Cargo.toml @@ -13,12 +13,21 @@ doctest = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -opentelemetry = { version = "0.28.0", default-features = false, features = [ - "trace", -] } +opentelemetry_api = { version = "0.20.0", features = ["metrics"] } +opentelemetry = { version = "0.28.0", default-features = false, features = ["trace"] } +opentelemetry_sdk = { version = "0.28.0", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = { version = "0.28.0", features = [ "trace", "grpc-tonic" ] } +opentelemetry-stdout = { version = "0.28.0", features = [ "trace" ] } + +tracing = { version = "0.1.41", features = [ "log" ] } +tracing-opentelemetry = { version = "0.29.0" } +tracing-bunyan-formatter = { version = "0.3.10" } +tracing-subscriber = { version = "0.3.19", features = [ "fmt", "ansi", "json", "registry", "env-filter" ] } + +strum = { version = "0.27.1", features = ["derive"] } +thiserror = { version = "2.0.11" } tokio = { workspace = true, features = ["full"] } -# Time time = { version = "0.3.17" } [dev-dependencies] diff --git a/packages/talos_cohort_replicator/src/otel/initialiser.rs b/packages/talos_common_utils/src/otel/initialiser.rs similarity index 96% rename from packages/talos_cohort_replicator/src/otel/initialiser.rs rename to packages/talos_common_utils/src/otel/initialiser.rs index deca7f62..1ce4190c 100644 --- a/packages/talos_cohort_replicator/src/otel/initialiser.rs +++ b/packages/talos_common_utils/src/otel/initialiser.rs @@ -9,11 +9,6 @@ use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::{fmt, EnvFilter}; -/** - * This module is intentional duplicate with packages/cohort_sdk/otel. - * We will externalise them into re-usable commons when we apply OTEL to full Talos ecosystem. - */ - pub fn init_otel_logs_tracing(name: String, enable_tracing: bool, grpc_endpoint: Option, default_level: &'static str) -> Result<(), OtelInitError> { let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level)); diff --git a/packages/talos_common_utils/src/otel/mod.rs b/packages/talos_common_utils/src/otel/mod.rs index 20a8a841..f38c9604 100644 --- a/packages/talos_common_utils/src/otel/mod.rs +++ b/packages/talos_common_utils/src/otel/mod.rs @@ -1,2 +1,3 @@ +pub mod initialiser; pub mod metric_constants; pub mod propagated_context; diff --git a/packages/talos_messenger_actions/Cargo.toml b/packages/talos_messenger_actions/Cargo.toml index 0a387631..db654d0a 100644 --- a/packages/talos_messenger_actions/Cargo.toml +++ b/packages/talos_messenger_actions/Cargo.toml @@ -16,8 +16,6 @@ doctest = false [dependencies] # Packages from workspace async-trait = { workspace = true } -env_logger = { workspace = true } -log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } @@ -50,6 +48,13 @@ zerofrom = { version = "=0.1.5" } litemap = { version = "=0.7.4" } # *** End - Adding to fix napi build error on GH actions. +tracing = { version = "0.1.41", features = [ "log" ] } +opentelemetry_api = { version = "0.20.0", features = ["metrics"] } +opentelemetry = { version = "0.28.0", default-features = false, features = ["trace"] } +opentelemetry_sdk = { version = "0.28.0", default-features = false, features = ["trace", "rt-tokio"] } +opentelemetry-otlp = { version = "0.28.0", features = [ "trace", "grpc-tonic" ] } +opentelemetry-stdout = { version = "0.28.0", features = [ "trace" ] } + talos_certifier = { path = "../talos_certifier", version = "0.2.60-dev" } talos_suffix = { path = "../talos_suffix", version = "0.2.60-dev" } talos_certifier_adapters = { path = "../talos_certifier_adapters", version = "0.2.60-dev" } diff --git a/packages/talos_messenger_actions/src/kafka/context.rs b/packages/talos_messenger_actions/src/kafka/context.rs index 79795cc1..50157b43 100644 --- a/packages/talos_messenger_actions/src/kafka/context.rs +++ b/packages/talos_messenger_actions/src/kafka/context.rs @@ -1,8 +1,8 @@ use futures_executor::block_on; -use log::error; use rdkafka::{producer::ProducerContext, ClientContext}; use talos_messenger_core::{core::MessengerChannelFeedback, errors::MessengerActionError}; use tokio::sync::mpsc; +use tracing::error; #[derive(Debug)] pub struct MessengerProducerDeliveryOpaque { diff --git a/packages/talos_messenger_actions/src/kafka/producer.rs b/packages/talos_messenger_actions/src/kafka/producer.rs index 19d7ccaf..917d8e4d 100644 --- a/packages/talos_messenger_actions/src/kafka/producer.rs +++ b/packages/talos_messenger_actions/src/kafka/producer.rs @@ -1,6 +1,5 @@ use ahash::HashMap; use async_trait::async_trait; -use log::debug; use rdkafka::{ config::{FromClientConfig, FromClientConfigAndContext}, producer::{BaseRecord, DefaultProducerContext, ProducerContext, ThreadedProducer}, @@ -11,6 +10,7 @@ use talos_certifier::{ }; use talos_certifier_adapters::kafka::utils::build_kafka_headers; use talos_rdkafka_utils::kafka_config::KafkaConfig; +use tracing::debug; // Kafka Producer pub struct KafkaProducer { diff --git a/packages/talos_messenger_actions/src/kafka/service.rs b/packages/talos_messenger_actions/src/kafka/service.rs index b823c974..c3bb8fd6 100644 --- a/packages/talos_messenger_actions/src/kafka/service.rs +++ b/packages/talos_messenger_actions/src/kafka/service.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use async_trait::async_trait; use futures_util::future::join_all; -use log::{debug, error, info}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::mpsc; +use tracing::{debug, error, info}; use talos_messenger_core::{ core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, MessengerSystemService}, diff --git a/packages/talos_messenger_actions/src/messenger_with_kafka.rs b/packages/talos_messenger_actions/src/messenger_with_kafka.rs index 9604b41a..db558e59 100644 --- a/packages/talos_messenger_actions/src/messenger_with_kafka.rs +++ b/packages/talos_messenger_actions/src/messenger_with_kafka.rs @@ -1,9 +1,10 @@ use ahash::HashMap; use async_trait::async_trait; -use log::debug; +use opentelemetry::global; use rdkafka::producer::ProducerContext; use talos_certifier::ports::{errors::MessagePublishError, MessageReciever}; use talos_certifier_adapters::KafkaConsumer; +use talos_common_utils::otel::initialiser::{init_otel_logs_tracing, init_otel_metrics}; use talos_messenger_core::{ core::{MessengerPublisher, PublishActionType}, errors::{MessengerServiceError, MessengerServiceErrorKind, MessengerServiceResult}, @@ -12,8 +13,12 @@ use talos_messenger_core::{ talos_messenger_service::TalosMessengerService, }; use talos_rdkafka_utils::kafka_config::KafkaConfig; -use talos_suffix::{core::SuffixConfig, Suffix}; +use talos_suffix::{ + core::{SuffixConfig, SuffixMetricsConfig}, + Suffix, +}; use tokio::sync::mpsc; +use tracing::debug; use crate::kafka::{ context::{MessengerProducerContext, MessengerProducerDeliveryOpaque}, @@ -110,9 +115,17 @@ pub struct Configuration { pub commit_size: Option, /// Commit issuing frequency. pub commit_frequency: Option, + pub otel_grpc_endpoint: Option, } pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResult { + init_otel_logs_tracing("messenger".into(), false, config.otel_grpc_endpoint.clone(), "info").map_err(|e| MessengerServiceError { + kind: MessengerServiceErrorKind::System, + reason: e.reason, + data: e.cause, + service: "init_otel_logs_tracing".into(), + })?; + let kafka_consumer = KafkaConsumer::new(&config.kafka_config); // Subscribe to topic. @@ -133,8 +146,19 @@ pub async fn messenger_with_kafka(config: Configuration) -> MessengerServiceResu let tx_feedback_channel_clone = tx_feedback_channel.clone(); // START - Inbound service - let suffix: Suffix = Suffix::with_config(config.suffix_config.unwrap_or_default(), None); + let suffix: Suffix = if config.otel_grpc_endpoint.is_some() { + let _ = init_otel_metrics(config.otel_grpc_endpoint.clone()); + let meter = global::meter("messenger"); + Suffix::with_config( + config.suffix_config.unwrap_or_default(), + Some((SuffixMetricsConfig { prefix: "messenger".into() }, meter)), + ) + } else { + Suffix::with_config(config.suffix_config.unwrap_or_default(), None) + }; + let inbound_service_config = MessengerInboundServiceConfig::new(config.allowed_actions, config.commit_size, config.commit_frequency); + let inbound_service = MessengerInboundService::new(kafka_consumer, tx_actions_channel, rx_feedback_channel, suffix, inbound_service_config); // END - Inbound service diff --git a/packages/talos_messenger_core/Cargo.toml b/packages/talos_messenger_core/Cargo.toml index 7fb8029d..f91e1bac 100644 --- a/packages/talos_messenger_core/Cargo.toml +++ b/packages/talos_messenger_core/Cargo.toml @@ -15,8 +15,6 @@ doctest = false [dependencies] # Packages from workspace async-trait = { workspace = true } -env_logger = { workspace = true } -log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } @@ -42,6 +40,8 @@ time = { version = "0.3.30" } indexmap = { version = "2.0.0", features = ["rayon"] } ahash = "0.8.3" +tracing = { version = "0.1.41", features = [ "log" ] } + talos_certifier = { path = "../talos_certifier", version = "0.2.60-dev" } talos_suffix = { path = "../talos_suffix", version = "0.2.60-dev" } talos_common_utils = { path = "../../packages/talos_common_utils", version = "0.2.60-dev" } diff --git a/packages/talos_messenger_core/src/services/inbound_service.rs b/packages/talos_messenger_core/src/services/inbound_service.rs index 11aa95dd..0b37f026 100644 --- a/packages/talos_messenger_core/src/services/inbound_service.rs +++ b/packages/talos_messenger_core/src/services/inbound_service.rs @@ -2,7 +2,7 @@ use std::time::Duration; use ahash::{HashMap, HashMapExt}; use async_trait::async_trait; -use log::{debug, error, info, warn}; +use tracing::{debug, error, info, warn}; use talos_certifier::{model::DecisionMessageTrait, ports::MessageReciever, ChannelMessage}; use talos_suffix::{core::SuffixMeta, Suffix, SuffixTrait}; diff --git a/packages/talos_messenger_core/src/suffix.rs b/packages/talos_messenger_core/src/suffix.rs index 8e50789d..8f0d5b4a 100644 --- a/packages/talos_messenger_core/src/suffix.rs +++ b/packages/talos_messenger_core/src/suffix.rs @@ -1,5 +1,4 @@ use ahash::{HashMap, HashMapExt}; -use log::{debug, warn}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fmt::Debug; @@ -7,6 +6,7 @@ use strum::{Display, EnumString}; use talos_certifier::model::{Decision, DecisionMessageTrait}; use talos_suffix::{core::SuffixResult, Suffix, SuffixTrait}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use tracing::{debug, warn}; use crate::models::MessengerCandidateMessage; diff --git a/packages/talos_messenger_core/src/tests/test_utils.rs b/packages/talos_messenger_core/src/tests/test_utils.rs index b5317d0e..b9725bcc 100644 --- a/packages/talos_messenger_core/src/tests/test_utils.rs +++ b/packages/talos_messenger_core/src/tests/test_utils.rs @@ -1,6 +1,5 @@ use ahash::HashMap; use async_trait::async_trait; -use log::{debug, error}; use strum::{Display, EnumString}; use talos_certifier::{ errors::SystemServiceError, @@ -16,6 +15,7 @@ use tokio::{ sync::mpsc::{self, Sender}, task::JoinHandle, }; +use tracing::{debug, error}; use crate::{ core::{ActionService, MessengerChannelFeedback, MessengerCommitActions, MessengerPublisher, PublishActionType}, diff --git a/packages/talos_metrics/Cargo.toml b/packages/talos_metrics/Cargo.toml index 2b46cd9f..7b28fd2c 100644 --- a/packages/talos_metrics/Cargo.toml +++ b/packages/talos_metrics/Cargo.toml @@ -19,7 +19,4 @@ opentelemetry = { version = "0.20.0", features = ["metrics"] } serde = { workspace = true } serde_json = { workspace = true } -env_logger = { workspace = true } -log = { workspace = true } - time = { version = "0.3.17" } diff --git a/packages/talos_rdkafka_utils/Cargo.toml b/packages/talos_rdkafka_utils/Cargo.toml index 21a8f89b..3a3421ae 100644 --- a/packages/talos_rdkafka_utils/Cargo.toml +++ b/packages/talos_rdkafka_utils/Cargo.toml @@ -16,9 +16,8 @@ doctest = false async-trait = { workspace = true } tokio = { workspace = true, features = ["macros", "rt"] } talos_common_utils = { path = "../talos_common_utils", version = "0.2.60-dev" } -# Logging -log = { workspace = true } -env_logger = { workspace = true } + +tracing = { version = "0.1.41", features = [ "log" ] } # Test mockall = "0.11.0" diff --git a/packages/talos_rdkafka_utils/src/kafka_config.rs b/packages/talos_rdkafka_utils/src/kafka_config.rs index 4ed80217..78198e75 100644 --- a/packages/talos_rdkafka_utils/src/kafka_config.rs +++ b/packages/talos_rdkafka_utils/src/kafka_config.rs @@ -109,7 +109,7 @@ impl KafkaConfig { self.setup_auth(&mut client_config, base_config); - log::debug!("p: client_config = {:?}", client_config); + tracing::debug!("p: client_config = {:?}", client_config); client_config } diff --git a/packages/talos_suffix/Cargo.toml b/packages/talos_suffix/Cargo.toml index 22704018..bc27d6c3 100644 --- a/packages/talos_suffix/Cargo.toml +++ b/packages/talos_suffix/Cargo.toml @@ -37,8 +37,6 @@ potential_utf = { version = "=0.1.0"} ######################################################## # Logging -#log = { workspace = true } -#env_logger = { workspace = true } tracing = { version = "0.1.41", features = [ "log" ] } opentelemetry_api = { version = "0.20.0", features = ["metrics"] }