Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ tracing-subscriber = { version = "0.3", features = [
"env-filter",
"fmt",
] }
tracing-record-hierarchical = "0.1.1"
tracing-opentelemetry = "0.28"
opentelemetry = { version = "0.27", features = ["trace"] }
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "trace"] }
opentelemetry-otlp = { version = "0.27", features = [
"tonic",
"trace",
"tls-roots",
] }
rustls-native-certs = "0.7.2"
zeroize = "1.6"

Expand Down
6 changes: 5 additions & 1 deletion crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,23 @@ tokio-rustls = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing-record-hierarchical = { workspace = true }
aws-config = { workspace = true }
aws-types = { workspace = true }
aws-credential-types = { workspace = true }
aws-msk-iam-sasl-signer = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-opentelemetry = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-otlp = { workspace = true }
typestate = { workspace = true }
url = { workspace = true }
webpki = { workspace = true }
apache-avro = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
async-process = { path = "../async-process" }
Expand Down
9 changes: 3 additions & 6 deletions crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::instrument;

pub mod log_appender;
pub mod logging;
pub mod otel;

mod topology;
pub use topology::extract_dekaf_config;
Expand All @@ -34,16 +35,13 @@ pub use api_client::{KafkaApiClient, KafkaClientAuth};

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use flow_client::client::{refresh_authorizations, RefreshToken};
use log_appender::SESSION_CLIENT_ID_FIELD_MARKER;
use percent_encoding::{percent_decode_str, utf8_percent_encode};
use proto_flow::flow::MaterializationSpec;
use serde::{Deserialize, Serialize};
use std::{
any,
sync::Arc,
time::{Duration, SystemTime},
};
use tracing_record_hierarchical::SpanExt;

pub struct App {
/// Hostname which is advertised for Kafka access.
Expand Down Expand Up @@ -460,9 +458,8 @@ async fn handle_api(
// https://github.com/confluentinc/librdkafka/blob/e03d3bb91ed92a38f38d9806b8d8deffe78a1de5/src/rdkafka_request.c#L2823
let (header, request) = dec_request(frame, version)?;
if let Some(client_id) = &header.client_id {
tracing::Span::current()
.record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, client_id.to_string());
tracing::info!("Got client ID!");
session.set_client_id(client_id.to_string());
tracing::info!(?client_id, "Got client ID!");
}
Ok(enc_resp(out, &header, session.api_versions(request).await?))
}
Expand Down
57 changes: 7 additions & 50 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ enum GazetteAppender {
}

impl GazetteAppender {
#[tracing::instrument(skip_all, fields(
journal_name = self.get_journal_name()
))]
async fn append<S>(&mut self, data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()>
where
S: Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
Expand Down Expand Up @@ -224,6 +227,9 @@ impl GazetteAppender {
}
}

#[tracing::instrument(skip(self), fields(
journal_name = self.get_journal_name()
))]
async fn get_client(&self) -> anyhow::Result<journal::Client> {
match self {
GazetteAppender::OpsStats(state) => match state.task_listener.get().await? {
Expand Down Expand Up @@ -271,9 +277,6 @@ pub struct TaskForwarder<W: TaskWriter> {
}

// These well-known tracing field names are used to annotate all log messages within a particular session.
// This is done by using `tracing_record_hierarchical` to update the field value wherever it's defined in the span hierarchy:
//
// tracing::Span::current().record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, ...client_id...);
pub const SESSION_TASK_NAME_FIELD_MARKER: &str = "task_name";
pub const SESSION_CLIENT_ID_FIELD_MARKER: &str = "session_client_id";
pub const EXCLUDE_FROM_TASK_LOGGING: &str = "exclude_from_task_logging";
Expand Down Expand Up @@ -311,7 +314,7 @@ impl<W: TaskWriter + Clone + 'static> TaskForwarder<W> {
}
}

#[instrument(skip_all, fields(
#[instrument(name = "start_log_append_loop", skip_all, fields(
task_name = tracing::field::Empty
))]
async fn start(
Expand Down Expand Up @@ -531,16 +534,10 @@ impl<W: TaskWriter + Clone + 'static> TaskForwarder<W> {
}

pub fn set_task_name(&self, name: String, build: String) {
use tracing_record_hierarchical::SpanExt;

self.send_message(TaskWriterMessage::SetTaskName {
name: name.clone(),
build,
});

// Also set the task name on the parent span so it's included in the logs. This also adds it
// to the logs that Dekaf writes to stdout, which makes debugging issues much easier.
tracing::Span::current().record_hierarchical(SESSION_TASK_NAME_FIELD_MARKER, name);
}

pub fn send_log_message(&self, log: ops::Log) {
Expand Down Expand Up @@ -662,7 +659,6 @@ mod tests {
use tracing::{info, info_span};
use tracing::{instrument::WithSubscriber, Instrument};

use tracing_record_hierarchical::SpanExt;
use tracing_subscriber::prelude::*;

#[derive(Default, Clone)]
Expand Down Expand Up @@ -736,7 +732,6 @@ mod tests {
let producer = gen_producer();

let subscriber = tracing_subscriber::registry()
.with(tracing_record_hierarchical::HierarchicalRecord::default())
.with(ops::tracing::Layer::new(
|log| MOCK_LOG_FORWARDER.get().send_log_message(log.clone()),
std::time::SystemTime::now,
Expand Down Expand Up @@ -823,44 +818,6 @@ mod tests {
.await;
}

#[tokio::test]
async fn test_logging_with_client_id_hierarchical() {
setup(|logs, _stats, cancelled| async move {
{
info!("Test log data before setting name, you should see me");
let session_span = info_span!(
"session_span",
{ SESSION_CLIENT_ID_FIELD_MARKER } = tracing::field::Empty
);
let session_guard = session_span.enter();

info!("Test log data without a task name yet!");

MOCK_LOG_FORWARDER.get().set_task_name(
"my_task".to_string(),
"11:22:33:44:55:66:77:88".parse().unwrap(),
);

let child_span = info_span!("child_span");
let child_guard = child_span.enter();

tracing::Span::current()
.record_hierarchical(SESSION_CLIENT_ID_FIELD_MARKER, "my-client-id");

info!("I should have a client ID");
drop(child_guard);
info!("I should also have a client ID");
drop(session_guard)
};

tokio::time::sleep(Duration::from_millis(100)).await;

assert_output("session_logger_and_task_name_hierarchical", logs).await;
assert!(!cancelled.is_cancelled());
})
.await;
}

#[tokio::test]
async fn test_session_subscriber_layer_taskless() {
setup(|logs, _stats, cancelled| async move {
Expand Down
Loading
Loading