Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ members = [
"witchcraft-server-ete",
"render-conjure",
]

[patch.crates-io]
witchcraft-metrics = { git = "https://github.com/palantir/witchcraft-rust-logging", branch = "exemplars" }
12 changes: 5 additions & 7 deletions witchcraft-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

pub use body::{RequestBody, ResponseWriter};
use config::install::InstallConfig;
use config::runtime::RuntimeConfig;
use conjure_error::Error;
use conjure_http::server::{AsyncService, ConjureRuntime};
use conjure_runtime::{Agent, ClientFactory, HostMetricsRegistry, UserAgent};
Expand All @@ -309,13 +312,8 @@ use status::StatusServiceEndpoints;
use tokio::runtime::{Handle, Runtime};
use tokio::signal::unix::{self, SignalKind};
use tokio::{runtime, select, time};
use witchcraft_log::{error, fatal, info};
use witchcraft_metrics::MetricRegistry;

pub use body::{RequestBody, ResponseWriter};
use config::install::InstallConfig;
use config::runtime::RuntimeConfig;
pub use witchcraft::Witchcraft;
use witchcraft_log::{error, fatal, info};
#[doc(inline)]
pub use witchcraft_server_config as config;
#[doc(inline)]
Expand Down Expand Up @@ -441,7 +439,7 @@ where
let runtime_config_ok = Arc::new(AtomicBool::new(true));
let runtime_config = load_runtime(&handle, &runtime_config_ok)?;

let metrics = Arc::new(MetricRegistry::new());
let metrics = Arc::new(logging::metric::registry());
let host_metrics = Arc::new(HostMetricsRegistry::new());
let health_checks = Arc::new(HealthCheckRegistry::new(&handle));
let readiness_checks = Arc::new(ReadinessCheckRegistry::new());
Expand Down
74 changes: 66 additions & 8 deletions witchcraft-server/src/logging/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use crate::logging::logger::{self, Appender};
use crate::logging::metric::gauge_reporter::GaugeReporter;
use crate::shutdown_hooks::ShutdownHooks;
use conjure_error::Error;
use conjure_object::Utc;
use conjure_object::{DateTime, Utc};
use futures_sink::Sink;
use futures_util::{ready, SinkExt, Stream};
use pin_project::pin_project;
use serde::Serialize;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -29,14 +30,26 @@ use std::time::Duration;
use tokio::task;
use tokio::time::{self, Instant};
use witchcraft_log::warn;
use witchcraft_metrics::{Metric, MetricId, MetricRegistry};
use witchcraft_logging_api::objects::{Sample, TraceId};
use witchcraft_metrics::{Metric, MetricId, MetricRegistry, Snapshot};
use witchcraft_server_config::install::InstallConfig;

mod gauge_reporter;

const LOG_INTERVAL: Duration = Duration::from_secs(30);
const NANOS_PER_MICRO: i64 = 1_000;
const NANOS_PER_MICRO_F64: f64 = NANOS_PER_MICRO as f64;
const NANOS_PER_MICRO: f64 = 1_000.;

struct Exemplar {
instant: Instant,
time: DateTime<Utc>,
trace_id: zipkin::TraceId,
}

pub fn registry() -> MetricRegistry {
let mut registry = MetricRegistry::new();
registry.set_exemplar_provider(Arc::new(provide_exemplar));
registry
}

pub async fn init(
metrics: &Arc<MetricRegistry>,
Expand Down Expand Up @@ -96,17 +109,21 @@ async fn log_metrics(mut appender: Appender<MetricLogV1>, metrics: Arc<MetricReg
.insert_values("p99", snapshot.value(0.99))
.insert_values("p999", snapshot.value(0.999))
.insert_values("count", m.count())
.samples(extract_samples(&*snapshot, |v| v))
}
Metric::Timer(m) => {
let snapshot = m.snapshot();
builder(id)
.metric_type("timer")
.insert_values("max", snapshot.max() / NANOS_PER_MICRO)
.insert_values("p95", snapshot.value(0.95) / NANOS_PER_MICRO_F64)
.insert_values("p99", snapshot.value(0.99) / NANOS_PER_MICRO_F64)
.insert_values("p999", snapshot.value(0.999) / NANOS_PER_MICRO_F64)
.insert_values("max", (snapshot.max() as f64) / NANOS_PER_MICRO)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by fix - previously the max would be rounded down to the nearest whole microsecond while the percentiles wouldn't.

.insert_values("p95", snapshot.value(0.95) / NANOS_PER_MICRO)
.insert_values("p99", snapshot.value(0.99) / NANOS_PER_MICRO)
.insert_values("p999", snapshot.value(0.999) / NANOS_PER_MICRO)
.insert_values("count", m.count())
.insert_values("1m", m.one_minute_rate())
.samples(extract_samples(&*snapshot, |v| {
(v as f64) / NANOS_PER_MICRO
}))
}
};

Expand Down Expand Up @@ -161,6 +178,47 @@ fn finish_log(
.build()
}

// We only track exemplars for calls with a sampled trace active
fn provide_exemplar() -> Option<Arc<dyn witchcraft_metrics::Exemplar>> {
let span = zipkin::current()?;

if span.sampled() != Some(true) {
return None;
}

Some(Arc::new(Exemplar {
instant: Instant::now(),
time: Utc::now(),
trace_id: span.trace_id(),
}))
}

// We report the single exemplar with the highest value recorded within the logging window.
fn extract_samples<T>(
snapshot: &dyn Snapshot,
map_value: impl Fn(i64) -> T,
) -> impl Iterator<Item = Sample>
where
T: Serialize,
{
let cutoff = Instant::now() - LOG_INTERVAL;

snapshot
.exemplars()
// consumers can override our exemplar setup so it's not a bug if the downcast fails.
.filter_map(|(v, e)| e.downcast_ref::<Exemplar>().map(|e| (v, e)))
.filter(|(_, e)| e.instant >= cutoff)
.max_by_key(|(v, _)| *v)
.map(|(v, e)| {
Sample::builder()
.value(map_value(v))
.time(e.time)
.trace_id(TraceId(e.trace_id.to_string()))
.build()
})
.into_iter()
}

async fn idle(
gauge_reporter: &mut GaugeReporter,
appender: &mut Appender<MetricLogV1>,
Expand Down
2 changes: 1 addition & 1 deletion witchcraft-server/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod cleanup;
mod format;
mod logger;
pub mod mdc;
mod metric;
pub(crate) mod metric;
mod service;
mod trace;

Expand Down
21 changes: 15 additions & 6 deletions witchcraft-server/src/service/mdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,29 @@ where

this.inner.as_pin_mut().unwrap().poll(cx).map(|r| {
r.map(|inner| MdcBody {
inner,
inner: Some(inner),
snapshot: mdc::snapshot(),
})
})
}
}

#[pin_project]
#[pin_project(PinnedDrop)]
pub struct MdcBody<B> {
#[pin]
inner: B,
inner: Option<B>,
snapshot: Snapshot,
}

#[pinned_drop]
impl<B> PinnedDrop for MdcBody<B> {
fn drop(self: Pin<&mut Self>) {
let mut this = self.project();
let _guard = with(this.snapshot);
this.inner.set(None);
}
}

impl<B> Body for MdcBody<B>
where
B: Body,
Expand All @@ -112,15 +121,15 @@ where
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let _guard = with(this.snapshot);
this.inner.poll_frame(cx)
this.inner.as_pin_mut().unwrap().poll_frame(cx)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
self.inner.as_ref().unwrap().is_end_stream()
}

fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
self.inner.as_ref().unwrap().size_hint()
}
}

Expand Down
43 changes: 32 additions & 11 deletions witchcraft-server/src/service/trace_propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures_util::ready;
use http::header::USER_AGENT;
use http::{Request, Response};
use http_body::{Body, Frame};
use pin_project::pin_project;
use pin_project::{pin_project, pinned_drop};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -91,20 +91,29 @@ where
span.tag("http.version", &format!("{:?}", req.version()));

TracePropagationFuture {
inner: self.inner.call(req),
inner: Some(self.inner.call(req)),
span: Some(span),
}
.await
}
}

#[pin_project]
#[pin_project(PinnedDrop)]
pub struct TracePropagationFuture<F> {
#[pin]
inner: F,
inner: Option<F>,
span: Option<OpenSpan<Detached>>,
}

#[pinned_drop]
impl<F> PinnedDrop for TracePropagationFuture<F> {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are required to ensure that the zipkin thread local state is set when the endpoint metric layer handles its timer updates.

fn drop(self: Pin<&mut Self>) {
let mut this = self.project();
let _guard = this.span.as_ref().map(|s| zipkin::set_current(s.context()));
this.inner.set(None);
}
}

impl<F, B> Future for TracePropagationFuture<F>
where
F: Future<Output = Response<B>>,
Expand All @@ -115,22 +124,34 @@ where
let this = self.project();
let _guard = zipkin::set_current(this.span.as_ref().unwrap().context());

let response = ready!(this.inner.poll(cx));
let response = ready!(this.inner.as_pin_mut().unwrap().poll(cx));

let mut span = this.span.take().unwrap();
span.tag("http.status_code", response.status().as_str());

Poll::Ready(response.map(|inner| TracePropagationBody { inner, span }))
Poll::Ready(response.map(|inner| TracePropagationBody {
inner: Some(inner),
span,
}))
}
}

#[pin_project]
#[pin_project(PinnedDrop)]
pub struct TracePropagationBody<B> {
#[pin]
inner: B,
inner: Option<B>,
span: OpenSpan<Detached>,
}

#[pinned_drop]
impl<B> PinnedDrop for TracePropagationBody<B> {
fn drop(self: Pin<&mut Self>) {
let mut this = self.project();
let _guard = zipkin::set_current(this.span.context());
this.inner.set(None);
}
}

impl<B> Body for TracePropagationBody<B>
where
B: Body,
Expand All @@ -145,15 +166,15 @@ where
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
let _guard = zipkin::set_current(this.span.context());
this.inner.poll_frame(cx)
this.inner.as_pin_mut().unwrap().poll_frame(cx)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
self.inner.as_ref().unwrap().is_end_stream()
}

fn size_hint(&self) -> http_body::SizeHint {
self.inner.size_hint()
self.inner.as_ref().unwrap().size_hint()
}
}

Expand Down