diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 866dfd487108..fa173c76a946 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; @@ -411,7 +410,6 @@ public boolean offsetBasedDeduplicationSupported() { private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; private KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics(); - private Stopwatch stopwatch = Stopwatch.createUnstarted(); private Set kafkaTopics; @@ -580,13 +578,12 @@ private void consumerPollLoop() { while (!closed.get()) { try { if (records.isEmpty()) { - stopwatch.start(); + long startMillis = System.currentTimeMillis(); records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); - stopwatch.stop(); + long elapsedMillis = System.currentTimeMillis() - startMillis; for (String kafkaTopic : kafkaTopics) { kafkaResults.updateSuccessfulRpcMetrics( - kafkaTopic, - java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + kafkaTopic, java.time.Duration.ofMillis(elapsedMillis)); } } else if (availableRecordsQueue.offer( records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a05abba06e75..8894cbf59520 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -58,7 +58,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; @@ -567,19 +566,14 @@ public ProcessContinuation processElement( long expectedOffset = tracker.currentRestriction().getFrom(); consumer.resume(Collections.singleton(topicPartition)); consumer.seek(topicPartition, expectedOffset); - final Stopwatch pollTimer = Stopwatch.createUnstarted(); final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics(); try { while (Duration.ZERO.compareTo(remainingTimeout) < 0) { - // TODO: Remove this timer and use the existing fetch-latency-avg metric. - // A consumer will often have prefetches waiting to be returned immediately in which case - // this timer may contribute more latency than it measures. - // See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information. - pollTimer.reset().start(); + long startMillis = System.currentTimeMillis(); // Fetch the next records. final ConsumerRecords rawRecords = consumer.poll(remainingTimeout); - final Duration elapsed = pollTimer.elapsed(); + final Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - startMillis); try { remainingTimeout = remainingTimeout.minus(elapsed); } catch (ArithmeticException e) {