From 29e69c299c1d90295add445b30016fd6390a9e10 Mon Sep 17 00:00:00 2001 From: Alex Rhee Date: Wed, 21 Jan 2026 15:55:38 -0800 Subject: [PATCH 1/4] fix kafka delay metrics reporting --- .../sources/helpers/KafkaOffsetGen.java | 10 ++- .../sources/helpers/TestKafkaOffsetGen.java | 65 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 10bfd4adfe84f..3a0d9958f5ed0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -313,10 +313,15 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpoint, long } else { lastCheckpointStr = lastCheckpoint.isPresent() ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty(); } + + long kafkaDelayCount = 0L; + if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { + kafkaDelayCount = delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer); + } + // Determine the offset ranges to read from if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) { fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions); - metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer)); } else { switch (autoResetValue) { case EARLIEST: @@ -341,6 +346,9 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpoint, long // Obtain the latest offsets. toOffsets = consumer.endOffsets(topicPartitions); + + // Always emit the Kafka delay count metric (even when 0) + metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, kafkaDelayCount); } return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index e630d62139747..1a23559933f50 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.time.Instant; @@ -76,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -605,4 +607,67 @@ void mockDescribeTopicConfigs(MockedStatic staticMock, Map kafkaPar when(mock.describeConfigs(Collections.singleton(resource))).thenReturn(mockResult); when(mockResult.all()).thenReturn(future); } + + @Test + public void testKafkaDelayCountMetricEmittedWithLag() { + testUtils.createTopic(testTopicName, 1); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // Read first 250 messages, then simulate lag + String lastCheckpointString = testTopicName + ",0:250"; + kafkaOffsetGen.getNextOffsetRanges( + Option.of(new StreamerCheckpointV2(lastCheckpointString)), 500, mockMetrics); + + // Verify metric was called with lag count of 750 (1000 - 250) + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 750L); + } + + @Test + public void testKafkaDelayCountMetricEmittedWithoutCheckpoint() { + testUtils.createTopic(testTopicName, 1); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // First run without checkpoint + kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, mockMetrics); + + // Verify metric was called with 0 (no checkpoint = no lag) + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 0L); + } + + @Test + public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() { + testUtils.createTopic(testTopicName, 2); + String[] messages = new String[1000]; + for (int i = 0; i < 1000; i++) { + messages[i] = String.format("{\"id\":\"%d\"}", i); + } + testUtils.sendMessages(testTopicName, messages); + + HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); + + // Checkpoint with some consumed messages, creating lag + String lastCheckpointString = testTopicName + ",0:250,1:249"; + kafkaOffsetGen.getNextOffsetRanges( + Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, mockMetrics); + + // Verify metric was called with a lag count > 0 (exact value depends on partition distribution) + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), delayCaptor.capture()); + assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 0 when there is lag"); + } } From 66d7d2b01765d8666a4f2fcb398b6b9e5fb94652 Mon Sep 17 00:00:00 2001 From: Alex Rhee Date: Thu, 29 Jan 2026 17:36:38 -0800 Subject: [PATCH 2/4] update tests --- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 5 ++--- .../utilities/sources/helpers/TestKafkaOffsetGen.java | 10 ++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 3a0d9958f5ed0..bc4ee0c0e0199 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -318,6 +318,8 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpoint, long if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) { kafkaDelayCount = delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer); } + // Always emit the Kafka delay count metric (even when 0) + metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, kafkaDelayCount); // Determine the offset ranges to read from if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) { @@ -346,9 +348,6 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpoint, long // Obtain the latest offsets. toOffsets = consumer.endOffsets(topicPartitions); - - // Always emit the Kafka delay count metric (even when 0) - metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, kafkaDelayCount); } return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index 1a23559933f50..34e1f58c994b0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -52,7 +52,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.time.Instant; @@ -77,7 +76,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -661,13 +659,13 @@ public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() { KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); // Checkpoint with some consumed messages, creating lag + // Checkpoint shows 250 messages consumed from partition 0 and 249 from partition 1 (total 499) + // With 1000 total messages sent, the expected lag is 1000 - 499 = 501 String lastCheckpointString = testTopicName + ",0:250,1:249"; kafkaOffsetGen.getNextOffsetRanges( Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, mockMetrics); - // Verify metric was called with a lag count > 0 (exact value depends on partition distribution) - ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); - verify(mockMetrics, times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), delayCaptor.capture()); - assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 0 when there is lag"); + // Verify metric was called with exact delay count of 501 (1000 messages - 499 consumed) + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 501L); } } From 891ce7672d0b708edec7a1aa404d663fb803bf96 Mon Sep 17 00:00:00 2001 From: Alex Rhee Date: Mon, 2 Feb 2026 12:55:53 -0800 Subject: [PATCH 3/4] Fix non-deterministic multi-partition delay test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use range-based assertion instead of exact value because Kafka's message distribution across partitions is non-deterministic when messages don't have explicit partition keys. The test now validates: - Delay count is positive (lag exists) - Delay count is reasonable (≤ total messages) --- .../sources/helpers/TestKafkaOffsetGen.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index 34e1f58c994b0..54971754465da 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.time.Instant; @@ -76,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -658,14 +660,17 @@ public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() { HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); - // Checkpoint with some consumed messages, creating lag - // Checkpoint shows 250 messages consumed from partition 0 and 249 from partition 1 (total 499) - // With 1000 total messages sent, the expected lag is 1000 - 499 = 501 + // Checkpoint with some consumed messages, creating lag (0:250, 1:249 = 499 consumed) + // Note: Cannot assert exact delay count because Kafka's message distribution across + // partitions is non-deterministic when messages don't have explicit partition keys String lastCheckpointString = testTopicName + ",0:250,1:249"; kafkaOffsetGen.getNextOffsetRanges( Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, mockMetrics); - // Verify metric was called with exact delay count of 501 (1000 messages - 499 consumed) - verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 501L); + // Verify metric was called with a reasonable lag count + ArgumentCaptor delayCaptor = ArgumentCaptor.forClass(Long.class); + verify(mockMetrics, times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), delayCaptor.capture()); + assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 0 when there is lag"); + assertTrue(delayCaptor.getValue() <= 1000, "Delay count should not exceed total messages sent"); } } From 02ee843aa43f0403018cda4c729b8fa79ef9351a Mon Sep 17 00:00:00 2001 From: Alex Rhee Date: Wed, 4 Feb 2026 17:20:27 -0800 Subject: [PATCH 4/4] retrigger CI --- .../hudi/utilities/sources/helpers/TestKafkaOffsetGen.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index 54971754465da..87d035a1db355 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -660,7 +660,7 @@ public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() { HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class); KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING)); - // Checkpoint with some consumed messages, creating lag (0:250, 1:249 = 499 consumed) + // Checkpoint with some consumed messages, creating lag (0:250, 1:249 = 499) // Note: Cannot assert exact delay count because Kafka's message distribution across // partitions is non-deterministic when messages don't have explicit partition keys String lastCheckpointString = testTopicName + ",0:250,1:249";