diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index ec1e15cda7ebc..38ad9e2cb6bb1 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -97,11 +97,26 @@ public void start(Map props) { consumer.assign(topicPartitionOffsets.keySet()); log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream() .filter(x -> x.getValue() == 0L).count()); - log.trace("Seeking offsets: {}", topicPartitionOffsets); - topicPartitionOffsets.forEach(consumer::seek); + + maybeSeek(topicPartitionOffsets); + log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(), taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions); } + // when offset to seek is 0, mm2 does not have to seek, as 'seek' action may override + // consumer.auto.offset.reset=latest + private void maybeSeek(Map topicPartitionOffsets) { + for (Map.Entry entry : topicPartitionOffsets.entrySet()) { + TopicPartition tp = entry.getKey(); + Long offset = entry.getValue(); + if (offset != 0) { + consumer.seek(tp, offset); + log.debug("Seeking TopicPartition {} to offset: {}", tp, offset); + } else { + log.debug("Skip seeking TopicPartition {} ", tp); + } + } + } @Override public void commit() { diff --git a/gradle.properties b/gradle.properties index 89bd159a29848..4e3aed37cd58a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,7 +20,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py -version=3.3.2 +version=3.3.2-remerge1 scalaVersion=2.13.8 task=build org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC