From 33d0f2b4ddd083899e1823ebb1f6419dd5b2c621 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 21 Aug 2025 09:44:13 -0400 Subject: [PATCH] dekaf: Add support for shipping opentelemetry traces --- Cargo.lock | 106 +++++++++++++++++++++------ Cargo.toml | 9 ++- crates/dekaf/Cargo.toml | 6 +- crates/dekaf/src/lib.rs | 9 +-- crates/dekaf/src/log_appender.rs | 57 ++------------- crates/dekaf/src/logging.rs | 43 +++++++---- crates/dekaf/src/main.rs | 115 +++++++++++++++++++++++++----- crates/dekaf/src/otel.rs | 69 ++++++++++++++++++ crates/dekaf/src/session.rs | 17 +++++ crates/dekaf/src/task_manager.rs | 15 +++- crates/gazette/src/journal/mod.rs | 2 + 11 files changed, 335 insertions(+), 113 deletions(-) create mode 100644 crates/dekaf/src/otel.rs diff --git a/Cargo.lock b/Cargo.lock index c4332f7083a..e1ca4192672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2358,6 +2358,9 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "models", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "ops", "percent-encoding", "postgrest", @@ -2389,13 +2392,14 @@ dependencies = [ "tonic", "tower-http", "tracing", - "tracing-record-hierarchical", + "tracing-opentelemetry", "tracing-subscriber", "tracing-test", "tuple", "typestate", "unseal", "url", + "uuid 1.10.0", "webpki", ] @@ -4926,6 +4930,72 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab70038c28ed37b97d8ed414b6429d343a8bbf44c9f79ec854f3a643029ba6d7" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 1.0.63", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91cf61a1868dacc576bf2b2a1c3e9ab150af7272909e80085c3173384fe11f76" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror 1.0.63", + "tokio", + "tonic", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e05acbfada5ec79023c85368af14abd0b307c015e9064d249b2a950ef459a6" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231e9d6ceef9b0b2546ddf52335785ce41252bc7474ee8ba05bfad277be13ab8" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "opentelemetry", + "percent-encoding", + "rand 0.8.5", + "serde_json", + "thiserror 1.0.63", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "ops" version = "0.0.0" @@ -6615,18 +6685,6 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" -[[package]] -name = "sealed" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4a8caec23b7800fb97971a1c6ae365b6239aaeddfb934d6265f8505e795699d" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "security-framework" version = "2.11.1" @@ -7691,9 +7749,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", @@ -7710,7 +7768,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls-native-certs 0.7.2", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.3", "socket2", "tokio", @@ -7843,15 +7901,21 @@ dependencies = [ ] [[package]] -name = "tracing-record-hierarchical" -version = "0.1.1" +name = "tracing-opentelemetry" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4870b936b70cfeef3d45599052b53b0c9be6dce769cdb1f2fae5cbf47c3ef9d0" +checksum = "97a971f6058498b5c0f1affa23e7ea202057a7301dbff68e968b2d578bcbd053" dependencies = [ - "lazy_static", - "sealed", + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", "tracing", + "tracing-core", + "tracing-log", "tracing-subscriber", + "web-time", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 76aee921905..5e30ffa3206 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,7 +200,14 @@ tracing-subscriber = { version = "0.3", features = [ "env-filter", "fmt", ] } -tracing-record-hierarchical = "0.1.1" +tracing-opentelemetry = "0.28" +opentelemetry = { version = "0.27", features = ["trace"] } +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "trace"] } +opentelemetry-otlp = { version = "0.27", features = [ + "tonic", + "trace", + "tls-roots", +] } rustls-native-certs = "0.7.2" zeroize = "1.6" diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 97952a49bef..3c70144cbce 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -71,7 +71,6 @@ tokio-rustls = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true } tonic = { workspace = true } -tracing-record-hierarchical = { workspace = true } aws-config = { workspace = true } aws-types = { workspace = true } aws-credential-types = { workspace = true } @@ -79,11 +78,16 @@ aws-msk-iam-sasl-signer = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } +tracing-opentelemetry = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } +opentelemetry-otlp = { workspace = true } typestate = { workspace = true } url = { workspace = true } webpki = { workspace = true } apache-avro = { workspace = true } thiserror = { workspace = true } +uuid = { workspace = true } [dev-dependencies] async-process = { path = "../async-process" } diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 3a73f8e9d0d..4254f405fda 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -8,6 +8,7 @@ use tracing::instrument; pub mod log_appender; pub mod logging; +pub mod otel; mod topology; pub use topology::extract_dekaf_config; @@ -34,16 +35,13 @@ pub use api_client::{KafkaApiClient, KafkaClientAuth}; use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser}; use flow_client::client::{refresh_authorizations, RefreshToken}; -use log_appender::SESSION_CLIENT_ID_FIELD_MARKER; use percent_encoding::{percent_decode_str, utf8_percent_encode}; use proto_flow::flow::MaterializationSpec; use serde::{Deserialize, Serialize}; use std::{ - any, sync::Arc, time::{Duration, SystemTime}, }; -use tracing_record_hierarchical::SpanExt; pub struct App { /// Hostname which is advertised for Kafka access. @@ -460,9 +458,8 @@ async fn handle_api( // https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823 let (header, request) = dec_request(frame, version)?; if let Some(client_id) = &header.client_id { - tracing::Span::current() - .record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, client_id.to_string()); - tracing::info!("Got client ID!"); + session.set_client_id(client_id.to_string()); + tracing::info!(?client_id, "Got client ID!"); } Ok(enc_resp(out, &header, session.api_versions(request).await?)) } diff --git a/crates/dekaf/src/log_appender.rs b/crates/dekaf/src/log_appender.rs index 14c64ee7aa2..fe8a1447570 100644 --- a/crates/dekaf/src/log_appender.rs +++ b/crates/dekaf/src/log_appender.rs @@ -191,6 +191,9 @@ enum GazetteAppender { } impl GazetteAppender { + #[tracing::instrument(skip_all, fields( + journal_name = self.get_journal_name() + ))] async fn append(&mut self, data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()> where S: Stream> + Send + 'static, @@ -224,6 +227,9 @@ impl GazetteAppender { } } + #[tracing::instrument(skip(self), fields( + journal_name = self.get_journal_name() + ))] async fn get_client(&self) -> anyhow::Result { match self { GazetteAppender::OpsStats(state) => match state.task_listener.get().await? { @@ -271,9 +277,6 @@ pub struct TaskForwarder { } // These well-known tracing field names are used to annotate all log messages within a particular session. -// This is done by using `tracing_record_hierarchical` to update the field value wherever it's defined in the span hierarchy: -// -// tracing::Span::current().record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, ...client_id...); pub const SESSION_TASK_NAME_FIELD_MARKER: &str = "task_name"; pub const SESSION_CLIENT_ID_FIELD_MARKER: &str = "session_client_id"; pub const EXCLUDE_FROM_TASK_LOGGING: &str = "exclude_from_task_logging"; @@ -311,7 +314,7 @@ impl TaskForwarder { } } - #[instrument(skip_all, fields( + #[instrument(name = "start_log_append_loop", skip_all, fields( task_name = tracing::field::Empty ))] async fn start( @@ -531,16 +534,10 @@ impl TaskForwarder { } pub fn set_task_name(&self, name: String, build: String) { - use tracing_record_hierarchical::SpanExt; - self.send_message(TaskWriterMessage::SetTaskName { name: name.clone(), build, }); - - // Also set the task name on the parent span so it's included in the logs. This also adds it - // to the logs that Dekaf writes to stdout, which makes debugging issues much easier. - tracing::Span::current().record_hierarchical(SESSION_TASK_NAME_FIELD_MARKER, name); } pub fn send_log_message(&self, log: ops::Log) { @@ -662,7 +659,6 @@ mod tests { use tracing::{info, info_span}; use tracing::{instrument::WithSubscriber, Instrument}; - use tracing_record_hierarchical::SpanExt; use tracing_subscriber::prelude::*; #[derive(Default, Clone)] @@ -736,7 +732,6 @@ mod tests { let producer = gen_producer(); let subscriber = tracing_subscriber::registry() - .with(tracing_record_hierarchical::HierarchicalRecord::default()) .with(ops::tracing::Layer::new( |log| MOCK_LOG_FORWARDER.get().send_log_message(log.clone()), std::time::SystemTime::now, @@ -823,44 +818,6 @@ mod tests { .await; } - #[tokio::test] - async fn test_logging_with_client_id_hierarchical() { - setup(|logs, _stats, cancelled| async move { - { - info!("Test log data before setting name, you should see me"); - let session_span = info_span!( - "session_span", - { SESSION_CLIENT_ID_FIELD_MARKER } = tracing::field::Empty - ); - let session_guard = session_span.enter(); - - info!("Test log data without a task name yet!"); - - MOCK_LOG_FORWARDER.get().set_task_name( - "my_task".to_string(), - "11:22:33:44:55:66:77:88".parse().unwrap(), - ); - - let child_span = info_span!("child_span"); - let child_guard = child_span.enter(); - - tracing::Span::current() - .record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, "my-client-id"); - - info!("I should have a client ID"); - drop(child_guard); - info!("I should also have a client ID"); - drop(session_guard) - }; - - tokio::time::sleep(Duration::from_millis(100)).await; - - assert_output("session_logger_and_task_name_hierarchical", logs).await; - assert!(!cancelled.is_cancelled()); - }) - .await; - } - #[tokio::test] async fn test_session_subscriber_layer_taskless() { setup(|logs, _stats, cancelled| async move { diff --git a/crates/dekaf/src/logging.rs b/crates/dekaf/src/logging.rs index 8d06f23f7cc..9aa924a5d07 100644 --- a/crates/dekaf/src/logging.rs +++ b/crates/dekaf/src/logging.rs @@ -1,11 +1,15 @@ -use crate::log_appender::{self, GazetteWriter, TaskForwarder}; +use crate::{ + log_appender::{self, GazetteWriter, TaskForwarder}, + otel::OtelConfig, +}; +use anyhow::Context; use futures::Future; use lazy_static::lazy_static; use rand::Rng; use tracing::level_filters::LevelFilter; use tracing::Instrument; use tracing_subscriber::filter::Targets; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer, Registry}; // These are accessible anywhere inside the call stack of a future wrapped with [`forward_logs()`]. // The relationship between LogForwarder and log journal is one-to-one. That means that all logs @@ -15,7 +19,7 @@ tokio::task_local! { static LOG_LEVEL: std::cell::Cell<&'static tracing_subscriber::filter::Targets>; } -pub fn install() { +pub fn install(otel_cfg: Option) -> anyhow::Result<()> { // Build a tracing_subscriber::Filter which uses our dynamic log level. let log_filter = tracing_subscriber::filter::DynFilterFn::new(move |metadata, ctx| { if metadata @@ -40,8 +44,27 @@ pub fn install() { .from_env_lossy(), ); + let otel_layer = if let Some(cfg) = otel_cfg { + let layer = crate::otel::init_tracer_provider(&cfg) + .context("Failed to initialize OpenTelemetry")?; + Some( + layer.with_filter( + tracing_subscriber::EnvFilter::builder() + .with_default_directive(LevelFilter::WARN.into()) + .from_env_lossy() + .add_directive("hyper=off".parse().unwrap()) + .add_directive("tonic=off".parse().unwrap()) + .add_directive("h2=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()) + .add_directive("dekaf=debug".parse().unwrap()), + ), + ) + } else { + None + }; + let registry = tracing_subscriber::registry() - .with(tracing_record_hierarchical::HierarchicalRecord::default()) + .with(otel_layer) .with(fmt_layer) .with( ops::tracing::Layer::new( @@ -54,6 +77,7 @@ pub fn install() { ); registry.init(); + Ok(()) } lazy_static! { @@ -102,16 +126,7 @@ where LOG_LEVEL.scope( std::cell::Cell::new(build_log_filter(ops::LogLevel::Info)), - TASK_FORWARDER.scope( - forwarder, - fut.instrument(tracing::info_span!( - // Attach these empty fields so that later on we can use tracing_record_hierarchical - // to set them, effectively adding a field to every event emitted inside a Session. - "dekaf_session", - { log_appender::SESSION_CLIENT_ID_FIELD_MARKER } = tracing::field::Empty, - { log_appender::SESSION_TASK_NAME_FIELD_MARKER } = tracing::field::Empty, - )), - ), + TASK_FORWARDER.scope(forwarder, fut), ) } diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 251231a4e97..37d44d2522d 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -5,7 +5,8 @@ use anyhow::{bail, Context}; use axum_server::tls_rustls::RustlsConfig; use clap::{Args, Parser}; use dekaf::{ - log_appender::GazetteWriter, logging, KafkaApiClient, KafkaClientAuth, Session, TaskManager, + log_appender::{self, GazetteWriter}, + logging, otel, KafkaApiClient, KafkaClientAuth, Session, TaskManager, }; use flow_client::{ DEFAULT_AGENT_URL, DEFAULT_DATA_PLANE_FQDN, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL, @@ -22,6 +23,7 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; +use tracing::Instrument; use url::Url; /// A Kafka-compatible proxy for reading Estuary Flow collections. @@ -133,6 +135,9 @@ pub struct Cli { #[command(flatten)] tls: Option, + + #[command(flatten)] + otel: Option, } #[derive(Args, Debug, serde::Serialize)] @@ -148,6 +153,40 @@ struct TlsArgs { certificate_key_file: Option, } +#[derive(Args, Debug, serde::Serialize)] +#[group(id = "otel_config", requires = "otel_endpoint")] +struct OtelArgs { + /// OpenTelemetry OTLP endpoint URL + #[arg(long, env = "OTEL_ENDPOINT", group = "otel_config")] + otel_endpoint: Option, + + /// Service name for OpenTelemetry traces + #[arg( + long, + env = "OTEL_SERVICE_NAME", + group = "otel_config", + default_value = "dekaf" + )] + otel_service_name: Option, + + /// Grafana Cloud username/instance ID for authentication + #[arg(long, env = "OTEL_USERNAME", group = "otel_config")] + otel_username: Option, + + /// Grafana Cloud API token for authentication + #[arg(long, env = "OTEL_PASSWORD", group = "otel_config")] + otel_password: Option, + + /// Trace sampling rate (0.0-1.0) + #[arg( + long, + env = "OTEL_SAMPLE_RATE", + default_value = "0.1", + group = "otel_config" + )] + otel_sample_rate: f64, +} + #[derive(Args, Debug, Clone, serde::Serialize)] #[group( multiple = true, @@ -215,16 +254,6 @@ impl Cli { } fn main() { - logging::install(); - - let cli = Cli::parse(); - - rustls::crypto::aws_lc_rs::default_provider() - .install_default() - .unwrap(); - - tracing::info!("Starting dekaf"); - let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build(); @@ -237,7 +266,7 @@ fn main() { } }; - let handle = runtime.spawn(async_main(cli)); + let handle = runtime.spawn(async_main()); let result = runtime.block_on(async { handle.await.unwrap() }); // Explicitly shut down the runtime without waiting for blocking background tasks. @@ -250,7 +279,39 @@ fn main() { } } -async fn async_main(cli: Cli) -> anyhow::Result<()> { +async fn async_main() -> anyhow::Result<()> { + let cli = Cli::parse(); + + let otel_cfg = if let Some(otel_args) = &cli.otel { + Some(otel::OtelConfig { + endpoint: otel_args + .otel_endpoint + .clone() + .expect("OTEL_ENDPOINT should be set by this point"), + service_name: otel_args + .otel_service_name + .clone() + .expect("OTEL_SERVICE_NAME should be set by this point"), + username: otel_args.otel_username.clone(), + password: otel_args.otel_password.clone(), + sample_rate: otel_args.otel_sample_rate, + }) + } else { + None + }; + + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .unwrap(); + + logging::install(otel_cfg).expect("Failed to set up logging/tracing"); + + tracing::info!("Starting dekaf"); + + if let Some(_) = cli.otel { + tracing::info!("OpenTelemetry tracing enabled"); + } + let upstream_kafka_urls = cli.build_broker_urls()?; // ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------ @@ -495,6 +556,8 @@ async fn serve( metrics::gauge!("dekaf_total_connections").increment(1); + let correlation_id = uuid::Uuid::new_v4().to_string(); + let result = async { loop { tokio::select! { @@ -503,11 +566,27 @@ async fn serve( return Ok(()); }; - dekaf::dispatch_request_frame(&mut session, &mut raw_sasl_auth, frame, &mut out) - .await?; - - () = w.write_all(&mut out).await?; - out.clear(); + let request_span = tracing::info_span!( + // Detach from parent span so that each request frame is its own top-level span. + // This is because a single session may run for hours at a time, and otel doesn't + // like long-running root spans. + parent: None, + "session_request", + "addr" = ?addr, + "correlation_id" = %correlation_id, + { log_appender::SESSION_CLIENT_ID_FIELD_MARKER } = session.client_id(), + { log_appender::SESSION_TASK_NAME_FIELD_MARKER } = session.task_name(), + ); + + async { + dekaf::dispatch_request_frame(&mut session, &mut raw_sasl_auth, frame, &mut out) + .await?; + + () = w.write_all(&mut out).await?; + out.clear(); + + anyhow::Ok(()) + }.instrument(request_span).await?; } _ = tokio::time::sleep(idle_timeout) => { anyhow::bail!("timeout waiting for next session request") diff --git a/crates/dekaf/src/otel.rs b/crates/dekaf/src/otel.rs new file mode 100644 index 00000000000..a64619f285e --- /dev/null +++ b/crates/dekaf/src/otel.rs @@ -0,0 +1,69 @@ +use anyhow::{Context, Result}; +use opentelemetry::{global, trace::TracerProvider, KeyValue}; +use opentelemetry_otlp::{WithExportConfig, WithTonicConfig}; +use opentelemetry_sdk::{ + trace::{RandomIdGenerator, Sampler, TracerProvider as SdkTracerProvider}, + Resource, +}; +use std::time::Duration; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::Registry; + +pub struct OtelConfig { + pub endpoint: String, + pub service_name: String, + pub username: Option, + pub password: Option, + pub sample_rate: f64, +} + +fn create_resource(service_name: &str) -> Resource { + Resource::new(vec![ + KeyValue::new("service.name", service_name.to_string()), + KeyValue::new("service.version", env!("CARGO_PKG_VERSION")), + ]) +} + +pub fn init_tracer_provider( + config: &OtelConfig, +) -> Result> { + let mut builder = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots()) + .with_endpoint(&config.endpoint) + .with_timeout(Duration::from_secs(10)); + + // Add authentication if provided + if let (Some(username), Some(password)) = (&config.username, &config.password) { + let auth_header = format!( + "Basic {}", + base64::encode(format!("{}:{}", username, password)) + ); + let mut metadata = tonic::metadata::MetadataMap::new(); + metadata.insert( + "authorization", + auth_header + .parse() + .context("Failed to parse authorization header")?, + ); + builder = builder.with_metadata(metadata); + } + + let exporter = builder + .build() + .context("Failed to build OTLP span exporter")?; + + let tracer_provider = SdkTracerProvider::builder() + .with_sampler(Sampler::TraceIdRatioBased(config.sample_rate)) + .with_id_generator(RandomIdGenerator::default()) + .with_resource(create_resource(&config.service_name)) + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) + .build(); + + let tracer = tracer_provider.tracer("dekaf"); + global::set_tracer_provider(tracer_provider.clone()); + + let telemetry_layer = OpenTelemetryLayer::new(tracer); + + Ok(telemetry_layer) +} diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index dcbd0d2ffa5..7cf59c4c878 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -51,6 +51,7 @@ pub struct Session { msk_region: String, // Number of ReadResponses to buffer in PendingReads read_buffer_size: usize, + client_id: Option, // ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------ legacy_mode_broker_urls: Option>, @@ -81,6 +82,7 @@ impl Session { legacy_mode_broker_password, reads: HashMap::new(), auth: None, + client_id: None, secret, data_preview_state: SessionDataPreviewState::Unknown, } @@ -1561,4 +1563,19 @@ impl Session { Err(e) => return Err(e), } } + + pub fn task_name(&self) -> Option { + match &self.auth { + Some(SessionAuthentication::Task(auth)) => Some(auth.task_name.clone()), + Some(SessionAuthentication::Redirect { spec, .. }) => Some(spec.name.clone()), + _ => None, + } + } + pub fn client_id(&self) -> Option<&str> { + self.client_id.as_ref().map(|x| x.as_str()) + } + + pub fn set_client_id(&mut self, client_id: String) { + self.client_id = Some(client_id); + } } diff --git a/crates/dekaf/src/task_manager.rs b/crates/dekaf/src/task_manager.rs index 942c7b44e29..3505d851100 100644 --- a/crates/dekaf/src/task_manager.rs +++ b/crates/dekaf/src/task_manager.rs @@ -19,6 +19,7 @@ use std::{ time::Duration, }; use tokio::sync::watch; +use tracing::Instrument; // Define a custom cloneable error type #[derive(Debug, Clone)] @@ -79,6 +80,7 @@ pub struct TaskStateListener(Arc>>>); impl TaskStateListener { /// Gets the current state, waiting if it's not yet available. /// Returns a clone of the state or the cached error. + #[tracing::instrument(skip(self), name = "task_state_listener_get")] pub async fn get(&self) -> anyhow::Result { let mut temp_rx = (*self.0).clone(); loop { @@ -190,7 +192,7 @@ impl TaskManager { sender, stop_signal, activity_signal, - task_name, + task_name.clone(), ), )); @@ -198,7 +200,6 @@ impl TaskManager { } /// Runs the task manager loop until either there are no more receivers or the stop signal is triggered. - #[tracing::instrument(skip(self, receiver, sender, stop_signal))] async fn run_task_manager( self: std::sync::Arc, // Hold onto a strong reference to the receiver so we can keep it alive until the timeout runs out @@ -228,6 +229,13 @@ impl TaskManager { let mut timeout_start = None; loop { + let tick_span = tracing::info_span!( + parent: None, // This is the key: creates a new root trace. + "task_manager_tick", + task_name = %task_name, + ); + + let guard = tick_span.enter(); // No more receivers except us, time to shut down this task loop. // Note that we only do this after waiting out the interval. // This is to provide a grace period for any new receivers to be created @@ -252,6 +260,8 @@ impl TaskManager { } } + drop(guard); + let mut has_been_migrated = false; let loop_result: Result<()> = async { @@ -381,6 +391,7 @@ impl TaskManager { } } // End of match } + .instrument(tick_span) .await; if let Err(e) = loop_result { diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index 727ca960126..618993a4121 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -51,6 +51,7 @@ impl Client { } /// Invoke the Gazette journal Apply API. + #[tracing::instrument(skip(self))] pub async fn apply(&self, req: broker::ApplyRequest) -> crate::Result { let mut client = self.into_sub(self.router.route( None, @@ -68,6 +69,7 @@ impl Client { } /// Invoke the Gazette journal ListFragments API. + #[tracing::instrument(skip(self))] pub async fn list_fragments( &self, req: broker::FragmentsRequest,