diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 79a74810e9719..3af6d7e2645ab 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -263,7 +263,7 @@ The Apache Software License, Version 2.0
* Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Fastutil -- it.unimi.dsi-fastutil-8.5.16.jar
- * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.51.0.jar
+ * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.59.2.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
- com.google.code.gson-gson-2.8.9.jar
@@ -431,26 +431,26 @@ The Apache Software License, Version 2.0
- org.jetbrains.kotlin-kotlin-stdlib-jdk8-1.8.20.jar
- org.jetbrains-annotations-13.0.jar
* gRPC
- - io.grpc-grpc-all-1.72.0.jar
- - io.grpc-grpc-auth-1.72.0.jar
- - io.grpc-grpc-context-1.72.0.jar
- - io.grpc-grpc-core-1.72.0.jar
- - io.grpc-grpc-protobuf-1.72.0.jar
- - io.grpc-grpc-protobuf-lite-1.72.0.jar
- - io.grpc-grpc-stub-1.72.0.jar
- - io.grpc-grpc-alts-1.72.0.jar
- - io.grpc-grpc-api-1.72.0.jar
- - io.grpc-grpc-grpclb-1.72.0.jar
- - io.grpc-grpc-netty-shaded-1.72.0.jar
- - io.grpc-grpc-services-1.72.0.jar
- - io.grpc-grpc-xds-1.72.0.jar
- - io.grpc-grpc-rls-1.72.0.jar
- - io.grpc-grpc-servlet-1.72.0.jar
- - io.grpc-grpc-servlet-jakarta-1.72.0.jar
+ - io.grpc-grpc-all-1.75.0.jar
+ - io.grpc-grpc-auth-1.75.0.jar
+ - io.grpc-grpc-context-1.75.0.jar
+ - io.grpc-grpc-core-1.75.0.jar
+ - io.grpc-grpc-protobuf-1.75.0.jar
+ - io.grpc-grpc-protobuf-lite-1.75.0.jar
+ - io.grpc-grpc-stub-1.75.0.jar
+ - io.grpc-grpc-alts-1.75.0.jar
+ - io.grpc-grpc-api-1.75.0.jar
+ - io.grpc-grpc-grpclb-1.75.0.jar
+ - io.grpc-grpc-netty-shaded-1.75.0.jar
+ - io.grpc-grpc-services-1.75.0.jar
+ - io.grpc-grpc-xds-1.75.0.jar
+ - io.grpc-grpc-rls-1.75.0.jar
+ - io.grpc-grpc-servlet-1.75.0.jar
+ - io.grpc-grpc-servlet-jakarta-1.75.0.jar
- io.grpc-grpc-util-1.60.0.jar
- - io.grpc-grpc-opentelemetry-1.72.0.jar
- - io.grpc-grpc-gcp-csm-observability-1.72.0.jar
- - io.grpc-grpc-inprocess-1.72.0.jar
+ - io.grpc-grpc-opentelemetry-1.75.0.jar
+ - io.grpc-grpc-gcp-csm-observability-1.75.0.jar
+ - io.grpc-grpc-inprocess-1.75.0.jar
* Perfmark
- io.perfmark-perfmark-api-0.26.0.jar
* OpenCensus
@@ -484,8 +484,8 @@ The Apache Software License, Version 2.0
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- - io.github.oxia-db-oxia-client-api-0.6.2.jar
- - io.github.oxia-db-oxia-client-0.6.2.jar
+ - io.github.oxia-db-oxia-client-api-0.7.0.jar
+ - io.github.oxia-db-oxia-client-0.7.0.jar
* OpenHFT
- net.openhft-zero-allocation-hashing-0.16.jar
* Java JSON WebTokens
@@ -541,7 +541,7 @@ The Apache Software License, Version 2.0
- io.opentelemetry.instrumentation-opentelemetry-runtime-telemetry-java8-1.33.6-alpha.jar
- io.opentelemetry.semconv-opentelemetry-semconv-1.29.0-alpha.jar
- com.google.cloud.opentelemetry-detector-resources-support-0.33.0.jar
- - io.opentelemetry.contrib-opentelemetry-gcp-resources-1.43.0-alpha.jar
+ - io.opentelemetry.contrib-opentelemetry-gcp-resources-1.48.0-alpha.jar
* Spotify completable-futures
- com.spotify-completable-futures-0.3.6.jar
* JSpecify
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
index c3759a533a571..b1d777dbbf287 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java
@@ -28,6 +28,7 @@
public class ManagedLedgerAttributes {
private final Attributes attributes;
+ private final Attributes attributesOnlyNamespace;
private final Attributes attributesOperationSucceed;
private final Attributes attributesOperationFailure;
@@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
);
+ attributesOnlyNamespace = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
+ );
attributesOperationSucceed = Attributes.builder()
.putAll(attributes)
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 8a4f0fa3a3c92..a452c6682a53b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetadataStore metadataStore;
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+ @Getter
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 634afccf6ac39..92e3052588af8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) {
public void addAddEntrySample(long size) {
addEntryOps.recordEvent(size);
entryStats.addValue(size);
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordEntrySize(size, managedLedger);
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
}
@@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) {
public void addAddEntryLatencySample(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordAddEntryLatency(latency, unit, managedLedger);
}
public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordLedgerAddEntryLatency(latency, unit, managedLedger);
}
public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
+ managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
+ .recordLedgerSwitchLatency(latency, unit, managedLedger);
}
public void addReadEntriesSample(int count, long totalSize) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
index 26c4b62cf7694..6e86532bdd864 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java
@@ -20,12 +20,18 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongHistogram;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.opentelemetry.Constants;
public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
+ // ml-level metrics
+
// Replaces pulsar_ml_AddEntryMessagesRate
public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count";
private final ObservableLongMeasurement addEntryCounter;
@@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
private final BatchCallback batchCallback;
+ // namespace-level metrics
+
+ // Histograms support only synchronous mode, so record measurements directly.
+ // Synchronous histograms currently do not support delete operations.
+ // Therefore, use only namespace-level attributes to avoid leaking high-cardinality attributes (e.g. topic name).
+ // See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md
+
+ // Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
+ // 'pulsar_storage_write_latency_*']
+ public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency";
+ private final DoubleHistogram addEntryLatencyHistogram;
+
+ // Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
+ // 'pulsar_storage_ledger_write_latency_*']
+ public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
+ "pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
+ private final DoubleHistogram ledgerAddEntryLatencyHistogram;
+
+ // Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
+ public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
+ "pulsar.broker.managed_ledger.ledger.switch.latency";
+ private final DoubleHistogram ledgerSwitchLatencyHistogram;
+
+ // Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW',
+ // 'pulsar_entry_size_*']
+ public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size";
+ private final LongHistogram entrySizeHistogram;
+
public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
@@ -124,6 +158,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge
bytesInCounter,
readEntryCacheMissCounter,
markDeleteCounter);
+
+ addEntryLatencyHistogram = meter
+ .histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
+ .setDescription("End-to-end write latency, including time spent in the executor queue.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ ledgerAddEntryLatencyHistogram = meter
+ .histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
+ .setDescription("End-to end write latency.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ ledgerSwitchLatencyHistogram = meter
+ .histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
+ .setDescription("Time taken to switch to a new ledger.")
+ .setUnit("s")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
+
+ entrySizeHistogram = meter
+ .histogramBuilder(ENTRY_SIZE_HISTOGRAM)
+ .ofLongs()
+ .setDescription("Size of entries written to the ledger.")
+ .setUnit("By")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L,
+ 102_400L, 1_048_576L))
+ .build();
}
@Override
@@ -151,4 +218,24 @@ private void recordMetrics(ManagedLedger ml) {
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes);
}
+
+ void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
+ final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
+ }
+
+ void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
+ final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
+ }
+
+ void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) {
+ final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
+ }
+
+ void recordEntrySize(long entrySize, ManagedLedger ml) {
+ final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
+ this.entrySizeHistogram.record(entrySize, attributes);
+ }
}
diff --git a/pom.xml b/pom.xml
index 408be82ce3e3e..b437518c3aa18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -221,7 +221,7 @@ flexible messaging model and an intuitive client API.
1.17
3.25.5
${protobuf3.version}
- 1.72.0
+ 1.75.0
1.41.0
0.26.0
${grpc.version}
@@ -296,7 +296,7 @@ flexible messaging model and an intuitive client API.
4.5.13
4.4.15
0.7.7
- 0.6.2
+ 0.7.0
2.0
1.10.12
5.5.0
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e8253771eded4..3ec6f5a0cd5e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -67,7 +67,6 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
-import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -899,7 +898,7 @@ private CompletableFuture> handleTopicEpochForExclusiveProducer(P
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
- PUBLISH_LATENCY.observe(latency, unit);
+ brokerService.getPulsarStats().recordPublishLatency(latency, unit);
}
@Override
@@ -908,15 +907,6 @@ public long increasePublishLimitedTimes() {
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}
- private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
- .quantile(0.0)
- .quantile(0.50)
- .quantile(0.95)
- .quantile(0.99)
- .quantile(0.999)
- .quantile(0.9999)
- .quantile(1.0)
- .register();
@Override
public void incrementPublishCount(Producer producer, int numOfMessages, long msgSizeInBytes) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
index b96e00a8909d6..45a0e8b42f0cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
@@ -280,4 +281,8 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
+
+ public void recordPublishLatency(long latency, TimeUnit unit) {
+ brokerOperabilityMetrics.recordPublishLatency(latency, unit);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
index 1855e1798b465..310c14d4afa33 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java
@@ -18,18 +18,22 @@
*/
package org.apache.pulsar.broker.stats;
+import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Counter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
+import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
/**
*/
@@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
"pulsar.broker.connection.create.operation.count";
private final ObservableLongCounter connectionCreateCounter;
+ public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = "pulsar.broker.topic.publish.latency";
+ private final DoubleHistogram topicPublishLatencyHistogram;
+ @PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+ private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
+ .quantile(0.0)
+ .quantile(0.50)
+ .quantile(0.95)
+ .quantile(0.99)
+ .quantile(0.999)
+ .quantile(0.9999)
+ .quantile(1.0)
+ .register();
+
public BrokerOperabilityMetrics(PulsarService pulsar) {
this.metricsList = new ArrayList<>();
this.localCluster = pulsar.getConfiguration().getClusterName();
@@ -87,6 +104,14 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes);
measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes);
});
+
+ this.topicPublishLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
+ .histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
+ .setUnit("s")
+ .setDescription("The latency in seconds for publishing messages")
+ .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
+ 0.2, 0.5, 1.0, 5.0, 30.0))
+ .build();
}
@Override
@@ -195,4 +220,9 @@ public void recordHealthCheckStatusSuccess() {
public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}
+
+ public void recordPublishLatency(long latency, TimeUnit unit) {
+ this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 1000.0);
+ PUBLISH_LATENCY.observe(latency, unit);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
index 3bfbf2064e156..76cad804e1ed3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java
@@ -121,4 +121,19 @@ public static void assertMetricDoubleGaugeValue(Collection metrics,
valueConsumer.accept(point.getValue());
}))));
}
+
+ public static void assertMetricHistogramValue(Collection metrics, String metricName,
+ Attributes attributes, Consumer countConsumer,
+ Consumer sumConsumer) {
+ final Map, Object> attributesMap = attributes.asMap();
+ assertThat(metrics).anySatisfy(metric -> assertThat(metric)
+ .hasName(metricName)
+ .hasHistogramSatisfying(histogram -> histogram.satisfies(
+ histoData -> assertThat(histoData.getPoints()).anySatisfy(
+ point -> {
+ assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
+ countConsumer.accept(point.getCount());
+ sumConsumer.accept(point.getSum());
+ }))));
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
index 87e751fcc5952..394af363d99b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;
+import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,6 +136,9 @@ public void testManagedLedgerMetrics() throws Exception {
OpenTelemetryAttributes.ML_NAME, mlName,
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
);
+ final var attribOnlyNamespace = Attributes.of(
+ OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
+ );
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
Awaitility.await().untilAsserted(() -> {
@@ -189,6 +193,16 @@ public void testManagedLedgerMetrics() throws Exception {
value -> assertThat(value).isGreaterThanOrEqualTo(0));
assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
attribCommon, value -> assertThat(value).isGreaterThanOrEqualTo(0));
+
+ assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
+ attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
+ attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
+ attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
index 4378e6b05b3ee..e197f3bc62192 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;
+import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -101,4 +104,27 @@ public void testBrokerConnection() throws Exception {
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
ConnectionCreateStatus.FAILURE.attributes, 1);
}
+
+ @Test
+ public void testPublishLatency() throws Exception {
+ final var topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency");
+ @Cleanup
+ final var producer = pulsarClient.newProducer().topic(topicName).create();
+
+ producer.send(("msg").getBytes());
+
+ var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+ Attributes.empty(), count -> assertThat(count).isEqualTo(1L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+
+ for (int i = 0; i < 9; i++) {
+ producer.send(("msg-" + i).getBytes());
+ }
+
+ metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
+ Attributes.empty(), count -> assertThat(count).isEqualTo(10L),
+ sum -> assertThat(sum).isGreaterThan(0.0));
+ }
}
diff --git a/pulsar-client-all/pom.xml b/pulsar-client-all/pom.xml
index d0cbb7b70f4c5..2b6b9a664b447 100644
--- a/pulsar-client-all/pom.xml
+++ b/pulsar-client-all/pom.xml
@@ -494,6 +494,17 @@
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+
+ attach-sources
+ none
+
+
+
org.apache.maven.plugins
maven-enforcer-plugin
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index d2e994efc53f4..34c1d3a2aa9f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
-import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabled;
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@@ -53,9 +53,9 @@ public class AvroSchema extends AvroBaseStructSchema {
private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
- boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
+ boolean jsr310ConversionEnabled = getJsr310ConversionEnabled(schemaInfo);
setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader,
- getJsr310ConversionEnabledFromSchemaInfo(schemaInfo)));
+ getJsr310ConversionEnabled(schemaInfo)));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
index 85d4d63a1b136..300e71991778e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema.reader;
-import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabled;
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.SchemaReader;
@@ -49,7 +49,7 @@ protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) {
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition(), schemaInfo);
}
- boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
+ boolean jsr310ConversionEnabled = getJsr310ConversionEnabled(schemaInfo);
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
readerSchema, pojoClassLoader, jsr310ConversionEnabled);
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 7af45734b89a3..99fe54ae7b5bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -33,7 +33,16 @@
public class SchemaUtil {
- public static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
+ private static Boolean globalJsr310ConversionEnabled = null;
+
+ public static void setGlobalJsr310ConversionEnabled(Boolean globalJsr310ConversionEnabled) {
+ SchemaUtil.globalJsr310ConversionEnabled = globalJsr310ConversionEnabled;
+ }
+
+ public static boolean getJsr310ConversionEnabled(SchemaInfo schemaInfo) {
+ if (globalJsr310ConversionEnabled != null) {
+ return globalJsr310ConversionEnabled;
+ }
if (schemaInfo != null) {
return Boolean.parseBoolean(schemaInfo.getProperties()
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/util/SchemaUtilTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/util/SchemaUtilTest.java
new file mode 100644
index 0000000000000..d80a25873a8ee
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/util/SchemaUtilTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.util;
+
+import static org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+public class SchemaUtilTest {
+
+ @Test
+ public void schemaWithoutJsr310EnabledPropertyReturnsFalse() {
+ SchemaUtil.setGlobalJsr310ConversionEnabled(null);
+ SchemaInfo schemaInfo = emptyPropertiesSchema();
+ boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
+ assertFalse(isJsr310Enabled);
+ }
+
+ @Test
+ public void schemaWithJsr310DisabledPropertyReturnsFalse() {
+ SchemaUtil.setGlobalJsr310ConversionEnabled(null);
+ SchemaInfo schemaInfo = disabledJsr310PropertiesSchema();
+ boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
+ assertFalse(isJsr310Enabled);
+ }
+
+ @Test
+ public void schemaWithJsr310EnabledPropertyReturnsTrue() {
+ SchemaUtil.setGlobalJsr310ConversionEnabled(null);
+ SchemaInfo schemaInfo = enabledJsr310PropertiesSchema();
+ boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(schemaInfo);
+ assertTrue(isJsr310Enabled);
+ }
+
+ @Test
+ public void globalJsr310DisabledAlwaysReturnsFalse() {
+ SchemaUtil.setGlobalJsr310ConversionEnabled(false);
+ boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(emptyPropertiesSchema());
+ assertFalse(isJsr310Enabled);
+ isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(disabledJsr310PropertiesSchema());
+ assertFalse(isJsr310Enabled);
+ isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(enabledJsr310PropertiesSchema());
+ assertFalse(isJsr310Enabled);
+ }
+
+ @Test
+ public void globalJsr310EnabledAlwaysReturnsTrue() {
+ SchemaUtil.setGlobalJsr310ConversionEnabled(true);
+ boolean isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(emptyPropertiesSchema());
+ assertTrue(isJsr310Enabled);
+ isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(disabledJsr310PropertiesSchema());
+ assertTrue(isJsr310Enabled);
+ isJsr310Enabled = SchemaUtil.getJsr310ConversionEnabled(enabledJsr310PropertiesSchema());
+ assertTrue(isJsr310Enabled);
+ }
+
+ private static SchemaInfo emptyPropertiesSchema() {
+ return SchemaInfo.builder()
+ .schema("{\"type\": \"string\"}".getBytes())
+ .type(SchemaType.AVRO)
+ .name("unitTest")
+ .properties(new HashMap<>())
+ .build();
+ }
+
+ private static SchemaInfo disabledJsr310PropertiesSchema() {
+ Map properties = new HashMap<>();
+ properties.put(JSR310_CONVERSION_ENABLED, "false");
+
+ return SchemaInfo.builder()
+ .schema("{\"type\": \"string\"}".getBytes())
+ .type(SchemaType.AVRO)
+ .name("unitTest")
+ .properties(properties)
+ .build();
+ }
+
+ private static SchemaInfo enabledJsr310PropertiesSchema() {
+ Map properties = new HashMap<>();
+ properties.put(JSR310_CONVERSION_ENABLED, "true");
+
+ return SchemaInfo.builder()
+ .schema("{\"type\": \"string\"}".getBytes())
+ .type(SchemaType.AVRO)
+ .name("unitTest")
+ .properties(properties)
+ .build();
+ }
+
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
index 78ceffb79c4b9..9e00cf61dccd6 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
@@ -477,9 +477,9 @@ public void testPeriodicTaskCancellation() throws Exception {
assertTrue(cancelled);
assertTrue(future.isCancelled());
- // Wait and verify no more executions
+ // Wait and verify no more executions (allowing one execution that was in progress during cancellation)
Thread.sleep(200);
- assertEquals(executionCount.get(), countBeforeCancel);
+ assertTrue(executionCount.get() - countBeforeCancel <= 1);
}
@Test
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
index 7f0dc6fba10f3..058e447d49a95 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java
@@ -20,14 +20,14 @@
import io.opentelemetry.api.OpenTelemetry;
import io.oxia.client.api.AsyncOxiaClient;
-import io.oxia.client.api.DeleteOption;
import io.oxia.client.api.Notification;
import io.oxia.client.api.OxiaClientBuilder;
-import io.oxia.client.api.PutOption;
import io.oxia.client.api.PutResult;
import io.oxia.client.api.Version;
import io.oxia.client.api.exceptions.KeyAlreadyExistsException;
import io.oxia.client.api.exceptions.UnexpectedVersionIdException;
+import io.oxia.client.api.options.DeleteOption;
+import io.oxia.client.api.options.PutOption;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
@@ -133,7 +133,7 @@ Optional convertGetResult(
return Optional.of(result)
.map(
oxiaResult ->
- new GetResult(oxiaResult.getValue(), convertStat(path, oxiaResult.getVersion())));
+ new GetResult(oxiaResult.value(), convertStat(path, oxiaResult.version())));
}
Stat convertStat(String path, Version version) {
diff --git a/src/settings.xml b/src/settings.xml
index 80ec2f40620e7..b27c935634037 100644
--- a/src/settings.xml
+++ b/src/settings.xml
@@ -26,7 +26,7 @@
${env.APACHE_USER}
${env.APACHE_PASSWORD}
-
+
apache.releases.https
@@ -34,17 +34,4 @@
${env.APACHE_PASSWORD}
-
-
-
- apache
-
- true
-
-
- ${env.GPG_EXECUTABLE}
- ${env.GPG_PASSPHRASE}
-
-
-