Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@
package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Stores and exports metrics for a batch of Kafka Client RPCs. */
public interface KafkaMetrics {

void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime);

void updateBacklogBytes(String topic, int partitionId, long backlog);

/*Flushes the buffered metrics to the current metric container for this thread.*/
Expand All @@ -44,9 +38,6 @@ public interface KafkaMetrics {
class NoOpKafkaMetrics implements KafkaMetrics {
private NoOpKafkaMetrics() {}

@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}

@Override
public void updateBacklogBytes(String topic, int partitionId, long backlog) {}

Expand Down Expand Up @@ -75,43 +66,13 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class);

private static final Map<String, Histogram> LATENCY_HISTOGRAMS =
new ConcurrentHashMap<String, Histogram>();

abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();;

abstract ConcurrentHashMap<MetricName, Long> perTopicPartitionBacklogs();

abstract AtomicBoolean isWritable();

public static KafkaMetricsImpl create() {
return new AutoValue_KafkaMetrics_KafkaMetricsImpl(
new ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>(),
new ConcurrentHashMap<MetricName, Long>(),
new AtomicBoolean(true));
}

/**
* Record the rpc status and latency of a successful Kafka poll RPC call.
*
* <p>TODO(naireenhussain): It's possible that `isWritable().get()` is called before it's set to
* false in another thread, allowing an extraneous measurement to slip in, so
* perTopicRpcLatencies() isn't necessarily thread safe. One way to address this would be to add
* synchronized blocks to ensure that there is only one thread ever reading/modifying the
* perTopicRpcLatencies() map.
*/
@Override
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (isWritable().get()) {
ConcurrentLinkedQueue<Duration> latencies = perTopicRpcLatencies().get(topic);
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(elapsedTime);
perTopicRpcLatencies().putIfAbsent(topic, latencies);
} else {
latencies.add(elapsedTime);
}
}
new ConcurrentHashMap<MetricName, Long>(), new AtomicBoolean(true));
}

/**
Expand All @@ -129,27 +90,6 @@ public void updateBacklogBytes(String topicName, int partitionId, long backlog)
}
}

/** Record rpc latency histogram metrics for all recorded topics. */
private void recordRpcLatencyMetrics() {
for (Map.Entry<String, ConcurrentLinkedQueue<Duration>> topicLatencies :
perTopicRpcLatencies().entrySet()) {
Histogram topicHistogram;
if (LATENCY_HISTOGRAMS.containsKey(topicLatencies.getKey())) {
topicHistogram = LATENCY_HISTOGRAMS.get(topicLatencies.getKey());
} else {
topicHistogram =
KafkaSinkMetrics.createRPCLatencyHistogram(
KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey());
LATENCY_HISTOGRAMS.put(topicLatencies.getKey(), topicHistogram);
}
// Update all the latencies
for (Duration d : topicLatencies.getValue()) {
Preconditions.checkArgumentNotNull(topicHistogram);
topicHistogram.update(d.toMillis());
}
}
}

/** This is for creating gauges from backlog bytes recorded previously. */
private void recordBacklogBytesInternal() {
for (Map.Entry<MetricName, Long> backlog : perTopicPartitionBacklogs().entrySet()) {
Expand All @@ -170,7 +110,6 @@ public void flushBufferedMetrics() {
return;
}
recordBacklogBytesInternal();
recordRpcLatencyMetrics();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package org.apache.beam.sdk.io.kafka;

import org.apache.beam.sdk.metrics.DelegatingGauge;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.LabeledMetricNameUtils;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -43,41 +40,12 @@ public class KafkaSinkMetrics {
public static final String METRICS_NAMESPACE = "KafkaSink";

// Base Metric names
private static final String RPC_LATENCY = "RpcLatency";
private static final String ESTIMATED_BACKLOG_SIZE = "EstimatedBacklogSize";

// Kafka Consumer Method names
enum RpcMethod {
POLL,
}

// Metric labels
private static final String TOPIC_LABEL = "topic_name";
private static final String RPC_METHOD = "rpc_method";
private static final String PARTITION_ID = "partition_id";

/**
* Creates a {@link Histogram} metric to record RPC latency with the name
*
* <p>'RpcLatency*rpc_method:{method};topic_name:{topic};'.
*
* @param method Kafka method associated with this metric.
* @param topic Kafka topic associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
public static Histogram createRPCLatencyHistogram(RpcMethod method, String topic) {
LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
nameBuilder.addLabel(RPC_METHOD, method.toString());
nameBuilder.addLabel(TOPIC_LABEL, topic);

nameBuilder.addMetricLabel("PER_WORKER_METRIC", "true");
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);

HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
return new DelegatingHistogram(metricName, buckets, false);
}

/**
* Creates a {@link Gauge} metric to record per partition backlog with the name
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
Expand Down Expand Up @@ -411,7 +410,6 @@ public boolean offsetBasedDeduplicationSupported() {
private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

private KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
private Stopwatch stopwatch = Stopwatch.createUnstarted();

private Set<String> kafkaTopics;

Expand Down Expand Up @@ -580,14 +578,7 @@ private void consumerPollLoop() {
while (!closed.get()) {
try {
if (records.isEmpty()) {
stopwatch.start();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
stopwatch.stop();
for (String kafkaTopic : kafkaTopics) {
kafkaResults.updateSuccessfulRpcMetrics(
kafkaTopic,
java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
}
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
records = ConsumerRecords.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -567,25 +566,19 @@ public ProcessContinuation processElement(
long expectedOffset = tracker.currentRestriction().getFrom();
consumer.resume(Collections.singleton(topicPartition));
consumer.seek(topicPartition, expectedOffset);
final Stopwatch pollTimer = Stopwatch.createUnstarted();

final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
while (Duration.ZERO.compareTo(remainingTimeout) < 0) {
// TODO: Remove this timer and use the existing fetch-latency-avg metric.
// A consumer will often have prefetches waiting to be returned immediately in which case
// this timer may contribute more latency than it measures.
// See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
pollTimer.reset().start();
long startMillis = System.currentTimeMillis();
// Fetch the next records.
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(remainingTimeout);
final Duration elapsed = pollTimer.elapsed();
final Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - startMillis);
try {
remainingTimeout = remainingTimeout.minus(elapsed);
} catch (ArithmeticException e) {
remainingTimeout = Duration.ZERO;
}
kafkaMetrics.updateSuccessfulRpcMetrics(topicPartition.topic(), elapsed);

// No progress when the polling timeout expired.
// Self-checkpoint and move to process the next element.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.beam.sdk.io.kafka;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.runners.core.metrics.GaugeCell;
Expand Down Expand Up @@ -98,7 +96,6 @@ public void testNoOpKafkaMetrics() throws Exception {
MetricsEnvironment.setCurrentContainer(testContainer);

KafkaMetrics results = KafkaMetrics.NoOpKafkaMetrics.getInstance();
results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));
results.updateBacklogBytes("test-topic", 0, 10);
results.flushBufferedMetrics();

Expand All @@ -107,30 +104,17 @@ public void testNoOpKafkaMetrics() throws Exception {
}

@Test
public void testKafkaRPCLatencyMetrics() throws Exception {
public void testKafkaBacklogMetrics() throws Exception {
TestMetricsContainer testContainer = new TestMetricsContainer();
MetricsEnvironment.setCurrentContainer(testContainer);

KafkaSinkMetrics.setSupportKafkaMetrics(true);

KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics();

results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));
results.updateBacklogBytes("test-topic", 0, 10);

results.flushBufferedMetrics();
// RpcLatency*rpc_method:POLL;topic_name:test-topic
MetricName histogramName =
MetricName.named(
"KafkaSink",
"RpcLatency*rpc_method:POLL;topic_name:test-topic;",
ImmutableMap.of("PER_WORKER_METRIC", "true"));
HistogramData.BucketType bucketType = HistogramData.ExponentialBuckets.of(1, 17);

assertThat(testContainer.histograms.size(), equalTo(1));
assertThat(
testContainer.histograms.get(KV.of(histogramName, bucketType)).values,
containsInAnyOrder(Double.valueOf(10.0)));

MetricName gaugeName =
MetricName.named(
Expand All @@ -140,19 +124,4 @@ public void testKafkaRPCLatencyMetrics() throws Exception {
assertThat(testContainer.gauges.size(), equalTo(1));
assertThat(testContainer.gauges.get(gaugeName).getCumulative().value(), equalTo(10L));
}

@Test
public void testKafkaRPCLatencyMetricsAreNotRecorded() throws Exception {
TestMetricsContainer testContainer = new TestMetricsContainer();
MetricsEnvironment.setCurrentContainer(testContainer);

KafkaSinkMetrics.setSupportKafkaMetrics(false);

KafkaMetrics results = KafkaSinkMetrics.kafkaMetrics();

results.updateSuccessfulRpcMetrics("test-topic", Duration.ofMillis(10));

results.flushBufferedMetrics();
assertThat(testContainer.histograms.size(), equalTo(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Test;
Expand All @@ -34,24 +33,6 @@
// TODO:Naireen - Refactor to remove duplicate code between the Kafka and BigQuery sinks
@RunWith(JUnit4.class)
public class KafkaSinkMetricsTest {
@Test
public void testCreatingHistogram() throws Exception {
Histogram histogram =
KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, "topic1");

MetricName histogramName =
MetricName.named(
"KafkaSink",
"RpcLatency*rpc_method:POLL;topic_name:topic1;",
ImmutableMap.of("PER_WORKER_METRIC", "true"));
assertThat(histogram.getName(), equalTo(histogramName));
assertTrue(
histogram
.getName()
.getLabels()
.containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC));
}

@Test
public void testCreatingBacklogGauge() throws Exception {
Gauge gauge =
Expand Down
Loading