diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bd4917da3b119..4359158dc9fb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -41,6 +41,7 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; +import io.prometheus.client.Gauge; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -293,6 +294,12 @@ enum State { private final ServerCnxThrottleTracker throttleTracker = new ServerCnxThrottleTracker(this); + private static final Gauge clientIpAndRoleConnections = Gauge.build() + .name("pulsar_broker_client_ip_role_connections") + .labelNames("ip", "role") + .help("The number of client IP and role connections") + .register(); + public ServerCnx(PulsarService pulsar) { this(pulsar, null); @@ -369,6 +376,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); connectionController.decreaseConnection(ctx.channel().remoteAddress()); + clientIpAndRoleConnections.labels(clientSourceAddress(), getAuthRole()).dec(); isActive = false; log.info("Closed connection from {}", remoteAddress); if (brokerInterceptor != null) { @@ -740,6 +748,7 @@ private void completeConnect(int clientProtoVersion, String clientVersion) { } maybeScheduleAuthenticationCredentialsRefresh(); } + clientIpAndRoleConnections.labels(clientSourceAddress(), getAuthRole()).inc(); writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation)); state = State.Connected; service.getPulsarStats().recordConnectionCreateSuccess(); @@ -3411,7 +3420,11 @@ public AuthenticationProvider getAuthenticationProvider() { @Override public String getAuthRole() { - return authRole; + if (authRole == null) { + return ""; + } else { + return authRole; + } } public String getAuthMethod() {