diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 10bfd4adfe84f..bc4ee0c0e0199 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -313,10 +313,17 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpoint, long } else { lastCheckpointStr = lastCheckpoint.isPresent() ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(); } + + long kafkaDelayCount = 0L; + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { + kafkaDelayCount = delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer); + } + // Always emit the Kafka delay count metric (even when 0) + metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, kafkaDelayCount); + // Determine the offset ranges to read from if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) { fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions); - metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { switch (autoResetValue) { case EARLIEST: diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index e630d62139747..87d035a1db355 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.time.Instant; @@ -76,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -605,4 +607,70 @@ void mockDescribeTopicConfigs(MockedStatic staticMock, Map kafkaPar when(mock.describeConfigs(Collections.singleton(resource))).thenReturn(mockResult); when(mockResult.all()).thenReturn(future); } + + @Test + public void testKafkaDelayCountMetricEmittedWithLag() { + testUtils.createTopic(testTopicName, 1); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // Read first 250 messages, then simulate lag + String lastCheckpointString = testTopicName + ",0:250"; + kafkaOffsetGen.getNextOffsetRanges( + Option.of(new StreamerCheckpointV2(lastCheckpointString)), 500, mockMetrics); + + // Verify metric was called with lag count of 750 (1000 - 250) + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 750L); + } + + @Test + public void testKafkaDelayCountMetricEmittedWithoutCheckpoint() { + testUtils.createTopic(testTopicName, 1); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // First run without checkpoint + kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, mockMetrics); + + // Verify metric was called with 0 (no checkpoint = no lag) + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 0L); + } + + @Test + public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() { + testUtils.createTopic(testTopicName, 2); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // Checkpoint with some consumed messages, creating lag (0:250, 1:249 = 499) + // Note: Cannot assert exact delay count because Kafka's message distribution across + // partitions is non-deterministic when messages don't have explicit partition keys + String lastCheckpointString = testTopicName + ",0:250,1:249"; + kafkaOffsetGen.getNextOffsetRanges( + Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, mockMetrics); + + // Verify metric was called with a reasonable lag count + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), delayCaptor.capture()); + assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 0 when there is lag"); + assertTrue(delayCaptor.getValue() <= 1000, "Delay count should not exceed total messages sent"); + } }