diff --git a/.config/nextest.toml b/.config/nextest.toml index f943eeda480..207f92dddc4 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -4,6 +4,12 @@ serial-db-tests = { max-threads = 1 } [profile.default] slow-timeout = { period = "30s", terminate-after = 4 } +# Exclude e2e tests from Dekaf by default because they require special setup. +default-filter = 'not binary_id(/dekaf::e2e/)' + +[profile.dekaf-e2e] +# Run only Dekaf e2e tests +default-filter = 'binary_id(/dekaf::e2e/)' [[profile.default.overrides]] filter = 'package(agent) and (test(discovers::) + test(directives::) + test(integration_tests::))' @@ -11,4 +17,4 @@ test-group = 'serial-db-tests' [[profile.default.overrides]] filter = 'package(automations) and test(test_fibonacci_bench)' -test-group = 'serial-db-tests' \ No newline at end of file +test-group = 'serial-db-tests' diff --git a/.github/workflows/dekaf-test.yaml b/.github/workflows/dekaf-test.yaml new file mode 100644 index 00000000000..70dfd5692e2 --- /dev/null +++ b/.github/workflows/dekaf-test.yaml @@ -0,0 +1,56 @@ +name: Dekaf Test +permissions: + contents: read + +on: + pull_request: + branches: [master] + paths-ignore: + - "site/**" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + dekaf-test: + name: Dekaf Test + runs-on: ubuntu-2404-large + steps: + - uses: actions/checkout@v5 + with: + submodules: true # Official JSON test data. + lfs: true # Test fixtures. + + - name: Install build tools + uses: jdx/mise-action@9dc7d5dd454262207dea3ab5a06a3df6afc8ff26 # v3.4.1 + - run: rustup upgrade + + - name: Cache Rust workspace + uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1 + with: + workspaces: ". -> ../../../cargo-target" + + - name: Cache RocksDB + uses: actions/cache@v4 + with: + key: rocksdb-${{ runner.os }}-${{ runner.arch }}-${{ env.ROCKSDB_VERSION }} + path: | + ~/rocksdb-${{ env.ROCKSDB_VERSION }}/include/ + ~/rocksdb-${{ env.ROCKSDB_VERSION }}/lib/ + + - name: Cache/Restore Go workspace. + uses: actions/cache@v4 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + + - uses: mozilla-actions/sccache-action@v0.0.9 + - run: echo 'SCCACHE_GHA_ENABLED=true' >> $GITHUB_ENV + - run: mise run build:rocksdb + - run: mise run ci:musl-dev + - run: mise run ci:gnu-dev + - run: mise run build:gazette + - run: mise run build:flowctl-go + - run: mise run local:stack + - run: mise run ci:dekaf-e2e diff --git a/Cargo.lock b/Cargo.lock index c51f9d4b89b..f6eeb08cb80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2605,6 +2605,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "serde_yaml", "simd-doc", "socket2 0.6.0", "tables", @@ -2624,6 +2625,7 @@ dependencies = [ "tuple", "unseal", "url", + "uuid 1.18.1", "webpki", ] diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index bc8a8c43f19..c67b00b6063 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -94,5 +94,7 @@ apache-avro = { workspace = true } insta = { workspace = true } rdkafka = { workspace = true } schema_registry_converter = { workspace = true } +serde_yaml = { workspace = true } tempfile = { workspace = true } tracing-test = { workspace = true } +uuid = { workspace = true, features = ["v4"] } diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index 9b28f6ccd50..3ec5ac5b273 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -1,4 +1,4 @@ -use anyhow::{Context, anyhow, bail}; +use anyhow::{Context, bail}; use bytes::{Bytes, BytesMut}; use futures::{SinkExt, TryStreamExt}; use kafka_protocol::{ @@ -16,53 +16,75 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; use std::{io::BufWriter, pin::Pin, sync::Arc}; -use tokio::sync::OnceCell; use tokio_rustls::rustls; use tokio_util::codec; +use tokio_util::either::Either; use tracing::instrument; use url::Url; -type BoxedKafkaConnection = Pin< - Box< - tokio_util::codec::Framed< - tokio_rustls::client::TlsStream, - codec::LengthDelimitedCodec, - >, - >, ->; +/// A stream that may or may not be TLS-encrypted. +type MaybeTlsStream = + Either>; -static ROOT_CERT_STORE: OnceCell> = OnceCell::const_new(); +type BoxedKafkaConnection = + Pin>>; -#[tracing::instrument(skip_all)] -async fn async_connect(broker_url: &str) -> anyhow::Result { - // Establish a TCP connection to the Kafka broker +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ConnectionScheme { + Plaintext, + Tls, +} - let parsed_url = Url::parse(broker_url)?; +/// Parse a broker URL to extract the connection scheme, host, and port. +/// +/// Supported schemes: +/// - `tcp://` - plaintext connection (no TLS) +/// - `tls://` - TLS-encrypted connection (default if no scheme provided) +fn parse_broker_url(broker_url: &str) -> anyhow::Result<(ConnectionScheme, String, u16)> { + // Default to tls:// if no scheme is provided + let url_with_scheme = if broker_url.contains("://") { + broker_url.to_string() + } else { + format!("tls://{broker_url}") + }; - let root_certs = ROOT_CERT_STORE - .get_or_try_init(|| async { - let mut certs = rustls::RootCertStore::empty(); - certs.add_parsable_certificates( - rustls_native_certs::load_native_certs().expect("failed to load native certs"), - ); - Ok::, anyhow::Error>(Arc::new(certs)) - }) - .await?; + let parsed = Url::parse(&url_with_scheme) + .with_context(|| format!("invalid broker URL: {broker_url}"))?; - let tls_config = rustls::ClientConfig::builder() - .with_root_certificates(root_certs.to_owned()) - .with_no_client_auth(); + let scheme = match parsed.scheme() { + "tcp" => ConnectionScheme::Plaintext, + "tls" => ConnectionScheme::Tls, + other => anyhow::bail!("unknown broker scheme: {other} (expected 'tcp' or 'tls')"), + }; + let host = parsed + .host() + .context("missing host in broker URL")? + .to_string(); + let port = parsed.port().unwrap_or(9092); + Ok((scheme, host, port)) +} - let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); +static ROOT_CERT_STORE: tokio::sync::OnceCell> = + tokio::sync::OnceCell::const_new(); - let hostname = parsed_url - .host() - .ok_or(anyhow!("Broker URL must contain a hostname"))?; - let port = parsed_url.port().unwrap_or(9092); - let dnsname = rustls::pki_types::ServerName::try_from(hostname.to_string())?; +fn kafka_codec() -> codec::LengthDelimitedCodec { + // https://kafka.apache.org/protocol.html#protocol_common + // All requests and responses originate from the following: + // > RequestOrResponse => Size (RequestMessage | ResponseMessage) + // > Size => int32 + tokio_util::codec::LengthDelimitedCodec::builder() + .big_endian() + .length_field_length(4) + .max_frame_length(1 << 27) // 128 MiB + .new_codec() +} + +#[tracing::instrument(skip_all)] +async fn async_connect(broker_url: &str) -> anyhow::Result { + let (scheme, host, port) = parse_broker_url(broker_url)?; - tracing::debug!(port = port,host = ?hostname, "Attempting to connect"); - let tcp_stream = tokio::net::TcpStream::connect(format!("{hostname}:{port}")).await?; + tracing::debug!(port, host = ?host, ?scheme, "Attempting to connect"); + let tcp_stream = tokio::net::TcpStream::connect(format!("{host}:{port}")).await?; // Let's keep this stream alive let sock_ref = socket2::SockRef::from(&tcp_stream); @@ -71,22 +93,33 @@ async fn async_connect(broker_url: &str) -> anyhow::Result .with_interval(Duration::from_secs(20)); sock_ref.set_tcp_keepalive(&ka)?; - let stream = tls_connector.connect(dnsname, tcp_stream).await?; - tracing::debug!(port = port,host = ?hostname, "Connection established"); + let stream: MaybeTlsStream = match scheme { + ConnectionScheme::Plaintext => Either::Left(tcp_stream), + ConnectionScheme::Tls => { + let root_certs = ROOT_CERT_STORE + .get_or_try_init(|| async { + let mut certs = rustls::RootCertStore::empty(); + certs.add_parsable_certificates( + rustls_native_certs::load_native_certs() + .expect("failed to load native certs"), + ); + Ok::, anyhow::Error>(Arc::new(certs)) + }) + .await?; - // https://kafka.apache.org/protocol.html#protocol_common - // All requests and responses originate from the following: - // > RequestOrResponse => Size (RequestMessage | ResponseMessage) - // > Size => int32 - let framed = tokio_util::codec::Framed::new( - stream, - tokio_util::codec::LengthDelimitedCodec::builder() - .big_endian() - .length_field_length(4) - .max_frame_length(1 << 27) // 128 MiB - .new_codec(), - ); + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_certs.to_owned()) + .with_no_client_auth(); + + let tls_connector = tokio_rustls::TlsConnector::from(Arc::new(tls_config)); + let dnsname = rustls::pki_types::ServerName::try_from(host.clone())?; + + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + Either::Right(tls_stream) + } + }; + let framed = tokio_util::codec::Framed::new(stream, kafka_codec()); Ok(Box::pin(framed)) } @@ -190,6 +223,7 @@ async fn sasl_auth( let sasl = SASLClient::new(sasl_config.clone()); let mechanisms = get_supported_sasl_mechanisms(broker_url).await?; + tracing::debug!(?mechanisms, "Discovered SASL mechanisms"); let offered_mechanisms = mechanisms .iter() @@ -201,7 +235,7 @@ async fn sasl_auth( let selected_mechanism = session.get_mechname().as_str().to_owned(); - tracing::debug!(mechamism=?selected_mechanism, "Starting SASL request with handshake"); + tracing::debug!(mechanism=?selected_mechanism, "Starting SASL request with handshake"); // Now we know which mechanism we want to request let handshake_req = messages::SaslHandshakeRequest::default().with_mechanism( @@ -222,26 +256,45 @@ async fn sasl_auth( let mut state_buf = BufWriter::new(Vec::new()); let mut state = session.step(None, &mut state_buf)?; + // Flush the BufWriter to ensure all data is written to the underlying Vec + let mut state_data = state_buf.into_inner()?; - // SASL can happen over multiple steps - while state.is_running() { - let authenticate_request = messages::SaslAuthenticateRequest::default() - .with_auth_bytes(Bytes::from(state_buf.into_inner()?)); + tracing::debug!( + is_running = state.is_running(), + buf_len = state_data.len(), + state = ?state, + "SASL state after first step" + ); + + // SASL mechanisms may return Finished(Yes) after writing data, meaning data + // was written and must be sent, but no further steps are needed. + // We need to send data if either: + // 1. state.is_running() - more steps are expected + // 2. state_data is not empty - data was written that must be sent + while state.is_running() || !state_data.is_empty() { + let authenticate_request = + messages::SaslAuthenticateRequest::default().with_auth_bytes(Bytes::from(state_data)); let auth_resp = send_request(conn, authenticate_request, None).await?; if auth_resp.error_code > 0 { - let err = kafka_protocol::ResponseError::try_from_code(handshake_resp.error_code) + let err = kafka_protocol::ResponseError::try_from_code(auth_resp.error_code) .map(|code| format!("{code:?}")) - .unwrap_or(format!("Unknown error {}", handshake_resp.error_code)); + .unwrap_or(format!("Unknown error {}", auth_resp.error_code)); bail!( "Error performing SASL authentication: {err} {:?}", auth_resp.error_message ) } + + if !state.is_running() { + break; + } + let data = Some(auth_resp.auth_bytes.to_vec()); state_buf = BufWriter::new(Vec::new()); state = session.step(data.as_deref(), &mut state_buf)?; + state_data = state_buf.into_inner()?; } tracing::debug!("Successfully completed SASL flow"); @@ -318,10 +371,14 @@ impl KafkaApiClient { .await .context("Failed to establish TCP connection")?; - tracing::debug!("Authenticating connection"); - sasl_auth(&mut conn, url, auth.sasl_config().await?) - .await - .context("SASL authentication failed")?; + if let Some(sasl_config) = auth.sasl_config().await? { + tracing::debug!("Authenticating connection via SASL"); + sasl_auth(&mut conn, url, sasl_config) + .await + .context("SASL authentication failed")?; + } else { + tracing::debug!("Skipping SASL authentication (no auth configured)"); + } let versions = get_versions(&mut conn) .await @@ -712,7 +769,11 @@ impl KafkaApiClient { #[derive(Clone)] pub enum KafkaClientAuth { + /// No authentication - for local testing with plaintext Kafka brokers. + None, + /// Static SASL configuration that doesn't refresh. NonRefreshing(Arc), + /// AWS MSK IAM authentication with automatic token refresh. MSK { aws_region: String, provider: aws_credential_types::provider::SharedCredentialsProvider, @@ -721,9 +782,32 @@ pub enum KafkaClientAuth { } impl KafkaClientAuth { - async fn sasl_config(&mut self) -> anyhow::Result> { + pub async fn from_msk_region(region: &str) -> anyhow::Result { + let provider = aws_config::from_env() + .region(aws_types::region::Region::new(region.to_owned())) + .load() + .await + .credentials_provider() + .context("AWS credentials provider not available")?; + + Ok(KafkaClientAuth::MSK { + aws_region: region.to_owned(), + provider, + cached: None, + }) + } + + pub fn plain(username: &str, password: &str) -> Self { + KafkaClientAuth::NonRefreshing( + SASLConfig::with_credentials(None, username.to_string(), password.to_string()) + .expect("PLAIN config should build"), + ) + } + + async fn sasl_config(&mut self) -> anyhow::Result>> { match self { - KafkaClientAuth::NonRefreshing(cfg) => Ok(cfg.clone()), + KafkaClientAuth::None => Ok(None), + KafkaClientAuth::NonRefreshing(cfg) => Ok(Some(cfg.clone())), KafkaClientAuth::MSK { aws_region, provider, @@ -733,7 +817,7 @@ impl KafkaClientAuth { let now_seconds = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); // Use a 30-second buffer before expiration to refresh the token. if *exp as u64 > now_seconds + 30 { - return Ok(cfg.clone()); + return Ok(Some(cfg.clone())); } } @@ -752,7 +836,7 @@ impl KafkaClientAuth { cached.replace((cfg.clone(), exp)); - Ok(cfg) + Ok(Some(cfg)) } } } diff --git a/crates/dekaf/src/lib.rs b/crates/dekaf/src/lib.rs index 49b71daf36d..b425ac23d3e 100644 --- a/crates/dekaf/src/lib.rs +++ b/crates/dekaf/src/lib.rs @@ -11,7 +11,7 @@ pub mod logging; mod topology; pub use topology::extract_dekaf_config; -use topology::{Collection, Partition}; +use topology::{Collection, CollectionStatus, CollectionUnavailable, Partition}; mod read; use read::Read; diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 44a82ea2ae2..dfd613cd7fe 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -71,12 +71,16 @@ pub struct Cli { #[arg(long, default_value = "9094", env = "METRICS_PORT")] metrics_port: u16, - /// List of Kafka broker URLs to try connecting to for group management APIs + /// List of Kafka broker URLs to try connecting to for group management APIs. + /// + /// URL schemes control the connection type: + /// - `tcp://host:port` - plaintext connection (no TLS) + /// - `tls://host:port` - TLS-encrypted connection (default if no scheme provided) #[arg(long, env = "DEFAULT_BROKER_URLS", value_delimiter = ',')] default_broker_urls: Vec, - /// The AWS region that the default broker lives in - #[arg(long, env = "DEFAULT_BROKER_MSK_REGION")] - default_broker_msk_region: String, + + #[command(flatten)] + upstream_auth: UpstreamAuthArgs, /// The secret used to encrypt/decrypt potentially sensitive strings when sending them /// to the upstream Kafka broker, e.g topic names in group management metadata. @@ -95,6 +99,10 @@ pub struct Cli { #[arg(long, env = "TASK_REQUEST_TIMEOUT", value_parser = humantime::parse_duration, default_value = "30s")] task_request_timeout: std::time::Duration, + /// How long to cache a MaterializationSpec before re-fetching it, even if the token is still valid + #[arg(long, env = "SPEC_TTL", value_parser = humantime::parse_duration, default_value = "2m")] + spec_ttl: std::time::Duration, + /// Timeout for TLS handshake completion #[arg(long, env = "TLS_HANDSHAKE_TIMEOUT", value_parser = humantime::parse_duration, default_value = "10s")] tls_handshake_timeout: std::time::Duration, @@ -147,23 +155,44 @@ struct TlsArgs { certificate_key_file: Option, } -impl Cli { - fn build_broker_urls(&self) -> anyhow::Result> { - self.default_broker_urls - .clone() - .into_iter() - .map(|url| { - { - let parsed = Url::parse(&url).expect("invalid broker URL {url}"); - Ok::<_, anyhow::Error>(format!( - "tcp://{}:{}", - parsed.host().context(format!("invalid broker URL {url}"))?, - parsed.port().unwrap_or(9092) - )) - } - .context(url) - }) - .collect::>>() +/// Authentication strategy for upstream Kafka connections. +#[derive(Clone, Copy, Debug, Default, serde::Serialize, clap::ValueEnum)] +pub enum UpstreamAuth { + /// AWS MSK IAM authentication. Requires --default-broker-msk-region. + #[default] + Msk, + /// No authentication. Use for local testing with plaintext Kafka brokers. + None, +} + +#[derive(Args, Debug, serde::Serialize)] +#[group(id = "upstream_auth_group")] +struct UpstreamAuthArgs { + /// Authentication strategy for upstream Kafka connections. + #[arg(long, env = "UPSTREAM_AUTH", value_enum, default_value = "msk")] + upstream_auth: UpstreamAuth, + /// The AWS region for MSK IAM authentication. + /// Required when --upstream-auth=msk. + #[arg( + long = "default-broker-msk-region", + env = "DEFAULT_BROKER_MSK_REGION", + required_if_eq("upstream_auth", "msk") + )] + msk_region: Option, +} + +impl UpstreamAuthArgs { + async fn build(&self) -> anyhow::Result { + match self.upstream_auth { + UpstreamAuth::Msk => { + let region = self + .msk_region + .as_ref() + .context("MSK region is required when --upstream-auth=msk")?; + KafkaClientAuth::from_msk_region(region).await + } + UpstreamAuth::None => Ok(KafkaClientAuth::None), + } } } @@ -204,10 +233,10 @@ fn main() { } async fn async_main(cli: Cli) -> anyhow::Result<()> { - let upstream_kafka_urls = cli.build_broker_urls()?; - test_kafka(&cli).await?; + let upstream_kafka_urls = cli.default_broker_urls.clone(); + let (api_endpoint, api_key) = if cli.local { (LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string()) } else { @@ -228,6 +257,7 @@ async fn async_main(cli: Cli) -> anyhow::Result<()> { let task_manager = Arc::new(TaskManager::new( cli.task_refresh_interval, cli.task_request_timeout, + cli.spec_ttl, client_base.clone(), cli.data_plane_fqdn.clone(), signing_token.clone(), @@ -271,7 +301,7 @@ async fn async_main(cli: Cli) -> anyhow::Result<()> { let schema_router = dekaf::registry::build_router(app.clone()); - let msk_region = cli.default_broker_msk_region.as_str(); + let upstream_auth = cli.upstream_auth.build().await?; // Setup TLS acceptor if TLS configuration is provided let tls_acceptor = if let Some(tls_cfg) = cli.tls { @@ -357,7 +387,7 @@ async fn async_main(cli: Cli) -> anyhow::Result<()> { app.clone(), cli.encryption_secret.to_owned(), upstream_kafka_urls.clone(), - msk_region.to_string(), + upstream_auth.clone(), cli.read_buffer_chunk_limit, ), socket, @@ -507,22 +537,9 @@ fn validate_certificate_name( #[tracing::instrument(skip(cli))] async fn test_kafka(cli: &Cli) -> anyhow::Result<()> { - let iam_creds = KafkaClientAuth::MSK { - aws_region: cli.default_broker_msk_region.clone(), - provider: aws_config::from_env() - .region(aws_types::region::Region::new( - cli.default_broker_msk_region.clone(), - )) - .load() - .await - .credentials_provider() - .unwrap(), - cached: None, - }; - - let broker_urls = cli.build_broker_urls()?; + let auth = cli.upstream_auth.build().await?; - KafkaApiClient::connect(broker_urls.as_slice(), iam_creds).await?; + KafkaApiClient::connect(&cli.default_broker_urls, auth).await?; tracing::info!("Successfully connected to upstream kafka"); diff --git a/crates/dekaf/src/registry.rs b/crates/dekaf/src/registry.rs index 71584c34c81..0d24afb4848 100644 --- a/crates/dekaf/src/registry.rs +++ b/crates/dekaf/src/registry.rs @@ -1,6 +1,7 @@ use super::App; use crate::{ - DekafError, SessionAuthentication, from_downstream_topic_name, to_downstream_topic_name, + CollectionUnavailable, DekafError, SessionAuthentication, from_downstream_topic_name, + to_downstream_topic_name, }; use anyhow::Context; use axum::extract::Request; @@ -84,7 +85,7 @@ async fn get_subject_latest( let client = &auth.flow_client().await?.pg_client(); - let collection = super::Collection::new( + let collection = match super::Collection::new( &auth, &from_downstream_topic_name(TopicName::from(StrBytes::from_string( collection.to_string(), @@ -92,7 +93,16 @@ async fn get_subject_latest( ) .await .context("failed to fetch collection metadata")? - .with_context(|| format!("collection {collection} does not exist"))?; + .ready() + { + Ok(c) => c, + Err(CollectionUnavailable::NotFound) => { + anyhow::bail!("collection {collection} does not exist") + } + Err(CollectionUnavailable::NotReady) => { + anyhow::bail!("collection {collection} exists but has no journals available") + } + }; let (key_id, value_id) = collection .registered_schema_ids(&client) diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index f56648b26e4..16aebaa3814 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1,6 +1,6 @@ -use super::{App, Collection, Read}; +use super::{App, Collection, CollectionStatus, CollectionUnavailable, Read}; use crate::{ - DekafError, KafkaApiClient, SessionAuthentication, TaskState, api_client::KafkaClientAuth, + DekafError, KafkaApiClient, KafkaClientAuth, SessionAuthentication, TaskState, from_downstream_topic_name, from_upstream_topic_name, logging::propagate_task_forwarder, read::BatchResult, to_downstream_topic_name, to_upstream_topic_name, topology::PartitionOffset, }; @@ -57,7 +57,7 @@ pub struct Session { auth: Option, data_preview_state: SessionDataPreviewState, broker_urls: Vec, - msk_region: String, + upstream_auth: KafkaClientAuth, // Number of ReadResponses to buffer in PendingReads read_buffer_size: usize, } @@ -67,14 +67,14 @@ impl Session { app: Arc, secret: String, broker_urls: Vec, - msk_region: String, + upstream_auth: KafkaClientAuth, read_buffer_size: usize, ) -> Self { Self { app, client: None, broker_urls, - msk_region, + upstream_auth, read_buffer_size, reads: HashMap::new(), auth: None, @@ -122,30 +122,14 @@ impl Session { if let Some(ref mut client) = self.client { Ok(client) } else { - let (auth, urls) = match self.auth { - Some(SessionAuthentication::Task(_)) - | Some(SessionAuthentication::Redirect { .. }) => ( - KafkaClientAuth::MSK { - aws_region: self.msk_region.clone(), - provider: aws_config::from_env() - .region(aws_types::region::Region::new(self.msk_region.clone())) - .load() - .await - .credentials_provider() - .unwrap(), - cached: None, - }, - self.broker_urls.as_slice(), - ), - None => anyhow::bail!("Must be authenticated"), - }; + if self.auth.is_none() { + anyhow::bail!("Must be authenticated"); + } + self.client.replace( - KafkaApiClient::connect( - urls, - auth, - ).await.context( - "failed to connect or authenticate to upstream Kafka broker used for serving group management APIs", - )? + KafkaApiClient::connect(&self.broker_urls, self.upstream_auth.clone()) + .await + .context("failed to connect or authenticate to upstream Kafka broker used for serving group management APIs")? ); Ok(self.client.as_mut().expect("guaranteed to exist")) } @@ -278,18 +262,27 @@ impl Session { .await?; tracing::debug!(collections=?ops::DebugJson(&collection_names), "fetched all collections"); - let collections = self + let collection_statuses = self .fetch_collections_for_metadata(collection_names) .await?; - collections + collection_statuses .into_iter() - .map(|(name, opt_coll)| { - let coll = opt_coll.ok_or_else(|| { - anyhow::anyhow!("Collection '{}' not found or not accessible", name) - })?; - let encoded_name = self.encode_topic_name(name)?; - self.build_topic_metadata(encoded_name, &coll) + .map(|(name, status)| { + let encoded_name = self.encode_topic_name(name.clone())?; + match status.ready() { + Ok(coll) => self.build_topic_metadata(encoded_name, &coll), + Err(CollectionUnavailable::NotFound) => { + // Bail here because `fetch_all_collection_names` should only return accessible collections. + anyhow::bail!("Collection '{}' not found or not accessible", name) + } + Err(CollectionUnavailable::NotReady) => { + // Collection exists but journals aren't available - return LeaderNotAvailable so clients will retry + Ok(MetadataResponseTopic::default() + .with_name(Some(encoded_name)) + .with_error_code(ResponseError::LeaderNotAvailable.code())) + } + } }) .collect() } @@ -305,21 +298,21 @@ impl Session { .map(|t| from_downstream_topic_name(t.name.clone().unwrap_or_default()).to_string()) .collect(); - let collections = self.fetch_collections_for_metadata(names).await?; + let collection_statuses = self.fetch_collections_for_metadata(names).await?; requests .iter() - .zip(collections) - .map(|(request, (_, maybe_collection))| { + .zip(collection_statuses) + .map(|(request, (_, status))| { let topic_name = request.name.to_owned().ok_or_else(|| { anyhow::anyhow!("Topic name is missing in metadata request") })?; - match maybe_collection { - Some(collection) => self.build_topic_metadata(topic_name, &collection), - None => Ok(MetadataResponseTopic::default() + match status.ready() { + Ok(collection) => self.build_topic_metadata(topic_name, &collection), + Err(reason) => Ok(MetadataResponseTopic::default() .with_name(Some(self.encode_topic_name(topic_name.to_string())?)) - .with_error_code(ResponseError::UnknownTopicOrPartition.code())), + .with_error_code(reason.code())), } }) .collect() @@ -412,26 +405,30 @@ impl Session { ListOffsetsPartitionResponse, ListOffsetsTopicResponse, }; - let Some(collection) = Collection::new( + let collection = match Collection::new( auth, from_downstream_topic_name(topic.name.clone()).as_str(), ) .await? - else { - // Collection doesn't exist - let partitions = topic - .partitions - .iter() - .map(|p| { - ListOffsetsPartitionResponse::default() - .with_partition_index(p.partition_index) - .with_error_code(ResponseError::UnknownTopicOrPartition.code()) - }) - .collect(); + .ready() + { + Ok(c) => c, + Err(reason) => { + let error_code = reason.code(); + let partitions = topic + .partitions + .iter() + .map(|partition| { + ListOffsetsPartitionResponse::default() + .with_partition_index(partition.partition_index) + .with_error_code(error_code) + }) + .collect(); - return Ok(ListOffsetsTopicResponse::default() - .with_name(topic.name.clone()) - .with_partitions(partitions)); + return Ok(ListOffsetsTopicResponse::default() + .with_name(topic.name.clone()) + .with_partitions(partitions)); + } }; let current_epoch = collection.binding_backfill_counter as i32; @@ -667,8 +664,11 @@ impl Session { let auth = self.auth.as_ref().unwrap(); let current_epoch = Collection::new(&auth, &key.0) .await? + .ready() + .ok() .map(|c| c.binding_backfill_counter as i32); + // If NotReady or NotFound, remove the pending read match current_epoch { Some(epoch) if pending_epoch < epoch => { metrics::counter!( @@ -737,17 +737,34 @@ impl Session { } Err(e) => return Err(e.into()), }; - let Some(collection) = Collection::new(&auth, &key.0).await? else { - metrics::counter!( - "dekaf_fetch_requests", - "topic_name" => key.0.to_string(), - "partition_index" => key.1.to_string(), - "task_name" => task_name.to_string(), - "state" => "collection_not_found" - ) - .increment(1); - tracing::debug!(collection = ?&key.0, "Collection doesn't exist!"); - continue; // Collection doesn't exist. + let collection = match Collection::new(&auth, &key.0).await?.ready() { + Ok(c) => c, + Err(CollectionUnavailable::NotFound) => { + metrics::counter!( + "dekaf_fetch_requests", + "topic_name" => key.0.to_string(), + "partition_index" => key.1.to_string(), + "task_name" => task_name.to_string(), + "state" => "collection_not_found" + ) + .increment(1); + tracing::debug!(collection = ?&key.0, "Collection doesn't exist!"); + self.reads.remove(&key); + continue; + } + Err(CollectionUnavailable::NotReady) => { + metrics::counter!( + "dekaf_fetch_requests", + "topic_name" => key.0.to_string(), + "partition_index" => key.1.to_string(), + "task_name" => task_name.to_string(), + "state" => "collection_not_ready" + ) + .increment(1); + tracing::debug!(collection = ?&key.0, "Collection not ready (no journals)"); + self.reads.remove(&key); + continue; + } }; // Validate consumer's leader epoch against current collection epoch @@ -925,42 +942,57 @@ impl Session { let Some((pending, _)) = self.reads.get_mut(&key) else { // No pending read. Check if this is due to epoch validation failure let auth = self.auth.as_ref().unwrap(); - if let Ok(Some(collection)) = Collection::new(&auth, &key.0).await { - if partition_request.current_leader_epoch >= 0 { - if partition_request.current_leader_epoch - < collection.binding_backfill_counter as i32 - { - // Epoch validation failed. Return FENCED_LEADER_EPOCH - partition_responses.push( - PartitionData::default() - .with_partition_index(partition_request.partition) - .with_error_code(ResponseError::FencedLeaderEpoch.code()) - .with_current_leader( - messages::fetch_response::LeaderIdAndEpoch::default() - .with_leader_id(messages::BrokerId(1)) - .with_leader_epoch( - collection.binding_backfill_counter as i32, - ), - ), - ); - continue; - } else if partition_request.current_leader_epoch - > collection.binding_backfill_counter as i32 - { - partition_responses.push( - PartitionData::default() - .with_partition_index(partition_request.partition) - .with_error_code(ResponseError::UnknownLeaderEpoch.code()) - .with_current_leader( - messages::fetch_response::LeaderIdAndEpoch::default() - .with_leader_id(messages::BrokerId(1)) - .with_leader_epoch( - collection.binding_backfill_counter as i32, - ), - ), - ); - continue; + match Collection::new(&auth, &key.0).await { + Ok(CollectionStatus::Ready(collection)) => { + if partition_request.current_leader_epoch >= 0 { + if partition_request.current_leader_epoch + < collection.binding_backfill_counter as i32 + { + // Epoch validation failed. Return FENCED_LEADER_EPOCH + partition_responses.push( + PartitionData::default() + .with_partition_index(partition_request.partition) + .with_error_code(ResponseError::FencedLeaderEpoch.code()) + .with_current_leader( + messages::fetch_response::LeaderIdAndEpoch::default() + .with_leader_id(messages::BrokerId(1)) + .with_leader_epoch( + collection.binding_backfill_counter as i32, + ), + ), + ); + continue; + } else if partition_request.current_leader_epoch + > collection.binding_backfill_counter as i32 + { + partition_responses.push( + PartitionData::default() + .with_partition_index(partition_request.partition) + .with_error_code(ResponseError::UnknownLeaderEpoch.code()) + .with_current_leader( + messages::fetch_response::LeaderIdAndEpoch::default() + .with_leader_id(messages::BrokerId(1)) + .with_leader_epoch( + collection.binding_backfill_counter as i32, + ), + ), + ); + continue; + } } + // Fall through to UnknownTopicOrPartition + } + Ok(CollectionStatus::Unavailable(CollectionUnavailable::NotReady)) => { + // Collection exists but journals aren't available - return LeaderNotAvailable + partition_responses.push( + PartitionData::default() + .with_partition_index(partition_request.partition) + .with_error_code(ResponseError::LeaderNotAvailable.code()), + ); + continue; + } + _ => { + // Fall through to UnknownTopicOrPartition } } // Collection doesn't exist or other error @@ -1193,19 +1225,30 @@ impl Session { .iter() .map(|t| TopicName::from(t.clone())) .collect(); - let collections = self + let collection_statuses = self .fetch_collections(topic_names.iter()) .await .unwrap_or_default(); for topic in consumer_protocol_subscription_msg.topics.iter_mut() { - let collection = collections + let backfill_counter = match collection_statuses .iter() .find(|(name, _)| name.as_str() == topic.as_str()) - .map(|(_, c)| c); - let backfill_counter = collection.map(|c| c.binding_backfill_counter); + .map(|(_, status)| status) + { + Some(CollectionStatus::Ready(c)) => c.binding_backfill_counter, + other => { + let error_code = other + .and_then(|s| s.error_code()) + .unwrap_or_else(|| ResponseError::UnknownTopicOrPartition.code()); + tracing::warn!(topic = ?topic, error_code, "Collection not available"); + return Ok( + messages::JoinGroupResponse::default().with_error_code(error_code) + ); + } + }; let transformed = self - .encrypt_topic_name(topic.to_owned().into(), backfill_counter)? + .encrypt_topic_name(topic.to_owned().into(), Some(backfill_counter))? .into(); tracing::info!(topic_name = ?topic, backfill_counter = ?backfill_counter, "Request to join group"); *topic = transformed; @@ -1394,22 +1437,41 @@ impl Session { .iter() .map(|p| p.topic.clone()) .collect(); - let collections = self + let collection_statuses = self .fetch_collections(topic_names.iter()) .await .unwrap_or_default(); + // Check for NotReady/NotFound before processing + for part in &consumer_protocol_assignment_msg.assigned_partitions { + let status = collection_statuses + .iter() + .find(|(name, _)| name.as_str() == part.topic.as_str()) + .map(|(_, status)| status); + + if !matches!(status, Some(CollectionStatus::Ready(_))) { + let error_code = status + .and_then(|s| s.error_code()) + .unwrap_or_else(|| ResponseError::UnknownTopicOrPartition.code()); + tracing::warn!(topic = ?part.topic, error_code, "Collection not available"); + return Ok(messages::SyncGroupResponse::default().with_error_code(error_code)); + } + } + consumer_protocol_assignment_msg.assigned_partitions = consumer_protocol_assignment_msg .assigned_partitions .into_iter() .map(|part| { - let collection = collections + let backfill_counter = collection_statuses .iter() .find(|(name, _)| name.as_str() == part.topic.as_str()) - .map(|(_, c)| c); - let backfill_counter = collection.map(|c| c.binding_backfill_counter); + .and_then(|(_, status)| match status { + CollectionStatus::Ready(c) => Some(c.binding_backfill_counter), + _ => None, + }) + .context("collection status missing after validation")?; let transformed_topic = - self.encrypt_topic_name(part.topic.to_owned(), backfill_counter)?; + self.encrypt_topic_name(part.topic.to_owned(), Some(backfill_counter))?; tracing::info!(topic_name = ?part.topic, backfill_counter = ?backfill_counter, "Syncing group"); Ok(part.with_topic(transformed_topic)) }) @@ -1565,18 +1627,50 @@ impl Session { let resp: anyhow::Result<_> = async { - let collections = self + let collection_statuses = self .fetch_collections(req.topics.iter().map(|topic| &topic.name)) .await?; - let desired_topic_partitions = collections + // Check for NotFound/NotReady collections - return per-topic errors + for (topic_name, status) in &collection_statuses { + let Some(error_code) = status.error_code() else { + continue; + }; + tracing::warn!(topic = ?topic_name, error_code, "Collection not available"); + // Return error response for all topics + let topics = req + .topics + .iter() + .map(|t| { + messages::offset_commit_response::OffsetCommitResponseTopic::default() + .with_name(t.name.clone()) + .with_partitions( + t.partitions + .iter() + .map(|p| { + messages::offset_commit_response::OffsetCommitResponsePartition::default() + .with_partition_index(p.partition_index) + .with_error_code(error_code) + }) + .collect(), + ) + }) + .collect(); + return Ok(messages::OffsetCommitResponse::default().with_topics(topics)); + } + + // All collections are Ready at this point + let desired_topic_partitions = collection_statuses .iter() - .map(|(topic_name, collection)| { - self.encrypt_topic_name( - topic_name.clone(), - Some(collection.binding_backfill_counter), - ) - .map(|encrypted_name| (encrypted_name, collection.partitions.len())) + .filter_map(|(topic_name, status)| match status { + CollectionStatus::Ready(collection) => Some( + self.encrypt_topic_name( + topic_name.clone(), + Some(collection.binding_backfill_counter), + ) + .map(|encrypted_name| (encrypted_name, collection.partitions.len())), + ), + CollectionStatus::Unavailable(_) => None, }) .collect::, _>>()?; @@ -1588,14 +1682,15 @@ impl Session { }; for topic in &mut req.topics { - // Look up collection to get backfill counter - let collection = collections + let backfill_counter = collection_statuses .iter() .find(|(name, _)| name == &topic.name) - .map(|(_, c)| c) + .and_then(|(_, status)| match status { + CollectionStatus::Ready(c) => Some(c.binding_backfill_counter), + CollectionStatus::Unavailable(_) => None, + }) .context(format!("Collection not found for topic {:?}", topic.name))?; - let encrypted = - self.encrypt_topic_name(topic.name.clone(), Some(collection.binding_backfill_counter))?; + let encrypted = self.encrypt_topic_name(topic.name.clone(), Some(backfill_counter))?; topic.name = encrypted; } @@ -1650,15 +1745,20 @@ impl Session { // Restore plaintext topic name for the response topic.name = decrypted_name.clone(); - let collection_partitions = &collections + let collection_partitions = match collection_statuses .iter() .find(|(topic_name, _)| topic_name == &decrypted_name) .context(format!( "unable to look up partitions for {:?}", decrypted_name ))? - .1 - .partitions; + { + (_, CollectionStatus::Ready(c)) => &c.partitions, + (_, CollectionStatus::Unavailable(_)) => { + // For unavailable topics, skip partition processing since we have no journals + continue; + } + }; for partition in &topic.partitions { if let Some(error) = partition.error_code.err() { @@ -1815,12 +1915,12 @@ impl Session { return Ok(redirect_response); } - let collections = if let Some(topics) = &req.topics { + let collection_statuses = if let Some(topics) = &req.topics { match self .fetch_collections(topics.iter().map(|topic| &topic.name)) .await { - Ok(collections) => Some(collections), + Ok(statuses) => Some(statuses), Err(e) if Self::is_redirect_error(&e) => return Ok(redirect_response), Err(e) => return Err(e), } @@ -1828,36 +1928,72 @@ impl Session { None }; - let collection_partitions = if let Some(ref collections) = collections { - collections + // Check for NotFound/NotReady collections - return per-topic errors + if let Some(ref statuses) = collection_statuses { + for (topic_name, status) in statuses { + let Some(error_code) = status.error_code() else { + continue; + }; + tracing::warn!(topic = ?topic_name, error_code, "Collection not available"); + // Return error response for all topics + let topics = req + .topics + .as_ref() + .map(|topics| { + topics + .iter() + .map(|t| { + messages::offset_fetch_response::OffsetFetchResponseTopic::default() + .with_name(t.name.clone()) + .with_partitions( + t.partition_indexes + .iter() + .map(|&p| { + messages::offset_fetch_response::OffsetFetchResponsePartition::default() + .with_partition_index(p) + .with_error_code(error_code) + }) + .collect(), + ) + }) + .collect() + }) + .unwrap_or_default(); + return Ok(messages::OffsetFetchResponse::default().with_topics(topics)); + } + } + + // All collections are Ready at this point + let collection_partitions = if let Some(ref statuses) = collection_statuses { + statuses .iter() - .map(|(topic_name, collection)| { - self.encrypt_topic_name( - topic_name.clone(), - Some(collection.binding_backfill_counter), - ) - .map(|encrypted_name| (encrypted_name, collection.partitions.len())) + .filter_map(|(topic_name, status)| match status { + CollectionStatus::Ready(collection) => Some( + self.encrypt_topic_name( + topic_name.clone(), + Some(collection.binding_backfill_counter), + ) + .map(|encrypted_name| (encrypted_name, collection.partitions.len())), + ), + CollectionStatus::Unavailable(_) => None, }) .collect::, _>>()? } else { vec![] }; - // Try fetching with epoch-qualified names first + // Encrypt topic names with epoch-qualified names if let Some(ref mut topics) = req.topics { for topic in topics.iter_mut() { - let collection = collections + let backfill_counter = collection_statuses .as_ref() - .and_then(|colls| colls.iter().find(|(name, _)| name == &topic.name)) - .map(|(_, c)| c); - if let Some(collection) = collection { - topic.name = self.encrypt_topic_name( - topic.name.clone(), - Some(collection.binding_backfill_counter), - )?; - } else { - topic.name = self.encrypt_topic_name(topic.name.clone(), None)?; - } + .and_then(|statuses| statuses.iter().find(|(name, _)| name == &topic.name)) + .and_then(|(_, status)| match status { + CollectionStatus::Ready(c) => Some(c.binding_backfill_counter), + CollectionStatus::Unavailable(_) => None, + }) + .context(format!("Collection not found for topic {:?}", topic.name))?; + topic.name = self.encrypt_topic_name(topic.name.clone(), Some(backfill_counter))?; } } @@ -1892,21 +2028,23 @@ impl Session { }) }); - if should_fallback && collections.is_some() { + if should_fallback && collection_statuses.is_some() { tracing::info!(group_id = ?req.group_id, "No offsets found with epoch, falling back to legacy format"); resp = client.send_request(fallback_req, Some(header)).await?; } for topic in resp.topics.iter_mut() { topic.name = self.decrypt_topic_name(topic.name.to_owned())?; - let maybe_backfill_counter = collections - .as_ref() - .map(|c| { - c.iter() - .find(|(topic_name, _)| topic_name == &topic.name) - .map(|(_, c)| c.binding_backfill_counter) - }) - .flatten(); + let maybe_backfill_counter = collection_statuses.as_ref().and_then(|statuses| { + statuses + .iter() + .find(|(topic_name, _)| topic_name == &topic.name) + .and_then(|(_, status)| match status { + CollectionStatus::Ready(c) => Some(c.binding_backfill_counter), + // Unavailable: don't set committed_leader_epoch + CollectionStatus::Unavailable(_) => None, + }) + }); if let Some(backfill_counter) = maybe_backfill_counter { for partition in topic.partitions.iter_mut() { @@ -1966,19 +2104,23 @@ impl Session { use messages::offset_for_leader_epoch_response::EpochEndOffset; let collection_name = from_downstream_topic_name(topic.topic.clone()); - let Some(collection) = Collection::new(auth, &collection_name).await? else { - let partitions = topic - .partitions - .iter() - .map(|p| { - EpochEndOffset::default() - .with_partition(p.partition) - .with_error_code(ResponseError::UnknownTopicOrPartition.code()) - }) - .collect(); - return Ok(OffsetForLeaderTopicResult::default() - .with_topic(topic.topic) - .with_partitions(partitions)); + let collection = match Collection::new(auth, &collection_name).await?.ready() { + Ok(c) => c, + Err(reason) => { + let error_code = reason.code(); + let partitions = topic + .partitions + .iter() + .map(|p| { + EpochEndOffset::default() + .with_partition(p.partition) + .with_error_code(error_code) + }) + .collect(); + return Ok(OffsetForLeaderTopicResult::default() + .with_topic(topic.topic) + .with_partitions(partitions)); + } }; let current_epoch = collection.binding_backfill_counter as i32; @@ -2201,15 +2343,13 @@ impl Session { async fn fetch_collections( &mut self, topics: impl IntoIterator, - ) -> anyhow::Result> { + ) -> anyhow::Result> { // Re-declare here to drop mutable reference let auth = self.auth.as_ref().unwrap(); futures::future::try_join_all(topics.into_iter().map(|topic| async move { - let collection = Collection::new(auth, topic.as_ref()) - .await? - .context(format!("unable to look up partitions for {:?}", topic))?; - Ok::<(TopicName, Collection), anyhow::Error>((topic.clone(), collection)) + let status = Collection::new(auth, topic.as_ref()).await?; + Ok::<(TopicName, CollectionStatus), anyhow::Error>((topic.clone(), status)) })) .await } @@ -2232,9 +2372,19 @@ impl Session { tracing::debug!( "Loading latest offset for this partition to check if session is data-preview" ); - let collection = Collection::new(auth, collection_name.as_str()) + let collection = match Collection::new(auth, collection_name.as_str()) .await? - .ok_or(anyhow::anyhow!("Collection {} not found", collection_name))?; + .ready() + { + Ok(c) => c, + Err(CollectionUnavailable::NotFound) => { + anyhow::bail!("Collection {} not found", collection_name); + } + Err(CollectionUnavailable::NotReady) => { + // Can't determine data preview status without journals - assume not data preview + return Ok(None); + } + }; match collection .fetch_partition_offset(partition as usize, -1) @@ -2312,17 +2462,14 @@ impl Session { ) -> anyhow::Result { let leader_epoch = collection.binding_backfill_counter as i32; - let partitions = if collection.partitions.is_empty() { - // Fallback to single partition if collection has no partitions defined - vec![Self::build_partition(0, leader_epoch)] - } else { - collection - .partitions - .iter() - .enumerate() - .map(|(index, _)| Self::build_partition(index as i32, leader_epoch)) - .collect() - }; + // Collections with empty partitions should be handled as NotReady before + // reaching this function, so we can safely iterate over partitions here + let partitions = collection + .partitions + .iter() + .enumerate() + .map(|(index, _)| Self::build_partition(index as i32, leader_epoch)) + .collect(); Ok(MetadataResponseTopic::default() .with_name(Some(name)) @@ -2333,17 +2480,17 @@ impl Session { async fn fetch_collections_for_metadata( &self, names: impl IntoIterator, - ) -> anyhow::Result)>> { + ) -> anyhow::Result> { let auth = self .auth .as_ref() .ok_or(anyhow::anyhow!("Session not authenticated"))?; - futures::future::try_join_all( - names.into_iter().map(|name| async move { - Collection::new(auth, &name).await.map(|coll| (name, coll)) - }), - ) + futures::future::try_join_all(names.into_iter().map(|name| async move { + Collection::new(auth, &name) + .await + .map(|status| (name, status)) + })) .await } } diff --git a/crates/dekaf/src/task_manager.rs b/crates/dekaf/src/task_manager.rs index f26b7cc628b..2954057cdfb 100644 --- a/crates/dekaf/src/task_manager.rs +++ b/crates/dekaf/src/task_manager.rs @@ -49,8 +49,6 @@ pub type Result = core::result::Result; const TASK_TIMEOUT: Duration = Duration::from_secs(60 * 3); /// How long before the end of an access token should we start trying to refresh it const REFRESH_START_AT: Duration = Duration::from_secs(60 * 5); -/// How long to cache a MaterializationSpec before re-fetching it, even if the token is still valid. -const SPEC_TTL: Duration = Duration::from_secs(60 * 2); #[derive(Clone)] pub enum TaskState { @@ -152,6 +150,7 @@ pub struct TaskManager { >, interval: Duration, timeout: Duration, + spec_ttl: Duration, client: flow_client::Client, data_plane_fqdn: String, data_plane_signer: jsonwebtoken::EncodingKey, @@ -160,6 +159,7 @@ impl TaskManager { pub fn new( interval: Duration, timeout: Duration, + spec_ttl: Duration, client: flow_client::Client, data_plane_fqdn: String, data_plane_signer: jsonwebtoken::EncodingKey, @@ -168,9 +168,10 @@ impl TaskManager { tasks: std::sync::Mutex::new(HashMap::new()), interval, timeout, + spec_ttl, client, data_plane_fqdn, - data_plane_signer: data_plane_signer, + data_plane_signer, } } @@ -304,6 +305,7 @@ impl TaskManager { &self.data_plane_fqdn, &self.data_plane_signer, self.timeout, + self.spec_ttl, ) .await .context("error fetching or refreshing dekaf task auth")?; @@ -685,12 +687,12 @@ impl DekafTaskAuth { } } } - fn refresh_at(&self) -> anyhow::Result { + fn refresh_at(&self, spec_ttl: Duration) -> anyhow::Result { let token_refresh_at = self.exp()? - REFRESH_START_AT; let spec_refresh_at = match self { - DekafTaskAuth::Redirect { fetched_at, .. } => *fetched_at + SPEC_TTL, - DekafTaskAuth::Auth { fetched_at, .. } => *fetched_at + SPEC_TTL, + DekafTaskAuth::Redirect { fetched_at, .. } => *fetched_at + spec_ttl, + DekafTaskAuth::Auth { fetched_at, .. } => *fetched_at + spec_ttl, }; // Refresh when either the token is nearing expiry or the spec is stale @@ -705,12 +707,12 @@ async fn get_or_refresh_dekaf_auth( data_plane_fqdn: &str, data_plane_signer: &jsonwebtoken::EncodingKey, timeout: Duration, + spec_ttl: Duration, ) -> anyhow::Result { let now = time::OffsetDateTime::now_utc(); if let Some(cached_auth) = cached { - if now < cached_auth.refresh_at()? { - tracing::debug!("DekafTaskAuth is still valid, no need to refresh."); + if now < cached_auth.refresh_at(spec_ttl)? { return Ok(cached_auth); } @@ -728,7 +730,7 @@ async fn get_or_refresh_dekaf_auth( { Ok(resp) => resp, Err(_) => { - // This isn't checking SPEC_TTL, so it will potentially hand out + // This isn't checking spec_ttl, so it will potentially hand out // stale specs up until the token's expiration if time::OffsetDateTime::now_utc() < cached_auth.exp()? { tracing::warn!( diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 24622353bcb..52bd78bf5ed 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -62,6 +62,59 @@ pub struct Collection { pub binding_backfill_counter: u32, } +/// Represents why a collection is unavailable. +#[derive(Debug, Clone, Copy)] +pub enum CollectionUnavailable { + /// The binding doesn't exist in the materialization spec. + NotFound, + /// The binding exists but journals are not yet available. This can happen when: + /// - A collection was recently reset and the writer hasn't created journals yet + /// - The collection exists in the control plane but no data has been written + /// Callers should return a retryable error (e.g., LeaderNotAvailable) to clients. + NotReady, +} + +impl CollectionUnavailable { + pub fn code(self) -> i16 { + use kafka_protocol::error::ResponseError; + match self { + CollectionUnavailable::NotFound => ResponseError::UnknownTopicOrPartition.code(), + CollectionUnavailable::NotReady => ResponseError::LeaderNotAvailable.code(), + } + } +} + +pub enum CollectionStatus { + Ready(Collection), + Unavailable(CollectionUnavailable), +} + +impl CollectionStatus { + pub fn not_found() -> Self { + Self::Unavailable(CollectionUnavailable::NotFound) + } + + pub fn not_ready() -> Self { + Self::Unavailable(CollectionUnavailable::NotReady) + } + + /// Returns the Kafka error code for non-ready states, None if Ready. + pub fn error_code(&self) -> Option { + match self { + CollectionStatus::Ready(_) => None, + CollectionStatus::Unavailable(reason) => Some(reason.code()), + } + } + + /// Convert to Result, returning the unavailability reason for non-Ready states. + pub fn ready(self) -> Result { + match self { + CollectionStatus::Ready(c) => Ok(c), + CollectionStatus::Unavailable(reason) => Err(reason), + } + } +} + /// Partition is a collection journal which is mapped into a stable Kafka partition order. #[derive(Debug, Clone)] pub struct Partition { @@ -88,14 +141,14 @@ impl Collection { pub async fn new( auth: &SessionAuthentication, topic_name: &str, - ) -> anyhow::Result> { + ) -> anyhow::Result { let binding = match auth { SessionAuthentication::Task(task_auth) => { if let Some(binding) = task_auth.get_binding_for_topic(topic_name).await? { binding } else { tracing::warn!("{topic_name} is not a binding of {}", task_auth.task_name); - return Ok(None); + return Ok(CollectionStatus::not_found()); } } SessionAuthentication::Redirect { spec, .. } => { @@ -103,7 +156,7 @@ impl Collection { .context("failed to get binding for topic in redirected session")? else { tracing::warn!("{topic_name} is not a binding of {}", spec.name); - return Ok(None); + return Ok(CollectionStatus::not_found()); }; binding } @@ -216,7 +269,18 @@ impl Collection { "built collection" ); - Ok(Some(Self { + // If there are no partitions/journals, the collection exists but isn't ready to serve. + // This happens when a collection was reset and journals haven't been created yet, + // or when a collection exists but no data has ever been written. + if partitions.is_empty() { + tracing::warn!( + collection_name, + "Collection binding exists but has no journals available" + ); + return Ok(CollectionStatus::not_ready()); + } + + Ok(CollectionStatus::Ready(Self { name: collection_name.to_string(), journal_client, key_ptr, diff --git a/crates/dekaf/tests/dekaf_integration_test.rs b/crates/dekaf/tests/dekaf_integration_test.rs deleted file mode 100644 index e09b4a9ed3a..00000000000 --- a/crates/dekaf/tests/dekaf_integration_test.rs +++ /dev/null @@ -1,725 +0,0 @@ -use anyhow::Context; -use dekaf::connector::DekafConfig; -use futures::StreamExt; -use locate_bin; -use rand::Rng; -use rdkafka::{Message, consumer::Consumer}; -use schema_registry_converter::async_impl::avro; -use schema_registry_converter::async_impl::schema_registry; -use serde_json::json; -use std::{collections::HashMap, env, io::Write, time::Duration}; - -async fn create_consumer<'a>( - username: &'a str, - token: &'a str, - topic: &'a str, -) -> (rdkafka::consumer::StreamConsumer, avro::AvroDecoder<'a>) { - #[allow(non_snake_case)] - let DEKAF_BROKER = env::var("DEKAF_BROKER").expect("Missing DEKAF_BROKER environment variable"); - #[allow(non_snake_case)] - let SCHEMA_REGISTRY = - env::var("DEKAF_REGISTRY").expect("Missing DEKAF_REGISTRY environment variable"); - - let consumer: rdkafka::consumer::StreamConsumer = rdkafka::ClientConfig::new() - .set("bootstrap.servers", DEKAF_BROKER) - .set("security.protocol", "SASL_PLAINTEXT") - .set("sasl.mechanism", "PLAIN") - .set("sasl.username", username) - .set("sasl.password", token) - .set("group.id", "this_needs_to_be_set_but_we_dont_use_it") - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "smallest") - .set("enable.auto.offset.store", "false") - .create() - .expect("Consumer creation failed"); - - consumer - .subscribe(vec![topic].as_slice()) - .expect("Consumer subscription failed"); - - let decoder = avro::AvroDecoder::new( - schema_registry::SrSettings::new_builder(String::from(SCHEMA_REGISTRY)) - .set_basic_authorization(username, Some(token)) - .build() - .expect("failed to build avro decoder"), - ); - - (consumer, decoder) -} - -#[derive(Debug)] -enum SpecAction { - Create, - Delete, -} - -async fn test_specs( - name: &str, - action: SpecAction, - mut capture: models::CaptureDef, - collections: HashMap<&str, models::CollectionDef>, - mut materialization: models::MaterializationDef, -) -> anyhow::Result<(String, String)> { - let mut temp_flow = tempfile::NamedTempFile::new()?; - - let suffix: String = format!("{:04x}", rand::rng().random::()); - - let capture_name = format!("{}/{suffix}/source-http-ingest", name); - let materialization_name = format!("{}/{suffix}/test-dekaf", name); - - tracing::info!(temp=?temp_flow, "Attempting to {:?}", action); - - // rewrite capture bindings - capture.bindings.iter_mut().for_each(|binding| { - binding.target = models::Collection::new(format!("{name}/{}", binding.target.to_string())) - }); - - // rewrite materialization sources - materialization.bindings.iter_mut().for_each(|binding| { - binding - .source - .set_collection(models::Collection::new(format!( - "{name}/{}", - binding.source.collection() - ))) - }); - - let collections_mapped = collections - .into_iter() - .fold(HashMap::new(), |mut state, (k, v)| { - state.insert(format!("{name}/{k}"), v); - state - }); - - let file_contents = json!({ - "captures": { - &capture_name: capture - }, - "collections": collections_mapped, - "materializations": { - &materialization_name: materialization - } - }); - - temp_flow.write(serde_json::to_vec(&file_contents)?.as_slice())?; - - let flowctl = locate_bin::locate("flowctl").context("failed to locate flowctl")?; - - let async_process::Output { - stderr, - stdout: _stdout, - status, - } = async_process::output( - async_process::Command::new(flowctl).args( - match action { - SpecAction::Create => vec![ - "catalog", - "publish", - "--auto-approve", - "--init-data-plane", - "ops/dp/public/local-cluster", - "--source", - temp_flow.path().to_str().unwrap(), - ], - SpecAction::Delete => vec![ - "catalog", - "delete", - "--prefix", - name, - "--captures=true", - "--collections=true", - "--materializations=true", - "--dangerous-auto-approve", - ], - } - .as_slice(), - ), - ) - .await - .context("failed to invoke flowctl")?; - - if !status.success() { - let output = String::from_utf8_lossy(&stderr); - if !output.contains("no specs found matching given selector") { - anyhow::bail!("flowctl failed: {}", output); - } - } - - tracing::info!( - ?capture_name, - ?materialization_name, - "Successful {:?}", - action - ); - - Ok((capture_name, materialization_name)) -} - -async fn sops_encrypt(input: models::RawValue) -> anyhow::Result { - #[allow(non_snake_case)] - let KEYRING = env::var("SOPS_KEYRING").unwrap_or( - "projects/estuary-control/locations/us-central1/keyRings/sops/cryptoKeys/cd-github-control" - .to_string(), - ); - - let sops = locate_bin::locate("sops").context("failed to locate sops")?; - - let async_process::Output { - stderr, - stdout, - status, - } = async_process::input_output( - async_process::Command::new(sops).args([ - "--encrypt", - "--input-type", - "json", - "--output-type", - "json", - "--gcp-kms", - &KEYRING, - "--encrypted-suffix", - "_sops", - "/dev/stdin", - ]), - input.get().as_bytes(), - ) - .await - .context("failed to run sops")?; - - if !status.success() { - anyhow::bail!( - "decrypting sops document failed: {}", - String::from_utf8_lossy(&stderr), - ); - } - - Ok(models::RawValue::from_string( - std::str::from_utf8(stdout.as_slice()) - .context("failed to parse sops output")? - .to_string(), - )?) -} - -async fn get_shard_info( - task_name: &str, -) -> anyhow::Result { - let flowctl = locate_bin::locate("flowctl").context("failed to locate flowctl")?; - - let async_process::Output { - stderr, - stdout, - status, - } = async_process::output(async_process::Command::new(flowctl).args([ - "raw", - "list-shards", - "--task", - task_name, - "-ojson", - ])) - .await - .context("failed to list shards")?; - - if !status.success() { - anyhow::bail!( - "listing shards failed: {}", - String::from_utf8_lossy(&stderr), - ); - } - - Ok(serde_json::from_slice(&stdout)?) -} - -async fn wait_for_primary(task_name: &str) -> anyhow::Result<()> { - loop { - match get_shard_info(task_name).await { - Err(e) => { - tracing::warn!(?e, "Error getting shard info"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - Ok(info) - if info.status.iter().any(|status| { - status.code() == gazette::consumer::replica_status::Code::Primary - }) => - { - return Ok(()); - } - Ok(info) - if info.status.iter().any(|status| { - status.code() == gazette::consumer::replica_status::Code::Failed - }) => - { - tracing::warn!(statuses = ?info.status, "Shard failed"); - anyhow::bail!("Shard failed"); - } - Ok(info) => { - tracing::info!(statuses = ?info.status,"Waiting for primary"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - } - } -} - -async fn send_docs(task_name: &str, path: &str, docs: Vec) -> anyhow::Result<()> { - let shard = get_shard_info(task_name).await?; - - let shard_endpoint = shard - .route - .context("missing shard route")? - .endpoints - .first() - .context("missing shard endpoint")? - .replace("https://", ""); - let shard_labels = shard - .spec - .context("missing shard spec")? - .labels - .context("missing shard labels")? - .labels; - - let hostname = &shard_labels - .iter() - .find(|lab| lab.name == labels::HOSTNAME) - .context("missing HOSTNAME label")? - .value; - let port = &shard_labels - .iter() - .find(|lab| lab.name == labels::EXPOSE_PORT) - .context("missing HOSTNAME label")? - .value; - - let client = reqwest::Client::builder() - .danger_accept_invalid_certs(true) - .build()?; - let url = format!("https://{hostname}-{port}.{shard_endpoint}/{path}"); - - tracing::info!(url, "Sending docs"); - - for doc in docs { - let response = client - .post(url.clone()) - .header("Content-Type", "application/json") - .body(doc.get().to_string()) - .send() - .await - .context("failed to send document")?; - - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - anyhow::bail!("failed to send document: status={}, body={}", status, body); - } - } - - Ok(()) -} - -// Note: For the moment, this is going to use an externally-running `agent`, either from Tilt -// or the one running in prod. That means that any changes to `dekaf::connector` will not get -// tested, as that functionality gets baked into the agent/runtime directly. An improvement -// to these tests would be to additionally run/test against a local build of the latest `agent`. -async fn roundtrip( - name: String, - endpoint_config: DekafConfig, - schema: serde_json::Value, - field_selection: serde_json::Value, - docs: Vec, -) -> anyhow::Result<()> { - // Create test collection with specific schema - // Create test Dekaf materialization - let materialization_config = sops_encrypt(models::RawValue::from_value(&serde_json::to_value( - endpoint_config.clone(), - )?)) - .await?; - - let capture: models::CaptureDef = serde_json::from_value(json!({ - "endpoint": { - "connector": { - "image": "ghcr.io/estuary/source-http-ingest:dev", - "config": sops_encrypt(models::RawValue::from_value(&json!({ - "paths": ["/data"] - }))).await? - } - }, - "bindings": [{ - "resource": { - "path": "/data", - "stream": "/data" - }, - "target": "single_collection" - }] - }))?; - - let collections = HashMap::from_iter( - vec![("single_collection", serde_json::from_value(schema)?)].into_iter(), - ); - - let materialization: models::MaterializationDef = serde_json::from_value(json!({ - "endpoint": { - "dekaf": { - "variant": "foo", - "config": materialization_config - } - }, - "bindings": [ - { - "resource": { - "topic_name": "test_topic" - }, - "source": "single_collection", - "fields": field_selection - } - ] - }))?; - - let task_name_prefix = format!("test/dekaf_testing/{name}"); - - test_specs( - &task_name_prefix, - SpecAction::Delete, - capture.clone(), - collections.clone(), - materialization.clone(), - ) - .await?; - - let (capture_name, materialization_name) = test_specs( - &task_name_prefix, - SpecAction::Create, - capture.clone(), - collections.clone(), - materialization.clone(), - ) - .await?; - - wait_for_primary(&capture_name).await?; - - tracing::info!("Capture is primary"); - - send_docs( - &capture_name, - "data", - docs.iter() - .map(models::RawValue::from_value) - .collect::>(), - ) - .await?; - - tracing::info!("Sent test docs"); - - // Consume test documents - let (consumer, decoder) = - create_consumer(&materialization_name, &endpoint_config.token, "test_topic").await; - - let mut doc_stream = consumer.stream(); - - let mut counter = 0; - while let Some(consumed) = doc_stream.next().await { - let consumed = consumed?; - - // Connfirm that field selection was applied - - let decoded_key = match decoder.decode(consumed.key()).await { - Err(e) => { - tracing::error!(err=?e, "Error decoding key"); - return Err(anyhow::Error::from(e)); - } - Ok(d) => apache_avro::from_value::(&d.value), - }?; - - insta::assert_json_snapshot!(format!("{name}-key-{counter}"), &decoded_key); - - let decoded_payload = match decoder.decode(consumed.payload()).await { - Err(e) => { - tracing::error!(err=?e, "Error decoding value"); - return Err(anyhow::Error::from(e)); - } - Ok(d) => apache_avro::from_value::(&d.value), - }?; - - insta::assert_json_snapshot!(format!("{name}-value-{counter}"), &decoded_payload, { - ".flow_published_at.json" => "[timestamp]", - ".flow_document._flow_extra._meta.json" => "[contains_timestamp]" - }); - - counter += 1; - if counter >= docs.len() { - break; - } - } - - // Delete test specs - - test_specs( - &task_name_prefix, - SpecAction::Delete, - capture.clone(), - collections.clone(), - materialization.clone(), - ) - .await?; - Ok(()) -} - -#[ignore] -#[tokio::test] -#[tracing_test::traced_test] -async fn test_field_selection_specific() -> anyhow::Result<()> { - roundtrip( - "field_selection_specific".to_string(), - dekaf::connector::DekafConfig { - deletions: dekaf::connector::DeletionMode::Kafka, - strict_topic_names: false, - token: "1234".to_string(), - }, - json!({ - "schema": { - "properties": { - "key": { - "type": "string" - }, - "field_a": { - "type": "string", - }, - "field_b": { - "type": "string", - } - }, - "type": "object", - "required": [ - "key", - "field_a", - "field_b" - ], - }, - "key": [ - "/key" - ] - }), - json!({ - "include": { - "field_a": {} - }, - "recommended": false - }), - vec![json!({ - "key": "first", - "field_a": "foo", - "field_b": "bar" - })], - ) - .await -} - -#[ignore] -#[tokio::test] -#[tracing_test::traced_test] -async fn test_field_selection_recommended() -> anyhow::Result<()> { - roundtrip( - "field_selection_recommended".to_string(), - dekaf::connector::DekafConfig { - deletions: dekaf::connector::DeletionMode::Kafka, - strict_topic_names: false, - token: "1234".to_string(), - }, - json!({ - "schema": { - "properties": { - "key": { - "type": "string" - }, - "field_a": { - "type": "string", - }, - "field_b": { - "type": "string", - } - }, - "type": "object", - "required": [ - "key", - "field_a", - "field_b" - ], - }, - "key": [ - "/key" - ] - }), - json!({ - "recommended": true - }), - vec![json!({ - "key": "first", - "field_a": "foo", - "field_b": "bar" - })], - ) - .await -} - -#[ignore] -#[tokio::test] -#[tracing_test::traced_test] -async fn test_field_selection_flow_document() -> anyhow::Result<()> { - roundtrip( - "field_selection_flow_document".to_string(), - dekaf::connector::DekafConfig { - deletions: dekaf::connector::DeletionMode::Kafka, - strict_topic_names: false, - token: "1234".to_string(), - }, - json!({ - "schema": { - "properties": { - "key": { - "type": "string" - }, - "field_a": { - "type": "string", - }, - "field_b": { - "type": "string", - } - }, - "type": "object", - "required": [ - "key", - "field_a", - "field_b" - ], - }, - "key": [ - "/key" - ] - }), - json!({ - "include": { - "key": {}, - "flow_document": {} - }, - "exclude": [ - "field_a", - "field_b" - ], - "recommended": true - }), - vec![json!({ - "key": "first", - "field_a": "foo", - "field_b": "bar" - })], - ) - .await -} - -#[ignore] -#[tokio::test] -#[tracing_test::traced_test] -async fn test_meta_is_deleted() -> anyhow::Result<()> { - roundtrip( - "meta_is_deleted".to_string(), - dekaf::connector::DekafConfig { - deletions: dekaf::connector::DeletionMode::CDC, - strict_topic_names: false, - token: "1234".to_string(), - }, - json!({ - "schema": { - "properties": { - "key": { - "type": "string" - }, - "field_a": { - "type": "string", - }, - "field_b": { - "type": "string", - }, - "_meta": { - "type": "object", - "properties": { - "op": { - "type": "string" - } - } - } - }, - "type": "object", - "required": [ - "key", - ], - }, - "key": [ - "/key" - ] - }), - json!({ - "include": { - "key": {}, - }, - "recommended": false - }), - vec![ - json!({ - "key": "first", - "field_a": "foo", - "field_b": "bar", - "_meta": { - "op": "c" - } - }), - json!({ - "key": "first", - "field_a": "foo", - "field_b": "bar", - "_meta": { - "op": "d" - } - }), - ], - ) - .await -} - -#[ignore] -#[tokio::test] -#[tracing_test::traced_test] -async fn test_fields_not_required() -> anyhow::Result<()> { - roundtrip( - "fields_not_required".to_string(), - dekaf::connector::DekafConfig { - deletions: dekaf::connector::DeletionMode::Kafka, - strict_topic_names: false, - token: "1234".to_string(), - }, - json!({ - "schema": { - "properties": { - "key": { - "type": "string" - }, - "field_a": { - "type": "string", - }, - }, - "type": "object", - "required": [ - "key", - ], - }, - "key": [ - "/key" - ] - }), - json!({ - "recommended": true - }), - vec![json!({ - "key": "first", - // Omitting "field_a" - })], - ) - .await -} diff --git a/crates/dekaf/tests/e2e/basic.rs b/crates/dekaf/tests/e2e/basic.rs new file mode 100644 index 00000000000..61d4546092b --- /dev/null +++ b/crates/dekaf/tests/e2e/basic.rs @@ -0,0 +1,34 @@ +use super::DekafTestEnv; +use serde_json::json; + +const FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); + +/// Basic roundtrip test: publish specs, inject documents, consume via Dekaf. +#[tokio::test] +async fn test_basic_roundtrip() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("basic_roundtrip", FIXTURE).await?; + + env.inject_documents( + "data", + vec![ + json!({"id": "doc-1", "value": "hello"}), + json!({"id": "doc-2", "value": "world"}), + ], + ) + .await?; + + let consumer = env.kafka_consumer()?; + consumer.subscribe(&["test_topic"])?; + + let records = consumer.fetch().await?; + + assert_eq!(records.len(), 2, "should receive both injected documents"); + assert_eq!(records[0].value["id"], "doc-1"); + assert_eq!(records[0].value["value"], "hello"); + assert_eq!(records[1].value["id"], "doc-2"); + assert_eq!(records[1].value["value"], "world"); + + Ok(()) +} diff --git a/crates/dekaf/tests/e2e/collection_reset.rs b/crates/dekaf/tests/e2e/collection_reset.rs new file mode 100644 index 00000000000..1971dc84845 --- /dev/null +++ b/crates/dekaf/tests/e2e/collection_reset.rs @@ -0,0 +1,662 @@ +use super::{ + DekafTestEnv, + raw_kafka::{ + TestKafkaClient, fetch_current_leader_epoch, fetch_partition_error, + list_offsets_partition_error, metadata_leader_epoch, offset_for_epoch_result, + }, +}; +use anyhow::Context; +use kafka_protocol::ResponseError; +use serde_json::json; +use std::time::Duration; + +const FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); +const EPOCH_CHANGE_TIMEOUT: Duration = Duration::from_secs(30); + +/// When a consumer sends a `current_leader_epoch` that is less than Dekaf's +/// current epoch (derived from binding backfill counter), Dekaf should return `FENCED_LEADER_EPOCH`. +/// +/// The response also includes the current leader epoch in `current_leader`, allowing +/// the consumer to know what the new epoch is. +#[tokio::test] +async fn test_fenced_leader_epoch_on_stale_consumer() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("fenced_epoch", FIXTURE).await?; + + // Inject initial document so the collection has data + env.inject_documents("data", vec![json!({"id": "1", "value": "pre-reset"})]) + .await?; + + let initial_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + + perform_collection_reset(&env, "test_topic", 0, initial_epoch, EPOCH_CHANGE_TIMEOUT).await?; + + let new_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + + // Now fetch with the OLD (stale) epoch - should get FENCED_LEADER_EPOCH + let info = env.connection_info(); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let fetch_resp = client + .fetch_with_epoch("test_topic", 0, 0, initial_epoch) + .await?; + let error = fetch_partition_error(&fetch_resp, "test_topic", 0).expect("should have error"); + + assert!( + error == ResponseError::FencedLeaderEpoch.code(), + "expected FENCED_LEADER_EPOCH for stale consumer, got error code {error}" + ); + + tracing::info!( + initial_epoch, + new_epoch, + "FENCED_LEADER_EPOCH received when fetching stale epoch" + ); + + // Extract the leader epoch from the fetch response and verify it's correct + let response_epoch = + fetch_current_leader_epoch(&fetch_resp, "test_topic", 0).expect("should have epoch"); + assert_eq!( + response_epoch, new_epoch, + "response should include current epoch" + ); + + // Verify fetch works with the new epoch + let fetch_resp = client + .fetch_with_epoch("test_topic", 0, 0, new_epoch) + .await?; + let error = fetch_partition_error(&fetch_resp, "test_topic", 0) + .expect("partition should exist in fetch response"); + assert_eq!( + error, 0, + "fetch should succeed with new epoch, got error code {error}" + ); + + Ok(()) +} + +/// Verify that ListOffsets returns `FENCED_LEADER_EPOCH` for stale epoch. +/// +/// Both Fetch and ListOffsets should validate the `current_leader_epoch` parameter. +#[tokio::test] +async fn test_list_offsets_fenced_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("list_offsets_fenced", FIXTURE).await?; + let info = env.connection_info(); + + env.inject_documents("data", vec![json!({"id": "1", "value": "test"})]) + .await?; + + // Get initial epoch and verify ListOffsets works before reset + let token = env.dekaf_token()?; + let initial_epoch = { + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let metadata = client.metadata(&["test_topic"]).await?; + let epoch = + metadata_leader_epoch(&metadata, "test_topic", 0).expect("metadata should have epoch"); + + // Verify ListOffsets works with current epoch before reset + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, epoch) // -1 = latest + .await?; + let error = list_offsets_partition_error(&list_resp, "test_topic", 0) + .expect("partition should exist in ListOffsets response"); + assert_eq!( + error, 0, + "ListOffsets should succeed before reset, got error code {error}" + ); + + epoch + }; + + perform_collection_reset(&env, "test_topic", 0, initial_epoch, EPOCH_CHANGE_TIMEOUT).await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, initial_epoch) + .await?; + let error = + list_offsets_partition_error(&list_resp, "test_topic", 0).expect("should have error"); + + assert!( + error == ResponseError::FencedLeaderEpoch.code(), + "expected FENCED_LEADER_EPOCH for stale epoch in ListOffsets, got error code {error}" + ); + + // Verify ListOffsets succeeds with the NEW epoch + let new_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + let list_resp_new = client + .list_offsets_with_epoch("test_topic", 0, -1, new_epoch) + .await?; + let error_new = list_offsets_partition_error(&list_resp_new, "test_topic", 0) + .expect("partition should exist"); + assert_eq!( + error_new, 0, + "ListOffsets should succeed with new epoch, got error code {error_new}" + ); + + Ok(()) +} + +/// Verify that Fetch returns `UNKNOWN_LEADER_EPOCH` for epoch > current. +/// +/// When a consumer sends an epoch that is greater than the current epoch, +/// Dekaf should return `UNKNOWN_LEADER_EPOCH`. This shouldn't happen in normal +/// operation but is something that could theoretically happen. +#[tokio::test] +async fn test_unknown_leader_epoch_for_future_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("unknown_epoch", FIXTURE).await?; + let info = env.connection_info(); + + env.inject_documents("data", vec![json!({"id": "1", "value": "test"})]) + .await?; + + tracing::info!("Connecting raw Kafka client"); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let current_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + + // Fetch with a future epoch (way higher than current) + let future_epoch = current_epoch + 100; + + let fetch_resp = client + .fetch_with_epoch("test_topic", 0, 0, future_epoch) + .await?; + let error = fetch_partition_error(&fetch_resp, "test_topic", 0).expect("should have error"); + + assert!( + error == ResponseError::UnknownLeaderEpoch.code(), + "expected UNKNOWN_LEADER_EPOCH for future epoch, got error code {error}" + ); + + // Also test ListOffsets with future epoch + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, future_epoch) + .await?; + let list_error = + list_offsets_partition_error(&list_resp, "test_topic", 0).expect("should have error"); + + assert!( + list_error == ResponseError::UnknownLeaderEpoch.code(), + "expected UNKNOWN_LEADER_EPOCH for future epoch in ListOffsets, got error code {list_error}" + ); + + // Verify fetch/list_offsets work with the CURRENT epoch + let fetch_resp_ok = client + .fetch_with_epoch("test_topic", 0, 0, current_epoch) + .await?; + let error_ok = + fetch_partition_error(&fetch_resp_ok, "test_topic", 0).expect("partition should exist"); + assert_eq!( + error_ok, 0, + "Fetch should succeed with current epoch, got error code {error_ok}" + ); + + let list_resp_ok = client + .list_offsets_with_epoch("test_topic", 0, -1, current_epoch) + .await?; + let list_error_ok = list_offsets_partition_error(&list_resp_ok, "test_topic", 0) + .expect("partition should exist"); + assert_eq!( + list_error_ok, 0, + "ListOffsets should succeed with current epoch, got error code {list_error_ok}" + ); + + Ok(()) +} + +/// Verify that OffsetForLeaderEpoch returns `end_offset=0` for old epochs. +/// +/// After receiving `FENCED_LEADER_EPOCH`, consumers call `OffsetForLeaderEpoch` +/// to find the end offset for their old epoch. Dekaf returns `end_offset=0` for +/// old epochs, indicating the consumer should reset to the beginning. +#[tokio::test] +async fn test_offset_for_leader_epoch_returns_zero_for_old_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("offset_for_old_epoch", FIXTURE).await?; + + env.inject_documents("data", vec![json!({"id": "1", "value": "test"})]) + .await?; + + let initial_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + + perform_collection_reset(&env, "test_topic", 0, initial_epoch, EPOCH_CHANGE_TIMEOUT).await?; + + let info = env.connection_info(); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let resp = client + .offset_for_leader_epoch("test_topic", 0, initial_epoch) + .await?; + let result = offset_for_epoch_result(&resp, "test_topic", 0).expect("should have result"); + + assert_eq!( + result.error_code, 0, + "OffsetForLeaderEpoch should succeed for old epoch" + ); + + assert_eq!( + result.end_offset, 0, + "old epoch should return end_offset=0 (reset to beginning), got {}", + result.end_offset + ); + + let new_epoch = get_leader_epoch(&env, "test_topic", 0).await?; + + assert_eq!( + result.leader_epoch, new_epoch, + "should return current epoch in response" + ); + + Ok(()) +} + +/// Verify current epoch returns actual high watermark (not 0). +/// +/// When querying `OffsetForLeaderEpoch` for the current epoch, Dekaf should +/// return the actual high watermark, not 0. This ensures that only old epochs +/// trigger reset-to-beginning behavior. +#[tokio::test] +async fn test_offset_for_leader_epoch_returns_highwater_for_current() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("offset_for_current", FIXTURE).await?; + let info = env.connection_info(); + + // Inject multiple documents so we have a meaningful high watermark + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "first"}), + json!({"id": "2", "value": "second"}), + json!({"id": "3", "value": "third"}), + ], + ) + .await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let metadata = client.metadata(&["test_topic"]).await?; + let current_epoch = + metadata_leader_epoch(&metadata, "test_topic", 0).expect("metadata should have epoch"); + + // Get the actual high watermark via ListOffsets for comparison + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, current_epoch) // -1 = latest + .await?; + let list_partition = list_resp + .topics + .iter() + .find(|t| t.name.as_str() == "test_topic") + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == 0)) + .expect("partition should exist"); + let high_watermark = list_partition.offset; + + let resp = client + .offset_for_leader_epoch("test_topic", 0, current_epoch) + .await?; + let result = offset_for_epoch_result(&resp, "test_topic", 0).expect("should have result"); + + assert_eq!( + result.error_code, 0, + "OffsetForLeaderEpoch should succeed for current epoch" + ); + + assert_eq!( + result.end_offset, high_watermark, + "current epoch should return exact high_watermark" + ); + + assert_eq!( + result.leader_epoch, current_epoch, + "should return current epoch in response" + ); + + Ok(()) +} + +// Collection reset can cause a race condition between activation and deletion, +// so we need to do this song and dance where we first disable the task in order +// to prevent it from recreating journals with an old first-pub ID, then reset the collection, +// then re-enable the task. We then want to wait until Dekaf's cache has picked up the new spec. +pub async fn perform_collection_reset( + env: &DekafTestEnv, + topic: &str, + partition: i32, + initial_epoch: i32, + epoch_timeout: Duration, +) -> anyhow::Result<()> { + env.disable_capture().await?; + env.reset_collections().await?; + env.enable_capture().await?; + + // Wait for capture to be ready + let capture = env.capture_name().context("no capture in fixture")?; + env.wait_for_primary(capture).await?; + + // Inject a document to trigger lazy journal creation for the new epoch + tracing::info!("Injecting document to create new journal"); + env.inject_documents( + "data", + vec![json!({"id": "reset-trigger", "value": "post-reset"})], + ) + .await?; + + let info = env.connection_info(); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + // Wait for Dekaf to pick up the new epoch + let new_epoch = + wait_for_epoch_change(&mut client, topic, partition, initial_epoch, epoch_timeout).await?; + tracing::info!(initial_epoch, new_epoch, "Collection reset completed"); + + Ok(()) +} + +pub async fn get_leader_epoch( + env: &DekafTestEnv, + topic: &str, + partition: i32, +) -> anyhow::Result { + let info = env.connection_info(); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + let metadata = client.metadata(&[topic]).await?; + let epoch = + metadata_leader_epoch(&metadata, topic, partition).context("metadata should have epoch")?; + tracing::info!(initial_epoch = epoch, "Got initial epoch"); + Ok(epoch) +} + +/// Wait for Dekaf to report a leader epoch greater than `previous_epoch` AND have partitions available. +/// +/// This polls the metadata endpoint until both conditions are met: +/// 1. The epoch changes (spec refresh completed) +/// 2. The topic has at least one partition (journal listing completed) +/// +/// The second condition is needed because after a collection reset, there's a delay between +/// the spec refresh (epoch change) and the new journal being listed by Gazette. +pub async fn wait_for_epoch_change( + client: &mut TestKafkaClient, + topic: &str, + partition: i32, + previous_epoch: i32, + timeout: std::time::Duration, +) -> anyhow::Result { + let deadline = std::time::Instant::now() + timeout; + + tracing::info!( + %topic, + partition, + previous_epoch, + timeout_secs = timeout.as_secs(), + "Waiting for epoch change and partitions" + ); + + let mut last_error: Option = None; + let mut last_epoch: Option = None; + let mut last_has_partitions = false; + + loop { + match client.metadata(&[topic]).await { + Ok(metadata) => { + if let Some(epoch) = metadata_leader_epoch(&metadata, topic, partition) { + last_epoch = Some(epoch); + // Check if partitions exist (not just epoch change) + let has_partitions = metadata + .topics + .iter() + .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some(topic)) + .map(|t| !t.partitions.is_empty()) + .unwrap_or(false); + last_has_partitions = has_partitions; + + if epoch > previous_epoch && has_partitions { + tracing::info!( + %topic, + partition, + previous_epoch, + new_epoch = epoch, + "Epoch changed and partitions available" + ); + return Ok(epoch); + } + tracing::debug!( + %topic, + partition, + current_epoch = epoch, + previous_epoch, + has_partitions, + "Waiting for epoch change and/or partitions" + ); + } + } + Err(e) => { + last_error = Some(e.to_string()); + tracing::debug!(error = %e, "Metadata request failed, will retry"); + } + } + + if std::time::Instant::now() > deadline { + let context = match (&last_error, last_epoch) { + (Some(err), _) => format!("last error: {err}"), + (None, Some(epoch)) => { + format!("last_epoch={epoch}, has_partitions={last_has_partitions}") + } + (None, None) => "no metadata received".to_string(), + }; + anyhow::bail!( + "timeout waiting for epoch to change from {previous_epoch} for {topic}:{partition} ({context})" + ); + } + + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } +} + +/// Verify Metadata response includes leader_epoch >= 1. +/// +/// Dekaf maps the binding's backfill counter to Kafka's leader epoch. +/// Since we add 1 to avoid epoch 0 (which consumers handle poorly), +/// the epoch should always be >= 1. +#[tokio::test] +async fn test_metadata_includes_leader_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("metadata_epoch", FIXTURE).await?; + let info = env.connection_info(); + + // Inject a document so the topic has data + env.inject_documents("data", vec![json!({"id": "1", "value": "test"})]) + .await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let metadata = client.metadata(&["test_topic"]).await?; + + // Extract leader epoch from the partition + let leader_epoch = metadata_leader_epoch(&metadata, "test_topic", 0) + .expect("metadata should include leader_epoch"); + + // For a fresh collection with no resets, backfill counter is 0, so epoch = 0 + 1 = 1 + assert_eq!( + leader_epoch, 1, + "fresh collection should have leader_epoch = 1 (backfill_counter 0 + 1)" + ); + + Ok(()) +} + +/// Verify ListOffsets response includes leader_epoch. +#[tokio::test] +async fn test_list_offsets_includes_leader_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("list_offsets_epoch", FIXTURE).await?; + let info = env.connection_info(); + + // Inject documents so offsets exist + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "first"}), + json!({"id": "2", "value": "second"}), + ], + ) + .await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let metadata = client.metadata(&["test_topic"]).await?; + let current_epoch = + metadata_leader_epoch(&metadata, "test_topic", 0).expect("metadata should have epoch"); + + // Fresh collection should have epoch = 1 + assert_eq!(current_epoch, 1, "fresh collection should have epoch = 1"); + + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, current_epoch) + .await?; + + let error_code = list_offsets_partition_error(&list_resp, "test_topic", 0) + .expect("partition should exist in ListOffsets response"); + assert_eq!( + error_code, 0, + "ListOffsets should succeed with current epoch, got error code {error_code}" + ); + + // Extract the leader_epoch from response + let latest_partition = list_resp + .topics + .iter() + .find(|t| t.name.as_str() == "test_topic") + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == 0)) + .expect("partition should exist"); + + assert_eq!( + latest_partition.leader_epoch, current_epoch, + "leader_epoch in ListOffsets response should match metadata epoch" + ); + + let earliest_resp = client + .list_offsets_with_epoch("test_topic", 0, -2, current_epoch) + .await?; + + let earliest_partition = earliest_resp + .topics + .iter() + .find(|t| t.name.as_str() == "test_topic") + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == 0)) + .expect("partition should exist"); + + assert_eq!( + earliest_partition.leader_epoch, current_epoch, + "leader_epoch for earliest offset should match metadata epoch" + ); + + // Earliest offset should be 0 + assert_eq!( + earliest_partition.offset, 0, + "earliest offset should be 0, got {}", + earliest_partition.offset + ); + + // Latest offset should be > 0 after injecting documents + assert!( + latest_partition.offset > 0, + "latest offset should be > 0 after injecting documents, got {}", + latest_partition.offset + ); + + // Latest should be greater than earliest + assert!( + latest_partition.offset > earliest_partition.offset, + "latest offset ({}) should be > earliest offset ({})", + latest_partition.offset, + earliest_partition.offset + ); + + Ok(()) +} + +/// Verify Fetch response includes leader_epoch in current_leader. +/// +/// When fetching data, the response should include the current_leader field +/// with the leader_epoch, allowing consumers to detect epoch changes. +#[tokio::test] +async fn test_fetch_response_includes_leader_epoch() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("fetch_epoch", FIXTURE).await?; + let info = env.connection_info(); + + // Inject documents to fetch + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "hello"}), + json!({"id": "2", "value": "world"}), + ], + ) + .await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let metadata = client.metadata(&["test_topic"]).await?; + let current_epoch = + metadata_leader_epoch(&metadata, "test_topic", 0).expect("metadata should have epoch"); + + // Fresh collection should have epoch = 1 + assert_eq!(current_epoch, 1, "fresh collection should have epoch = 1"); + + let fetch_resp = client + .fetch_with_epoch("test_topic", 0, 0, current_epoch) + .await?; + + let error_code = fetch_partition_error(&fetch_resp, "test_topic", 0) + .expect("partition should exist in fetch response"); + assert_eq!( + error_code, 0, + "Fetch should succeed with current epoch, got error code {error_code}" + ); + + // Extract leader_epoch from current_leader + let response_epoch = fetch_current_leader_epoch(&fetch_resp, "test_topic", 0) + .expect("Fetch response should include current_leader with leader_epoch"); + + assert_eq!( + response_epoch, current_epoch, + "Fetch response epoch ({response_epoch}) should match metadata epoch ({current_epoch})" + ); + + let partition = fetch_resp + .responses + .iter() + .find(|t| t.topic.as_str() == "test_topic") + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == 0)) + .expect("partition should exist"); + + assert!( + partition.high_watermark > 0, + "high_watermark should be > 0 after injecting documents, got {}", + partition.high_watermark + ); + + Ok(()) +} diff --git a/crates/dekaf/tests/e2e/empty_fetch.rs b/crates/dekaf/tests/e2e/empty_fetch.rs new file mode 100644 index 00000000000..34d686a71ed --- /dev/null +++ b/crates/dekaf/tests/e2e/empty_fetch.rs @@ -0,0 +1,61 @@ +use super::DekafTestEnv; +use serde_json::json; + +const FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); + +/// Verify that empty fetch responses are valid and don't cause parse errors. +/// +/// The test: +/// 1. Injects 2 documents and consumes them +/// 2. Issues another fetch (no more documents exist, triggers empty response) +/// 3. Verifies no parse errors occurred +/// 4. Injects one more document and confirms consumer can still fetch it +#[tokio::test] +async fn test_empty_fetch_valid_message_set_size() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("empty_fetch", FIXTURE).await?; + + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "first"}), + json!({"id": "2", "value": "second"}), + ], + ) + .await?; + + let consumer = env.kafka_consumer()?; + consumer.subscribe(&["test_topic"])?; + + let records = consumer.fetch().await?; + + assert_eq!(records.len(), 2, "should receive 2 initial documents"); + assert_eq!(records[0].value["id"], "1"); + assert_eq!(records[0].value["value"], "first"); + assert_eq!(records[1].value["id"], "2"); + assert_eq!(records[1].value["value"], "second"); + + // Fetch again when caught up. Should return empty since no new documents. + let empty_records = consumer.fetch().await?; + assert_eq!( + empty_records.len(), + 0, + "should receive empty response when caught up" + ); + + env.inject_documents("data", vec![json!({"id": "3", "value": "third"})]) + .await?; + + let more_records = consumer.fetch().await?; + + assert_eq!( + more_records.len(), + 1, + "should receive 1 document after reinject" + ); + assert_eq!(more_records[0].value["id"], "3"); + assert_eq!(more_records[0].value["value"], "third"); + + Ok(()) +} diff --git a/crates/dekaf/tests/e2e/fixtures/basic.flow.yaml b/crates/dekaf/tests/e2e/fixtures/basic.flow.yaml new file mode 100644 index 00000000000..4b6aa363415 --- /dev/null +++ b/crates/dekaf/tests/e2e/fixtures/basic.flow.yaml @@ -0,0 +1,35 @@ +collections: + test_data: + schema: + type: object + properties: + id: { type: string } + value: { type: string } + required: [id] + key: [/id] + +captures: + source_ingest: + endpoint: + connector: + image: ghcr.io/estuary/source-http-ingest:dev + config: + paths: ["/data"] + bindings: + - resource: { path: "/data", stream: "/data" } + target: test_data + +materializations: + dekaf_test: + endpoint: + dekaf: + variant: testing + config: + token: "test-token-12345" + strict_topic_names: false + bindings: + - source: test_data + resource: { topic_name: test_topic } + fields: + recommended: true + exclude: [flow_published_at] diff --git a/crates/dekaf/tests/e2e/harness.rs b/crates/dekaf/tests/e2e/harness.rs new file mode 100644 index 00000000000..0926fb26a85 --- /dev/null +++ b/crates/dekaf/tests/e2e/harness.rs @@ -0,0 +1,498 @@ +use anyhow::Context; +use std::collections::BTreeMap; +use std::time::Duration; + +/// Prefix for test namespaces. Requires storage mappings and user grants +/// provisioned by `mise run local:test-tenant`. +const TEST_NAMESPACE_PREFIX: &str = "test/dekaf"; + +pub fn init_tracing() { + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::from_default_env() + .add_directive("e2e=debug".parse().unwrap()) + .add_directive("dekaf=info".parse().unwrap()), + ) + .with_test_writer() + .try_init(); +} + +/// Create a flowctl command configured for local stack. +/// Requires FLOW_ACCESS_TOKEN environment variable to be set. +fn flowctl_command() -> anyhow::Result { + // Try to find flowctl in cargo-target/debug first (where `cargo build` puts it), + // falling back to locate_bin (which checks alongside the test binary and PATH). + let cargo_target = std::env::var("CARGO_TARGET_DIR") + .unwrap_or_else(|_| format!("{}/cargo-target", std::env::var("HOME").unwrap())); + let debug_flowctl = std::path::PathBuf::from(&cargo_target).join("debug/flowctl"); + + let flowctl = if debug_flowctl.exists() { + debug_flowctl + } else { + locate_bin::locate("flowctl")? + }; + + let home = std::env::var("HOME").unwrap(); + let ca_cert = + std::env::var("SSL_CERT_FILE").unwrap_or_else(|_| format!("{}/flow-local/ca.crt", home)); + let access_token = std::env::var("FLOW_ACCESS_TOKEN") + .context("FLOW_ACCESS_TOKEN environment variable must be set for e2e tests")?; + + let mut cmd = async_process::Command::new(flowctl); + cmd.env("FLOW_ACCESS_TOKEN", access_token); + cmd.env("SSL_CERT_FILE", ca_cert); + cmd.arg("--profile").arg("local"); + Ok(cmd) +} + +/// Test environment for Dekaf E2E tests. +pub struct DekafTestEnv { + /// Unique namespace for this test (e.g., "test/dekaf/my_test/a1b2") + pub namespace: String, + /// The rewritten catalog for modification in disable/enable/reset operations + catalog: models::Catalog, +} + +impl DekafTestEnv { + /// Setup test environment from a fixture. + /// + /// The fixture is a catalog YAML. Names are automatically + /// rewritten to include a unique test namespace. + pub async fn setup(test_name: &str, fixture_yaml: &str) -> anyhow::Result { + let suffix = format!("{:04x}", rand::random::()); + let namespace = format!("{TEST_NAMESPACE_PREFIX}/{test_name}/{suffix}"); + + tracing::info!(%namespace, "Setting up test environment"); + + let catalog = rewrite_catalog_names(&namespace, fixture_yaml)?; + + let temp_file = tempfile::Builder::new().suffix(".json").tempfile()?; + std::fs::write(temp_file.path(), serde_json::to_string_pretty(&catalog)?)?; + + let output = async_process::output(flowctl_command()?.args([ + "catalog", + "publish", + "--auto-approve", + "--init-data-plane", + "ops/dp/public/local-cluster", + "--source", + temp_file.path().to_str().unwrap(), + ])) + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!(%stderr, "flowctl publish failed"); + anyhow::bail!("flowctl publish failed: {}", stderr); + } + + tracing::info!("Publish succeeded"); + + let env = Self { namespace, catalog }; + + // Wait for capture shard to be ready (Dekaf materializations don't have shards) + if let Some(capture) = env.capture_name() { + env.wait_for_primary(capture).await?; + } + + Ok(env) + } + + /// Returns the name of the first materialization in the catalog. + pub fn materialization_name(&self) -> Option<&str> { + self.catalog + .materializations + .keys() + .next() + .map(|k| k.as_ref()) + } + + /// Returns the name of the first capture in the catalog. + pub fn capture_name(&self) -> Option<&str> { + self.catalog.captures.keys().next().map(|k| k.as_ref()) + } + + /// Returns an iterator over collection names in the catalog. + pub fn collection_names(&self) -> impl Iterator { + self.catalog.collections.keys().map(|k| k.as_ref()) + } + + /// Wait for a task's shard to become primary. + /// Only needed for captures/derivations, not materializations. + pub async fn wait_for_primary(&self, task_name: &str) -> anyhow::Result<()> { + tracing::info!(%task_name, "Waiting for shard to become primary"); + let deadline = std::time::Instant::now() + Duration::from_secs(60); + + loop { + let output = async_process::output(flowctl_command()?.args([ + "raw", + "list-shards", + "--task", + task_name, + "-ojson", + ])) + .await?; + + if output.status.success() { + let shard: proto_gazette::consumer::list_response::Shard = + serde_json::from_slice(&output.stdout)?; + + let status_codes: Vec<_> = shard.status.iter().map(|s| s.code()).collect(); + tracing::debug!(?status_codes, "Shard status"); + + if shard + .status + .iter() + .any(|s| s.code() == proto_gazette::consumer::replica_status::Code::Primary) + { + tracing::info!(%task_name, "Shard is primary"); + return Ok(()); + } + + if let Some(failed) = shard + .status + .iter() + .find(|s| s.code() == proto_gazette::consumer::replica_status::Code::Failed) + { + // Extract and format error messages cleanly + let errors: Vec<&str> = failed.errors.iter().map(|s| s.as_str()).collect(); + for (i, error) in errors.iter().enumerate() { + tracing::error!(%task_name, error_num = i + 1, error = %error, "Shard error"); + } + anyhow::bail!("shard {task_name} failed:\n{}", errors.join("\n")); + } + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::debug!(%stderr, "list-shards failed (shard may not exist yet)"); + } + + if std::time::Instant::now() > deadline { + tracing::error!(%task_name, "Timeout waiting for shard"); + anyhow::bail!("timeout waiting for {task_name} to become primary"); + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + /// Inject documents into a collection via HTTP ingest. + pub async fn inject_documents( + &self, + path: &str, + docs: impl IntoIterator, + ) -> anyhow::Result<()> { + let capture = self.capture_name().context("no capture in fixture")?; + + // Get shard endpoint from flowctl + let output = async_process::output(flowctl_command()?.args([ + "raw", + "list-shards", + "--task", + capture, + "-ojson", + ])) + .await?; + + let shard: proto_gazette::consumer::list_response::Shard = + serde_json::from_slice(&output.stdout)?; + + let route = shard.route.context("missing route")?; + let spec = shard.spec.context("missing spec")?; + let labels = spec.labels.context("missing labels")?; + + let endpoint = route.endpoints.first().context("no endpoints")?; + let hostname = labels + .labels + .iter() + .find(|l| l.name == "estuary.dev/hostname") + .map(|l| &l.value) + .context("no hostname label")?; + let port = labels + .labels + .iter() + .find(|l| l.name == "estuary.dev/expose-port") + .map(|l| &l.value) + .context("no port label")?; + + let base = endpoint.replace("https://", ""); + let url = format!("https://{hostname}-{port}.{base}/{path}"); + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + + let docs: Vec<_> = docs.into_iter().collect(); + tracing::info!(count = docs.len(), %path, "Injecting documents"); + + for doc in docs { + let resp = client + .post(&url) + .header("Content-Type", "application/json") + .json(&doc) + .send() + .await?; + + if !resp.status().is_success() { + anyhow::bail!( + "inject failed: {} {}", + resp.status(), + resp.text().await.unwrap_or_default() + ); + } + } + + Ok(()) + } + + /// Get Kafka connection info for external clients. + pub fn connection_info(&self) -> ConnectionInfo { + ConnectionInfo { + broker: std::env::var("DEKAF_BROKER").unwrap_or("localhost:9092".into()), + registry: std::env::var("DEKAF_REGISTRY").unwrap_or("http://localhost:9093".into()), + username: self.materialization_name().unwrap_or_default().to_string(), + collections: self.collection_names().map(String::from).collect(), + } + } + + /// Get the Dekaf auth token from the fixture's materialization config. + /// + /// Extracts the token from `materializations..endpoint.dekaf.config.token`. + pub fn dekaf_token(&self) -> anyhow::Result { + let mat_def = self + .catalog + .materializations + .values() + .next() + .context("no materialization in fixture")?; + + let dekaf_config = match &mat_def.endpoint { + models::MaterializationEndpoint::Dekaf(cfg) => cfg, + _ => anyhow::bail!("materialization endpoint is not Dekaf"), + }; + + let config: serde_json::Value = serde_json::from_str(dekaf_config.config.get())?; + config["token"] + .as_str() + .map(String::from) + .context("dekaf config missing token field") + } + + /// Create a high-level consumer connected to Dekaf. + /// + /// Uses the token from the fixture's materialization config. + pub fn kafka_consumer(&self) -> anyhow::Result { + let info = self.connection_info(); + let token = self.dekaf_token()?; + Ok(super::kafka::KafkaConsumer::new( + &info.broker, + &info.registry, + &info.username, + &token, + )) + } + + async fn publish_catalog(&self, catalog: &models::Catalog) -> anyhow::Result<()> { + let temp_file = tempfile::Builder::new().suffix(".json").tempfile()?; + std::fs::write(temp_file.path(), serde_json::to_string_pretty(catalog)?)?; + + let output = async_process::output(flowctl_command()?.args([ + "catalog", + "publish", + "--auto-approve", + "--source", + temp_file.path().to_str().unwrap(), + ])) + .await?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::error!(%stderr, "flowctl publish failed"); + anyhow::bail!("flowctl publish failed: {}", stderr); + } + + Ok(()) + } + + /// Disable the capture task by publishing with `shards.disable = true`. + pub async fn disable_capture(&self) -> anyhow::Result<()> { + let (capture_name, capture_def) = self + .catalog + .captures + .iter() + .next() + .context("no capture in fixture")?; + + tracing::info!(%capture_name, "Disabling capture"); + + let mut capture_def = capture_def.clone(); + capture_def.shards.disable = true; + + let catalog = models::Catalog { + captures: [(capture_name.clone(), capture_def)].into(), + ..Default::default() + }; + + self.publish_catalog(&catalog).await?; + tracing::info!(%capture_name, "Capture disabled"); + Ok(()) + } + + /// Re-enable the capture task by publishing with `shards.disable = false`. + /// + /// After enabling, use `wait_for_primary()` to wait for the capture to be ready. + pub async fn enable_capture(&self) -> anyhow::Result<()> { + let (capture_name, capture_def) = self + .catalog + .captures + .iter() + .next() + .context("no capture in fixture")?; + + tracing::info!(%capture_name, "Enabling capture"); + + let mut capture_def = capture_def.clone(); + capture_def.shards.disable = false; + + let catalog = models::Catalog { + captures: [(capture_name.clone(), capture_def)].into(), + ..Default::default() + }; + + self.publish_catalog(&catalog).await?; + tracing::info!(%capture_name, "Capture enabled"); + Ok(()) + } + + /// Reset all collections by publishing with `reset: true`. + /// + /// This increments the backfill counter for the collection, which Dekaf maps + /// to a new leader epoch. + /// + /// For proper reset behavior, you should: + /// 1. Disable the capture first (`disable_capture()`) + /// 2. Reset the collection(s) + /// 3. Re-enable the capture (`enable_capture()`) + /// 4. Wait for the capture to be primary (`wait_for_primary()`) + /// 5. Wait for Dekaf to pick up the new epoch (`wait_for_epoch_change()`) + pub async fn reset_collections(&self) -> anyhow::Result<()> { + let collections: BTreeMap = self + .catalog + .collections + .iter() + .map(|(k, v)| { + let mut coll_def = v.clone(); + coll_def.reset = true; + (k.clone(), coll_def) + }) + .collect(); + + let names: Vec<&str> = collections.keys().map(|k| k.as_ref()).collect(); + tracing::info!(?names, "Resetting collections"); + + let catalog = models::Catalog { + collections, + ..Default::default() + }; + + self.publish_catalog(&catalog).await + } + + /// Cleanup test specs synchronously. + fn cleanup_sync(&self) { + let cmd = match flowctl_command() { + Ok(mut cmd) => { + cmd.args([ + "catalog", + "delete", + "--prefix", + &self.namespace, + "--dangerous-auto-approve", + ]); + cmd + } + Err(e) => { + tracing::warn!(error = %e, namespace = %self.namespace, "Failed to create cleanup command"); + return; + } + }; + + let result = std::process::Command::new(cmd.get_program()) + .args(cmd.get_args()) + .envs(cmd.get_envs().filter_map(|(k, v)| v.map(|v| (k, v)))) + .output(); + + match result { + Ok(output) if output.status.success() => {} + Ok(output) => { + let stderr = String::from_utf8_lossy(&output.stderr); + tracing::warn!(namespace = %self.namespace, %stderr, "Test cleanup failed"); + } + Err(e) => { + tracing::warn!(namespace = %self.namespace, error = %e, "Test cleanup command failed"); + } + } + } +} + +impl Drop for DekafTestEnv { + fn drop(&mut self) { + self.cleanup_sync(); + } +} + +#[derive(Debug, Clone)] +pub struct ConnectionInfo { + pub broker: String, + pub registry: String, + pub username: String, + pub collections: Vec, +} + +/// Rewrite fixture names to include test namespace. +fn rewrite_catalog_names(namespace: &str, yaml: &str) -> anyhow::Result { + let yaml_value: serde_yaml::Value = serde_yaml::from_str(yaml)?; + let original: models::Catalog = serde_json::from_value(serde_json::to_value(&yaml_value)?)?; + + let prefix = |name: &str| format!("{namespace}/{name}"); + + // Rewrite collection names + let collections = original + .collections + .into_iter() + .map(|(name, def)| (models::Collection::new(prefix(name.as_ref())), def)) + .collect(); + + // Rewrite capture names and binding targets + let captures = original + .captures + .into_iter() + .map(|(name, mut def)| { + for binding in &mut def.bindings { + binding.target = models::Collection::new(prefix(binding.target.as_ref())); + } + (models::Capture::new(prefix(name.as_ref())), def) + }) + .collect(); + + // Rewrite materialization names and binding sources + let materializations = original + .materializations + .into_iter() + .map(|(name, mut def)| { + for binding in &mut def.bindings { + let old_collection = binding.source.collection(); + binding + .source + .set_collection(models::Collection::new(prefix(old_collection.as_ref()))); + } + (models::Materialization::new(prefix(name.as_ref())), def) + }) + .collect(); + + Ok(models::Catalog { + collections, + captures, + materializations, + ..Default::default() + }) +} diff --git a/crates/dekaf/tests/e2e/kafka.rs b/crates/dekaf/tests/e2e/kafka.rs new file mode 100644 index 00000000000..65ce772febb --- /dev/null +++ b/crates/dekaf/tests/e2e/kafka.rs @@ -0,0 +1,94 @@ +use anyhow::Context; +use futures::StreamExt; +use rdkafka::Message; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use schema_registry_converter::async_impl::avro::AvroDecoder; +use schema_registry_converter::async_impl::schema_registry::SrSettings; +use std::time::Duration; + +pub struct KafkaConsumer { + consumer: StreamConsumer, + decoder: AvroDecoder<'static>, +} + +#[derive(Debug)] +pub struct DecodedRecord { + pub topic: String, + pub partition: i32, + pub offset: i64, + pub key: serde_json::Value, + pub value: serde_json::Value, +} + +impl KafkaConsumer { + pub fn new(broker: &str, registry: &str, username: &str, password: &str) -> Self { + let consumer: StreamConsumer = rdkafka::ClientConfig::new() + .set("bootstrap.servers", broker) + .set("security.protocol", "SASL_PLAINTEXT") + .set("sasl.mechanism", "PLAIN") + .set("sasl.username", username) + .set("sasl.password", password) + .set("group.id", &format!("test-{}", uuid::Uuid::new_v4())) + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "earliest") + .create() + .expect("consumer creation failed"); + + let sr_settings = SrSettings::new_builder(registry.to_string()) + .set_basic_authorization(username, Some(password)) + .build() + .expect("schema registry settings failed"); + + let decoder = AvroDecoder::new(sr_settings); + + KafkaConsumer { consumer, decoder } + } + + pub fn subscribe(&self, topics: &[&str]) -> anyhow::Result<()> { + self.consumer.subscribe(topics)?; + Ok(()) + } + + /// Fetch all available records until no more arrive within the timeout. + pub async fn fetch(&self) -> anyhow::Result> { + const TIMEOUT: Duration = Duration::from_secs(10); + + let mut records = Vec::new(); + let mut stream = self.consumer.stream(); + + loop { + match tokio::time::timeout(TIMEOUT, stream.next()).await { + Ok(Some(Ok(msg))) => { + let key = self + .decoder + .decode(msg.key()) + .await + .context("failed to decode key")?; + let value = self + .decoder + .decode(msg.payload()) + .await + .context("failed to decode value")?; + + records.push(DecodedRecord { + topic: msg.topic().to_string(), + partition: msg.partition(), + offset: msg.offset(), + key: apache_avro::from_value(&key.value)?, + value: apache_avro::from_value(&value.value)?, + }); + } + Ok(Some(Err(e))) => return Err(e.into()), + Ok(None) => break, + Err(_) => break, // timeout, no more records available + } + } + + Ok(records) + } + + /// Get the inner consumer for advanced operations. + pub fn inner(&self) -> &StreamConsumer { + &self.consumer + } +} diff --git a/crates/dekaf/tests/e2e/list_offsets.rs b/crates/dekaf/tests/e2e/list_offsets.rs new file mode 100644 index 00000000000..c8caab309b4 --- /dev/null +++ b/crates/dekaf/tests/e2e/list_offsets.rs @@ -0,0 +1,153 @@ +use super::DekafTestEnv; +use super::raw_kafka::{TestKafkaClient, list_offsets_partition_error}; +use kafka_protocol::ResponseError; +use rdkafka::consumer::Consumer; +use serde_json::json; +use std::time::Duration; + +const BASIC_FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); + +/// Verify ListOffsets returns valid earliest (-2) and latest (-1) offsets. +#[tokio::test] +async fn test_list_offsets_earliest_and_latest() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("list_offsets_basic", BASIC_FIXTURE).await?; + + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "first"}), + json!({"id": "2", "value": "second"}), + json!({"id": "3", "value": "third"}), + ], + ) + .await?; + + let consumer = env.kafka_consumer()?; + + // fetch_watermarks internally uses ListOffsets with timestamp=-2 (earliest) + // and timestamp=-1 (latest). + let (low, high) = + consumer + .inner() + .fetch_watermarks("test_topic", 0, Duration::from_secs(10))?; + + assert!(low >= 0, "earliest offset should be >= 0, got {low}"); + assert!( + high > 0, + "latest offset should be > 0 after injecting docs, got {high}" + ); + assert!(high >= low, "latest ({high}) should be >= earliest ({low})"); + + // Inject more documents and verify latest offset advances + let high_before = high; + + env.inject_documents("data", vec![json!({"id": "4", "value": "fourth"})]) + .await?; + + tokio::time::sleep(Duration::from_millis(500)).await; + + let (_, high_after) = + consumer + .inner() + .fetch_watermarks("test_topic", 0, Duration::from_secs(10))?; + + assert!( + high_after > high_before, + "latest offset should advance after injecting more docs: before={high_before}, after={high_after}" + ); + + Ok(()) +} + +/// When a client requests offsets for a partition that doesn't exist, +/// Dekaf should return UnknownTopicOrPartition error code. +#[tokio::test] +async fn test_list_offsets_unknown_partition() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("list_offsets_unknown", BASIC_FIXTURE).await?; + + // Inject a document so the collection is in Ready state (journals exist) + env.inject_documents("data", vec![json!({"id": "1", "value": "test"})]) + .await?; + + let info = env.connection_info(); + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + let resp = client + .list_offsets_with_epoch("test_topic", 99, -1, 1) + .await?; + + let error_code = list_offsets_partition_error(&resp, "test_topic", 99) + .expect("partition should exist in ListOffsets response"); + + assert_eq!( + error_code, + ResponseError::UnknownTopicOrPartition.code(), + "expected UnknownTopicOrPartition error, got error code {error_code}" + ); + + Ok(()) +} + +/// Verify that multiple ListOffsets queries return consistent results. +/// +/// When no data changes between queries, offsets should remain stable. +#[tokio::test] +async fn test_list_offsets_multiple_queries() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("list_offsets_multi", BASIC_FIXTURE).await?; + + env.inject_documents( + "data", + vec![ + json!({"id": "1", "value": "a"}), + json!({"id": "2", "value": "b"}), + json!({"id": "3", "value": "c"}), + ], + ) + .await?; + + let consumer = env.kafka_consumer()?; + + let (baseline_low, baseline_high) = + consumer + .inner() + .fetch_watermarks("test_topic", 0, Duration::from_secs(10))?; + + assert!( + baseline_low >= 0, + "baseline: earliest should be >= 0, got {baseline_low}" + ); + assert!( + baseline_high > 0, + "baseline: latest should be > 0 after injecting docs, got {baseline_high}" + ); + assert!( + baseline_high >= baseline_low, + "baseline: latest ({baseline_high}) should be >= earliest ({baseline_low})" + ); + + // Make multiple watermark queries and verify offsets remain stable + for i in 1..5 { + let (low, high) = + consumer + .inner() + .fetch_watermarks("test_topic", 0, Duration::from_secs(10))?; + + assert_eq!( + low, baseline_low, + "iteration {i}: earliest offset changed (expected {baseline_low}, got {low})" + ); + assert_eq!( + high, baseline_high, + "iteration {i}: latest offset changed (expected {baseline_high}, got {high})" + ); + } + + Ok(()) +} diff --git a/crates/dekaf/tests/e2e/main.rs b/crates/dekaf/tests/e2e/main.rs new file mode 100644 index 00000000000..b809fc23ab4 --- /dev/null +++ b/crates/dekaf/tests/e2e/main.rs @@ -0,0 +1,11 @@ +mod harness; +pub mod kafka; +pub mod raw_kafka; + +mod basic; +mod collection_reset; +mod empty_fetch; +mod list_offsets; +mod not_ready; + +pub use harness::{ConnectionInfo, DekafTestEnv, init_tracing}; diff --git a/crates/dekaf/tests/e2e/not_ready.rs b/crates/dekaf/tests/e2e/not_ready.rs new file mode 100644 index 00000000000..2d551ba64a0 --- /dev/null +++ b/crates/dekaf/tests/e2e/not_ready.rs @@ -0,0 +1,163 @@ +use super::{ + DekafTestEnv, + raw_kafka::{TestKafkaClient, list_offsets_partition_error, metadata_leader_epoch}, +}; +use kafka_protocol::ResponseError; +use serde_json::json; +use std::time::Duration; + +const FIXTURE: &str = include_str!("fixtures/basic.flow.yaml"); +const SPEC_REFRESH_TIMEOUT: Duration = Duration::from_secs(30); + +/// Test that all partition-aware operations return LeaderNotAvailable when no journals exist. +/// +/// After a collection reset, there's a window where the new partition template exists +/// but the journal hasn't been created yet. Dekaf should return LeaderNotAvailable +/// for all operations that require partition data, signaling clients to retry. +#[tokio::test] +async fn test_all_operations_return_leader_not_available_when_no_journals() -> anyhow::Result<()> { + super::init_tracing(); + + let env = DekafTestEnv::setup("not_ready", FIXTURE).await?; + let info = env.connection_info(); + + env.inject_documents("data", vec![json!({"id": "1", "value": "initial"})]) + .await?; + + let token = env.dekaf_token()?; + let mut client = TestKafkaClient::connect(&info.broker, &info.username, &token).await?; + + // Verify all operations work initially + let metadata = client.metadata(&["test_topic"]).await?; + let initial_epoch = + metadata_leader_epoch(&metadata, "test_topic", 0).expect("should have epoch before reset"); + + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, initial_epoch) + .await?; + assert!( + list_offsets_partition_error(&list_resp, "test_topic", 0) == Some(0), + "ListOffsets should succeed before reset" + ); + + let fetch_resp = client + .fetch_with_epoch("test_topic", 0, 0, initial_epoch) + .await?; + assert!( + super::raw_kafka::fetch_partition_error(&fetch_resp, "test_topic", 0) == Some(0), + "Fetch should succeed before reset" + ); + + let epoch_resp = client + .offset_for_leader_epoch("test_topic", 0, initial_epoch) + .await?; + assert!( + super::raw_kafka::offset_for_epoch_result(&epoch_resp, "test_topic", 0) + .map_or(false, |r| r.error_code == 0), + "OffsetForLeaderEpoch should succeed before reset" + ); + + // Reset collection without injecting documents afterward - leaves journals uncreated + env.disable_capture().await?; + env.reset_collections().await?; + env.enable_capture().await?; + + let capture = env.capture_name().unwrap(); + env.wait_for_primary(capture).await?; + + // Poll until we enter the NotReady state + let deadline = std::time::Instant::now() + SPEC_REFRESH_TIMEOUT; + let mut observed_not_ready = false; + + while std::time::Instant::now() < deadline { + let metadata = client.metadata(&["test_topic"]).await?; + + let topic = metadata + .topics + .iter() + .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some("test_topic")); + + let Some(topic) = topic else { + tokio::time::sleep(Duration::from_millis(500)).await; + continue; + }; + + // Check if we're in NotReady state (metadata returns LeaderNotAvailable) + if topic.error_code == ResponseError::LeaderNotAvailable.code() { + observed_not_ready = true; + + // Verify ListOffsets also returns LeaderNotAvailable + let list_resp = client + .list_offsets_with_epoch("test_topic", 0, -1, -1) + .await?; + let list_error = list_offsets_partition_error(&list_resp, "test_topic", 0); + assert_eq!( + list_error, + Some(ResponseError::LeaderNotAvailable.code()), + "ListOffsets should return LeaderNotAvailable during NotReady state" + ); + + // Verify Fetch also returns LeaderNotAvailable + let fetch_resp = client.fetch_with_epoch("test_topic", 0, 0, -1).await?; + let fetch_error = super::raw_kafka::fetch_partition_error(&fetch_resp, "test_topic", 0); + assert_eq!( + fetch_error, + Some(ResponseError::LeaderNotAvailable.code()), + "Fetch should return LeaderNotAvailable during NotReady state" + ); + + // Verify OffsetForLeaderEpoch also returns LeaderNotAvailable + let epoch_resp = client.offset_for_leader_epoch("test_topic", 0, 1).await?; + let epoch_result = + super::raw_kafka::offset_for_epoch_result(&epoch_resp, "test_topic", 0); + assert!( + epoch_result.map_or(false, |r| r.error_code + == ResponseError::LeaderNotAvailable.code()), + "OffsetForLeaderEpoch should return LeaderNotAvailable during NotReady state" + ); + + break; + } + + // Check if journals were created before we could test + if let Some(new_epoch) = metadata_leader_epoch(&metadata, "test_topic", 0) { + if new_epoch > initial_epoch && !topic.partitions.is_empty() { + anyhow::bail!("Journals were created before we could test NotReady state"); + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } + + assert!( + observed_not_ready, + "Test never observed NotReady state within timeout - unable to verify LeaderNotAvailable behavior" + ); + + // Verify that after injecting a document (creating journals), operations work again + env.inject_documents("data", vec![json!({"id": "2", "value": "post-reset"})]) + .await?; + + // Poll until metadata returns successfully with partitions + let deadline = std::time::Instant::now() + SPEC_REFRESH_TIMEOUT; + loop { + let metadata = client.metadata(&["test_topic"]).await?; + let topic = metadata + .topics + .iter() + .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some("test_topic")); + + if let Some(topic) = topic { + if topic.error_code == 0 && !topic.partitions.is_empty() { + break; + } + } + + if std::time::Instant::now() > deadline { + anyhow::bail!("Timeout waiting for metadata to succeed after journal creation"); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Ok(()) +} diff --git a/crates/dekaf/tests/e2e/raw_kafka.rs b/crates/dekaf/tests/e2e/raw_kafka.rs new file mode 100644 index 00000000000..07706cb669d --- /dev/null +++ b/crates/dekaf/tests/e2e/raw_kafka.rs @@ -0,0 +1,211 @@ +use dekaf::{KafkaApiClient, KafkaClientAuth}; +use kafka_protocol::{messages, protocol::StrBytes}; + +/// Protocol versions to use for test requests. +mod protocol_versions { + pub const FETCH: i16 = 12; + pub const LIST_OFFSETS: i16 = 7; + pub const OFFSET_FOR_LEADER_EPOCH: i16 = 2; + pub const METADATA: i16 = 9; +} + +fn topic_name(s: &str) -> messages::TopicName { + messages::TopicName(StrBytes::from_string(s.to_string())) +} + +pub struct TestKafkaClient { + inner: KafkaApiClient, +} + +impl TestKafkaClient { + /// Connect to a Kafka broker with SASL PLAIN authentication. + /// + /// For Dekaf, `username` is the materialization name and `password` is the token. + pub async fn connect(broker: &str, username: &str, password: &str) -> anyhow::Result { + let auth = KafkaClientAuth::plain(username, password); + let inner = KafkaApiClient::connect(&[format!("tcp://{broker}")], auth).await?; + Ok(Self { inner }) + } + + /// Fetch with explicit `current_leader_epoch` to test epoch validation. + /// + /// When `leader_epoch` is less than the server's current epoch, Dekaf returns + /// `FENCED_LEADER_EPOCH`. When greater, it returns `UNKNOWN_LEADER_EPOCH`. + pub async fn fetch_with_epoch( + &mut self, + topic: &str, + partition: i32, + offset: i64, + leader_epoch: i32, + ) -> anyhow::Result { + let req = messages::FetchRequest::default() + .with_max_wait_ms(1000) + .with_min_bytes(1) + .with_max_bytes(1024 * 1024) + .with_topics(vec![ + messages::fetch_request::FetchTopic::default() + .with_topic(topic_name(topic)) + .with_partitions(vec![ + messages::fetch_request::FetchPartition::default() + .with_partition(partition) + .with_fetch_offset(offset) + .with_current_leader_epoch(leader_epoch) + .with_partition_max_bytes(1024 * 1024), + ]), + ]); + + let header = messages::RequestHeader::default() + .with_request_api_key(messages::ApiKey::Fetch as i16) + .with_request_api_version(protocol_versions::FETCH); + + self.inner.send_request(req, Some(header)).await + } + + /// ListOffsets with explicit `current_leader_epoch`. + /// + /// - `timestamp = -2`: earliest offset + /// - `timestamp = -1`: latest offset + pub async fn list_offsets_with_epoch( + &mut self, + topic: &str, + partition: i32, + timestamp: i64, + leader_epoch: i32, + ) -> anyhow::Result { + let req = messages::ListOffsetsRequest::default().with_topics(vec![ + messages::list_offsets_request::ListOffsetsTopic::default() + .with_name(topic_name(topic)) + .with_partitions(vec![ + messages::list_offsets_request::ListOffsetsPartition::default() + .with_partition_index(partition) + .with_timestamp(timestamp) + .with_current_leader_epoch(leader_epoch), + ]), + ]); + + let header = messages::RequestHeader::default() + .with_request_api_key(messages::ApiKey::ListOffsets as i16) + .with_request_api_version(protocol_versions::LIST_OFFSETS); + + self.inner.send_request(req, Some(header)).await + } + + pub async fn offset_for_leader_epoch( + &mut self, + topic: &str, + partition: i32, + leader_epoch: i32, + ) -> anyhow::Result { + let req = messages::OffsetForLeaderEpochRequest::default().with_topics(vec![ + messages::offset_for_leader_epoch_request::OffsetForLeaderTopic::default() + .with_topic(topic_name(topic)) + .with_partitions(vec![ + messages::offset_for_leader_epoch_request::OffsetForLeaderPartition::default() + .with_partition(partition) + .with_leader_epoch(leader_epoch), + ]), + ]); + + let header = messages::RequestHeader::default() + .with_request_api_key(messages::ApiKey::OffsetForLeaderEpoch as i16) + .with_request_api_version(protocol_versions::OFFSET_FOR_LEADER_EPOCH); + + self.inner.send_request(req, Some(header)).await + } + + pub async fn metadata( + &mut self, + topics: &[&str], + ) -> anyhow::Result { + let req = messages::MetadataRequest::default().with_topics(Some( + topics + .iter() + .map(|t| { + messages::metadata_request::MetadataRequestTopic::default() + .with_name(Some(topic_name(t))) + }) + .collect(), + )); + + let header = messages::RequestHeader::default() + .with_request_api_key(messages::ApiKey::Metadata as i16) + .with_request_api_version(protocol_versions::METADATA); + + self.inner.send_request(req, Some(header)).await + } +} + +/// Extract the error code from a FetchResponse for a specific topic/partition. +pub fn fetch_partition_error( + resp: &messages::FetchResponse, + topic: &str, + partition: i32, +) -> Option { + resp.responses + .iter() + .find(|t| t.topic.as_str() == topic) + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == partition)) + .map(|p| p.error_code) +} + +/// Extract the leader epoch from a FetchResponse's current_leader field. +pub fn fetch_current_leader_epoch( + resp: &messages::FetchResponse, + topic: &str, + partition: i32, +) -> Option { + resp.responses + .iter() + .find(|t| t.topic.as_str() == topic) + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == partition)) + .map(|p| p.current_leader.leader_epoch) +} + +/// Extract the error code from a ListOffsetsResponse for a specific topic/partition. +pub fn list_offsets_partition_error( + resp: &messages::ListOffsetsResponse, + topic: &str, + partition: i32, +) -> Option { + resp.topics + .iter() + .find(|t| t.name.as_str() == topic) + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == partition)) + .map(|p| p.error_code) +} + +/// Extract the leader epoch from a MetadataResponse for a specific topic/partition. +pub fn metadata_leader_epoch( + resp: &messages::MetadataResponse, + topic: &str, + partition: i32, +) -> Option { + resp.topics + .iter() + .find(|t| t.name.as_ref().map(|n| n.as_str()) == Some(topic)) + .and_then(|t| t.partitions.iter().find(|p| p.partition_index == partition)) + .map(|p| p.leader_epoch) +} +#[derive(Debug, Clone)] +pub struct OffsetForEpochResult { + pub error_code: i16, + pub leader_epoch: i32, + pub end_offset: i64, +} + +/// Extract OffsetForLeaderEpoch result for a specific topic/partition. +pub fn offset_for_epoch_result( + resp: &messages::OffsetForLeaderEpochResponse, + topic: &str, + partition: i32, +) -> Option { + resp.topics + .iter() + .find(|t| t.topic.as_str() == topic) + .and_then(|t| t.partitions.iter().find(|p| p.partition == partition)) + .map(|p| OffsetForEpochResult { + error_code: p.error_code, + leader_epoch: p.leader_epoch, + end_offset: p.end_offset, + }) +} diff --git a/crates/dekaf/tests/field_extraction_tests.rs b/crates/dekaf/tests/field_extraction_tests.rs index 14d100deca7..3abd3724af6 100644 --- a/crates/dekaf/tests/field_extraction_tests.rs +++ b/crates/dekaf/tests/field_extraction_tests.rs @@ -320,3 +320,34 @@ async fn test_number_or_string_format_number() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn test_field_selection_flow_document() -> anyhow::Result<()> { + let fixture_path = "tests/fixtures/field_selection_flow_document.yaml".to_string(); + let docs = vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })]; + + for output in roundtrip(fixture_path, serde_to_jsonl(docs)?.as_slice()).await? { + insta::assert_debug_snapshot!(output?); + } + + Ok(()) +} + +#[tokio::test] +async fn test_fields_not_required() -> anyhow::Result<()> { + let fixture_path = "tests/fixtures/fields_not_required.yaml".to_string(); + let docs = vec![json!({ + "key": "first" + // field_a intentionally omitted to test optional field handling + })]; + + for output in roundtrip(fixture_path, serde_to_jsonl(docs)?.as_slice()).await? { + insta::assert_debug_snapshot!(output?); + } + + Ok(()) +} diff --git a/crates/dekaf/tests/fixtures/field_selection_flow_document.yaml b/crates/dekaf/tests/fixtures/field_selection_flow_document.yaml new file mode 100644 index 00000000000..241c55a484e --- /dev/null +++ b/crates/dekaf/tests/fixtures/field_selection_flow_document.yaml @@ -0,0 +1,38 @@ +collections: + test/collection: + key: + - /key + schema: + properties: + key: + type: string + field_a: + type: string + field_b: + type: string + type: object + required: + - key + - field_a + - field_b +materializations: + test/materialization: + endpoint: + dekaf: + variant: foo + config: + deletions: kafka + token: "1234" + strict_topic_names: false + bindings: + - source: test/collection + resource: + topic_name: foo + fields: + include: + key: {} + flow_document: {} + exclude: + - field_a + - field_b + recommended: true diff --git a/crates/dekaf/tests/fixtures/fields_not_required.yaml b/crates/dekaf/tests/fixtures/fields_not_required.yaml new file mode 100644 index 00000000000..2a37338a989 --- /dev/null +++ b/crates/dekaf/tests/fixtures/fields_not_required.yaml @@ -0,0 +1,28 @@ +collections: + test/collection: + key: + - /key + schema: + properties: + key: + type: string + field_a: + type: string + type: object + required: + - key +materializations: + test/materialization: + endpoint: + dekaf: + variant: foo + config: + deletions: kafka + token: "1234" + strict_topic_names: false + bindings: + - source: test/collection + resource: + topic_name: foo + fields: + recommended: true diff --git a/crates/dekaf/tests/snapshots/field_extraction_tests__field_selection_flow_document.snap b/crates/dekaf/tests/snapshots/field_extraction_tests__field_selection_flow_document.snap new file mode 100644 index 00000000000..43762911d87 --- /dev/null +++ b/crates/dekaf/tests/snapshots/field_extraction_tests__field_selection_flow_document.snap @@ -0,0 +1,62 @@ +--- +source: crates/dekaf/tests/field_extraction_tests.rs +assertion_line: 334 +expression: output? +--- +Record( + [ + ( + "key", + String( + "first", + ), + ), + ( + "flow_published_at", + Union( + 0, + Record( + [ + ( + "json", + String( + "null", + ), + ), + ], + ), + ), + ), + ( + "flow_document", + Record( + [ + ( + "field_a", + String( + "foo", + ), + ), + ( + "field_b", + String( + "bar", + ), + ), + ( + "key", + String( + "first", + ), + ), + ( + "_flow_extra", + Map( + {}, + ), + ), + ], + ), + ), + ], +) diff --git a/crates/dekaf/tests/snapshots/field_extraction_tests__fields_not_required.snap b/crates/dekaf/tests/snapshots/field_extraction_tests__fields_not_required.snap new file mode 100644 index 00000000000..b8352cded7f --- /dev/null +++ b/crates/dekaf/tests/snapshots/field_extraction_tests__fields_not_required.snap @@ -0,0 +1,38 @@ +--- +source: crates/dekaf/tests/field_extraction_tests.rs +assertion_line: 349 +expression: output? +--- +Record( + [ + ( + "key", + String( + "first", + ), + ), + ( + "field_a", + Union( + 1, + Null, + ), + ), + ( + "flow_published_at", + Union( + 0, + Record( + [ + ( + "json", + String( + "null", + ), + ), + ], + ), + ), + ), + ], +) diff --git a/crates/flowctl/src/config.rs b/crates/flowctl/src/config.rs index aec0fcbcc8b..68865e02193 100644 --- a/crates/flowctl/src/config.rs +++ b/crates/flowctl/src/config.rs @@ -171,6 +171,13 @@ impl Config { config.user_refresh_token = Some(token); } } + + // Allow overriding access token via environment variable for CI/automation. + if let Ok(token) = std::env::var(FLOW_ACCESS_TOKEN) { + tracing::info!("using access token from environment variable {FLOW_ACCESS_TOKEN}"); + config.user_access_token = Some(token); + } + config.is_local = profile == "local"; Ok(config) @@ -226,3 +233,5 @@ impl Config { // Environment variable which is inspected for a base64-encoded refresh token. const FLOW_AUTH_TOKEN: &str = "FLOW_AUTH_TOKEN"; +// Environment variable which is inspected for an access token (for CI/automation). +const FLOW_ACCESS_TOKEN: &str = "FLOW_ACCESS_TOKEN"; diff --git a/local/systemd/flow-dekaf-kafka.service b/local/systemd/flow-dekaf-kafka.service new file mode 100644 index 00000000000..e85498c1bd5 --- /dev/null +++ b/local/systemd/flow-dekaf-kafka.service @@ -0,0 +1,35 @@ +[Unit] +Description=Flow Dekaf Kafka (upstream for consumer groups) +Documentation=https://github.com/estuary/flow + +[Service] +Type=simple +TimeoutStartSec=120 + +EnvironmentFile=%h/flow-local/env/common.env + +# Clean up any existing container, then start Kafka in KRaft mode +ExecStartPre=-/usr/bin/docker rm -f dekaf-test-kafka +ExecStart=/usr/bin/docker run --rm \ + --name dekaf-test-kafka \ + --network host \ + --tmpfs /var/lib/kafka/data:uid=1000,gid=1000 \ + -e KAFKA_NODE_ID=1 \ + -e KAFKA_PROCESS_ROLES=broker,controller \ + -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093 \ + -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:29092 \ + -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ + -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ + -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:29093 \ + -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ + -e KAFKA_LOG_DIRS=/var/lib/kafka/data \ + -e KAFKA_AUTO_CREATE_TOPICS_ENABLE=true \ + -e CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk \ + confluentinc/cp-kafka:7.5.0 + +ExecStop=/usr/bin/docker stop dekaf-test-kafka + +Restart=on-failure +RestartSec=5s diff --git a/local/systemd/flow-dekaf.service b/local/systemd/flow-dekaf.service new file mode 100644 index 00000000000..b8d9227f386 --- /dev/null +++ b/local/systemd/flow-dekaf.service @@ -0,0 +1,30 @@ +[Unit] +Description=Flow Dekaf +Documentation=https://github.com/estuary/flow +After=flow-control-plane.target flow-dekaf-kafka.service +Requires=flow-control-plane.target flow-dekaf-kafka.service + +[Service] +Type=simple +TimeoutStartSec=600 + +EnvironmentFile=%h/flow-local/env/common.env +EnvironmentFile=%h/flow-local/env/dekaf.env + +ExecStartPre=sh -c 'cd ${FLOW_ROOT} && cargo build -p dekaf' +ExecStart=%h/cargo-target/debug/dekaf \ + --local \ + --advertise-host 127.0.0.1 \ + --kafka-port ${DEKAF_KAFKA_PORT} \ + --schema-registry-port ${DEKAF_REGISTRY_PORT} \ + --metrics-port ${DEKAF_METRICS_PORT} \ + --default-broker-urls tcp://localhost:${DEKAF_UPSTREAM_PORT} \ + --upstream-auth none \ + --encryption-secret ${DEKAF_ENCRYPTION_SECRET} \ + --data-plane-access-key ${DATA_PLANE_ACCESS_KEY} \ + --task-refresh-interval 10s \ + --spec-ttl 10s \ + --idle-session-timeout 60s + +Restart=on-failure +RestartSec=5s diff --git a/mise/README.md b/mise/README.md index 0e376d745b3..cb22506dca5 100644 --- a/mise/README.md +++ b/mise/README.md @@ -236,13 +236,16 @@ mise tasks | `build:flowctl-go` | Build flowctl-go binary | ### Local Stack Tasks -| Task | Description | -|------|-------------| -| `local:stack` | Start full control plane + data plane | -| `local:control-plane` | Start control plane only | -| `local:data-plane ` | Start a data plane | -| `local:supabase` | Start Supabase only | -| `local:stop` | Stop all services and clean up | + +| Task | Description | +| -------------------------------- | ------------------------------------- | +| `local:stack` | Start full control plane + data plane | +| `local:control-plane` | Start control plane only | +| `local:data-plane ` | Start a data plane | +| `local:supabase` | Start Supabase only | +| `local:stop` | Stop all services and clean up | +| `local:dekaf` | Start Dekaf against local stack | +| `local:dekaf-kafka` | Start local Kafka for Dekaf testing | ### CI Tasks | Task | Description | @@ -252,6 +255,7 @@ mise tasks | `ci:sql-tap` | Run pgTAP SQL tests | | `ci:nextest-run` | Run Rust tests via nextest | | `ci:gotest` | Run Go tests | +| `ci:dekaf-e2e` | Run Dekaf E2E tests | ### VM Tasks | Task | Description | diff --git a/mise/tasks/ci/dekaf-e2e b/mise/tasks/ci/dekaf-e2e new file mode 100755 index 00000000000..e1b1a0a27e7 --- /dev/null +++ b/mise/tasks/ci/dekaf-e2e @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +set -euo pipefail + +#MISE description="Run Dekaf E2E tests" +#MISE depends=["local:test-tenant", "local:dekaf"] +#USAGE flag "--filter " help="Test filter pattern for nextest" +#USAGE flag "--update-snapshots" help="Update insta snapshots" +#USAGE flag "--concurrency " help="How many tests to run at once" + + +FILTER="${usage_filter:-}" +UPDATE_SNAPSHOTS="${usage_update_snapshots:-false}" +CONCURRENCY="${usage_concurrency:-"num-cpus"}" + +FLOW_LOCAL="${HOME}/flow-local" +CARGO_TARGET_DIR="${CARGO_TARGET_DIR:-${HOME}/cargo-target}" +KAFKA_PORT=9092 +REGISTRY_PORT=9093 + + +# Wait for agent to be ready by checking if it's listening +echo "Waiting for agent..." +TIMEOUT=120 +ELAPSED=0 +until nc -z localhost 8675 2>/dev/null; do + sleep 1 + ELAPSED=$((ELAPSED + 1)) + if [ "${ELAPSED}" -ge "${TIMEOUT}" ]; then + echo "Error: Agent failed to start within ${TIMEOUT} seconds" + exit 1 + fi +done +echo "Agent is ready" + +# Build Dekaf and flowctl +echo "Building Dekaf and flowctl..." +cargo build -p dekaf -p flowctl --quiet + +# Wait for Dekaf to be ready (started by local:dekaf dependency) +echo "Waiting for Dekaf..." +TIMEOUT=30 +ELAPSED=0 +until nc -z localhost ${KAFKA_PORT} 2>/dev/null; do + sleep 1 + ELAPSED=$((ELAPSED + 1)) + if [ "${ELAPSED}" -ge "${TIMEOUT}" ]; then + echo "Error: Dekaf failed to start within ${TIMEOUT} seconds" + exit 1 + fi +done + +echo "Running Dekaf E2E tests..." + +export DEKAF_BROKER="localhost:${KAFKA_PORT}" +export DEKAF_REGISTRY="http://localhost:${REGISTRY_PORT}" +export SSL_CERT_FILE="${FLOW_LOCAL}/ca.crt" + +# A token for the local-stack system user signed against the local-stack +# supabase secret (super-secret-jwt-token-with-at-least-32-characters-long). +export FLOW_ACCESS_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJodHRwOi8vMTI3LjAuMC4xOjU0MzEvYXV0aC92MSIsInN1YiI6ImZmZmZmZmZmLWZmZmYtZmZmZi1mZmZmLWZmZmZmZmZmZmZmZiIsImF1ZCI6ImF1dGhlbnRpY2F0ZWQiLCJleHAiOjI3MDAwMDAwMDAsImlhdCI6MTcwMDAwMDAwMCwiZW1haWwiOiJzdXBwb3J0QGVzdHVhcnkuZGV2Iiwicm9sZSI6ImF1dGhlbnRpY2F0ZWQiLCJpc19hbm9ueW1vdXMiOmZhbHNlfQ.Nb-N4s_YnObBHGivSTe_8FEniVUUpehzrRkF5JgNWWU" + +TEST_EXIT_CODE=0 +if [ -n "${FILTER}" ]; then + PATH="${CARGO_TARGET_DIR}/debug:${PATH}" \ + cargo nextest run --profile dekaf-e2e -p dekaf --no-capture --failure-output=immediate --success-output=immediate -E "test(${FILTER})" || TEST_EXIT_CODE=$? +else + PATH="${CARGO_TARGET_DIR}/debug:${PATH}" \ + cargo nextest run --profile dekaf-e2e -p dekaf -j "${CONCURRENCY}" --no-fail-fast --retries=1 || TEST_EXIT_CODE=$? +fi + +# On failure, output service logs for debugging +if [ "${TEST_EXIT_CODE}" -ne 0 ]; then + echo "" + echo "========================================" + echo "Tests failed! Dumping service logs..." + echo "========================================" + echo "" + echo "--- Dekaf logs ---" + journalctl --user -u flow-dekaf --no-pager -n 200 || true + echo "" + echo "--- Agent logs ---" + journalctl --user -u flow-control-agent --no-pager -n 200 || true + echo "" + exit "${TEST_EXIT_CODE}" +fi diff --git a/mise/tasks/local/dekaf b/mise/tasks/local/dekaf new file mode 100755 index 00000000000..67f7685d92c --- /dev/null +++ b/mise/tasks/local/dekaf @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +set -euo pipefail + +#MISE description="Start Dekaf against local stack" +#MISE depends=["local:dekaf-kafka", "local:tls-cert"] + +FLOW_LOCAL="${HOME}/flow-local" + +# Generate Dekaf environment file +# The data plane access key must match what's in the data_planes table +# For local-cluster: echo -n "key-local-cluster" | base64 +DATA_PLANE_ACCESS_KEY=$(echo -n "key-local-cluster" | base64 | tr -d '\n') + +cat > "${FLOW_LOCAL}/env/dekaf.env" < /dev/null 2>&1; do + sleep 1 + ELAPSED=$((ELAPSED + 1)) + if [ "${ELAPSED}" -ge "${TIMEOUT}" ]; then + echo "Error: Kafka failed to start within ${TIMEOUT} seconds" + journalctl --user -u flow-dekaf-kafka --no-pager -n 50 + exit 1 + fi +done diff --git a/mise/tasks/local/test-tenant b/mise/tasks/local/test-tenant new file mode 100755 index 00000000000..2013e47e480 --- /dev/null +++ b/mise/tasks/local/test-tenant @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +set -euo pipefail + +#MISE description="Provision test/ tenant for integration tests" + +PGURL="postgresql://postgres:postgres@localhost:5432/postgres" +DATA_PLANE_NAME="ops/dp/public/local-cluster" + +echo "Provisioning test/ tenant..." + +psql "${PGURL}" -c " +INSERT INTO public.storage_mappings (catalog_prefix, spec) +VALUES + ('test/', '{\"stores\": [{\"provider\": \"GCS\", \"bucket\": \"estuary-trial\", \"prefix\": \"collection-data/\"}], \"data_planes\": [\"${DATA_PLANE_NAME}\"]}'), + ('recovery/test/', '{\"stores\": [{\"provider\": \"GCS\", \"bucket\": \"estuary-trial\"}]}') +ON CONFLICT (catalog_prefix) DO NOTHING; +" + +# Grant admin access to support@estuary.dev for test/ prefix +psql "${PGURL}" -c " +INSERT INTO public.user_grants (user_id, object_role, capability) +VALUES ('ffffffff-ffff-ffff-ffff-ffffffffffff', 'test/', 'admin') +ON CONFLICT (user_id, object_role) DO NOTHING; +" + +# Grant role permissions so tasks within test/ can read/write to each other +psql "${PGURL}" -c " +INSERT INTO public.role_grants (subject_role, object_role, capability) +VALUES ('test/', 'test/', 'admin') +ON CONFLICT (subject_role, object_role) DO NOTHING; +" diff --git a/supabase/migrations/20260121180000_grant_dekaf_to_authenticated.sql b/supabase/migrations/20260121180000_grant_dekaf_to_authenticated.sql new file mode 100644 index 00000000000..31300906ef9 --- /dev/null +++ b/supabase/migrations/20260121180000_grant_dekaf_to_authenticated.sql @@ -0,0 +1,7 @@ +begin; + +-- These got dropped from the most recent migration rollup +grant dekaf to authenticator; +alter role dekaf nologin bypassrls; + +commit;