From 10a866b27f6c6837769099edaf6e934ffe57fd29 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 19 Oct 2022 09:07:01 -0700 Subject: [PATCH 01/13] [improve][pulsar-broker] Provide option to unloadNamespaceBundle with bundle Affinity broker url --- .../broker/admin/impl/NamespacesBase.java | 29 ++++++- .../pulsar/broker/admin/v1/Namespaces.java | 4 +- .../pulsar/broker/admin/v2/Namespaces.java | 4 +- .../broker/loadbalance/LoadManager.java | 4 + .../loadbalance/ModularLoadManager.java | 4 + .../broker/loadbalance/NoopLoadManager.java | 14 ++++ .../impl/ModularLoadManagerImpl.java | 14 +++- .../impl/ModularLoadManagerWrapper.java | 29 +++++-- .../impl/SimpleLoadManagerImpl.java | 15 ++++ .../broker/namespace/NamespaceService.java | 1 + .../pulsar/broker/web/PulsarWebResource.java | 4 +- .../pulsar/broker/admin/NamespacesTest.java | 4 +- .../ModularLoadManagerImplTest.java | 79 ++++++++++++++++++- .../pulsar/client/admin/Namespaces.java | 27 +++++++ .../client/admin/internal/NamespacesImpl.java | 14 ++++ .../pulsar/admin/cli/CmdNamespaces.java | 11 ++- 16 files changed, 239 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b33b84e5aed41..d2d49e2dd649c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -56,6 +56,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -878,6 +879,29 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { } } + public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) { + if (destinationBroker != null) { + if (!this.isLeaderBroker()) { + LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); + String leaderBrokerUrl = leaderBroker.getServiceUrl(); + try { + URL redirectUrl = new URL(leaderBrokerUrl); + URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(redirectUrl.getHost()) + .port(redirectUrl.getPort()).replaceQueryParam("authoritative", + false).build(); + + // Redirect + log.debug("Redirecting the rest call to leader - {}, bundleRange - {}", redirect, bundleRange); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } catch (MalformedURLException exception) { + log.error("The leader broker url is malformed - {}", leaderBrokerUrl); + throw new RestException(exception); + } + } + pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); + } + } + public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) { return validateSuperUserAccessAsync() .thenAccept(__ -> { @@ -924,8 +948,9 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR } return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, true) - .thenCompose(nsBundle -> - pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); + .thenCompose(nsBundle -> { + return pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle); + }); })); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index ffb0e49d36502..fd67de411728c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -887,8 +887,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("destinationBroker") String destinationBroker) { validateNamespaceName(property, cluster, namespace); + setNamespaceBundleAffinity(bundleRange, destinationBroker); internalUnloadNamespaceBundleAsync(bundleRange, authoritative) .thenAccept(__ -> { log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index b6bf1f0927cc6..f0a4dbb01dcf6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -813,8 +813,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("destinationBroker") String destinationBroker) { validateNamespaceName(tenant, namespace); + setNamespaceBundleAffinity(bundleRange, destinationBroker); internalUnloadNamespaceBundleAsync(bundleRange, authoritative) .thenAccept(__ -> { log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index e34215d199648..45a5cffc3f027 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -119,6 +119,10 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception { CompletableFuture> getAvailableBrokersAsync(); + void setNamespaceBundleAffinity(String bundle, String broker); + + String removeNamespaceBundleAffinity(String bundle); + void stop() throws PulsarServerException; /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index fa6895568e918..55754c0a0a672 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -140,4 +140,8 @@ default void writeBrokerDataOnZooKeeper(boolean force) { * @return bundle data */ BundleData getBundleDataOrDefault(String bundle); + + void setNamespaceBundleAffinity(String bundle, String broker); + + String removeNamespaceBundleAffinity(String bundle); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 1ab56b50cdef4..4d2d86b95b2a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -20,10 +20,12 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -43,11 +45,13 @@ public class NoopLoadManager implements LoadManager { private String lookupServiceAddress; private ResourceUnit localResourceUnit; private LockManager lockManager; + private Map bundleBrokerAffinityMap; @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class); + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); } @Override @@ -142,4 +146,14 @@ public void stop() throws PulsarServerException { } } + @Override + public void setNamespaceBundleAffinity(String bundle, String broker) { + broker = broker.replaceFirst("http[s]?://", ""); + this.bundleBrokerAffinityMap.put(bundle, broker); + } + + @Override + public String removeNamespaceBundleAffinity(String bundle) { + return this.bundleBrokerAffinityMap.remove(bundle); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 7d1a21a8c900e..a87258337ac76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private final Lock lock = new ReentrantLock(); private Set knownBrokers = ConcurrentHashMap.newKeySet(); + private Map bundleBrokerAffinityMap; /** * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called. @@ -215,7 +216,7 @@ public ModularLoadManagerImpl() { scheduler = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = new HashMap<>(); - + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { @Override public boolean isEnablePersistentTopics(String brokerUrl) { @@ -1212,4 +1213,15 @@ public List getLoadBalancingMetrics() { return metricsCollection; } + + @Override + public void setNamespaceBundleAffinity(String bundle, String broker) { + broker = broker.replaceFirst("http[s]?://", ""); + this.bundleBrokerAffinityMap.put(bundle, broker); + } + + @Override + public String removeNamespaceBundleAffinity(String bundle) { + return this.bundleBrokerAffinityMap.remove(bundle); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 5f7cd5b8c38fa..80814ee8186fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -65,13 +65,13 @@ public LoadManagerReport generateLoadReport() { @Override public Optional getLeastLoaded(final ServiceUnitId serviceUnit) { + String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); + String affinityBroker = loadManager.removeNamespaceBundleAffinity(bundleRange); + if (affinityBroker != null) { + return Optional.of(buildBrokerResourceUnit(affinityBroker)); + } Optional leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit); - return leastLoadedBroker.map(s -> { - String webServiceUrl = getBrokerWebServiceUrl(s); - String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl); - return new SimpleResourceUnit(webServiceUrl, - new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); - }); + return leastLoadedBroker.map(this::buildBrokerResourceUnit); } private String getBrokerWebServiceUrl(String broker) { @@ -146,4 +146,21 @@ public Set getAvailableBrokers() throws Exception { public CompletableFuture> getAvailableBrokersAsync() { return loadManager.getAvailableBrokersAsync(); } + + private SimpleResourceUnit buildBrokerResourceUnit (String broker) { + String webServiceUrl = getBrokerWebServiceUrl(broker); + String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl); + return new SimpleResourceUnit(webServiceUrl, + new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); + } + + @Override + public void setNamespaceBundleAffinity(String bundle, String broker) { + loadManager.setNamespaceBundleAffinity(bundle, broker); + } + + @Override + public String removeNamespaceBundleAffinity(String bundle) { + return loadManager.removeNamespaceBundleAffinity(bundle); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index c2c0d1947c93e..b457c3653fb0e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -40,6 +40,7 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -186,6 +187,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer updateRankingHandle; + private Map bundleBrokerAffinityMap; + // Perform initializations which may be done without a PulsarService. public SimpleLoadManagerImpl() { scheduler = Executors.newSingleThreadScheduledExecutor( @@ -251,6 +254,7 @@ public Long load(String key) throws Exception { } }); this.pulsar = pulsar; + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); } public SimpleLoadManagerImpl(PulsarService pulsar) { @@ -1443,6 +1447,17 @@ public void doNamespaceBundleSplit() throws Exception { } } + @Override + public void setNamespaceBundleAffinity(String bundle, String broker) { + broker = broker.replaceFirst("http[s]?://", ""); + this.bundleBrokerAffinityMap.put(bundle, broker); + } + + @Override + public String removeNamespaceBundleAffinity(String bundle) { + return this.bundleBrokerAffinityMap.remove(bundle); + } + @Override public void stop() throws PulsarServerException { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4be64f4b7b50c..c6f61443d7741 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -692,6 +692,7 @@ private Optional> getLeastLoadedFromLoadManager(ServiceUnit String lookupAddress = leastLoadedBroker.get().getResourceId(); String advertisedAddr = (String) leastLoadedBroker.get() .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME); + if (LOG.isDebugEnabled()) { LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getSafeWebServiceAddress(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 58b9436e6bf5c..14c3c856974dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -710,8 +710,8 @@ public CompletableFuture validateBundleOwnershipAsync(NamespaceBundle bund // Replace the host and port of the current request and redirect URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost()) .port(webUrl.get().getPort()).replaceQueryParam("authoritative", - newAuthoritative).build(); - + newAuthoritative).replaceQueryParam("destinationBroker", + null).build(); log.debug("{} is not a service unit owned", bundle); // Redirect log.debug("Redirecting the rest call to {}", redirect); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 0f0ee8df3fa1f..0a92e0cc8bcaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -726,7 +726,7 @@ public void testNamespacesApiRedirects() throws Exception { doReturn(uri).when(uriInfo).getRequestUri(); namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster, - this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false); + this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null); captor = ArgumentCaptor.forClass(WebApplicationException.class); verify(response, timeout(5000).atLeast(1)).resume(captor.capture()); assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); @@ -1053,7 +1053,7 @@ public void testUnloadNamespaceWithBundles() throws Exception { doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle); AsyncResponse response = mock(AsyncResponse.class); namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000", - false); + false, null); verify(response, timeout(5000).times(1)).resume(any(RestException.class)); // cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 05e82484226ca..e283d7cd87064 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static java.lang.Thread.sleep; import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -85,6 +87,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -102,6 +105,8 @@ public class ModularLoadManagerImplTest { private PulsarService pulsar2; private PulsarAdmin admin2; + private PulsarService pulsar3; + private String primaryHost; private String secondaryHost; @@ -181,6 +186,20 @@ void setup() throws Exception { pulsar2 = new PulsarService(config2); pulsar2.start(); + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setAdvertisedAddress("localhost"); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + pulsar3 = new PulsarService(config); + secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get()); url2 = new URL(pulsar2.getWebServiceAddress()); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); @@ -188,7 +207,7 @@ void setup() throws Exception { primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager"); secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager"); nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32()); - Thread.sleep(100); + sleep(100); } @AfterMethod(alwaysRun = true) @@ -201,6 +220,10 @@ void shutdown() throws Exception { pulsar2.close(); pulsar1.close(); + + if (pulsar3.isRunning()) { + pulsar3.close(); + } bkEnsemble.stop(); } @@ -284,6 +307,58 @@ public void testEvenBundleDistribution() throws Exception { } } + + + @Test + public void testBrokerAffinity() throws Exception { + // Start broker 3 + pulsar3.start(); + + final String tenant = "test"; + final String cluster = "test"; + String namespace = tenant + "/" + cluster + "/" + "test"; + String topic = "persistent://" + namespace + "/my-topic1"; + admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(namespace, 16); + + String topicLookup = admin1.lookups().lookupTopic(topic); + String bundleRange = admin1.lookups().getBundleRange(topic); + + String brokerServiceUrl = pulsar1.getBrokerServiceUrl(); + String brokerUrl = pulsar1.getSafeWebServiceAddress(); + log.debug("initial broker service url - {}", topicLookup); + Random rand=new Random(); + + if (topicLookup.equals(brokerServiceUrl)) { + int x = rand.nextInt(2); + if (x == 0) { + brokerUrl = pulsar2.getSafeWebServiceAddress(); + brokerServiceUrl = pulsar2.getBrokerServiceUrl(); + } + else { + brokerUrl = pulsar3.getSafeWebServiceAddress(); + brokerServiceUrl = pulsar3.getBrokerServiceUrl(); + } + } + log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl); + String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl(); + log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress()); + //Make a call to broker which is not a leader + if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) { + admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl); + } + else { + admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl); + } + + sleep(2000); + String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic); + log.debug("final broker service url - {}", topicLookupAfterUnload); + Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload); + } + /** * It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker * unless all the brokers are in same state. @@ -345,7 +420,7 @@ public void testLoadShedding() throws Exception { // Need to update all the bundle data for the shredder to see the spy. primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); - Thread.sleep(100); + sleep(100); localBrokerData.setCpu(new ResourceUsage(80, 100)); primaryLoadManager.doLoadShedding(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index f4c284bb48434..2690df658b7be 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -2090,6 +2090,20 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi */ void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException; + /** + * Unload namespace bundle and assign the bundle to specified broker. + * + * @param namespace + * @param bundle + * range of bundle to unload + * @param destinationBroker + * Target broker url to which the bundle should be assigned to + * @throws PulsarAdminException + * Unexpected error + */ + void unloadNamespaceBundle(String namespace, String bundle, String destinationBroker) throws PulsarAdminException; + + /** * Unload namespace bundle asynchronously. * @@ -2101,6 +2115,19 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi */ CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle); + /** + * Unload namespace bundle asynchronously. + * + * @param namespace + * @param bundle + * range of bundle to unload + * @param destinationBroker + * Target broker url to which the bundle should be assigned to + * + * @return a future that can be used to track when the bundle is unloaded + */ + CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle, String destinationBroker); + /** * Split namespace bundle. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 6d4889a751d37..fa3155c59d601 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -814,6 +814,12 @@ public void unloadNamespaceBundle(String namespace, String bundle) throws Pulsar sync(() -> unloadNamespaceBundleAsync(namespace, bundle)); } + @Override + public void unloadNamespaceBundle(String namespace, + String bundle, String destinationBroker) throws PulsarAdminException { + sync(() -> unloadNamespaceBundleAsync(namespace, bundle, destinationBroker)); + } + @Override public CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle) { NamespaceName ns = NamespaceName.get(namespace); @@ -821,6 +827,14 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture unloadNamespaceBundleAsync(String namespace, + String bundle, String destinationBroker) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "unload").queryParam("destinationBroker", destinationBroker); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index b64df272b4468..080380d549e62 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -880,13 +880,22 @@ private class Unload extends CliCommand { @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}") private String bundle; + @Parameter(names = { "--destinationBroker", "-d" }, + description = "Target brokerWebServiceAddress to which the bundle has to be allocated to") + private String destinationBroker; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); if (bundle == null) { getAdmin().namespaces().unload(namespace); } else { - getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle); + if (destinationBroker == null) { + getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle); + } else { + getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker); + } + } } } From 0a7b068c924d114191a6c687cb560c6e85b08bd9 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Tue, 8 Nov 2022 11:13:22 -0500 Subject: [PATCH 02/13] [improve][admin,broker] Provide option to unloadNamespaceBundle with bundle Affinity broker url --- .../pulsar/broker/loadbalance/ModularLoadManagerImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index e283d7cd87064..4b9f679f19d1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -342,6 +342,7 @@ public void testBrokerAffinity() throws Exception { brokerServiceUrl = pulsar3.getBrokerServiceUrl(); } } + brokerUrl = brokerUrl.replaceFirst("http[s]?://", ""); log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl); String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl(); log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress()); From e43fc50e61721b3a022616a54275470174d763ae Mon Sep 17 00:00:00 2001 From: Vineeth Date: Mon, 14 Nov 2022 13:33:53 -0500 Subject: [PATCH 03/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../broker/admin/impl/NamespacesBase.java | 45 +++++++++++-------- site2/docs/admin-api-namespaces.md | 4 +- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d2d49e2dd649c..ac40da5e7eef5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -878,30 +878,37 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { throw new RestException(e); } } - - public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) { - if (destinationBroker != null) { - if (!this.isLeaderBroker()) { - LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); - String leaderBrokerUrl = leaderBroker.getServiceUrl(); - try { - URL redirectUrl = new URL(leaderBrokerUrl); - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(redirectUrl.getHost()) - .port(redirectUrl.getPort()).replaceQueryParam("authoritative", - false).build(); - - // Redirect - log.debug("Redirecting the rest call to leader - {}, bundleRange - {}", redirect, bundleRange); - throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); - } catch (MalformedURLException exception) { - log.error("The leader broker url is malformed - {}", leaderBrokerUrl); - throw new RestException(exception); + + private void validateLeaderBroker() { + if (!this.isLeaderBroker()) { + LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); + String leaderBrokerUrl = leaderBroker.getServiceUrl(); + try { + URL redirectUrl = new URL(leaderBrokerUrl); + URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(redirectUrl.getHost()) + .port(redirectUrl.getPort()).replaceQueryParam("authoritative", + false).build(); + + // Redirect + if (log.isDebugEnabled()) { + log.debug("Redirecting the rest call to leader - {}", redirect); } + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } catch (MalformedURLException exception) { + log.error("The leader broker url is malformed - {}", leaderBrokerUrl); + throw new RestException(exception); } - pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); } } + public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) { + if (StringUtils.isBlank(destinationBroker)) { + return; + } + validateLeaderBroker(); + pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); + } + public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) { return validateSuperUserAccessAsync() .thenAccept(__ -> { diff --git a/site2/docs/admin-api-namespaces.md b/site2/docs/admin-api-namespaces.md index e74117256d036..a8ab9be79a5ac 100644 --- a/site2/docs/admin-api-namespaces.md +++ b/site2/docs/admin-api-namespaces.md @@ -479,12 +479,14 @@ A namespace bundle is a virtual group of topics that belong to the same namespac ```shell pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/namespace1 +pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/namespace1 +pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff --destinationBroker broker1.use.org.com:8080 test-tenant/namespace1 ``` -{@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/:bundle/unload|operation/unloadNamespaceBundle?version=@pulsar:version_number@} +{@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/:bundle/unload|operation/unloadNamespaceBundle?version=@pulsar:version_number@&destinationBroker=broker1.use.org.com:8080} From 1d2a97157c08441254bdca4ebe145d63e9b5fd72 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Mon, 14 Nov 2022 16:14:48 -0500 Subject: [PATCH 04/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ac40da5e7eef5..61090d015cec5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -878,7 +878,7 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { throw new RestException(e); } } - + private void validateLeaderBroker() { if (!this.isLeaderBroker()) { LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); From 94def5333f6d93d570afc120eff1fe21af3ba47b Mon Sep 17 00:00:00 2001 From: Vineeth Date: Mon, 14 Nov 2022 23:38:01 -0500 Subject: [PATCH 05/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../pulsar/broker/loadbalance/LoadManager.java | 4 +--- .../broker/loadbalance/ModularLoadManager.java | 4 +--- .../pulsar/broker/loadbalance/NoopLoadManager.java | 12 +++++------- .../loadbalance/impl/ModularLoadManagerImpl.java | 12 +++++------- .../loadbalance/impl/ModularLoadManagerWrapper.java | 11 +++-------- .../loadbalance/impl/SimpleLoadManagerImpl.java | 12 +++++------- 6 files changed, 20 insertions(+), 35 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 45a5cffc3f027..b4df5d31968dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -119,9 +119,7 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception { CompletableFuture> getAvailableBrokersAsync(); - void setNamespaceBundleAffinity(String bundle, String broker); - - String removeNamespaceBundleAffinity(String bundle); + String setNamespaceBundleAffinity(String bundle, String broker); void stop() throws PulsarServerException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index 55754c0a0a672..d608bd6784f29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -141,7 +141,5 @@ default void writeBrokerDataOnZooKeeper(boolean force) { */ BundleData getBundleDataOrDefault(String bundle); - void setNamespaceBundleAffinity(String bundle, String broker); - - String removeNamespaceBundleAffinity(String bundle); + String setNamespaceBundleAffinity(String bundle, String broker); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 4d2d86b95b2a0..f460cd18f6b57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -147,13 +147,11 @@ public void stop() throws PulsarServerException { } @Override - public void setNamespaceBundleAffinity(String bundle, String broker) { + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (broker == null) { + return this.bundleBrokerAffinityMap.remove(bundle); + } broker = broker.replaceFirst("http[s]?://", ""); - this.bundleBrokerAffinityMap.put(bundle, broker); - } - - @Override - public String removeNamespaceBundleAffinity(String bundle) { - return this.bundleBrokerAffinityMap.remove(bundle); + return this.bundleBrokerAffinityMap.put(bundle, broker); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index a87258337ac76..614a9e3d16c13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -1215,13 +1215,11 @@ public List getLoadBalancingMetrics() { } @Override - public void setNamespaceBundleAffinity(String bundle, String broker) { + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (broker == null) { + return this.bundleBrokerAffinityMap.remove(bundle); + } broker = broker.replaceFirst("http[s]?://", ""); - this.bundleBrokerAffinityMap.put(bundle, broker); - } - - @Override - public String removeNamespaceBundleAffinity(String bundle) { - return this.bundleBrokerAffinityMap.remove(bundle); + return this.bundleBrokerAffinityMap.put(bundle, broker); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 80814ee8186fb..47b93ff45b1ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -66,7 +66,7 @@ public LoadManagerReport generateLoadReport() { @Override public Optional getLeastLoaded(final ServiceUnitId serviceUnit) { String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); - String affinityBroker = loadManager.removeNamespaceBundleAffinity(bundleRange); + String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); if (affinityBroker != null) { return Optional.of(buildBrokerResourceUnit(affinityBroker)); } @@ -155,12 +155,7 @@ private SimpleResourceUnit buildBrokerResourceUnit (String broker) { } @Override - public void setNamespaceBundleAffinity(String bundle, String broker) { - loadManager.setNamespaceBundleAffinity(bundle, broker); - } - - @Override - public String removeNamespaceBundleAffinity(String bundle) { - return loadManager.removeNamespaceBundleAffinity(bundle); + public String setNamespaceBundleAffinity(String bundle, String broker) { + return loadManager.setNamespaceBundleAffinity(bundle, broker); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index b457c3653fb0e..c44480b76e0f1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -1448,14 +1448,12 @@ public void doNamespaceBundleSplit() throws Exception { } @Override - public void setNamespaceBundleAffinity(String bundle, String broker) { + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (broker == null) { + return this.bundleBrokerAffinityMap.remove(bundle); + } broker = broker.replaceFirst("http[s]?://", ""); - this.bundleBrokerAffinityMap.put(bundle, broker); - } - - @Override - public String removeNamespaceBundleAffinity(String bundle) { - return this.bundleBrokerAffinityMap.remove(bundle); + return this.bundleBrokerAffinityMap.put(bundle, broker); } @Override From ef8013a88cac3680629efde0a2fa1b282485a391 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Fri, 18 Nov 2022 10:06:20 -0500 Subject: [PATCH 06/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../broker/admin/impl/NamespacesBase.java | 22 ++++++++++++++++--- .../broker/namespace/NamespaceService.java | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 61090d015cec5..382d3ae8f4f54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -38,6 +38,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -57,6 +58,7 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.loadbalance.LeaderBroker; +import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -883,10 +885,21 @@ private void validateLeaderBroker() { if (!this.isLeaderBroker()) { LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); String leaderBrokerUrl = leaderBroker.getServiceUrl(); + CompletableFuture result = pulsar().getNamespaceService() + .createLookupResult(leaderBrokerUrl, false, null); try { - URL redirectUrl = new URL(leaderBrokerUrl); - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(redirectUrl.getHost()) - .port(redirectUrl.getPort()).replaceQueryParam("authoritative", + LookupResult lookupResult = result.get(); + String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() + : lookupResult.getLookupData().getHttpUrl(); + if (redirectUrl == null) { + log.error("Redirected broker's service url is not configured"); + throw new RestException(Response.Status.PRECONDITION_FAILED, + "Redirected broker's service url is not configured."); + } + URL url = new URL(redirectUrl); + URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost()) + .port(url.getPort()) + .replaceQueryParam("authoritative", false).build(); // Redirect @@ -897,6 +910,9 @@ private void validateLeaderBroker() { } catch (MalformedURLException exception) { log.error("The leader broker url is malformed - {}", leaderBrokerUrl); throw new RestException(exception); + } catch (ExecutionException | InterruptedException exception) { + log.error("Leader broker not found - {}", leaderBrokerUrl); + throw new RestException(exception); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index c6f61443d7741..27df77e815dfe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -601,7 +601,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle, } } - protected CompletableFuture createLookupResult(String candidateBroker, boolean authoritativeRedirect, + public CompletableFuture createLookupResult(String candidateBroker, boolean authoritativeRedirect, final String advertisedListenerName) { CompletableFuture lookupFuture = new CompletableFuture<>(); From ab7130ba9daff461a7443c36372751bb9ad039d5 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Fri, 18 Nov 2022 12:26:47 -0500 Subject: [PATCH 07/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 382d3ae8f4f54..ba1bc8ea2c2db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -971,9 +971,8 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR } return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, true) - .thenCompose(nsBundle -> { - return pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle); - }); + .thenCompose(nsBundle -> + pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); })); } From d660951a63c24defc88edacdafbf19ce06e9f6a7 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Mon, 28 Nov 2022 13:31:31 -0500 Subject: [PATCH 08/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ba1bc8ea2c2db..081738697c58f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -971,7 +971,7 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR } return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, authoritative, true) - .thenCompose(nsBundle -> + .thenCompose(nsBundle -> pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)); })); } From a75ff4472710b3558e4e7193f09a4d1878081df9 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Mon, 28 Nov 2022 16:13:50 -0500 Subject: [PATCH 09/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../java/org/apache/pulsar/admin/cli/CmdNamespaces.java | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 081738697c58f..9cdc585bd83fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -904,7 +904,7 @@ private void validateLeaderBroker() { // Redirect if (log.isDebugEnabled()) { - log.debug("Redirecting the rest call to leader - {}", redirect); + log.debug("Redirecting the request call to leader - {}", redirect); } throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } catch (MalformedURLException exception) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 080380d549e62..f673ff40b79c3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -890,12 +890,7 @@ void run() throws PulsarAdminException { if (bundle == null) { getAdmin().namespaces().unload(namespace); } else { - if (destinationBroker == null) { - getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle); - } else { - getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker); - } - + getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker); } } } From 09d4f768837682195f466671729aeba131f8ddbb Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 30 Nov 2022 11:18:51 -0500 Subject: [PATCH 10/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 940e96911165f..d715c6b4e06c7 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -442,7 +442,7 @@ public void namespaces() throws Exception { namespaces = new CmdNamespaces(() -> admin); namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff")); - verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); + verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", null); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); From f2b5759e6f9e68dac93e053c8a395a47aed5527f Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 14 Dec 2022 13:29:46 -0800 Subject: [PATCH 11/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/loadbalance/NoopLoadManager.java | 4 +++- .../broker/loadbalance/impl/ModularLoadManagerImpl.java | 2 +- .../broker/loadbalance/impl/ModularLoadManagerWrapper.java | 4 +++- .../pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java | 4 +++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index f460cd18f6b57..7f2baf2f3604d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -27,8 +27,10 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.common.naming.ServiceUnitId; @@ -148,7 +150,7 @@ public void stop() throws PulsarServerException { @Override public String setNamespaceBundleAffinity(String bundle, String broker) { - if (broker == null) { + if (StringUtils.isBlank(broker)) { return this.bundleBrokerAffinityMap.remove(bundle); } broker = broker.replaceFirst("http[s]?://", ""); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 614a9e3d16c13..6e63643a859d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -1216,7 +1216,7 @@ public List getLoadBalancingMetrics() { @Override public String setNamespaceBundleAffinity(String bundle, String broker) { - if (broker == null) { + if (StringUtils.isBlank(broker)) { return this.bundleBrokerAffinityMap.remove(bundle); } broker = broker.replaceFirst("http[s]?://", ""); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 47b93ff45b1ba..382ae7bcf4880 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -23,6 +23,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; + +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -67,7 +69,7 @@ public LoadManagerReport generateLoadReport() { public Optional getLeastLoaded(final ServiceUnitId serviceUnit) { String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); - if (affinityBroker != null) { + if (StringUtils.isBlank(affinityBroker)) { return Optional.of(buildBrokerResourceUnit(affinityBroker)); } Optional leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index c44480b76e0f1..ffaa231e9c8a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -47,6 +47,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; + +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -1449,7 +1451,7 @@ public void doNamespaceBundleSplit() throws Exception { @Override public String setNamespaceBundleAffinity(String bundle, String broker) { - if (broker == null) { + if (StringUtils.isBlank(broker)) { return this.bundleBrokerAffinityMap.remove(bundle); } broker = broker.replaceFirst("http[s]?://", ""); From fc9be0ac9e6fe70d9d54a790d5cc9d5c1dd529b8 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 14 Dec 2022 13:41:30 -0800 Subject: [PATCH 12/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/loadbalance/NoopLoadManager.java | 1 - .../broker/loadbalance/impl/ModularLoadManagerWrapper.java | 1 - .../pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java | 1 - 3 files changed, 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 7f2baf2f3604d..e8c0567fd0c8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -30,7 +30,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit; import org.apache.pulsar.common.naming.ServiceUnitId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 382ae7bcf4880..2fd2610ab1e7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -23,7 +23,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; - import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index ffaa231e9c8a2..c98857b2fc44c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.PulsarServerException; From e86bdb1a2fcb4e2ef7c0a19f68c031506985ec19 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 21 Dec 2022 15:05:23 -0800 Subject: [PATCH 13/13] [improve][admin,broker] Add option to unloadNamespaceBundle with bundle Affinity broker url --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 6 +++++- .../broker/loadbalance/impl/ModularLoadManagerWrapper.java | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9cdc585bd83fd..756cc9b5c85a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -40,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -888,7 +889,7 @@ private void validateLeaderBroker() { CompletableFuture result = pulsar().getNamespaceService() .createLookupResult(leaderBrokerUrl, false, null); try { - LookupResult lookupResult = result.get(); + LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS); String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() : lookupResult.getLookupData().getHttpUrl(); if (redirectUrl == null) { @@ -912,6 +913,9 @@ private void validateLeaderBroker() { throw new RestException(exception); } catch (ExecutionException | InterruptedException exception) { log.error("Leader broker not found - {}", leaderBrokerUrl); + throw new RestException(exception.getCause()); + } catch (TimeoutException exception) { + log.error("Leader broker not found within timeout - {}", leaderBrokerUrl); throw new RestException(exception); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 2fd2610ab1e7b..c61d39cf3159a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -68,7 +68,7 @@ public LoadManagerReport generateLoadReport() { public Optional getLeastLoaded(final ServiceUnitId serviceUnit) { String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); - if (StringUtils.isBlank(affinityBroker)) { + if (!StringUtils.isBlank(affinityBroker)) { return Optional.of(buildBrokerResourceUnit(affinityBroker)); } Optional leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);