Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,17 @@ public OffsetRange[] getNextOffsetRanges(Option<Checkpoint> lastCheckpoint, long
} else {
lastCheckpointStr = lastCheckpoint.isPresent() ? Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty();
}

long kafkaDelayCount = 0L;
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) {
kafkaDelayCount = delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer);
}
// Always emit the Kafka delay count metric (even when 0)
metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, kafkaDelayCount);

Comment on lines +321 to +323
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we want to account for else branch below to account for EARLIEST and GROUP where the delay count should also be calculated?

Another nit: if the check point string does not follow the format (checkTopicCheckpoint(lastCheckpointStr)), after the change it can fail the calculation.

// Determine the offset ranges to read from
if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty() && checkTopicCheckpoint(lastCheckpointStr)) {
fromOffsets = fetchValidOffsets(consumer, lastCheckpointStr, topicPartitions);
metrics.updateStreamerSourceDelayCount(METRIC_NAME_KAFKA_DELAY_COUNT, delayOffsetCalculation(lastCheckpointStr, topicPartitions, consumer));
} else {
switch (autoResetValue) {
case EARLIEST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

import java.time.Instant;
Expand All @@ -76,6 +77,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -605,4 +607,70 @@ void mockDescribeTopicConfigs(MockedStatic<AdminClient> staticMock, Map kafkaPar
when(mock.describeConfigs(Collections.singleton(resource))).thenReturn(mockResult);
when(mockResult.all()).thenReturn(future);
}

@Test
public void testKafkaDelayCountMetricEmittedWithLag() {
testUtils.createTopic(testTopicName, 1);
String[] messages = new String[1000];
for (int i = 0; i < 1000; i++) {
messages[i] = String.format("{\"id\":\"%d\"}", i);
}
testUtils.sendMessages(testTopicName, messages);

HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));

// Read first 250 messages, then simulate lag
String lastCheckpointString = testTopicName + ",0:250";
kafkaOffsetGen.getNextOffsetRanges(
Option.of(new StreamerCheckpointV2(lastCheckpointString)), 500, mockMetrics);

// Verify metric was called with lag count of 750 (1000 - 250)
verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 750L);
}

@Test
public void testKafkaDelayCountMetricEmittedWithoutCheckpoint() {
testUtils.createTopic(testTopicName, 1);
String[] messages = new String[1000];
for (int i = 0; i < 1000; i++) {
messages[i] = String.format("{\"id\":\"%d\"}", i);
}
testUtils.sendMessages(testTopicName, messages);

HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));

// First run without checkpoint
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, mockMetrics);

// Verify metric was called with 0 (no checkpoint = no lag)
verify(mockMetrics, times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 0L);
}

@Test
public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() {
testUtils.createTopic(testTopicName, 2);
String[] messages = new String[1000];
for (int i = 0; i < 1000; i++) {
messages[i] = String.format("{\"id\":\"%d\"}", i);
}
testUtils.sendMessages(testTopicName, messages);

HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));

// Checkpoint with some consumed messages, creating lag (0:250, 1:249 = 499)
// Note: Cannot assert exact delay count because Kafka's message distribution across
// partitions is non-deterministic when messages don't have explicit partition keys
String lastCheckpointString = testTopicName + ",0:250,1:249";
kafkaOffsetGen.getNextOffsetRanges(
Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, mockMetrics);

// Verify metric was called with a reasonable lag count
ArgumentCaptor<Long> delayCaptor = ArgumentCaptor.forClass(Long.class);
verify(mockMetrics, times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), delayCaptor.capture());
assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 0 when there is lag");
assertTrue(delayCaptor.getValue() <= 1000, "Delay count should not exceed total messages sent");
}
}
Loading