diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index 5e95df5a9c..d2415f14f0 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -76,6 +76,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -465,13 +466,15 @@ private OffsetsMaps fetchTopicPartitionsMetadata(List topicParti // For normal case, the newest offset will correspond to the offset of the newest message in the stream; // But for the big message, it is not the case. Seeking on the newest offset gives nothing for the newest big message. // For now, we keep it as is for newest offsets the same as historical metadata structure. - if (offset <= 0) { - LOG.warn( - "Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning", - topicPartition, offset); - oldestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), "0"); + long beginOffset = oldestOffsetsWithLong.get(topicPartition); + if (offset <= 0L) { + LOG.warn("Empty Kafka topic partition {} with upcoming offset {}. Skipping newest offset and setting oldest offset to 0 to consume from beginning", topicPartition, offset); + oldestOffsets.put(KafkaUtil.toSystemStreamPartition(this.systemName, topicPartition), "0"); + } else if (Objects.equals(beginOffset, offset)) { + LOG.warn("Empty Kafka topic partition {} with upcoming offset {}", topicPartition, offset); + newestOffsets.put(KafkaUtil.toSystemStreamPartition(this.systemName, topicPartition), String.valueOf(offset)); } else { - newestOffsets.put(KafkaUtil.toSystemStreamPartition(systemName, topicPartition), String.valueOf(offset - 1)); + newestOffsets.put(KafkaUtil.toSystemStreamPartition(this.systemName, topicPartition), String.valueOf(offset - 1L)); } }); return new OffsetsMaps(oldestOffsets, newestOffsets, upcomingOffsets); diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java index cd1e707b89..7b3919925b 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminWithMock.java @@ -62,6 +62,7 @@ public class TestKafkaSystemAdminWithMock { private PartitionInfo mockPartitionInfo1; private TopicPartition testTopicPartition0; private TopicPartition testTopicPartition1; + private TopicPartition testTopicPartition2; private ConcurrentHashMap consumersReference; @@ -100,6 +101,7 @@ public void setUp() throws Exception { // mock LinkedInKafkaConsumerImpl other behaviors testTopicPartition0 = new TopicPartition(VALID_TOPIC, 0); testTopicPartition1 = new TopicPartition(VALID_TOPIC, 1); + testTopicPartition2 = new TopicPartition(VALID_TOPIC, 2); Map testBeginningOffsets = ImmutableMap.of(testTopicPartition0, KAFKA_BEGINNING_OFFSET_FOR_PARTITION0, testTopicPartition1, KAFKA_BEGINNING_OFFSET_FOR_PARTITION1); @@ -176,9 +178,9 @@ public void testGetSystemStreamMetaDataWithNoTopic() { public void testGetSystemStreamMetaDataForTopicWithNoMessage() { // The topic with no messages will have beginningOffset = 0 and endOffset = 0 when(mockKafkaConsumer.beginningOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( - ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L)); + ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L, testTopicPartition2, 10L)); when(mockKafkaConsumer.endOffsets(ImmutableList.of(testTopicPartition0, testTopicPartition1))).thenReturn( - ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L)); + ImmutableMap.of(testTopicPartition0, 0L, testTopicPartition1, 0L, testTopicPartition2, 10L)); Map metadataMap = kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC)); @@ -190,7 +192,7 @@ public void testGetSystemStreamMetaDataForTopicWithNoMessage() { // verify the offset for each partition Map systemStreamPartitionMetadata = metadataMap.get(VALID_TOPIC).getSystemStreamPartitionMetadata(); - assertEquals("there are 2 partitions", systemStreamPartitionMetadata.size(), 2); + assertEquals("there are 3 partitions", systemStreamPartitionMetadata.size(), 3); SystemStreamMetadata.SystemStreamPartitionMetadata partition0Metadata = systemStreamPartitionMetadata.get(new Partition(0)); @@ -205,6 +207,13 @@ public void testGetSystemStreamMetaDataForTopicWithNoMessage() { assertEquals("upcoming offset for partition 1", partition1Metadata.getUpcomingOffset(), "0"); assertEquals("newest offset is not set due to abnormal upcoming offset", partition1Metadata.getNewestOffset(), null); + + SystemStreamMetadata.SystemStreamPartitionMetadata partition2Metadata = + systemStreamPartitionMetadata.get(new Partition(2)); + assertEquals("oldest offset for partition 2", partition2Metadata.getOldestOffset(), "10"); + assertEquals("upcoming offset for partition 2", partition2Metadata.getUpcomingOffset(), "10"); + assertEquals("newest offset is not set due to abnormal upcoming offset", partition2Metadata.getNewestOffset(), + "10"); } @Test