Skip to content

Conversation

@jshearer
Copy link
Contributor

@jshearer jshearer commented Jan 20, 2026

This PR adds test coverage for Dekaf's handling of cross-dataplane migrations.

Previously, the convention was that Dekaf addresses live at dekaf.{plane_fqdn}:9092. In order to test migration locally, I had to manually and temporarily change this logic. In order to support automated testing of migrations, we needed a first-party way to represent the address of a data-plane's Dekaf instance. We already have reactor_address and broker_address in the data_planes table, so I added dekaf_address and dekaf_registry_address.

One wrinkle is that unlike reactors and brokers, data planes do not always run Dekaf. In order to handle this properly, I updated data-plane-controller to actively manage the dekaf_address and dekaf_registry_address columns, such that they're only set if Dekaf is actually deployed in that plane, otherwise they're null. Dekaf will now emit a helpful error message when attempting to serve a redirect to a data-plane that does not have an instance deployed.

Note: As it turns out, our mapping of redirects onto the Kafka protocol isn't perfect. Specifically, since we reuse the same broker ID for the redirect target:

// If the session needs to be redirected, this causes the consumer to
// connect to the correct Dekaf instance by advertising it as the
// only broker in the response. Otherwise advertise ourselves as the broker.
let broker = if let Some((broker_host, broker_port)) = self.get_redirect_address().await? {
MetadataResponseBroker::default()
.with_node_id(messages::BrokerId(1))
.with_host(StrBytes::from_string(broker_host))
.with_port(broker_port)
} else {
MetadataResponseBroker::default()
.with_node_id(messages::BrokerId(1))
.with_host(StrBytes::from_string(self.app.advertise_host.clone()))
.with_port(self.app.advertise_kafka_port as i32)
};

librdkafka gets confused and shuts down its session rather than smoothly connecting to the newly advertised broker. The subsequent connection only sees the new broker and connects to it properly, but I believe this is the source of the periodic error logging we've been seeing. Fixing it is out of scope here, but it involves a mechanism for assigning each data-plane's Dekaf instance(s) a stable numeric identifier that we can use as the broker ID.

Note: I discovered a race condition in Gazette while working on these tests that causes them to have an unfortunately high upper bound on runtime: about 8 minutes.

Briefly, when a journal is migrated, one of the steps is suspending it in the source data-plane. When a journal is suspended, two things happen: its un-persisted fragments are scheduled for immediate upload, and its replication factor is reduced from 3 to 1. The race condition is: if the broker that was the primary for the journal being suspended ends up being picked as the sole remaining broker in the topology, the fragment persistence happens promptly. Otherwise, it falls back to the backup fragment upload pathway which takes up to 3 minutes.

If all of the necessary fragments aren't present when the journal is then published to the destination data-plane, we have to wait out the 5 minute default fragment refresh interval before they get picked up. Hence the 8 minute upper bound on runtime.

The following output is with a temporarily modified faster fragment refresh interval:

 Nextest run ID db351aec-39d7-4872-a37e-e4ef30c2f1fb with nextest profile: dekaf-e2e
    Starting 2 tests across 1 binary (14 tests and 3 binaries skipped)
        SLOW [> 60.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        SLOW [> 60.000s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>120.000s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>120.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        PASS [ 170.702s] dekaf::e2e migration::test_migration_protocol_responses
        SLOW [>180.000s] dekaf::e2e migration::test_rdkafka_handles_redirect
        PASS [ 187.367s] dekaf::e2e migration::test_rdkafka_handles_redirect

@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch 3 times, most recently from aa912fd to 43f4f43 Compare January 21, 2026 15:22
Dekaf previously required TLS and MSK IAM authentication for all upstream
Kafka connections, making local development and testing difficult. This adds
support for plaintext connections via URL scheme detection:

* `tcp://host:port` connects without TLS, `tls://host:port` uses TLS (default)
* `--upstream-auth=none` flag skips SASL authentication entirely
* `KafkaClientAuth::from_msk_region(None)` creates no-auth mode

Example local usage:
  dekaf --default-broker-urls tcp://localhost:29092 --upstream-auth=none ...
…ka errors

It's possible for a collection to exist in the control plane without having
any extant journals. This can happen either when the capture task is failing or
hasn't emitted any documents, and more frequently during a collection reset.
Previously, Dekaf treated this the same as a missing collection, causing
consumers to receive non-retryable errors or inconsistent behavior.

Introduces `CollectionStatus` enum to distinguish three states:

* `Ready`: binding exists and journals are available
* `NotFound`: binding doesn't exist in the materialization spec
* `NotReady`: binding exists but journals aren't available yet

For `NotReady`, we'll use `LeaderNotAvailable` (a retryable error) to cause
consumers to retry with backoff until the journals become available.
They will eventually give up.
This is mainly for e2e tests so we can set a low TTL and
avoid waiting around for too long for changes to propagate.
* Run Dekaf e2e tests as separate step because `nexttest-run` messes with local stack state
* Make `local:data-plane` idempotent
* `ci:dekaf-e2e` now assumes `local:stack` etc are up rather than explicitly depending on it
* mise: log systemd output if failure
* mise: also log agent logs on failure
* nexttest: exclude e2e tests by default, and run them with
`--profile dekaf-e2e` instead
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch 3 times, most recently from 585fc03 to 1fd7e66 Compare January 21, 2026 16:15
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from 7823aab to 109edbf Compare January 21, 2026 16:15
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from 1fd7e66 to cc80924 Compare January 21, 2026 16:58
@jshearer jshearer changed the base branch from dekaf/collection_reset_with_e2e_tests to master January 21, 2026 17:23
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from cc80924 to a1ded99 Compare January 21, 2026 17:24
@jshearer jshearer force-pushed the dekaf/migrations_e2e_test branch from a1ded99 to bd16cf6 Compare January 21, 2026 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants