diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java new file mode 100644 index 0000000000000..4a17cda16b9c0 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterHealthStatusResources.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resources; + +import java.util.Optional; +import java.util.function.Function; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreException; + +public class ClusterHealthStatusResources extends BaseResources { + public static final String BASE_PATH = "/health-status/"; + + public ClusterHealthStatusResources(MetadataStore store, int operationTimeoutSec) { + super(store, String.class, operationTimeoutSec); + } + + public void updateHealthStatus(String clusterName, Function modifyFunction) + throws MetadataStoreException { + set(joinPath(BASE_PATH, clusterName), modifyFunction); + } + + public Optional getHealthStatus(String clusterName) throws MetadataStoreException { + return get(joinPath(BASE_PATH, clusterName)); + } + + public enum Status { + available, + unavailable + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index fe7ffe0bc7b43..9491a603015a9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -34,6 +34,8 @@ public class PulsarResources { @Getter private final ClusterResources clusterResources; @Getter + private final ClusterHealthStatusResources clusterHealthStatusResources; + @Getter private final ResourceGroupResources resourcegroupResources; @Getter private final NamespaceResources namespaceResources; @@ -63,11 +65,14 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec); clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore, operationTimeoutSec); + clusterHealthStatusResources = new ClusterHealthStatusResources(localMetadataStore, + operationTimeoutSec); namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec); resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec); } else { tenantResources = null; clusterResources = null; + clusterHealthStatusResources = null; namespaceResources = null; resourcegroupResources = null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 054411c49f6ef..cdee7b94c6836 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -34,6 +34,7 @@ import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -96,6 +97,7 @@ import org.apache.pulsar.broker.resourcegroup.ResourceGroupService; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager; import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager; +import org.apache.pulsar.broker.resources.ClusterHealthStatusResources; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.rest.Topics; @@ -752,6 +754,14 @@ public void start() throws PulsarServerException { localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer); localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent); + String healthStatusPath = ClusterHealthStatusResources.BASE_PATH + + config.getClusterName(); + if (!localMetadataStore.exists(healthStatusPath).get(30000, TimeUnit.MICROSECONDS)) { + localMetadataStore.put(healthStatusPath, + ClusterHealthStatusResources.Status.available.name().getBytes(StandardCharsets.UTF_8), + Optional.of(-1L)); + } + coordinationService = new CoordinationServiceImpl(localMetadataStore); if (config.isConfigurationStoreSeparated()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2f064d7b37720..02dabb4d1c5c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -1017,6 +1017,92 @@ public void deleteFailureDomain( }); } + @POST + @Path("/{cluster}/updateHealthStatus}") + @ApiOperation( + value = "Update cluster health status.", + notes = "This operation requires Pulsar superuser privileges." + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission."), + @ApiResponse(code = 412, message = "Cluster doesn't exist."), + @ApiResponse(code = 500, message = "Internal server error.") + }) + public void updateHealthStatus( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster, + @ApiParam(value = "The status info", required = true) Map status + ) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) + .thenAccept(__ -> { + try { + clusterHealthStatusResources().updateHealthStatus(cluster, + old -> status.getOrDefault("status", "")); + log.info("[{}] Successful update health status for cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.noContent().build()); + } catch (MetadataStoreException e) { + log.error("Update cluster {} health status error", cluster, e); + } + }) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + log.warn("[{}] Failed to update health status for cluster. clusters {} Does not exist", + clientAppId(), cluster); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "cluster " + cluster + " does not exist")); + return null; + } + log.error("[{}] Failed to update clusters/{}/{}", clientAppId(), cluster, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @GET + @Path("/{cluster}/getHealthStatus}") + @ApiOperation( + value = "Get cluster health status.", + notes = "This operation requires Pulsar superuser privileges." + ) + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission."), + @ApiResponse(code = 412, message = "Cluster doesn't exist."), + @ApiResponse(code = 500, message = "Internal server error.") + }) + public void getHealthStatus( + @Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster + ) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) + .thenAccept(__ -> { + try { + String status = clusterHealthStatusResources().getHealthStatus(cluster).get(); + log.info("[{}] Successful get health status for cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.ok(status)); + } catch (MetadataStoreException e) { + log.error("Update cluster {} health status error", cluster, e); + } + }) + .exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + log.warn("[{}] Failed to get health status for cluster. clusters {} Does not exist", + clientAppId(), cluster); + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "cluster " + cluster + " does not exist")); + return null; + } + log.error("[{}] Failed to get clusters/{}/{}", clientAppId(), cluster, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + private CompletableFuture validateBrokerExistsInOtherDomain(final String cluster, final String inputDomainName, final FailureDomainImpl inputDomain) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index bd4917da3b119..0c899c920d806 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -53,11 +53,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.naming.AuthenticationException; @@ -81,6 +77,7 @@ import org.apache.pulsar.broker.limiter.ConnectionController; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.broker.resources.ClusterHealthStatusResources; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -91,6 +88,7 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.internal.BaseResource; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.BatchMessageIdImpl; @@ -98,51 +96,10 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.common.api.AuthData; -import org.apache.pulsar.common.api.proto.BaseCommand; -import org.apache.pulsar.common.api.proto.CommandAck; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn; -import org.apache.pulsar.common.api.proto.CommandAuthResponse; -import org.apache.pulsar.common.api.proto.CommandCloseConsumer; -import org.apache.pulsar.common.api.proto.CommandCloseProducer; -import org.apache.pulsar.common.api.proto.CommandConnect; -import org.apache.pulsar.common.api.proto.CommandConsumerStats; -import org.apache.pulsar.common.api.proto.CommandEndTxn; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription; -import org.apache.pulsar.common.api.proto.CommandFlow; -import org.apache.pulsar.common.api.proto.CommandGetLastMessageId; -import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema; -import org.apache.pulsar.common.api.proto.CommandGetSchema; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; -import org.apache.pulsar.common.api.proto.CommandLookupTopic; -import org.apache.pulsar.common.api.proto.CommandNewTxn; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; -import org.apache.pulsar.common.api.proto.CommandProducer; -import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; -import org.apache.pulsar.common.api.proto.CommandSeek; -import org.apache.pulsar.common.api.proto.CommandSend; -import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.*; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; -import org.apache.pulsar.common.api.proto.CommandUnsubscribe; -import org.apache.pulsar.common.api.proto.CommandWatchTopicList; -import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; -import org.apache.pulsar.common.api.proto.CompressionType; -import org.apache.pulsar.common.api.proto.FeatureFlags; -import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.api.proto.KeySharedMode; -import org.apache.pulsar.common.api.proto.KeyValue; -import org.apache.pulsar.common.api.proto.MessageIdData; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.api.proto.ProducerAccessMode; -import org.apache.pulsar.common.api.proto.ProtocolVersion; -import org.apache.pulsar.common.api.proto.Schema; -import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.api.proto.SingleMessageMetadata; -import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.intercept.InterceptException; @@ -1114,6 +1071,30 @@ protected void handleConnect(CommandConnect connect) { } } + + @Override + protected void handleHealthCheck(CommandHealthCheck healthCheck) { + checkArgument(state == State.Start); + + String clusterName = this.service.pulsar().getConfig().getClusterName(); + try { + byte[] status = this.service.pulsar().getLocalMetadataStore() + .get(ClusterHealthStatusResources.BASE_PATH + clusterName) + .get(30000, TimeUnit.MILLISECONDS) + .get() + .getValue(); + if (String.valueOf(status).equalsIgnoreCase(ClusterHealthStatusResources.Status.available.name())) { + writeAndFlush(Commands.newHealthCheckResponse(true)); + } else { + writeAndFlush(Commands.newHealthCheckResponse(false)); + } + } catch (Exception e) { + log.error("cluster health status check error.", e); + writeAndFlush(Commands.newError(-1L, ServerError.UnknownError, + "cluster health status check error.")); + } + } + @Override protected void handleAuthResponse(CommandAuthResponse authResponse) { checkArgument(authResponse.hasResponse()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index e23286ae4492e..b0bb340bad0f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.BookieResources; +import org.apache.pulsar.broker.resources.ClusterHealthStatusResources; import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.DynamicConfigurationResources; import org.apache.pulsar.broker.resources.LoadBalanceResources; @@ -1104,6 +1105,10 @@ protected ClusterResources clusterResources() { return pulsar().getPulsarResources().getClusterResources(); } + protected ClusterHealthStatusResources clusterHealthStatusResources() { + return pulsar().getPulsarResources().getClusterHealthStatusResources(); + } + protected BookieResources bookieResources() { return pulsar().getPulsarResources().getBookieResources(); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 53e6680946566..f928cc6b7a6a6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -809,4 +809,31 @@ void updateFailureDomain(String cluster, String domainName, FailureDomain domain */ CompletableFuture getFailureDomainAsync(String cluster, String domainName); + /** + * Update health status for a cluster. + *

+ * + * @param cluster + * Cluster name + * + * @param status + * health status + * + * @return + * + */ + CompletableFuture updateHealthStatusAsync(String cluster, String status); + + /** + * Get health status for a cluster. + *

+ * + * @param cluster + * Cluster name + * + * @return + * + */ + CompletableFuture getHealthStatusAsync(String cluster); + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 231d4506d6173..b975941af5396 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -319,6 +319,21 @@ public CompletableFuture getFailureDomainAsync(String cluster, St .thenApply(failureDomain -> failureDomain); } + @Override + public CompletableFuture updateHealthStatusAsync(String cluster, String status) { + WebTarget path = adminClusters.path(cluster).path("updateHealthStatus"); + Map statusMap = new HashMap<>(); + statusMap.put("status", status); + return asyncPostRequest(path, Entity.entity(statusMap, MediaType.APPLICATION_JSON_TYPE)); + } + + @Override + public CompletableFuture getHealthStatusAsync(String cluster) { + WebTarget path = adminClusters.path(cluster).path("getHealthStatus"); + return asyncGetRequest(path, new FutureCallback() {}) + .thenApply(status -> status); + } + private void setDomain(String cluster, String domainName, FailureDomain domain) throws PulsarAdminException { sync(() -> setDomainAsync(cluster, domainName, domain)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 0ea56e4430951..d7d1ad4967f2a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -286,6 +286,31 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Update cluster health status, available or unavailable. default available.") + private class UpdateHealthStatus extends CliCommand { + @Parameter(description = "cluster-name", required = true) + private java.util.List params; + + @Parameter(names = "--status", description = "status", required = true) + private String status; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + getAdmin().clusters().updateHealthStatusAsync(cluster, status); + } + } + + @Parameters(commandDescription = "Get cluster health status, available or unavailable.") + private class GetHealthStatus extends CliCommand { + @Parameter(description = "cluster-name", required = true) + private java.util.List params; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + print(getAdmin().clusters().getHealthStatusAsync(cluster)); + } + } + /** * Base command. */ @@ -493,6 +518,8 @@ public CmdClusters(Supplier admin) { jcommander.addCommand("update-failure-domain", new UpdateFailureDomain()); jcommander.addCommand("delete-failure-domain", new DeleteFailureDomain()); jcommander.addCommand("list-failure-domains", new ListFailureDomains()); + jcommander.addCommand("update-health-status", new UpdateHealthStatus()); + jcommander.addCommand("get-health-status", new GetHealthStatus()); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java index 68b781e67d29c..ab809ee112486 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.protocol.Commands; @Slf4j @Data @@ -131,7 +132,13 @@ boolean probeAvailable(String url) { Socket socket = new Socket(); socket.connect(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort()), TIMEOUT); socket.close(); - return true; + + ClientCnx clientCnx = pulsarClient.getCnxPool() + .getConnection(new InetSocketAddress(endpoint.getHostName(), endpoint.getPort())) + .get(); + clientCnx.ctx().writeAndFlush(Commands.newHealthCheck()).sync(); + + return clientCnx.isClusterAvailable(); } catch (Exception e) { log.warn("Failed to probe available, url: {}", url, e); return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 75e84eeca3e6a..f209a7b7eef3a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -64,38 +64,8 @@ import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.AuthData; -import org.apache.pulsar.common.api.proto.BaseCommand; -import org.apache.pulsar.common.api.proto.CommandAckResponse; -import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAuthChallenge; -import org.apache.pulsar.common.api.proto.CommandCloseConsumer; -import org.apache.pulsar.common.api.proto.CommandCloseProducer; -import org.apache.pulsar.common.api.proto.CommandConnected; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; -import org.apache.pulsar.common.api.proto.CommandError; -import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse; -import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; -import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; -import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; -import org.apache.pulsar.common.api.proto.CommandMessage; -import org.apache.pulsar.common.api.proto.CommandNewTxnResponse; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; -import org.apache.pulsar.common.api.proto.CommandProducerSuccess; -import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic; -import org.apache.pulsar.common.api.proto.CommandSendError; -import org.apache.pulsar.common.api.proto.CommandSendReceipt; -import org.apache.pulsar.common.api.proto.CommandSuccess; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; -import org.apache.pulsar.common.api.proto.CommandTopicMigrated; +import org.apache.pulsar.common.api.proto.*; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; -import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; -import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; -import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; @@ -120,6 +90,9 @@ public class ClientCnx extends PulsarHandler { private AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @Getter + private static volatile boolean clusterAvailable = true; + @Getter private final ConcurrentLongHashMap> pendingRequests = ConcurrentLongHashMap.>newBuilder() @@ -366,6 +339,15 @@ public long getDuplicatedResponseCount() { return duplicatedResponseCounter.get(); } + @Override + protected void handleHealthCheckResponse(CommandHealthCheckResponse healthCheckResponse) { + if (healthCheckResponse.hasAvailable()) { + clusterAvailable = healthCheckResponse.isAvailable(); + } else { + clusterAvailable = false; + } + } + @Override protected void handleConnected(CommandConnected connected) { checkArgument(state == State.SentConnectFrame || state == State.Connecting); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 34d47e2836bb2..b1cc04c45a517 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -38,6 +38,7 @@ import java.util.Set; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import lombok.val; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.PulsarVersion; @@ -47,61 +48,15 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; -import org.apache.pulsar.common.api.proto.AuthMethod; -import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.*; import org.apache.pulsar.common.api.proto.BaseCommand.Type; -import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; -import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandAck.ValidationError; -import org.apache.pulsar.common.api.proto.CommandAckResponse; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAuthChallenge; -import org.apache.pulsar.common.api.proto.CommandCloseConsumer; -import org.apache.pulsar.common.api.proto.CommandCloseProducer; -import org.apache.pulsar.common.api.proto.CommandConnect; -import org.apache.pulsar.common.api.proto.CommandConnected; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; -import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse; -import org.apache.pulsar.common.api.proto.CommandGetSchema; -import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; -import org.apache.pulsar.common.api.proto.CommandLookupTopic; -import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; -import org.apache.pulsar.common.api.proto.CommandMessage; -import org.apache.pulsar.common.api.proto.CommandNewTxnResponse; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; -import org.apache.pulsar.common.api.proto.CommandProducer; -import org.apache.pulsar.common.api.proto.CommandProducerSuccess; -import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; -import org.apache.pulsar.common.api.proto.CommandSeek; -import org.apache.pulsar.common.api.proto.CommandSend; -import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; -import org.apache.pulsar.common.api.proto.FeatureFlags; -import org.apache.pulsar.common.api.proto.IntRange; -import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.api.proto.KeySharedMode; -import org.apache.pulsar.common.api.proto.KeyValue; -import org.apache.pulsar.common.api.proto.MessageIdData; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.api.proto.ProtocolVersion; -import org.apache.pulsar.common.api.proto.Schema; -import org.apache.pulsar.common.api.proto.ServerError; -import org.apache.pulsar.common.api.proto.SingleMessageMetadata; -import org.apache.pulsar.common.api.proto.Subscription; -import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; @@ -192,6 +147,18 @@ private static void setFeatureFlags(FeatureFlags flags) { flags.setSupportsPartialProducer(true); } + public static ByteBuf newHealthCheck() { + BaseCommand cmd = localCmd(Type.HEALTH_CHECK); + cmd.setHealthCheck(); + return serializeWithSize(cmd); + } + + public static ByteBuf newHealthCheckResponse(boolean available) { + BaseCommand cmd = localCmd(Type.HEALTH_CHECK_RESPONSE); + cmd.setHealthCheckResponse().setAvailable(available); + return serializeWithSize(cmd); + } + public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, String originalAuthData, String originalAuthMethod) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index c1c1ebe355bb9..abd17f5555e94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -24,66 +24,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundInvoker; import io.netty.handler.codec.haproxy.HAProxyMessage; -import org.apache.pulsar.common.api.proto.BaseCommand; -import org.apache.pulsar.common.api.proto.CommandAck; -import org.apache.pulsar.common.api.proto.CommandAckResponse; -import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn; -import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn; -import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; -import org.apache.pulsar.common.api.proto.CommandAuthChallenge; -import org.apache.pulsar.common.api.proto.CommandAuthResponse; -import org.apache.pulsar.common.api.proto.CommandCloseConsumer; -import org.apache.pulsar.common.api.proto.CommandCloseProducer; -import org.apache.pulsar.common.api.proto.CommandConnect; -import org.apache.pulsar.common.api.proto.CommandConnected; -import org.apache.pulsar.common.api.proto.CommandConsumerStats; -import org.apache.pulsar.common.api.proto.CommandConsumerStatsResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxn; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription; -import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; -import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; -import org.apache.pulsar.common.api.proto.CommandError; -import org.apache.pulsar.common.api.proto.CommandFlow; -import org.apache.pulsar.common.api.proto.CommandGetLastMessageId; -import org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse; -import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema; -import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; -import org.apache.pulsar.common.api.proto.CommandGetSchema; -import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; -import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; -import org.apache.pulsar.common.api.proto.CommandLookupTopic; -import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; -import org.apache.pulsar.common.api.proto.CommandMessage; -import org.apache.pulsar.common.api.proto.CommandNewTxn; -import org.apache.pulsar.common.api.proto.CommandNewTxnResponse; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; -import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; -import org.apache.pulsar.common.api.proto.CommandPing; -import org.apache.pulsar.common.api.proto.CommandPong; -import org.apache.pulsar.common.api.proto.CommandProducer; -import org.apache.pulsar.common.api.proto.CommandProducerSuccess; -import org.apache.pulsar.common.api.proto.CommandReachedEndOfTopic; -import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages; -import org.apache.pulsar.common.api.proto.CommandSeek; -import org.apache.pulsar.common.api.proto.CommandSend; -import org.apache.pulsar.common.api.proto.CommandSendError; -import org.apache.pulsar.common.api.proto.CommandSendReceipt; -import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.api.proto.CommandSuccess; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; -import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; -import org.apache.pulsar.common.api.proto.CommandTopicMigrated; -import org.apache.pulsar.common.api.proto.CommandUnsubscribe; -import org.apache.pulsar.common.api.proto.CommandWatchTopicList; -import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; -import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; -import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; -import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.api.proto.*; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.util.netty.NettyChannelUtil; import org.slf4j.Logger; @@ -187,6 +128,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception handleConnected(cmd.getConnected()); break; + case HEALTH_CHECK: + checkArgument(cmd.hasHealthCheck()); + handleHealthCheck(cmd.getHealthCheck()); + break; + + case HEALTH_CHECK_RESPONSE: + checkArgument(cmd.hasHealthCheckResponse()); + handleHealthCheckResponse(cmd.getHealthCheckResponse()); + break; + case ERROR: checkArgument(cmd.hasError()); handleError(cmd.getError()); @@ -529,6 +480,14 @@ protected void handleConnected(CommandConnected connected) { throw new UnsupportedOperationException(); } + protected void handleHealthCheck(CommandHealthCheck healthCheck) { + throw new UnsupportedOperationException(); + } + + protected void handleHealthCheckResponse(CommandHealthCheckResponse healthCheckResponse) { + throw new UnsupportedOperationException(); + } + protected void handleSubscribe(CommandSubscribe subscribe) { throw new UnsupportedOperationException(); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 387e4e3ff679d..183329047dd83 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -295,6 +295,14 @@ message CommandConnect { optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy. } +message CommandHealthCheck { + repeated KeyValue metadata = 1; +} + +message CommandHealthCheckResponse { + optional bool available = 1 [default = true]; +} + message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; @@ -1049,6 +1057,9 @@ message BaseCommand { WATCH_TOPIC_LIST_CLOSE = 67; TOPIC_MIGRATED = 68; + + HEALTH_CHECK = 69; + HEALTH_CHECK_RESPONSE = 70; } @@ -1132,4 +1143,7 @@ message BaseCommand { optional CommandWatchTopicListClose watchTopicListClose = 67; optional CommandTopicMigrated topicMigrated = 68; + + optional CommandHealthCheck healthCheck = 69; + optional CommandHealthCheckResponse healthCheckResponse = 70; }