Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test to cover this new behavior?

From a quick search maybe reuse or similar to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,9 @@
*/
class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {

// Track last successfully committed offsets to suppress no-op commits for idle partitions.
private final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can also track last commit time per partition? We could try to commit if idle for more than some time (10 minutes for example).

I think this could also help for cases where customers may use time lag monitoring (tracking time since last commit).


///////////////////// Reader API ////////////////////////////////////////////////////////////
@SuppressWarnings("FutureReturnValueIgnored")
@Override
Expand Down Expand Up @@ -611,37 +615,61 @@ private void consumerPollLoop() {

private void commitCheckpointMark() {
KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);
if (checkpointMark != null) {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
Instant now = Instant.now();
if (checkpointMark == null) {
return;
}

try {
consumer.commitSync(
checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} catch (Exception e) {
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
if (now.isAfter(nextAllowedCommitFailLogTime)) {
LOG.warn(
String.format(
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
e);
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} else {
LOG.info(
String.format(
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
this, checkpointMark),
e);
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);
Instant now = Instant.now();

try {
// Commit only partitions whose offsets have advanced since the last successful commit
// for this reader. This suppresses no-op commits for idle partitions.
Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
for (KafkaCheckpointMark.PartitionMark p : checkpointMark.getPartitions()) {
long next = p.getNextOffset();
if (next == UNINITIALIZED_OFFSET) {
continue;
}

TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition());
Long prev = lastCommittedOffsets.get(tp);

if (prev == null || next > prev) {
toCommit.put(tp, new OffsetAndMetadata(next));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of this change. Can we keep the existing java streams logic, but simply add a new filter step?

}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could we add a debug log for # idle partitions?

if (toCommit.isEmpty()) {
// Nothing advanced since last successful commit; avoid noisy commitSync().
return;
}

consumer.commitSync(toCommit);

// Only update after a successful commit.
for (Map.Entry<TopicPartition, OffsetAndMetadata> e : toCommit.entrySet()) {
lastCommittedOffsets.put(e.getKey(), e.getValue().offset());
}

nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} catch (Exception e) {
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
if (now.isAfter(nextAllowedCommitFailLogTime)) {
LOG.warn(
String.format(
"%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s",
this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark),
e);
nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL);
} else {
LOG.info(
String.format(
"%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s",
this, checkpointMark),
e);
}
}
}
Expand Down
Loading