From 67997a3d330a2c58f993dfb7bcc8bd62d3c5e28c Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Wed, 25 Feb 2026 13:51:43 +0500 Subject: [PATCH] Remove Guava Stopwatch from Kafka consumer.poll() loop Replace Stopwatch (backed by System.nanoTime()) with lightweight System.currentTimeMillis() for timeout tracking in the hot consumer.poll() loop. The Stopwatch was adding more latency than it measured when Kafka had prefetched records ready to return immediately. This change keeps the updateSuccessfulRpcMetrics() calls and all shared metric infrastructure (KafkaMetrics, KafkaSinkMetrics) intact. --- .../apache/beam/sdk/io/kafka/KafkaUnboundedReader.java | 9 +++------ .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 10 ++-------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 866dfd487108..fa173c76a946 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; @@ -411,7 +410,6 @@ public boolean offsetBasedDeduplicationSupported() { private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; private KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); - private Stopwatch stopwatch = Stopwatch.createUnstarted(); private Set kafkaTopics; @@ -580,13 +578,12 @@ private void consumerPollLoop() { while (!closed.get()) { try { if (records.isEmpty()) { - stopwatch.start(); + long startMillis = System.currentTimeMillis(); records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); - stopwatch.stop(); + long elapsedMillis = System.currentTimeMillis() - startMillis; for (String kafkaTopic : kafkaTopics) { kafkaResults.updateSuccessfulRpcMetrics( - kafkaTopic, - java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + kafkaTopic, java.time.Duration.ofMillis(elapsedMillis)); } } else if (availableRecordsQueue.offer( records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a05abba06e75..8894cbf59520 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -58,7 +58,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; @@ -567,19 +566,14 @@ public ProcessContinuation processElement( long expectedOffset = tracker.currentRestriction().getFrom(); consumer.resume(Collections.singleton(topicPartition)); consumer.seek(topicPartition, expectedOffset); - final Stopwatch pollTimer = Stopwatch.createUnstarted(); final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); try { while (Duration.ZERO.compareTo(remainingTimeout) < 0) { - // TODO: Remove this timer and use the existing fetch-latency-avg metric. - // A consumer will often have prefetches waiting to be returned immediately in which case - // this timer may contribute more latency than it measures. - // See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information. - pollTimer.reset().start(); + long startMillis = System.currentTimeMillis(); // Fetch the next records. final ConsumerRecords rawRecords = consumer.poll(remainingTimeout); - final Duration elapsed = pollTimer.elapsed(); + final Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - startMillis); try { remainingTimeout = remainingTimeout.minus(elapsed); } catch (ArithmeticException e) {