diff --git a/conf/broker.conf b/conf/broker.conf index ea98ad4a9b5d2..94b1d76394d66 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1538,6 +1538,9 @@ authenticateMetricsEndpoint=false # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable consumer level metrics. default is false exposeConsumerLevelMetricsInPrometheus=false diff --git a/conf/standalone.conf b/conf/standalone.conf index a916a2f477e8f..0801a15dc3dc6 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -968,6 +968,9 @@ webSocketMaxTextFrameSize=1048576 # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Time in milliseconds that metrics endpoint would time out. Default is 30s. # Increase it if there are a lot of topics to expose topic-level metrics. # Set it to 0 to disable timeout. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e088f50a05c88..a5ac981a896f9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2983,6 +2983,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean exposeBundlesMetricsInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable expose per rest endpoint metrics of the broker.") + private boolean exposePerRestEndpointMetricsInPrometheus = false; + /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c1137bcfc25b7..089c679170bfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -761,7 +761,6 @@ public void start() throws PulsarServerException { config.getBacklogQuotaDefaultLimitSecond(), config.getDefaultRetentionTimeInMinutes() * 60)); } - localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java new file mode 100644 index 0000000000000..db6a51c63dc77 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.semconv.SemanticAttributes; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Stack; +import javax.validation.constraints.NotNull; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; + +public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { + private final LoadingCache cache = CacheBuilder + .newBuilder() + .maximumSize(100) + .expireAfterAccess(Duration.ofMinutes(1)) + .build(new CacheLoader<>() { + @Override + public @NotNull Attributes load(@NotNull ResourceMethod method) throws Exception { + return Attributes.of(PATH, getRestPath(method), METHOD, method.getHttpMethod()); + } + }); + + private static final String REQUEST_START_TIME = "requestStartTime"; + private static final AttributeKey PATH = SemanticAttributes.URL_PATH; + private static final AttributeKey METHOD = SemanticAttributes.HTTP_REQUEST_METHOD; + private static final AttributeKey CODE = SemanticAttributes.HTTP_RESPONSE_STATUS_CODE; + + private final DoubleHistogram latency; + private final LongCounter failed; + + private RestEndpointMetricsFilter(PulsarBrokerOpenTelemetry openTelemetry) { + Meter meter = openTelemetry.getMeter(); + latency = meter.histogramBuilder("pulsar_broker_rest_endpoint_latency") + .setDescription("Latency of REST endpoints in Pulsar broker") + .setUnit("ms") + .setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)) + .build(); + failed = meter.counterBuilder("pulsar_broker_rest_endpoint_failed") + .setDescription("Number of failed REST endpoints in Pulsar broker") + .build(); + } + + private static volatile RestEndpointMetricsFilter instance; + + public static synchronized RestEndpointMetricsFilter create(PulsarBrokerOpenTelemetry openTelemetry) { + if (instance == null) { + instance = new RestEndpointMetricsFilter(openTelemetry); + } + return instance; + } + + @Override + public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + Attributes attrs; + try { + UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); + ResourceMethod rm = info.getMatchedResourceMethod(); + attrs = cache.get(rm); + } catch (Throwable ex) { + attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod()); + } + + Response.StatusType status = resp.getStatusInfo(); + // record failure + if (status.getStatusCode() >= Response.Status.BAD_REQUEST.getStatusCode()) { + recordFailure(attrs, status.getStatusCode()); + return; + } + // record success + Object o = req.getProperty(REQUEST_START_TIME); + if (o instanceof Long start) { + recordSuccess(attrs, System.currentTimeMillis() - start); + } + } + + @Override + public void filter(ContainerRequestContext req) throws IOException { + // Set the request start time into properties. + req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); + } + + + private void recordSuccess(Attributes attrs, long duration) { + latency.record(duration, attrs); + } + + private void recordFailure(Attributes attrs, long code) { + Attributes attributes = attrs.toBuilder().put(CODE, code).build(); + failed.add(1, attributes); + } + + private static String getRestPath(ResourceMethod method) { + try { + StringBuilder fullPath = new StringBuilder(); + Stack pathStack = new Stack<>(); + Resource parent = method.getParent(); + + while (true) { + String path = parent.getPath(); + parent = parent.getParent(); + if (parent == null) { + if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) { + pathStack.push("/"); + } + pathStack.push(path); + break; + } + pathStack.push(path); + + } + while (!pathStack.isEmpty()) { + fullPath.append(pathStack.pop().replace("{", ":").replace("}", "")); + } + return fullPath.toString(); + } catch (Exception ex) { + return "UNKNOWN"; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 902593b7bf678..eeeee2bf4f1ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -192,6 +192,9 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); + if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { + config.register(RestEndpointMetricsFilter.create(pulsar.getOpenTelemetry())); + } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java new file mode 100644 index 0000000000000..53d618429a28a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import java.util.Set; +import java.util.UUID; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRestEndpointMetricsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setExposePerRestEndpointMetricsInPrometheus(true); + baseSetup(); + } + + @BeforeMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testMetrics() throws Exception { + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + // TODO: add more test cases + PrometheusMetricsClient client = new PrometheusMetricsClient("127.0.0.1", pulsar.getListenPortHTTP().get()); + PrometheusMetricsClient.Metrics metrics = client.getMetrics(); + System.out.println(); + +// Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); +// Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); +// +// Assert.assertTrue(latency.size() > 0); +// Assert.assertTrue(failed.size() > 0); +// +// for (PrometheusMetricsClient.Metric m : latency) { +// Assert.assertNotNull(m.tags.get("cluster")); +// Assert.assertNotNull(m.tags.get("path")); +// Assert.assertNotNull(m.tags.get("method")); +// } +// +// for (PrometheusMetricsClient.Metric m : failed) { +// Assert.assertNotNull(m.tags.get("cluster")); +// Assert.assertNotNull(m.tags.get("path")); +// Assert.assertNotNull(m.tags.get("method")); +// Assert.assertNotNull(m.tags.get("code")); +// } + } +} \ No newline at end of file