Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ tracing-attributes = "=0.1.22"
tracing-subscriber = { version = "0.3.17", default-features = true, features = ["env-filter", "json", "registry"] }
gethostname = "0.4.3"
rustc-hash = "1.1"
opentelemetry = { version = "0.19.0", features = ["rt-tokio-current-thread"] }
opentelemetry-otlp = { version = "0.12.0" }
tracing-opentelemetry = { version = "0.19.0" }

once_cell = "1.18.0"

# Tokio Dependencies
Expand Down Expand Up @@ -56,6 +60,7 @@ async-trait = "0.1.74"
uuid = { version = "1.5.0", features = ["v4", "fast-rng"] }

argh = "0.1.12"
axum-tracing-opentelemetry = "0.15.0"

[dev-dependencies]
rand = "0.8.5"
Expand Down
6 changes: 6 additions & 0 deletions config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ enabled = true
level = "DEBUG"
log_format = "default"

[log.telemetry]
traces_enabled = false
use_xray_generator = false
otel_exporter_otlp_endpoint = "http://localhost:4317"
route_to_trace = ["*/add", "*/retrieve", "*/delete"]

[server]
host = "127.0.0.1"
port = 8080
Expand Down
1 change: 1 addition & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ where
)
.with_state(state.clone())
.route("/health", routing::get(routes::health::health))
.layer(axum_tracing_opentelemetry::middleware::OtelAxumLayer::default())
.layer(
tower_trace::TraceLayer::new_for_http()
.on_request(tower_trace::DefaultOnRequest::new().level(tracing::Level::INFO))
Expand Down
22 changes: 22 additions & 0 deletions src/logger/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use serde::Deserialize;
pub struct Log {
/// Logging to a console.
pub console: LogConsole,
/// Telemetry / tracing.
pub telemetry: LogTelemetry,
}

/// Logging to a console.
Expand All @@ -24,6 +26,26 @@ pub struct LogConsole {
pub filtering_directive: Option<String>,
}

/// Telemetry / tracing.
#[derive(Debug, Deserialize, Clone, Default)]
#[serde(default)]
pub struct LogTelemetry {
/// Whether the traces pipeline is enabled.
pub traces_enabled: bool,
/// Whether errors in setting up traces or metrics pipelines must be ignored.
pub ignore_errors: bool,
/// Sampling rate for traces
pub sampling_rate: Option<f64>,
/// Base endpoint URL to send metrics and traces to. Can optionally include the port number.
pub otel_exporter_otlp_endpoint: Option<String>,
/// Timeout (in milliseconds) for sending metrics and traces.
pub otel_exporter_otlp_timeout: Option<u64>,
/// Whether to use xray ID generator, (enable this if you plan to use AWS-XRAY)
pub use_xray_generator: bool,
/// Route Based Tracing
pub route_to_trace: Option<Vec<String>>,
}

/// Describes the level of verbosity of a span or event.
#[derive(Debug, Clone, Copy)]
pub struct Level(pub(super) tracing::Level);
Expand Down
176 changes: 172 additions & 4 deletions src/logger/setup.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
//! Setup logging subsystem.

use super::{config, formatter::FormattingLayer, storage::StorageSubscription};

use std::time::Duration;

use opentelemetry::{
global,
sdk::{propagation::TraceContextPropagator, trace, trace::BatchConfig, Resource},
trace::{TraceContextExt, TraceState},
KeyValue,
};
use opentelemetry_otlp::{TonicExporterBuilder, WithExportConfig};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, util::SubscriberInitExt, EnvFilter, Layer};

use super::{config, formatter::FormattingLayer, storage::StorageSubscription};

/// Contains guards necessary for logging
#[derive(Debug)]
pub struct TelemetryGuard {
Expand All @@ -21,7 +30,16 @@ pub fn setup(
) -> TelemetryGuard {
let mut guards = Vec::new();

let subscriber = tracing_subscriber::registry().with(StorageSubscription);
// Setup OpenTelemetry traces
let traces_layer = if config.telemetry.traces_enabled {
setup_tracing_pipeline(&config.telemetry, service_name)
} else {
None
};

let subscriber = tracing_subscriber::registry()
.with(traces_layer)
.with(StorageSubscription);

// Setup console logging
if config.console.enabled {
Expand Down Expand Up @@ -49,7 +67,8 @@ pub fn setup(
}
config::LogFormat::Json => {
error_stack::Report::set_color_mode(error_stack::fmt::ColorMode::None);
let logging_layer = FormattingLayer::new(service_name, console_writer);
let logging_layer =
FormattingLayer::new(service_name, console_writer).with_filter(console_filter);
subscriber.with(logging_layer).init();
}
}
Expand All @@ -64,6 +83,155 @@ pub fn setup(
}
}

#[derive(Debug, Clone)]
enum TraceUrlAssert {
Match(String),
EndsWith(String),
}

impl TraceUrlAssert {
fn compare_url(&self, url: &str) -> bool {
match self {
Self::Match(value) => url == value,
Self::EndsWith(end) => url.ends_with(end),
}
}
}

impl From<String> for TraceUrlAssert {
fn from(value: String) -> Self {
match value {
url if url.starts_with('*') => Self::EndsWith(url.trim_start_matches('*').to_string()),
url => Self::Match(url),
}
}
}

#[derive(Debug, Clone)]
struct TraceAssertion {
clauses: Option<Vec<TraceUrlAssert>>,
/// default behaviour for tracing if no condition is provided
default: bool,
}

impl TraceAssertion {
///
/// Should the provided url be traced
///
fn should_trace_url(&self, url: &str) -> bool {
match &self.clauses {
Some(clauses) => clauses.iter().any(|cur| cur.compare_url(url)),
None => self.default,
}
}
}

///
/// Conditional Sampler for providing control on url based tracing
///
#[derive(Clone, Debug)]
struct ConditionalSampler<T: trace::ShouldSample + Clone + 'static>(TraceAssertion, T);

impl<T: trace::ShouldSample + Clone + 'static> trace::ShouldSample for ConditionalSampler<T> {
fn should_sample(
&self,
parent_context: Option<&opentelemetry::Context>,
trace_id: opentelemetry::trace::TraceId,
name: &str,
span_kind: &opentelemetry::trace::SpanKind,
attributes: &opentelemetry::trace::OrderMap<opentelemetry::Key, opentelemetry::Value>,
links: &[opentelemetry::trace::Link],
instrumentation_library: &opentelemetry::InstrumentationLibrary,
) -> opentelemetry::trace::SamplingResult {
match attributes
.get(&opentelemetry::Key::new("uri"))
.map_or(self.0.default, |inner| {
self.0.should_trace_url(&inner.as_str())
}) {
true => self.1.should_sample(
parent_context,
trace_id,
name,
span_kind,
attributes,
links,
instrumentation_library,
),
false => opentelemetry::trace::SamplingResult {
decision: opentelemetry::trace::SamplingDecision::Drop,
attributes: Vec::new(),
trace_state: match parent_context {
Some(ctx) => ctx.span().span_context().trace_state().clone(),
None => TraceState::default(),
},
},
}
}
}

fn get_opentelemetry_exporter(config: &config::LogTelemetry) -> TonicExporterBuilder {
let mut exporter_builder = opentelemetry_otlp::new_exporter().tonic();

if let Some(ref endpoint) = config.otel_exporter_otlp_endpoint {
exporter_builder = exporter_builder.with_endpoint(endpoint);
}
if let Some(timeout) = config.otel_exporter_otlp_timeout {
exporter_builder = exporter_builder.with_timeout(Duration::from_millis(timeout));
}

exporter_builder
}

fn setup_tracing_pipeline(
config: &config::LogTelemetry,
service_name: &str,
) -> Option<tracing_opentelemetry::OpenTelemetryLayer<tracing_subscriber::Registry, trace::Tracer>>
{
global::set_text_map_propagator(TraceContextPropagator::new());

let mut trace_config = trace::config()
.with_sampler(trace::Sampler::ParentBased(Box::new(ConditionalSampler(
TraceAssertion {
clauses: config
.route_to_trace
.clone()
.map(|inner| inner.into_iter().map(Into::into).collect()),
default: false,
},
trace::Sampler::TraceIdRatioBased(config.sampling_rate.unwrap_or(1.0)),
))))
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_owned(),
)]));
if config.use_xray_generator {
trace_config = trace_config.with_id_generator(trace::XrayIdGenerator::default());
}

// Change the default export interval from 5 seconds to 1 second
let batch_config = BatchConfig::default().with_scheduled_delay(Duration::from_millis(1000));

let traces_layer_result = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(get_opentelemetry_exporter(config))
.with_batch_config(batch_config)
.with_trace_config(trace_config)
.install_batch(opentelemetry::runtime::TokioCurrentThread)
.map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer));

if config.ignore_errors {
traces_layer_result
.map_err(|error| {
eprintln!("Failed to create an `opentelemetry_otlp` tracer: {error:?}")
})
.ok()
} else {
// Safety: This is conditional, there is an option to avoid this behavior at runtime.
#[allow(clippy::expect_used)]
Some(traces_layer_result.expect("Failed to create an `opentelemetry_otlp` tracer"))
}
}

fn get_envfilter(
filtering_directive: Option<&String>,
default_log_level: config::Level,
Expand Down
4 changes: 4 additions & 0 deletions src/routes/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use axum::{error_handling::HandleErrorLayer, response::IntoResponse};
use axum::middleware;

use masking::ExposeInterface;
use tracing_attributes::instrument;

use crate::{
app::AppState,
Expand Down Expand Up @@ -71,6 +72,7 @@ pub fn serve(
}

/// `/data/add` handling the requirement of storing cards
#[instrument(skip_all)]
pub async fn add_card(
extract::State(state): extract::State<AppState>,
Json(request): Json<types::StoreCardRequest>,
Expand Down Expand Up @@ -147,6 +149,7 @@ pub async fn add_card(
}

/// `/data/delete` handling the requirement of deleting cards
#[instrument(skip_all)]
pub async fn delete_card(
extract::State(state): extract::State<AppState>,
Json(request): Json<types::DeleteCardRequest>,
Expand Down Expand Up @@ -178,6 +181,7 @@ pub async fn delete_card(
}

/// `/data/retrieve` handling the requirement of retrieving cards
#[instrument(skip_all)]
pub async fn retrieve_card(
extract::State(state): extract::State<AppState>,
Json(request): Json<types::RetrieveCardRequest>,
Expand Down
2 changes: 2 additions & 0 deletions src/routes/data/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// }

use masking::PeekInterface;
use tracing_attributes::instrument;

use crate::error;

Expand Down Expand Up @@ -115,6 +116,7 @@ pub trait Validation {
impl Validation for StoreCardRequest {
type Error = error::ApiError;

#[instrument(skip_all)]
fn validate(&self) -> Result<(), Self::Error> {
match &self.data {
Data::EncData { .. } => Ok(()),
Expand Down
Loading