From 61805566297d2bb47c5decbe8fe8279048e8b89e Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 6 Jun 2023 20:15:02 +0800 Subject: [PATCH] resolve serviceUrl host separately when getConnection of each topic partition --- .../auth/MockedPulsarServiceBaseTest.java | 6 ++ .../client/api/ProducerCreationTest.java | 22 +++++ .../client/impl/BinaryProtoLookupService.java | 9 ++ .../pulsar/client/impl/ConnectionHandler.java | 14 ++- .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../apache/pulsar/client/impl/HttpClient.java | 86 +++++++++++++++++++ .../pulsar/client/impl/HttpLookupService.java | 35 ++++++++ .../pulsar/client/impl/LookupService.java | 20 +++++ .../pulsar/client/impl/ProducerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 6 ++ .../impl/PulsarServiceNameResolver.java | 21 +++++ .../client/impl/ServiceNameResolver.java | 22 +++++ .../pulsar/client/impl/TopicListWatcher.java | 2 +- .../impl/TransactionMetaStoreHandler.java | 2 +- 14 files changed, 243 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index b688d5fbf24d8..f7a4a4b00da84 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -119,10 +119,13 @@ protected static String getTlsFileForClient(String name) { protected PulsarService pulsar; protected PulsarAdmin admin; protected PulsarClient pulsarClient; + protected PulsarClient pulsarClientHttpUrlNotAllAvailable; + protected PulsarClient pulsarClientserviceUrlNotAllAvailable; protected PortForwarder brokerGateway; protected boolean enableBrokerGateway = false; protected URL brokerUrl; protected URL brokerUrlTls; + protected String brokerServiceUrl; protected URI lookupUrl; @@ -164,6 +167,8 @@ protected final void internalSetup() throws Exception { } } pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + pulsarClientHttpUrlNotAllAvailable = newPulsarClient(brokerUrl.toString() + ",localhost:5678,localhost:5677,localhost:5676", 0); + pulsarClientserviceUrlNotAllAvailable = newPulsarClient(brokerServiceUrl + ",localhost:5678,localhost:5677,localhost:5676", 0); } protected final void internalSetup(ServiceConfiguration serviceConfiguration) throws Exception { @@ -319,6 +324,7 @@ protected void startBroker() throws Exception { brokerUrl = pulsar.getWebServiceAddress() != null ? new URL(pulsar.getWebServiceAddress()) : null; brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new URL(pulsar.getWebServiceAddressTls()) : null; + brokerServiceUrl = pulsar.getBrokerServiceUrl() != null ? pulsar.getBrokerServiceUrl() : null; if (admin != null) { admin.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java index d5734588288f3..ea7c936b481ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCreationTest.java @@ -191,4 +191,26 @@ public void testInitialSubscriptionCreationWithAutoCreationDisable() Assert.assertFalse(admin.topics().getSubscriptions(topic.toString()).contains(initialSubscriptionName)); } + + @Test + public void testCreateWhenServiceUrlNotAllAvailable() throws Exception { + + final TopicName topic = + TopicName.get("persistent", "public", "default", "testCreateInitialSubscriptionOnPartitionedTopic"); + admin.topics().createPartitionedTopic(topic.toString(), 20); + + // use pulsar serviceUrl with unavailable host to new producer + Producer producer = pulsarClientserviceUrlNotAllAvailable.newProducer() + .topic(topic.toString()) + .create(); + + producer.close(); + + // use pulsar httpServiceUrl with unavailable host to new producer + Producer producer2 = pulsarClientHttpUrlNotAllAvailable.newProducer() + .topic(topic.toString()) + .create(); + + producer2.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index d5ce9213211dd..54ae04156f190 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -95,6 +95,11 @@ public CompletableFuture> getBroker(T return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0); } + public CompletableFuture> getBroker(TopicName topicName, int currentIndex) { + InetSocketAddress socketAddress = serviceNameResolver.resolveHost(currentIndex); + return findBroker(socketAddress, false, topicName, 0); + } + /** * calls broker binaryProto-lookup api to get metadata of partitioned-topic. * @@ -259,6 +264,10 @@ public String getServiceUrl() { return serviceNameResolver.getServiceUrl(); } + public List getAddressList() { + return serviceNameResolver.getAddressList(); + } + @Override public InetSocketAddress resolveHost() { return serviceNameResolver.resolveHost(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 046cb90643a23..ad96f9c095b17 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -19,8 +19,10 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.pulsar.client.api.PulsarClientException; @@ -41,6 +43,7 @@ public class ConnectionHandler { // Start with -1L because it gets incremented before sending on the first connection private volatile long epoch = -1L; protected volatile long lastConnectionClosedTimestamp = 0L; + private AtomicInteger serviceResolverIndex; interface Connection { void connectionFailed(PulsarClientException exception); @@ -49,11 +52,18 @@ interface Connection { protected Connection connection; - protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) { + protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection, List addressList) { this.state = state; this.connection = connection; this.backoff = backoff; CLIENT_CNX_UPDATER.set(this, null); + this.serviceResolverIndex = new AtomicInteger(randomIndex(addressList.size())); + } + + private static int randomIndex(int numAddresses) { + return numAddresses == 1 + ? + 0 : io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses); } protected void grabCnx() { @@ -79,7 +89,7 @@ protected void grabCnx() { } else if (state.topic == null) { cnxFuture = state.client.getConnectionToServiceUrl(); } else { - cnxFuture = state.client.getConnection(state.topic); // + cnxFuture = state.client.getConnection(state.topic, serviceResolverIndex.getAndIncrement()); } cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) // .exceptionally(this::handleConnectionError); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4a84e765065f2..bf252fc4fb6df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -332,7 +332,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(), - this); + this, client.getLookup().getAddressList()); this.topicName = TopicName.get(topic); if (this.topicName.isPersistent()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 38b8954377957..f46e2d086065d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -264,4 +264,90 @@ public CompletableFuture get(String path, Class clazz) { return future; } + + public CompletableFuture get(String path, Class clazz, int currentIndex) { + final CompletableFuture future = new CompletableFuture<>(); + try { + URI hostUri = serviceNameResolver.resolveHostUri(currentIndex); + String requestUrl = new URL(hostUri.toURL(), path).toString(); + String remoteHostName = hostUri.getHost(); + AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); + + CompletableFuture> authFuture = new CompletableFuture<>(); + + // bring a authenticationStage for sasl auth. + if (authData.hasDataForHttp()) { + authentication.authenticationStage(requestUrl, authData, null, authFuture); + } else { + authFuture.complete(null); + } + + // auth complete, do real request + authFuture.whenComplete((respHeaders, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to perform http request at authentication stage: {}", + requestUrl, ex.getMessage()); + future.completeExceptionally(new PulsarClientException(ex)); + return; + } + + // auth complete, use a new builder + BoundRequestBuilder builder = httpClient.prepareGet(requestUrl) + .setHeader("Accept", "application/json"); + + if (authData.hasDataForHttp()) { + Set> headers; + try { + headers = authentication.newRequestHeader(requestUrl, authData, respHeaders); + } catch (Exception e) { + log.warn("[{}] Error during HTTP get headers: {}", requestUrl, e.getMessage()); + future.completeExceptionally(new PulsarClientException(e)); + return; + } + if (headers != null) { + headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue())); + } + } + + builder.execute().toCompletableFuture().whenComplete((response2, t) -> { + if (t != null) { + log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage()); + future.completeExceptionally(new PulsarClientException(t)); + return; + } + + // request not success + if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) { + log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText()); + Exception e; + if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + e = new NotFoundException("Not found: " + response2.getStatusText()); + } else { + e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText()); + } + future.completeExceptionally(e); + return; + } + + try { + T data = ObjectMapperFactory.getMapper().reader().readValue( + response2.getResponseBodyAsBytes(), clazz); + future.complete(data); + } catch (Exception e) { + log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage()); + future.completeExceptionally(new PulsarClientException(e)); + } + }); + }); + } catch (Exception e) { + log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage()); + if (e instanceof PulsarClientException) { + future.completeExceptionally(e); + } else { + future.completeExceptionally(new PulsarClientException(e)); + } + } + + return future; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 7969ce402363f..1f1308b62d831 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -111,6 +111,41 @@ public CompletableFuture> getBroker(T }); } + @Override + public CompletableFuture> getBroker(TopicName topicName, int currentIndex) { + String basePath = topicName.isV2() ? BasePathV2 : BasePathV1; + String path = basePath + topicName.getLookupName(); + path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName); + return httpClient.get(path, LookupData.class, currentIndex) + .thenCompose(lookupData -> { + // Convert LookupData into as SocketAddress, handling exceptions + URI uri = null; + try { + if (useTls) { + uri = new URI(lookupData.getBrokerUrlTls()); + } else { + String serviceUrl = lookupData.getBrokerUrl(); + if (serviceUrl == null) { + serviceUrl = lookupData.getNativeUrl(); + } + uri = new URI(serviceUrl); + } + + InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort()); + return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress)); + } catch (Exception e) { + // Failed to parse url + log.warn("[{}] Lookup Failed due to invalid url {}, {}", topicName, uri, e.getMessage()); + return FutureUtil.failedFuture(e); + } + }); + } + + @Override + public List getAddressList() { + return httpClient.serviceNameResolver.getAddressList(); + } + @Override public CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 48ef67eae2047..c2f99f23330af 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.tuple.Pair; @@ -58,6 +59,18 @@ public interface LookupService extends AutoCloseable { */ CompletableFuture> getBroker(TopicName topicName); + /** + * Calls broker lookup-api to get broker {@link InetSocketAddress} which serves namespace bundle that contains given + * topic. use currentIndex to select the indexed address of the serviceUrl address list + * + * @param topicName + * topic-name + * @param currentIndex + * index of multi-serviceUrl + * @return a pair of addresses, representing the logical and physical address of the broker that serves given topic + */ + CompletableFuture> getBroker(TopicName topicName, int currentIndex); + /** * Returns {@link PartitionedTopicMetadata} for a given topic. * @@ -90,6 +103,13 @@ public interface LookupService extends AutoCloseable { */ String getServiceUrl(); + /** + * Returns serviceUrl address list. + * + * @return + */ + List getAddressList(); + /** * Resolves pulsar service url. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 2192ebfb64e75..6cc827a48b6f5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -273,7 +273,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS) .create(), - this); + this, client.getLookup().getAddressList()); grabCnx(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 6c749a8cf4354..969c3c3d98f09 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -950,6 +950,12 @@ public CompletableFuture getConnection(final String topic) { .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight())); } + public CompletableFuture getConnection(final String topic, int currentIndex) { + TopicName topicName = TopicName.get(topic); + return lookup.getBroker(topicName, currentIndex) + .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight())); + } + public CompletableFuture getConnectionToServiceUrl() { if (!(lookup instanceof BinaryProtoLookupService)) { return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL( diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java index e47750be46219..6dab279d31713 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java @@ -57,6 +57,15 @@ public InetSocketAddress resolveHost() { } } + public InetSocketAddress resolveHost(int index) { + List list = addressList; + checkState( + list != null, "No service url is provided yet"); + checkState( + !list.isEmpty(), "No hosts found for service url : " + serviceUrl); + return list.get(index % list.size()); + } + @Override public URI resolveHostUri() { InetSocketAddress host = resolveHost(); @@ -64,11 +73,23 @@ public URI resolveHostUri() { return URI.create(hostUrl); } + @Override + public URI resolveHostUri(int currentIndex) { + InetSocketAddress host = resolveHost(currentIndex); + String hostUrl = serviceUri.getServiceScheme() + "://" + host.getHostString() + ":" + host.getPort(); + return URI.create(hostUrl); + } + @Override public String getServiceUrl() { return serviceUrl; } + @Override + public List getAddressList() { + return new ArrayList<>(addressList); + } + @Override public ServiceURI getServiceUri() { return serviceUri; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java index 95f81d45bd755..74f54cdf03138 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ServiceNameResolver.java @@ -20,6 +20,8 @@ import java.net.InetSocketAddress; import java.net.URI; +import java.util.List; + import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; import org.apache.pulsar.common.net.ServiceURI; @@ -35,12 +37,25 @@ public interface ServiceNameResolver { */ InetSocketAddress resolveHost(); + /** + * Resolve pulsar service url by using given index + * + * @return resolve the service url to return a socket address + */ + InetSocketAddress resolveHost(int index); + /** * Resolve pulsar service url. * @return */ URI resolveHostUri(); + /** + * Resolve pulsar service url by using given index + * @return + */ + URI resolveHostUri(int currentIndex); + /** * Get service url. * @@ -48,6 +63,13 @@ public interface ServiceNameResolver { */ String getServiceUrl(); + /** + * Get service url address list. + * + * @return address list + */ + List getAddressList(); + /** * Get service uri. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java index 384d1b688b8d5..71d544af67ef2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -71,7 +71,7 @@ public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener top .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(), - this); + this, client.getLookup().getAddressList()); this.topicsPattern = topicsPattern; this.watcherId = watcherId; this.namespace = namespace; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 601fa2b8f815a..6ca3e51ea33bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -103,7 +103,7 @@ public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientIm .setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMandatoryStop(100, TimeUnit.MILLISECONDS) .create(), - this); + this, pulsarClient.getLookup().getAddressList()); this.connectFuture = connectFuture; this.internalPinnedExecutor = pulsarClient.getInternalExecutorService(); this.timer = pulsarClient.timer();