Skip to content

Bugfix/24646 kafka connect source flush#2

Open
harangozop wants to merge 4 commits intomasterfrom
bugfix/24646-kafka-connect-source-flush
Open

Bugfix/24646 kafka connect source flush#2
harangozop wants to merge 4 commits intomasterfrom
bugfix/24646-kafka-connect-source-flush

Conversation

@harangozop
Copy link
Owner

Fixes apache#24646

Motivation

When using Kafka Connect source connectors with transform filters that drop records (e.g., Debezium SMT Filter or Kafka Connect Filter + Predicate), the Pulsar Kafka Connect source can hang if a polled batch contains only records that are filtered out.
In this scenario, no records are emitted and the source waits indefinitely for an offset flush that never occurs.

This change ensures that batches are finalized even when all records are filtered: when the last record in a batch is processed and none were emitted/acked, we still trigger and complete the offset flush (treating “nothing to flush” as success). This prevents read() from blocking and allows the source to continue polling subsequent data.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration test that expects a flush when a filter is dropping every message in a given batch.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

…thread safety

Added more test cases to cover:
- partially acked -> shouldn't flush
- all acked -> should flush
- no ack -> shouldn't flush
…r into a helper method for better readability
@harangozop harangozop force-pushed the bugfix/24646-kafka-connect-source-flush branch from 9d8facc to d9517c7 Compare September 11, 2025 17:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Pulsar Kafka Connect adaptor: read() blocks indefinitely if all records in a batch are filtered

1 participant