Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The Apache Software License, Version 2.0
* Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Fastutil -- it.unimi.dsi-fastutil-8.5.16.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.51.0.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.59.2.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
- com.google.code.gson-gson-2.8.9.jar
Expand Down Expand Up @@ -431,26 +431,26 @@ The Apache Software License, Version 2.0
- org.jetbrains.kotlin-kotlin-stdlib-jdk8-1.8.20.jar
- org.jetbrains-annotations-13.0.jar
* gRPC
- io.grpc-grpc-all-1.72.0.jar
- io.grpc-grpc-auth-1.72.0.jar
- io.grpc-grpc-context-1.72.0.jar
- io.grpc-grpc-core-1.72.0.jar
- io.grpc-grpc-protobuf-1.72.0.jar
- io.grpc-grpc-protobuf-lite-1.72.0.jar
- io.grpc-grpc-stub-1.72.0.jar
- io.grpc-grpc-alts-1.72.0.jar
- io.grpc-grpc-api-1.72.0.jar
- io.grpc-grpc-grpclb-1.72.0.jar
- io.grpc-grpc-netty-shaded-1.72.0.jar
- io.grpc-grpc-services-1.72.0.jar
- io.grpc-grpc-xds-1.72.0.jar
- io.grpc-grpc-rls-1.72.0.jar
- io.grpc-grpc-servlet-1.72.0.jar
- io.grpc-grpc-servlet-jakarta-1.72.0.jar
- io.grpc-grpc-all-1.75.0.jar
- io.grpc-grpc-auth-1.75.0.jar
- io.grpc-grpc-context-1.75.0.jar
- io.grpc-grpc-core-1.75.0.jar
- io.grpc-grpc-protobuf-1.75.0.jar
- io.grpc-grpc-protobuf-lite-1.75.0.jar
- io.grpc-grpc-stub-1.75.0.jar
- io.grpc-grpc-alts-1.75.0.jar
- io.grpc-grpc-api-1.75.0.jar
- io.grpc-grpc-grpclb-1.75.0.jar
- io.grpc-grpc-netty-shaded-1.75.0.jar
- io.grpc-grpc-services-1.75.0.jar
- io.grpc-grpc-xds-1.75.0.jar
- io.grpc-grpc-rls-1.75.0.jar
- io.grpc-grpc-servlet-1.75.0.jar
- io.grpc-grpc-servlet-jakarta-1.75.0.jar
- io.grpc-grpc-util-1.60.0.jar
- io.grpc-grpc-opentelemetry-1.72.0.jar
- io.grpc-grpc-gcp-csm-observability-1.72.0.jar
- io.grpc-grpc-inprocess-1.72.0.jar
- io.grpc-grpc-opentelemetry-1.75.0.jar
- io.grpc-grpc-gcp-csm-observability-1.75.0.jar
- io.grpc-grpc-inprocess-1.75.0.jar
* Perfmark
- io.perfmark-perfmark-api-0.26.0.jar
* OpenCensus
Expand Down Expand Up @@ -484,8 +484,8 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- io.github.oxia-db-oxia-client-api-0.6.2.jar
- io.github.oxia-db-oxia-client-0.6.2.jar
- io.github.oxia-db-oxia-client-api-0.7.0.jar
- io.github.oxia-db-oxia-client-0.7.0.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
* Java JSON WebTokens
Expand Down Expand Up @@ -541,7 +541,7 @@ The Apache Software License, Version 2.0
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.6-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.29.0-alpha.jar
- com.google.cloud.opentelemetry-detector-resources-support-0.33.0.jar
- io.opentelemetry.contrib-opentelemetry-gcp-resources-1.43.0-alpha.jar
- io.opentelemetry.contrib-opentelemetry-gcp-resources-1.48.0-alpha.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar
* JSpecify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class ManagedLedgerAttributes {

private final Attributes attributes;
private final Attributes attributesOnlyNamespace;
private final Attributes attributesOperationSucceed;
private final Attributes attributesOperationFailure;

Expand All @@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
attributesOnlyNamespace = Attributes.of(
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
attributesOperationSucceed = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetadataStore metadataStore;

private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
@Getter
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) {
public void addAddEntrySample(long size) {
addEntryOps.recordEvent(size);
entryStats.addValue(size);
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordEntrySize(size, managedLedger);
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
}

Expand All @@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) {

public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordAddEntryLatency(latency, unit, managedLedger);
}

public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordLedgerAddEntryLatency(latency, unit, managedLedger);
}

public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
.recordLedgerSwitchLatency(latency, unit, managedLedger);
}

public void addReadEntriesSample(int count, long totalSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.opentelemetry.Constants;

public class OpenTelemetryManagedLedgerStats implements AutoCloseable {

// ml-level metrics

// Replaces pulsar_ml_AddEntryMessagesRate
public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count";
private final ObservableLongMeasurement addEntryCounter;
Expand Down Expand Up @@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable {

private final BatchCallback batchCallback;

// namespace-level metrics

// Histograms support only synchronous mode, so record measurements directly.
// Synchronous histograms currently do not support delete operations.
// Therefore, use only namespace-level attributes to avoid leaking high-cardinality attributes (e.g. topic name).
// See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md

// Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
// 'pulsar_storage_write_latency_*']
public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency";
private final DoubleHistogram addEntryLatencyHistogram;

// Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
// 'pulsar_storage_ledger_write_latency_*']
public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
"pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
private final DoubleHistogram ledgerAddEntryLatencyHistogram;

// Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
"pulsar.broker.managed_ledger.ledger.switch.latency";
private final DoubleHistogram ledgerSwitchLatencyHistogram;

// Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW',
// 'pulsar_entry_size_*']
public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size";
private final LongHistogram entrySizeHistogram;

public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);

Expand Down Expand Up @@ -124,6 +158,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge
bytesInCounter,
readEntryCacheMissCounter,
markDeleteCounter);

addEntryLatencyHistogram = meter
.histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
.setDescription("End-to-end write latency, including time spent in the executor queue.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

ledgerAddEntryLatencyHistogram = meter
.histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
.setDescription("End-to end write latency.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

ledgerSwitchLatencyHistogram = meter
.histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
.setDescription("Time taken to switch to a new ledger.")
.setUnit("s")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();

entrySizeHistogram = meter
.histogramBuilder(ENTRY_SIZE_HISTOGRAM)
.ofLongs()
.setDescription("Size of entries written to the ledger.")
.setUnit("By")
.setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L,
102_400L, 1_048_576L))
.build();
}

@Override
Expand Down Expand Up @@ -151,4 +218,24 @@ private void recordMetrics(ManagedLedger ml) {
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes);
}

void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
}

void recordEntrySize(long entrySize, ManagedLedger ml) {
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
this.entrySizeHistogram.record(entrySize, attributes);
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ flexible messaging model and an intuitive client API.</description>
<zt-zip.version>1.17</zt-zip.version>
<protobuf3.version>3.25.5</protobuf3.version>
<protoc3.version>${protobuf3.version}</protoc3.version>
<grpc.version>1.72.0</grpc.version>
<grpc.version>1.75.0</grpc.version>
<google-http-client.version>1.41.0</google-http-client.version>
<perfmark.version>0.26.0</perfmark.version>
<protoc-gen-grpc-java.version>${grpc.version}</protoc-gen-grpc-java.version>
Expand Down Expand Up @@ -296,7 +296,7 @@ flexible messaging model and an intuitive client API.</description>
<apache-http-client.version>4.5.13</apache-http-client.version>
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
<jetcd.version>0.7.7</jetcd.version>
<oxia.version>0.6.2</oxia.version>
<oxia.version>0.7.0</oxia.version>
<snakeyaml.version>2.0</snakeyaml.version>
<ant.version>1.10.12</ant.version>
<seancfoley.ipaddress.version>5.5.0</seancfoley.ipaddress.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -899,7 +898,7 @@ private CompletableFuture<Optional<Long>> handleTopicEpochForExclusiveProducer(P
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));

PUBLISH_LATENCY.observe(latency, unit);
brokerService.getPulsarStats().recordPublishLatency(latency, unit);
}

@Override
Expand All @@ -908,15 +907,6 @@ public long increasePublishLimitedTimes() {
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.quantile(0.9999)
.quantile(1.0)
.register();

@Override
public void incrementPublishCount(Producer producer, int numOfMessages, long msgSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
Expand Down Expand Up @@ -280,4 +281,8 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}

public void recordPublishLatency(long latency, TimeUnit unit) {
brokerOperabilityMetrics.recordPublishLatency(latency, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;

/**
*/
Expand All @@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
"pulsar.broker.connection.create.operation.count";
private final ObservableLongCounter connectionCreateCounter;

public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = "pulsar.broker.topic.publish.latency";
private final DoubleHistogram topicPublishLatencyHistogram;
@PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.quantile(0.9999)
.quantile(1.0)
.register();

public BrokerOperabilityMetrics(PulsarService pulsar) {
this.metricsList = new ArrayList<>();
this.localCluster = pulsar.getConfiguration().getClusterName();
Expand Down Expand Up @@ -87,6 +104,14 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes);
measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes);
});

this.topicPublishLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
.histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
.setUnit("s")
.setDescription("The latency in seconds for publishing messages")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();
}

@Override
Expand Down Expand Up @@ -195,4 +220,9 @@ public void recordHealthCheckStatusSuccess() {
public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}

public void recordPublishLatency(long latency, TimeUnit unit) {
this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 1000.0);
PUBLISH_LATENCY.observe(latency, unit);
}
}
Loading
Loading