From 1f36ea2fb083f634ba3854146e04d5962e8055f5 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 8 Apr 2023 19:49:47 +0800 Subject: [PATCH 1/3] Add topic metrics for the number of production data requests to add a topic and the average number of messages per request. --- .../pulsar/broker/service/Producer.java | 21 ++++++++++++++----- .../prometheus/AggregatedProducerStats.java | 4 ++++ .../prometheus/NamespaceStatsAggregator.java | 4 ++++ .../broker/stats/prometheus/TopicStats.java | 13 ++++++++++++ .../data/stats/PublisherStatsImpl.java | 8 +++++++ 5 files changed, 45 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 5b62e3261e64f..0e1b1e58829f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -73,6 +73,7 @@ public class Producer { private final String appId; private final BrokerInterceptor brokerInterceptor; private Rate msgIn; + private Rate requestIn; private Rate chunkedMessageRate; // it records msg-drop rate only for non-persistent topic private final Rate msgDrop; @@ -117,6 +118,7 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.closeFuture = new CompletableFuture<>(); this.appId = appId; this.msgIn = new Rate(); + this.requestIn = new Rate(); this.chunkedMessageRate = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; this.msgDrop = this.isNonPersistentTopic ? new Rate() : null; @@ -272,7 +274,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), + MessagePublishContext.get(this, sequenceId, msgIn, requestIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); if (brokerInterceptor != null) { brokerInterceptor @@ -284,7 +286,7 @@ private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, l private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, + highestSequenceId, msgIn, requestIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, position); if (brokerInterceptor != null) { brokerInterceptor @@ -377,6 +379,7 @@ private static final class MessagePublishContext implements PublishContext, Runn private long ledgerId; private long entryId; private Rate rateIn; + private Rate requestIn; private int msgSize; private long batchSize; private boolean chunked; @@ -537,6 +540,7 @@ public void run() { // stats rateIn.recordMultipleEvents(batchSize, msgSize); + requestIn.recordMultipleEvents(1L, batchSize); producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS); producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId); @@ -552,12 +556,13 @@ public void run() { recycle(); } - static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, + static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, Rate requestIn, int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; callback.rateIn = rateIn; + callback.requestIn = requestIn; callback.msgSize = msgSize; callback.batchSize = batchSize; callback.chunked = chunked; @@ -574,12 +579,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { + Rate requestIn, int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, + Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; callback.highestSequenceId = highestSequenceId; callback.rateIn = rateIn; + callback.requestIn = requestIn; callback.msgSize = msgSize; callback.batchSize = batchSize; callback.originalProducerName = null; @@ -629,6 +636,7 @@ public void recycle() { originalSequenceId = -1L; originalHighestSequenceId = -1L; rateIn = null; + requestIn = null; msgSize = 0; ledgerId = -1L; entryId = -1L; @@ -730,10 +738,13 @@ public void topicMigrated(Optional clusterUrl) { public void updateRates() { msgIn.calculateRate(); + requestIn.calculateRate(); chunkedMessageRate.calculateRate(); stats.msgRateIn = msgIn.getRate(); stats.msgThroughputIn = msgIn.getValueRate(); stats.averageMsgSize = msgIn.getAverageValue(); + stats.requestRateIn = requestIn.getRate(); + stats.averageMsgPerRequest = requestIn.getAverageValue(); stats.chunkedMessageRate = chunkedMessageRate.getRate(); if (chunkedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) { ((PersistentTopic) this.topic).msgChunkPublished = true; @@ -813,7 +824,7 @@ public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, lon return; } MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, + MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, requestIn, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); if (brokerInterceptor != null) { brokerInterceptor diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedProducerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedProducerStats.java index e9ac89da87bc6..a0934cd84c787 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedProducerStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedProducerStats.java @@ -28,4 +28,8 @@ public class AggregatedProducerStats { public double averageMsgSize; + public double requestRateIn; + + public double averageMsgPerRequest; + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 32fb06ea3ce8c..df5b1db941470 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -217,6 +217,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.producersCount++; stats.rateIn += producer.getStats().msgRateIn; stats.throughputIn += producer.getStats().msgThroughputIn; + stats.requestRateIn += producer.getStats().requestRateIn; + stats.averageMsgPerRequest += producer.getStats().averageMsgPerRequest; if (includeProducerMetrics) { AggregatedProducerStats producerStats = stats.producerStats.computeIfAbsent( @@ -225,6 +227,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include producerStats.msgRateIn = producer.getStats().msgRateIn; producerStats.msgThroughputIn = producer.getStats().msgThroughputIn; producerStats.averageMsgSize = producer.getStats().averageMsgSize; + producerStats.requestRateIn = producer.getStats().requestRateIn; + producerStats.averageMsgPerRequest = producer.getStats().averageMsgPerRequest; } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index 3a2563a87587b..44ececaf6b288 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -37,6 +37,8 @@ class TopicStats { double rateOut; double throughputIn; double throughputOut; + double requestRateIn; + double averageMsgPerRequest; long msgInCounter; long bytesInCounter; long msgOutCounter; @@ -81,6 +83,8 @@ public void reset() { rateOut = 0; throughputIn = 0; throughputOut = 0; + requestRateIn = 0; + averageMsgPerRequest = 0; bytesInCounter = 0; msgInCounter = 0; bytesOutCounter = 0; @@ -134,6 +138,11 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st writeMetric(stream, "pulsar_average_msg_size", stats.averageMsgSize, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_request_rate_in", stats.requestRateIn, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_average_msg_per_request", stats.averageMsgPerRequest, + cluster, namespace, topic, splitTopicAndPartitionIndexLabel); + writeMetric(stream, "pulsar_txn_tb_active_total", stats.ongoingTxnCount, cluster, namespace, topic, splitTopicAndPartitionIndexLabel); writeMetric(stream, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount, @@ -258,6 +267,10 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); writeProducerMetric(stream, "pulsar_producer_msg_average_Size", producerStats.averageMsgSize, cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); + writeProducerMetric(stream, "pulsar_producer_request_in", producerStats.requestRateIn, + cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); + writeProducerMetric(stream, "pulsar_producer_avg_msg_per_request", producerStats.averageMsgPerRequest, + cluster, namespace, topic, p, producerStats.producerId, splitTopicAndPartitionIndexLabel); }); stats.subscriptionStats.forEach((sub, subsStats) -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java index 41407a37e7ca0..3e54a735e9065 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java @@ -43,6 +43,12 @@ public class PublisherStatsImpl implements PublisherStats { /** Average message size published by this publisher. */ public double averageMsgSize; + /** Total rate of request published by this publisher (request/s). */ + public double requestRateIn; + + /** Average number of messages per entry by this publisher (msg/request). */ + public double averageMsgPerRequest; + /** The total rate of chunked messages published by this publisher. **/ public double chunkedMessageRate; @@ -95,6 +101,8 @@ public PublisherStatsImpl add(PublisherStatsImpl stats) { this.msgThroughputIn += stats.msgThroughputIn; double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; this.averageMsgSize = newAverageMsgSize; + this.requestRateIn += stats.requestRateIn; + this.averageMsgPerRequest += stats.averageMsgPerRequest; return this; } From 538a73b5e146a76708c4e5d50212bdabdd542dec Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 8 Apr 2023 20:59:02 +0800 Subject: [PATCH 2/3] Add topic metrics for the number of production data requests to add a topic and the average number of messages per request. --- .../apache/pulsar/common/policies/data/PublisherStatsTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java index 8be02c58eb0bd..b139e640ae1f2 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PublisherStatsTest.java @@ -40,6 +40,8 @@ public void testPublisherStats() throws Exception { "msgRateIn", "msgThroughputIn", "averageMsgSize", + "requestRateIn", + "averageMsgPerRequest", "chunkedMessageRate", "producerId", "metadata", From 88c9667ae5e66eaaeffd4913c7a1b33172a780ae Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 13 May 2023 21:28:33 +0800 Subject: [PATCH 3/3] Add topic metrics for the number of production data requests to add a topic and the average number of messages per request. --- .../broker/service/persistent/PersistentTopic.java | 2 ++ .../stats/prometheus/NamespaceStatsAggregator.java | 2 +- .../policies/data/stats/PublisherStatsImpl.java | 4 +++- .../common/policies/data/stats/TopicStatsImpl.java | 12 ++++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18a662c4b7a38..b215a059ac2a5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2168,6 +2168,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog PublisherStatsImpl publisherStats = producer.getStats(); stats.msgRateIn += publisherStats.msgRateIn; stats.msgThroughputIn += publisherStats.msgThroughputIn; + stats.requestRateIn += publisherStats.requestRateIn; if (producer.isRemote()) { remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); @@ -2176,6 +2177,7 @@ public CompletableFuture asyncGetStats(boolean getPreciseBacklog } }); + stats.averageMsgPerRequest = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgRateIn / stats.requestRateIn); stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn); stats.msgInCounter = getMsgInCounter(); stats.bytesInCounter = getBytesInCounter(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index df5b1db941470..d7679ea394587 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -218,7 +218,6 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.rateIn += producer.getStats().msgRateIn; stats.throughputIn += producer.getStats().msgThroughputIn; stats.requestRateIn += producer.getStats().requestRateIn; - stats.averageMsgPerRequest += producer.getStats().averageMsgPerRequest; if (includeProducerMetrics) { AggregatedProducerStats producerStats = stats.producerStats.computeIfAbsent( @@ -232,6 +231,7 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include } } }); + stats.averageMsgPerRequest = stats.rateIn / stats.requestRateIn; if (topic instanceof PersistentTopic) { tStatus.subscriptions.forEach((subName, subscriptionStats) -> { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java index 3e54a735e9065..9b1ceb5d0074a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/PublisherStatsImpl.java @@ -101,8 +101,10 @@ public PublisherStatsImpl add(PublisherStatsImpl stats) { this.msgThroughputIn += stats.msgThroughputIn; double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; this.averageMsgSize = newAverageMsgSize; + double newAverageMsgPerRequest = (this.averageMsgPerRequest * (this.count - 1) + stats.averageMsgPerRequest) + / this.count; + this.averageMsgPerRequest = newAverageMsgPerRequest; this.requestRateIn += stats.requestRateIn; - this.averageMsgPerRequest += stats.averageMsgPerRequest; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index c9c4739b904f6..203a655c603e7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -51,6 +51,12 @@ public class TopicStatsImpl implements TopicStats { /** Total rate of messages published on the topic (msg/s). */ public double msgRateIn; + /** Total rate of request published on the topic (request/s). */ + public double requestRateIn; + + /** Average number of messages per entry on the topic (msg/request). */ + public double averageMsgPerRequest; + /** Total throughput of messages published on the topic (byte/s). */ public double msgThroughputIn; @@ -191,6 +197,8 @@ public TopicStatsImpl() { public void reset() { this.count = 0; this.msgRateIn = 0; + this.requestRateIn = 0; + this.averageMsgPerRequest = 0; this.msgThroughputIn = 0; this.msgRateOut = 0; this.msgThroughputOut = 0; @@ -238,6 +246,10 @@ public TopicStatsImpl add(TopicStats ts) { this.waitingPublishers += stats.waitingPublishers; double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count; this.averageMsgSize = newAverageMsgSize; + this.requestRateIn = stats.requestRateIn; + double newAverageMsgPerRequest = (this.averageMsgPerRequest * (this.count - 1) + stats.averageMsgPerRequest) + / this.count; + this.averageMsgPerRequest = newAverageMsgPerRequest; this.storageSize += stats.storageSize; this.backlogSize += stats.backlogSize; this.publishRateLimitedTimes += stats.publishRateLimitedTimes;