Skip to content

Conversation

@jshearer
Copy link
Contributor

@jshearer jshearer commented Dec 18, 2025

Summary

  • E2E test framework:
    • publishes test specs into namespaced prefixes
    • uses source-http-ingest to inject documents
    • uses existing Dekaf Kafka API client to interact with low-level Kafka APIs
    • uses rdkafka Rust crate to interact with Dekaf via high-level librdkafka consumer
  • During testing I found some poorly specified behavior around collections with missing journals, so I added CollectionStatus in order to return a retryable error to consumers and reduce connection churn.
  • I added --spec-ttl to allow tests to shorten the TaskManager spec refresh cycle and speed up tests that depend on changes to specs.
  • A few changes to KafkaApiClient:
    • Plaintext connections are supported via tcp:// URL scheme
    • SASL PLAIN authentication support
    • --upstream-auth=none flag to skip authenticating to upstream Kafka broker if it doesn't require auth (such as when running tests)
  • Added Dekaf to CI:
    • dekaf-test job in platform-test.yaml
    • Exclude Dekaf tests in default nextest profile as they require things like a local Kafka broker to be running as a prerequisite. Add dekaf-e2e profile to run e2e tests explicitly
    • Mise tasks (local:dekaf, local:dekaf-kafka, local:test-tenant) for local Dekaf + Kafka services, provision test/ tenant etc

Test scenarios

Test file Scenarios
collection_reset.rs Collection-reset behaviors such as dealing with leader epochs in Metadata, ListOffsets, Fetch, FENCED_LEADER_EPOCH, UNKNOWN_LEADER_EPOCH, OffsetForLeaderEpoch, etc
not_ready.rs LeaderNotAvailable when journals don't exist
list_offsets.rs Earliest/latest offset queries, unknown partition handling
empty_fetch.rs Empty fetch responses don't break subsequent fetches
basic.rs Document roundtrip

I intentionally left out more complex tests from here such as higher volume load testing, testing with other high-level consumers such as sarama, franz-go, etc. This PR is already getting fairly large, and I wanted to get it reviewed and out the door rather than add more heft to it.


Additional PRs adding more E2E tests based ontop of this branch:

Note: Some of those PRs are targeting master instead of dekaf/collection_reset_with_e2e_tests in order to get CI to run the tests. Once this PR is merged I'll mark them ready for review

@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 4 times, most recently from 6490a0d to 8c53dd4 Compare December 19, 2025 16:06
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 9 times, most recently from b44dbee to 0aabc2e Compare December 29, 2025 20:30
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from 3663f1c to d0c51a8 Compare December 29, 2025 22:00
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 6 times, most recently from c291e83 to 134f474 Compare January 5, 2026 23:17
@jshearer jshearer changed the title WIP: Dekaf collection reset with e2e tests dekaf: e2e testing Jan 6, 2026
@jshearer jshearer self-assigned this Jan 6, 2026
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 2 times, most recently from 6ae3375 to a06a0a3 Compare January 6, 2026 18:20
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from a06a0a3 to d86adf3 Compare January 6, 2026 18:53
@jshearer jshearer marked this pull request as ready for review January 6, 2026 18:57
@jshearer jshearer requested a review from a team January 6, 2026 18:58
@jshearer jshearer requested a review from jgraettinger January 6, 2026 21:08
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 5 times, most recently from e1c6baf to e819733 Compare January 8, 2026 16:11
@jshearer jshearer requested a review from psFried January 8, 2026 17:35
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from e819733 to 7823aab Compare January 8, 2026 20:14
@jshearer jshearer changed the title dekaf: e2e testing dekaf: Implement e2e testing framework Jan 8, 2026
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Nice job and great to see this level of test coverage. A few comments below for discussion but nothing big.

anyhow::bail!("Collection '{}' not found or not accessible", name)
}
CollectionStatus::NotReady => {
// Collection exists but journals aren't available - return LeaderNotAvailable so clients will retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume you thought about this carefully, but: the "obvious" approach would represent such a collection as a topic with zero partitions. That breaks at a protocol level?

Or, it looks like we previously returned a single partition. I'm guessing this was a placeholder which was semantically empty, and then popped into actual existence with the first created journal? It looks like you junked this tactic to make the E2E tests work better (or were there other reasons, too?).

FYI a sentinel partition approach would fit well with the tokens work and the task-manager sketch, which re-frames journal listing as a long-lived watch. Such a sentinel could block awaiting a journal listing update, be notified immediately when it's ready, and then transition to doing actual reads.

Copy link
Contributor Author

@jshearer jshearer Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it from returning a single partition because consumers would then try to ListOffsets/Fetch from that partition and get UnknownTopicOrPartition, which is a permanent error, so they wouldn't retry. In practice they (usually) eventually do retry when the application logic times out and restarts the consumer, but it felt smelly to have that in the tests.

I tried returning no partitions instead, but the consumer also treated that as a "permanent" situation for the lifetime of the session. LeaderNotAvailable is supposed to be a transient condition and it causes the consumer to retry according to its retry policy.

return Ok(OffsetForLeaderTopicResult::default()
.with_topic(topic.topic)
.with_partitions(partitions));
let collection = match Collection::new(auth, &collection_name).await? {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quantity of new code, here, matching on CollectionStatus is a little alarming / a smell. Do these all truly need to be such distinct cases? Is there no factoring that could be applied, to collapse some of the commonality of these paths?

Nothing here looks incorrect, per se, but the change is violating my mental yardstick re: how much churn such a change should result in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, agreed that the size of the diff is big. Some of it is real behavior improvements that I noticed during testing, for example metadata now returns topic-specific errors instead of failing the whole request, the group management APIs now validate that collections exist and are available whereas before they would let you interact with groups (join/sync/commit offset etc) with topics that didn't map to any extant collection at all, etc.

Still, I took another pass over it and ended up moving the Kafka error code matching logic to a couple of helpers on CollectionStatus. So for example instead of having the error codes spread throughout session, like this:

let error_code = match status {
CollectionStatus::Ready(_) => continue,
CollectionStatus::NotFound => {
tracing::warn!(topic = ?topic_name, "Collection not found");
ResponseError::UnknownTopicOrPartition.code()
}
CollectionStatus::NotReady => {
tracing::warn!(
topic = ?topic_name,
"Collection exists but has no journals available"
);
ResponseError::LeaderNotAvailable.code()
}
};

You can instead just do this:

let Some(error_code) = status.error_code() else {
continue;
};


- uses: mozilla-actions/sccache-action@v0.0.9
- run: echo 'SCCACHE_GHA_ENABLED=true' >> $GITHUB_ENV
- run: mise run build:rocksdb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Break this out into a separate workflow, dekaf-test.yaml ?

Also please add a mise task ci:dekaf-test paralleling ci:platform-test so that there's a one-shot way to run these in a dev VM.

Copy link
Contributor Author

@jshearer jshearer Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I broke the gh action into a separate file. WRT ci:dekaf-test, do you mean something other than ci:dekaf-e2e?

@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 5 times, most recently from 8630748 to fe70d7b Compare January 21, 2026 22:35
Dekaf previously required TLS and MSK IAM authentication for all upstream
Kafka connections, making local development and testing difficult. This adds
support for plaintext connections via URL scheme detection:

* `tcp://host:port` connects without TLS, `tls://host:port` uses TLS (default)
* `--upstream-auth=none` flag skips SASL authentication entirely
* `KafkaClientAuth::from_msk_region(None)` creates no-auth mode

Example local usage:
  dekaf --default-broker-urls tcp://localhost:29092 --upstream-auth=none ...
…ka errors

It's possible for a collection to exist in the control plane without having
any extant journals. This can happen either when the capture task is failing or
hasn't emitted any documents, and more frequently during a collection reset.
Previously, Dekaf treated this the same as a missing collection, causing
consumers to receive non-retryable errors or inconsistent behavior.

Introduces `CollectionStatus` enum to distinguish three states:

* `Ready`: binding exists and journals are available
* `NotFound`: binding doesn't exist in the materialization spec
* `NotReady`: binding exists but journals aren't available yet

For `NotReady`, we'll use `LeaderNotAvailable` (a retryable error) to cause
consumers to retry with backoff until the journals become available.
They will eventually give up.
This is mainly for e2e tests so we can set a low TTL and
avoid waiting around for too long for changes to propagate.
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch 2 times, most recently from 76e90af to 00c17dc Compare January 21, 2026 23:28
* Run Dekaf e2e tests as separate step because `nexttest-run` messes with local stack state
* Make `local:data-plane` idempotent
* `ci:dekaf-e2e` now assumes `local:stack` etc are up rather than explicitly depending on it
* mise: log systemd output if failure
* mise: also log agent logs on failure
* nexttest: exclude e2e tests by default, and run them with
`--profile dekaf-e2e` instead
@jshearer jshearer force-pushed the dekaf/collection_reset_with_e2e_tests branch from 00c17dc to 40d0bd5 Compare January 21, 2026 23:29
@jshearer jshearer requested a review from jgraettinger January 21, 2026 23:29
Copy link
Member

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🎉 Nice work on the tests. This looks like a huge improvement


let mut state_buf = BufWriter::new(Vec::new());
let mut state = session.step(None, &mut state_buf)?;
// Flush the BufWriter to ensure all data is written to the underlying Vec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm supposing maybe into_inner() does a flush internally? Either way, this comment seems a bit confusing, as it says "flush" but then does not call flush

)
}

if !state.is_running() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why do we need to repeat this check here? Seems maybe worthy of a comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants