diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 866dfd487108..56bb057a4af7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -79,6 +80,9 @@ */ class KafkaUnboundedReader extends UnboundedReader> { + // Track last successfully committed offsets to suppress no-op commits for idle partitions. + private final Map lastCommittedOffsets = new HashMap<>(); + ///////////////////// Reader API //////////////////////////////////////////////////////////// @SuppressWarnings("FutureReturnValueIgnored") @Override @@ -611,37 +615,61 @@ private void consumerPollLoop() { private void commitCheckpointMark() { KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null); - if (checkpointMark != null) { - LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); - Consumer consumer = Preconditions.checkStateNotNull(this.consumer); - Instant now = Instant.now(); + if (checkpointMark == null) { + return; + } - try { - consumer.commitSync( - checkpointMark.getPartitions().stream() - .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET) - .collect( - Collectors.toMap( - p -> new TopicPartition(p.getTopic(), p.getPartition()), - p -> new OffsetAndMetadata(p.getNextOffset())))); - nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); - } catch (Exception e) { - // Log but ignore the exception. Committing consumer offsets to Kafka is not critical for - // KafkaIO because it relies on the offsets stored in KafkaCheckpointMark. - if (now.isAfter(nextAllowedCommitFailLogTime)) { - LOG.warn( - String.format( - "%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s", - this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark), - e); - nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); - } else { - LOG.info( - String.format( - "%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s", - this, checkpointMark), - e); + LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark); + Consumer consumer = Preconditions.checkStateNotNull(this.consumer); + Instant now = Instant.now(); + + try { + // Commit only partitions whose offsets have advanced since the last successful commit + // for this reader. This suppresses no-op commits for idle partitions. + Map toCommit = new HashMap<>(); + for (KafkaCheckpointMark.PartitionMark p : checkpointMark.getPartitions()) { + long next = p.getNextOffset(); + if (next == UNINITIALIZED_OFFSET) { + continue; } + + TopicPartition tp = new TopicPartition(p.getTopic(), p.getPartition()); + Long prev = lastCommittedOffsets.get(tp); + + if (prev == null || next > prev) { + toCommit.put(tp, new OffsetAndMetadata(next)); + } + } + + if (toCommit.isEmpty()) { + // Nothing advanced since last successful commit; avoid noisy commitSync(). + return; + } + + consumer.commitSync(toCommit); + + // Only update after a successful commit. + for (Map.Entry e : toCommit.entrySet()) { + lastCommittedOffsets.put(e.getKey(), e.getValue().offset()); + } + + nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); + } catch (Exception e) { + // Log but ignore the exception. Committing consumer offsets to Kafka is not critical for + // KafkaIO because it relies on the offsets stored in KafkaCheckpointMark. + if (now.isAfter(nextAllowedCommitFailLogTime)) { + LOG.warn( + String.format( + "%s: Did not successfully commit finalized checkpoint for > %s. Current checkpoint: %s", + this, MIN_COMMIT_FAIL_LOG_INTERVAL, checkpointMark), + e); + nextAllowedCommitFailLogTime = now.plus(MIN_COMMIT_FAIL_LOG_INTERVAL); + } else { + LOG.info( + String.format( + "%s: Could not commit finalized checkpoint. Commit will be retried with subsequent reads. Current checkpoint: %s", + this, checkpointMark), + e); } } }