From d333e48b0176abb0aba8eb17bc2b510842dba8fc Mon Sep 17 00:00:00 2001
From: yyj <1012293987@qq.com>
Date: Thu, 22 Feb 2024 14:23:36 +0800
Subject: [PATCH] [improve][broker]PIP-340 Optimization of Probe Implementation
for Automatic Failover
---
.../ClusterHealthStatusResources.java | 46 ++++++++++
.../broker/resources/PulsarResources.java | 5 ++
.../apache/pulsar/broker/PulsarService.java | 10 +++
.../broker/admin/impl/ClustersBase.java | 86 +++++++++++++++++++
.../pulsar/broker/service/ServerCnx.java | 75 ++++++----------
.../pulsar/broker/web/PulsarWebResource.java | 5 ++
.../apache/pulsar/client/admin/Clusters.java | 27 ++++++
.../client/admin/internal/ClustersImpl.java | 15 ++++
.../apache/pulsar/admin/cli/CmdClusters.java | 27 ++++++
.../client/impl/AutoClusterFailover.java | 9 +-
.../apache/pulsar/client/impl/ClientCnx.java | 44 +++-------
.../pulsar/common/protocol/Commands.java | 61 +++----------
.../pulsar/common/protocol/PulsarDecoder.java | 79 ++++-------------
pulsar-common/src/main/proto/PulsarApi.proto | 14 +++
14 files changed, 317 insertions(+), 186 deletions(-)
create mode 100644 pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java
new file mode 100644
index 0000000000000..4a17cda16b9c0
--- /dev/null
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+
+public class ClusterHealthStatusResources extends BaseResources {
+ public static final String BASE_PATH = "/health-status/";
+
+ public ClusterHealthStatusResources(MetadataStore store, int operationTimeoutSec) {
+ super(store, String.class, operationTimeoutSec);
+ }
+
+ public void updateHealthStatus(String clusterName, Function modifyFunction)
+ throws MetadataStoreException {
+ set(joinPath(BASE_PATH, clusterName), modifyFunction);
+ }
+
+ public Optional getHealthStatus(String clusterName) throws MetadataStoreException {
+ return get(joinPath(BASE_PATH, clusterName));
+ }
+
+ public enum Status {
+ available,
+ unavailable
+ }
+}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index fe7ffe0bc7b43..9491a603015a9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -34,6 +34,8 @@ public class PulsarResources {
@Getter
private final ClusterResources clusterResources;
@Getter
+ private final ClusterHealthStatusResources clusterHealthStatusResources;
+ @Getter
private final ResourceGroupResources resourcegroupResources;
@Getter
private final NamespaceResources namespaceResources;
@@ -63,11 +65,14 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
+ clusterHealthStatusResources = new ClusterHealthStatusResources(localMetadataStore,
+ operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
clusterResources = null;
+ clusterHealthStatusResources = null;
namespaceResources = null;
resourcegroupResources = null;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 054411c49f6ef..cdee7b94c6836 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -34,6 +34,7 @@
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -96,6 +97,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
+import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
@@ -752,6 +754,14 @@ public void start() throws PulsarServerException {
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer);
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
+ String healthStatusPath = ClusterHealthStatusResources.BASE_PATH
+ + config.getClusterName();
+ if (!localMetadataStore.exists(healthStatusPath).get(30000, TimeUnit.MICROSECONDS)) {
+ localMetadataStore.put(healthStatusPath,
+ ClusterHealthStatusResources.Status.available.name().getBytes(StandardCharsets.UTF_8),
+ Optional.of(-1L));
+ }
+
coordinationService = new CoordinationServiceImpl(localMetadataStore);
if (config.isConfigurationStoreSeparated()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 2f064d7b37720..02dabb4d1c5c2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -1017,6 +1017,92 @@ public void deleteFailureDomain(
});
}
+ @POST
+ @Path("/{cluster}/updateHealthStatus}")
+ @ApiOperation(
+ value = "Update cluster health status.",
+ notes = "This operation requires Pulsar superuser privileges."
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission."),
+ @ApiResponse(code = 412, message = "Cluster doesn't exist."),
+ @ApiResponse(code = 500, message = "Internal server error.")
+ })
+ public void updateHealthStatus(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
+ @PathParam("cluster") String cluster,
+ @ApiParam(value = "The status info", required = true) Map status
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenAccept(__ -> {
+ try {
+ clusterHealthStatusResources().updateHealthStatus(cluster,
+ old -> status.getOrDefault("status", ""));
+ log.info("[{}] Successful update health status for cluster {}", clientAppId(), cluster);
+ asyncResponse.resume(Response.noContent().build());
+ } catch (MetadataStoreException e) {
+ log.error("Update cluster {} health status error", cluster, e);
+ }
+ })
+ .exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failed to update health status for cluster. clusters {} Does not exist",
+ clientAppId(), cluster);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "cluster " + cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to update clusters/{}/{}", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @GET
+ @Path("/{cluster}/getHealthStatus}")
+ @ApiOperation(
+ value = "Get cluster health status.",
+ notes = "This operation requires Pulsar superuser privileges."
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission."),
+ @ApiResponse(code = 412, message = "Cluster doesn't exist."),
+ @ApiResponse(code = 500, message = "Internal server error.")
+ })
+ public void getHealthStatus(
+ @Suspended AsyncResponse asyncResponse,
+ @ApiParam(value = "The cluster name", required = true)
+ @PathParam("cluster") String cluster
+ ) {
+ validateSuperUserAccessAsync()
+ .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
+ .thenAccept(__ -> {
+ try {
+ String status = clusterHealthStatusResources().getHealthStatus(cluster).get();
+ log.info("[{}] Successful get health status for cluster {}", clientAppId(), cluster);
+ asyncResponse.resume(Response.ok(status));
+ } catch (MetadataStoreException e) {
+ log.error("Update cluster {} health status error", cluster, e);
+ }
+ })
+ .exceptionally(ex -> {
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (realCause instanceof NotFoundException) {
+ log.warn("[{}] Failed to get health status for cluster. clusters {} Does not exist",
+ clientAppId(), cluster);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "cluster " + cluster + " does not exist"));
+ return null;
+ }
+ log.error("[{}] Failed to get clusters/{}/{}", clientAppId(), cluster, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
private CompletableFuture validateBrokerExistsInOtherDomain(final String cluster,
final String inputDomainName,
final FailureDomainImpl inputDomain) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bd4917da3b119..0c899c920d806 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -53,11 +53,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
@@ -81,6 +77,7 @@
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -91,6 +88,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.BaseResource;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -98,51 +96,10 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.api.proto.BaseCommand;
-import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAuthResponse;
-import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
-import org.apache.pulsar.common.api.proto.CommandCloseProducer;
-import org.apache.pulsar.common.api.proto.CommandConnect;
-import org.apache.pulsar.common.api.proto.CommandConsumerStats;
-import org.apache.pulsar.common.api.proto.CommandEndTxn;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
-import org.apache.pulsar.common.api.proto.CommandFlow;
-import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
-import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
-import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandNewTxn;
-import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
-import org.apache.pulsar.common.api.proto.CommandProducer;
-import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
-import org.apache.pulsar.common.api.proto.CommandSeek;
-import org.apache.pulsar.common.api.proto.CommandSend;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.*;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
-import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
-import org.apache.pulsar.common.api.proto.CompressionType;
-import org.apache.pulsar.common.api.proto.FeatureFlags;
-import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.api.proto.KeyValue;
-import org.apache.pulsar.common.api.proto.MessageIdData;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.api.proto.ProducerAccessMode;
-import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.apache.pulsar.common.api.proto.Schema;
-import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
@@ -1114,6 +1071,30 @@ protected void handleConnect(CommandConnect connect) {
}
}
+
+ @Override
+ protected void handleHealthCheck(CommandHealthCheck healthCheck) {
+ checkArgument(state == State.Start);
+
+ String clusterName = this.service.pulsar().getConfig().getClusterName();
+ try {
+ byte[] status = this.service.pulsar().getLocalMetadataStore()
+ .get(ClusterHealthStatusResources.BASE_PATH + clusterName)
+ .get(30000, TimeUnit.MILLISECONDS)
+ .get()
+ .getValue();
+ if (String.valueOf(status).equalsIgnoreCase(ClusterHealthStatusResources.Status.available.name())) {
+ writeAndFlush(Commands.newHealthCheckResponse(true));
+ } else {
+ writeAndFlush(Commands.newHealthCheckResponse(false));
+ }
+ } catch (Exception e) {
+ log.error("cluster health status check error.", e);
+ writeAndFlush(Commands.newError(-1L, ServerError.UnknownError,
+ "cluster health status check error."));
+ }
+ }
+
@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(authResponse.hasResponse());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e23286ae4492e..b0bb340bad0f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -62,6 +62,7 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
+import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LoadBalanceResources;
@@ -1104,6 +1105,10 @@ protected ClusterResources clusterResources() {
return pulsar().getPulsarResources().getClusterResources();
}
+ protected ClusterHealthStatusResources clusterHealthStatusResources() {
+ return pulsar().getPulsarResources().getClusterHealthStatusResources();
+ }
+
protected BookieResources bookieResources() {
return pulsar().getPulsarResources().getBookieResources();
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 53e6680946566..f928cc6b7a6a6 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -809,4 +809,31 @@ void updateFailureDomain(String cluster, String domainName, FailureDomain domain
*/
CompletableFuture getFailureDomainAsync(String cluster, String domainName);
+ /**
+ * Update health status for a cluster.
+ *
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @param status
+ * health status
+ *
+ * @return
+ *
+ */
+ CompletableFuture updateHealthStatusAsync(String cluster, String status);
+
+ /**
+ * Get health status for a cluster.
+ *
+ *
+ * @param cluster
+ * Cluster name
+ *
+ * @return
+ *
+ */
+ CompletableFuture getHealthStatusAsync(String cluster);
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 231d4506d6173..b975941af5396 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -319,6 +319,21 @@ public CompletableFuture getFailureDomainAsync(String cluster, St
.thenApply(failureDomain -> failureDomain);
}
+ @Override
+ public CompletableFuture updateHealthStatusAsync(String cluster, String status) {
+ WebTarget path = adminClusters.path(cluster).path("updateHealthStatus");
+ Map statusMap = new HashMap<>();
+ statusMap.put("status", status);
+ return asyncPostRequest(path, Entity.entity(statusMap, MediaType.APPLICATION_JSON_TYPE));
+ }
+
+ @Override
+ public CompletableFuture getHealthStatusAsync(String cluster) {
+ WebTarget path = adminClusters.path(cluster).path("getHealthStatus");
+ return asyncGetRequest(path, new FutureCallback() {})
+ .thenApply(status -> status);
+ }
+
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
sync(() -> setDomainAsync(cluster, domainName, domain));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
index 0ea56e4430951..d7d1ad4967f2a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java
@@ -286,6 +286,31 @@ void run() throws PulsarAdminException {
}
}
+ @Parameters(commandDescription = "Update cluster health status, available or unavailable. default available.")
+ private class UpdateHealthStatus extends CliCommand {
+ @Parameter(description = "cluster-name", required = true)
+ private java.util.List params;
+
+ @Parameter(names = "--status", description = "status", required = true)
+ private String status;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ getAdmin().clusters().updateHealthStatusAsync(cluster, status);
+ }
+ }
+
+ @Parameters(commandDescription = "Get cluster health status, available or unavailable.")
+ private class GetHealthStatus extends CliCommand {
+ @Parameter(description = "cluster-name", required = true)
+ private java.util.List params;
+
+ void run() throws PulsarAdminException {
+ String cluster = getOneArgument(params);
+ print(getAdmin().clusters().getHealthStatusAsync(cluster));
+ }
+ }
+
/**
* Base command.
*/
@@ -493,6 +518,8 @@ public CmdClusters(Supplier admin) {
jcommander.addCommand("update-failure-domain", new UpdateFailureDomain());
jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain());
jcommander.addCommand("list-failure-domains", new ListFailureDomains());
+ jcommander.addCommand("update-health-status", new UpdateHealthStatus());
+ jcommander.addCommand("get-health-status", new GetHealthStatus());
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 68b781e67d29c..ab809ee112486 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.protocol.Commands;
@Slf4j
@Data
@@ -131,7 +132,13 @@ boolean probeAvailable(String url) {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort()), TIMEOUT);
socket.close();
- return true;
+
+ ClientCnx clientCnx = pulsarClient.getCnxPool()
+ .getConnection(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort()))
+ .get();
+ clientCnx.ctx().writeAndFlush(Commands.newHealthCheck()).sync();
+
+ return clientCnx.isClusterAvailable();
} catch (Exception e) {
log.warn("Failed to probe available, url: {}", url, e);
return false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 75e84eeca3e6a..f209a7b7eef3a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -64,38 +64,8 @@
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.api.proto.BaseCommand;
-import org.apache.pulsar.common.api.proto.CommandAckResponse;
-import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
-import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
-import org.apache.pulsar.common.api.proto.CommandCloseProducer;
-import org.apache.pulsar.common.api.proto.CommandConnected;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandError;
-import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse;
-import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
-import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
-import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
-import org.apache.pulsar.common.api.proto.CommandMessage;
-import org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
-import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
-import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic;
-import org.apache.pulsar.common.api.proto.CommandSendError;
-import org.apache.pulsar.common.api.proto.CommandSendReceipt;
-import org.apache.pulsar.common.api.proto.CommandSuccess;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
-import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
+import org.apache.pulsar.common.api.proto.*;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
-import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
@@ -120,6 +90,9 @@ public class ClientCnx extends PulsarHandler {
private AtomicLong duplicatedResponseCounter = new AtomicLong(0);
+ @Getter
+ private static volatile boolean clusterAvailable = true;
+
@Getter
private final ConcurrentLongHashMap> pendingRequests =
ConcurrentLongHashMap.>newBuilder()
@@ -366,6 +339,15 @@ public long getDuplicatedResponseCount() {
return duplicatedResponseCounter.get();
}
+ @Override
+ protected void handleHealthCheckResponse(CommandHealthCheckResponse healthCheckResponse) {
+ if (healthCheckResponse.hasAvailable()) {
+ clusterAvailable = healthCheckResponse.isAvailable();
+ } else {
+ clusterAvailable = false;
+ }
+ }
+
@Override
protected void handleConnected(CommandConnected connected) {
checkArgument(state == State.SentConnectFrame || state == State.Connecting);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 34d47e2836bb2..b1cc04c45a517 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -38,6 +38,7 @@
import java.util.Set;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
+import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.PulsarVersion;
@@ -47,61 +48,15 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
-import org.apache.pulsar.common.api.proto.AuthMethod;
-import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.*;
import org.apache.pulsar.common.api.proto.BaseCommand.Type;
-import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
-import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
-import org.apache.pulsar.common.api.proto.CommandAckResponse;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
-import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
-import org.apache.pulsar.common.api.proto.CommandCloseProducer;
-import org.apache.pulsar.common.api.proto.CommandConnect;
-import org.apache.pulsar.common.api.proto.CommandConnected;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
-import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
-import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
-import org.apache.pulsar.common.api.proto.CommandMessage;
-import org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
-import org.apache.pulsar.common.api.proto.CommandProducer;
-import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
-import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
-import org.apache.pulsar.common.api.proto.CommandSeek;
-import org.apache.pulsar.common.api.proto.CommandSend;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
-import org.apache.pulsar.common.api.proto.FeatureFlags;
-import org.apache.pulsar.common.api.proto.IntRange;
-import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.api.proto.KeySharedMode;
-import org.apache.pulsar.common.api.proto.KeyValue;
-import org.apache.pulsar.common.api.proto.MessageIdData;
-import org.apache.pulsar.common.api.proto.MessageMetadata;
-import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.apache.pulsar.common.api.proto.Schema;
-import org.apache.pulsar.common.api.proto.ServerError;
-import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
-import org.apache.pulsar.common.api.proto.Subscription;
-import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -192,6 +147,18 @@ private static void setFeatureFlags(FeatureFlags flags) {
flags.setSupportsPartialProducer(true);
}
+ public static ByteBuf newHealthCheck() {
+ BaseCommand cmd = localCmd(Type.HEALTH_CHECK);
+ cmd.setHealthCheck();
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newHealthCheckResponse(boolean available) {
+ BaseCommand cmd = localCmd(Type.HEALTH_CHECK_RESPONSE);
+ cmd.setHealthCheckResponse().setAvailable(available);
+ return serializeWithSize(cmd);
+ }
+
public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion,
String targetBroker, String originalPrincipal, String originalAuthData,
String originalAuthMethod) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c1c1ebe355bb9..abd17f5555e94 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,66 +24,7 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.handler.codec.haproxy.HAProxyMessage;
-import org.apache.pulsar.common.api.proto.BaseCommand;
-import org.apache.pulsar.common.api.proto.CommandAck;
-import org.apache.pulsar.common.api.proto.CommandAckResponse;
-import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
-import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
-import org.apache.pulsar.common.api.proto.CommandAuthResponse;
-import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
-import org.apache.pulsar.common.api.proto.CommandCloseProducer;
-import org.apache.pulsar.common.api.proto.CommandConnect;
-import org.apache.pulsar.common.api.proto.CommandConnected;
-import org.apache.pulsar.common.api.proto.CommandConsumerStats;
-import org.apache.pulsar.common.api.proto.CommandConsumerStatsResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxn;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
-import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
-import org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandError;
-import org.apache.pulsar.common.api.proto.CommandFlow;
-import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
-import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse;
-import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
-import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
-import org.apache.pulsar.common.api.proto.CommandGetSchema;
-import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
-import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
-import org.apache.pulsar.common.api.proto.CommandLookupTopic;
-import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
-import org.apache.pulsar.common.api.proto.CommandMessage;
-import org.apache.pulsar.common.api.proto.CommandNewTxn;
-import org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
-import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
-import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
-import org.apache.pulsar.common.api.proto.CommandPing;
-import org.apache.pulsar.common.api.proto.CommandPong;
-import org.apache.pulsar.common.api.proto.CommandProducer;
-import org.apache.pulsar.common.api.proto.CommandProducerSuccess;
-import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic;
-import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
-import org.apache.pulsar.common.api.proto.CommandSeek;
-import org.apache.pulsar.common.api.proto.CommandSend;
-import org.apache.pulsar.common.api.proto.CommandSendError;
-import org.apache.pulsar.common.api.proto.CommandSendReceipt;
-import org.apache.pulsar.common.api.proto.CommandSubscribe;
-import org.apache.pulsar.common.api.proto.CommandSuccess;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
-import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
-import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
-import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
-import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
-import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.api.proto.*;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.slf4j.Logger;
@@ -187,6 +128,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
handleConnected(cmd.getConnected());
break;
+ case HEALTH_CHECK:
+ checkArgument(cmd.hasHealthCheck());
+ handleHealthCheck(cmd.getHealthCheck());
+ break;
+
+ case HEALTH_CHECK_RESPONSE:
+ checkArgument(cmd.hasHealthCheckResponse());
+ handleHealthCheckResponse(cmd.getHealthCheckResponse());
+ break;
+
case ERROR:
checkArgument(cmd.hasError());
handleError(cmd.getError());
@@ -529,6 +480,14 @@ protected void handleConnected(CommandConnected connected) {
throw new UnsupportedOperationException();
}
+ protected void handleHealthCheck(CommandHealthCheck healthCheck) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleHealthCheckResponse(CommandHealthCheckResponse healthCheckResponse) {
+ throw new UnsupportedOperationException();
+ }
+
protected void handleSubscribe(CommandSubscribe subscribe) {
throw new UnsupportedOperationException();
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 387e4e3ff679d..183329047dd83 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -295,6 +295,14 @@ message CommandConnect {
optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy.
}
+message CommandHealthCheck {
+ repeated KeyValue metadata = 1;
+}
+
+message CommandHealthCheckResponse {
+ optional bool available = 1 [default = true];
+}
+
message FeatureFlags {
optional bool supports_auth_refresh = 1 [default = false];
optional bool supports_broker_entry_metadata = 2 [default = false];
@@ -1049,6 +1057,9 @@ message BaseCommand {
WATCH_TOPIC_LIST_CLOSE = 67;
TOPIC_MIGRATED = 68;
+
+ HEALTH_CHECK = 69;
+ HEALTH_CHECK_RESPONSE = 70;
}
@@ -1132,4 +1143,7 @@ message BaseCommand {
optional CommandWatchTopicListClose watchTopicListClose = 67;
optional CommandTopicMigrated topicMigrated = 68;
+
+ optional CommandHealthCheck healthCheck = 69;
+ optional CommandHealthCheckResponse healthCheckResponse = 70;
}