Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.prometheus.client.Gauge;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand Down Expand Up @@ -293,6 +294,12 @@ enum State {

private final ServerCnxThrottleTracker throttleTracker = new ServerCnxThrottleTracker(this);

private static final Gauge clientIpAndRoleConnections = Gauge.build()
.name("pulsar_broker_client_ip_role_connections")
.labelNames("ip", "role")
.help("The number of client IP and role connections")
.register();


public ServerCnx(PulsarService pulsar) {
this(pulsar, null);
Expand Down Expand Up @@ -369,6 +376,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
connectionController.decreaseConnection(ctx.channel().remoteAddress());
clientIpAndRoleConnections.labels(clientSourceAddress(), getAuthRole()).dec();
isActive = false;
log.info("Closed connection from {}", remoteAddress);
if (brokerInterceptor != null) {
Expand Down Expand Up @@ -740,6 +748,7 @@ private void completeConnect(int clientProtoVersion, String clientVersion) {
}
maybeScheduleAuthenticationCredentialsRefresh();
}
clientIpAndRoleConnections.labels(clientSourceAddress(), getAuthRole()).inc();
writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, enableSubscriptionPatternEvaluation));
state = State.Connected;
service.getPulsarStats().recordConnectionCreateSuccess();
Expand Down Expand Up @@ -3411,7 +3420,11 @@ public AuthenticationProvider getAuthenticationProvider() {

@Override
public String getAuthRole() {
return authRole;
if (authRole == null) {
return "";
} else {
return authRole;
}
}

public String getAuthMethod() {
Expand Down