diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index b33b84e5aed41..756cc9b5c85a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -38,7 +38,9 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -56,6 +58,8 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.loadbalance.LeaderBroker; +import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; @@ -878,6 +882,53 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() { } } + private void validateLeaderBroker() { + if (!this.isLeaderBroker()) { + LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); + String leaderBrokerUrl = leaderBroker.getServiceUrl(); + CompletableFuture result = pulsar().getNamespaceService() + .createLookupResult(leaderBrokerUrl, false, null); + try { + LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS); + String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() + : lookupResult.getLookupData().getHttpUrl(); + if (redirectUrl == null) { + log.error("Redirected broker's service url is not configured"); + throw new RestException(Response.Status.PRECONDITION_FAILED, + "Redirected broker's service url is not configured."); + } + URL url = new URL(redirectUrl); + URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost()) + .port(url.getPort()) + .replaceQueryParam("authoritative", + false).build(); + + // Redirect + if (log.isDebugEnabled()) { + log.debug("Redirecting the request call to leader - {}", redirect); + } + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } catch (MalformedURLException exception) { + log.error("The leader broker url is malformed - {}", leaderBrokerUrl); + throw new RestException(exception); + } catch (ExecutionException | InterruptedException exception) { + log.error("Leader broker not found - {}", leaderBrokerUrl); + throw new RestException(exception.getCause()); + } catch (TimeoutException exception) { + log.error("Leader broker not found within timeout - {}", leaderBrokerUrl); + throw new RestException(exception); + } + } + } + + public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) { + if (StringUtils.isBlank(destinationBroker)) { + return; + } + validateLeaderBroker(); + pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker); + } + public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) { return validateSuperUserAccessAsync() .thenAccept(__ -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index ffb0e49d36502..fd67de411728c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -887,8 +887,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("destinationBroker") String destinationBroker) { validateNamespaceName(property, cluster, namespace); + setNamespaceBundleAffinity(bundleRange, destinationBroker); internalUnloadNamespaceBundleAsync(bundleRange, authoritative) .thenAccept(__ -> { log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index b6bf1f0927cc6..f0a4dbb01dcf6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -813,8 +813,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("destinationBroker") String destinationBroker) { validateNamespaceName(tenant, namespace); + setNamespaceBundleAffinity(bundleRange, destinationBroker); internalUnloadNamespaceBundleAsync(bundleRange, authoritative) .thenAccept(__ -> { log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index e34215d199648..b4df5d31968dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -119,6 +119,8 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception { CompletableFuture> getAvailableBrokersAsync(); + String setNamespaceBundleAffinity(String bundle, String broker); + void stop() throws PulsarServerException; /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index fa6895568e918..d608bd6784f29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -140,4 +140,6 @@ default void writeBrokerDataOnZooKeeper(boolean force) { * @return bundle data */ BundleData getBundleDataOrDefault(String bundle); + + String setNamespaceBundleAffinity(String bundle, String broker); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java index 1ab56b50cdef4..e8c0567fd0c8c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/NoopLoadManager.java @@ -20,11 +20,14 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription; @@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager { private String lookupServiceAddress; private ResourceUnit localResourceUnit; private LockManager lockManager; + private Map bundleBrokerAffinityMap; @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class); + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); } @Override @@ -142,4 +147,12 @@ public void stop() throws PulsarServerException { } } + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (StringUtils.isBlank(broker)) { + return this.bundleBrokerAffinityMap.remove(bundle); + } + broker = broker.replaceFirst("http[s]?://", ""); + return this.bundleBrokerAffinityMap.put(bundle, broker); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 7d1a21a8c900e..6e63643a859d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private final Lock lock = new ReentrantLock(); private Set knownBrokers = ConcurrentHashMap.newKeySet(); + private Map bundleBrokerAffinityMap; /** * Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called. @@ -215,7 +216,7 @@ public ModularLoadManagerImpl() { scheduler = Executors.newSingleThreadScheduledExecutor( new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = new HashMap<>(); - + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() { @Override public boolean isEnablePersistentTopics(String brokerUrl) { @@ -1212,4 +1213,13 @@ public List getLoadBalancingMetrics() { return metricsCollection; } + + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (StringUtils.isBlank(broker)) { + return this.bundleBrokerAffinityMap.remove(bundle); + } + broker = broker.replaceFirst("http[s]?://", ""); + return this.bundleBrokerAffinityMap.put(bundle, broker); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 5f7cd5b8c38fa..c61d39cf3159a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -65,13 +66,13 @@ public LoadManagerReport generateLoadReport() { @Override public Optional getLeastLoaded(final ServiceUnitId serviceUnit) { + String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString()); + String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null); + if (!StringUtils.isBlank(affinityBroker)) { + return Optional.of(buildBrokerResourceUnit(affinityBroker)); + } Optional leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit); - return leastLoadedBroker.map(s -> { - String webServiceUrl = getBrokerWebServiceUrl(s); - String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl); - return new SimpleResourceUnit(webServiceUrl, - new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); - }); + return leastLoadedBroker.map(this::buildBrokerResourceUnit); } private String getBrokerWebServiceUrl(String broker) { @@ -146,4 +147,16 @@ public Set getAvailableBrokers() throws Exception { public CompletableFuture> getAvailableBrokersAsync() { return loadManager.getAvailableBrokersAsync(); } + + private SimpleResourceUnit buildBrokerResourceUnit (String broker) { + String webServiceUrl = getBrokerWebServiceUrl(broker); + String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl); + return new SimpleResourceUnit(webServiceUrl, + new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName)); + } + + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + return loadManager.setNamespaceBundleAffinity(bundle, broker); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index c2c0d1947c93e..c98857b2fc44c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -40,12 +40,14 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer updateRankingHandle; + private Map bundleBrokerAffinityMap; + // Perform initializations which may be done without a PulsarService. public SimpleLoadManagerImpl() { scheduler = Executors.newSingleThreadScheduledExecutor( @@ -251,6 +255,7 @@ public Long load(String key) throws Exception { } }); this.pulsar = pulsar; + this.bundleBrokerAffinityMap = new ConcurrentHashMap<>(); } public SimpleLoadManagerImpl(PulsarService pulsar) { @@ -1443,6 +1448,15 @@ public void doNamespaceBundleSplit() throws Exception { } } + @Override + public String setNamespaceBundleAffinity(String bundle, String broker) { + if (StringUtils.isBlank(broker)) { + return this.bundleBrokerAffinityMap.remove(bundle); + } + broker = broker.replaceFirst("http[s]?://", ""); + return this.bundleBrokerAffinityMap.put(bundle, broker); + } + @Override public void stop() throws PulsarServerException { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 4be64f4b7b50c..27df77e815dfe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -601,7 +601,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle, } } - protected CompletableFuture createLookupResult(String candidateBroker, boolean authoritativeRedirect, + public CompletableFuture createLookupResult(String candidateBroker, boolean authoritativeRedirect, final String advertisedListenerName) { CompletableFuture lookupFuture = new CompletableFuture<>(); @@ -692,6 +692,7 @@ private Optional> getLeastLoadedFromLoadManager(ServiceUnit String lookupAddress = leastLoadedBroker.get().getResourceId(); String advertisedAddr = (String) leastLoadedBroker.get() .getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME); + if (LOG.isDebugEnabled()) { LOG.debug("{} : redirecting to the least loaded broker, lookup address={}", pulsar.getSafeWebServiceAddress(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 8f238614fe3d8..7c1d2899758a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -710,8 +710,8 @@ public CompletableFuture validateBundleOwnershipAsync(NamespaceBundle bund // Replace the host and port of the current request and redirect URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost()) .port(webUrl.get().getPort()).replaceQueryParam("authoritative", - newAuthoritative).build(); - + newAuthoritative).replaceQueryParam("destinationBroker", + null).build(); log.debug("{} is not a service unit owned", bundle); // Redirect log.debug("Redirecting the rest call to {}", redirect); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 0f0ee8df3fa1f..0a92e0cc8bcaa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -726,7 +726,7 @@ public void testNamespacesApiRedirects() throws Exception { doReturn(uri).when(uriInfo).getRequestUri(); namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster, - this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false); + this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null); captor = ArgumentCaptor.forClass(WebApplicationException.class); verify(response, timeout(5000).atLeast(1)).resume(captor.capture()); assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode()); @@ -1053,7 +1053,7 @@ public void testUnloadNamespaceWithBundles() throws Exception { doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle); AsyncResponse response = mock(AsyncResponse.class); namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000", - false); + false, null); verify(response, timeout(5000).times(1)).resume(any(RestException.class)); // cleanup diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index 05e82484226ca..4b9f679f19d1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static java.lang.Thread.sleep; import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -85,6 +87,7 @@ import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -102,6 +105,8 @@ public class ModularLoadManagerImplTest { private PulsarService pulsar2; private PulsarAdmin admin2; + private PulsarService pulsar3; + private String primaryHost; private String secondaryHost; @@ -181,6 +186,20 @@ void setup() throws Exception { pulsar2 = new PulsarService(config2); pulsar2.start(); + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder"); + config.setClusterName("use"); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort()); + config.setAdvertisedAddress("localhost"); + config.setBrokerShutdownTimeoutMs(0L); + config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + config.setBrokerServicePort(Optional.of(0)); + config.setBrokerServicePortTls(Optional.of(0)); + config.setWebServicePortTls(Optional.of(0)); + pulsar3 = new PulsarService(config); + secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get()); url2 = new URL(pulsar2.getWebServiceAddress()); admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build(); @@ -188,7 +207,7 @@ void setup() throws Exception { primaryLoadManager = (ModularLoadManagerImpl) getField(pulsar1.getLoadManager().get(), "loadManager"); secondaryLoadManager = (ModularLoadManagerImpl) getField(pulsar2.getLoadManager().get(), "loadManager"); nsFactory = new NamespaceBundleFactory(pulsar1, Hashing.crc32()); - Thread.sleep(100); + sleep(100); } @AfterMethod(alwaysRun = true) @@ -201,6 +220,10 @@ void shutdown() throws Exception { pulsar2.close(); pulsar1.close(); + + if (pulsar3.isRunning()) { + pulsar3.close(); + } bkEnsemble.stop(); } @@ -284,6 +307,59 @@ public void testEvenBundleDistribution() throws Exception { } } + + + @Test + public void testBrokerAffinity() throws Exception { + // Start broker 3 + pulsar3.start(); + + final String tenant = "test"; + final String cluster = "test"; + String namespace = tenant + "/" + cluster + "/" + "test"; + String topic = "persistent://" + namespace + "/my-topic1"; + admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(namespace, 16); + + String topicLookup = admin1.lookups().lookupTopic(topic); + String bundleRange = admin1.lookups().getBundleRange(topic); + + String brokerServiceUrl = pulsar1.getBrokerServiceUrl(); + String brokerUrl = pulsar1.getSafeWebServiceAddress(); + log.debug("initial broker service url - {}", topicLookup); + Random rand=new Random(); + + if (topicLookup.equals(brokerServiceUrl)) { + int x = rand.nextInt(2); + if (x == 0) { + brokerUrl = pulsar2.getSafeWebServiceAddress(); + brokerServiceUrl = pulsar2.getBrokerServiceUrl(); + } + else { + brokerUrl = pulsar3.getSafeWebServiceAddress(); + brokerServiceUrl = pulsar3.getBrokerServiceUrl(); + } + } + brokerUrl = brokerUrl.replaceFirst("http[s]?://", ""); + log.debug("destination broker service url - {}, broker url - {}", brokerServiceUrl, brokerUrl); + String leaderServiceUrl = admin1.brokers().getLeaderBroker().getServiceUrl(); + log.debug("leader serviceUrl - {}, broker1 service url - {}", leaderServiceUrl, pulsar1.getSafeWebServiceAddress()); + //Make a call to broker which is not a leader + if (!leaderServiceUrl.equals(pulsar1.getSafeWebServiceAddress())) { + admin1.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl); + } + else { + admin2.namespaces().unloadNamespaceBundle(namespace, bundleRange, brokerUrl); + } + + sleep(2000); + String topicLookupAfterUnload = admin1.lookups().lookupTopic(topic); + log.debug("final broker service url - {}", topicLookupAfterUnload); + Assert.assertEquals(brokerServiceUrl, topicLookupAfterUnload); + } + /** * It verifies that once broker owns max-number of topics: load-manager doesn't allocates new bundles to that broker * unless all the brokers are in same state. @@ -345,7 +421,7 @@ public void testLoadShedding() throws Exception { // Need to update all the bundle data for the shredder to see the spy. primaryLoadManager.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080")); - Thread.sleep(100); + sleep(100); localBrokerData.setCpu(new ResourceUsage(80, 100)); primaryLoadManager.doLoadShedding(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index f4c284bb48434..2690df658b7be 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -2090,6 +2090,20 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi */ void unloadNamespaceBundle(String namespace, String bundle) throws PulsarAdminException; + /** + * Unload namespace bundle and assign the bundle to specified broker. + * + * @param namespace + * @param bundle + * range of bundle to unload + * @param destinationBroker + * Target broker url to which the bundle should be assigned to + * @throws PulsarAdminException + * Unexpected error + */ + void unloadNamespaceBundle(String namespace, String bundle, String destinationBroker) throws PulsarAdminException; + + /** * Unload namespace bundle asynchronously. * @@ -2101,6 +2115,19 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi */ CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle); + /** + * Unload namespace bundle asynchronously. + * + * @param namespace + * @param bundle + * range of bundle to unload + * @param destinationBroker + * Target broker url to which the bundle should be assigned to + * + * @return a future that can be used to track when the bundle is unloaded + */ + CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle, String destinationBroker); + /** * Split namespace bundle. * diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 6d4889a751d37..fa3155c59d601 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -814,6 +814,12 @@ public void unloadNamespaceBundle(String namespace, String bundle) throws Pulsar sync(() -> unloadNamespaceBundleAsync(namespace, bundle)); } + @Override + public void unloadNamespaceBundle(String namespace, + String bundle, String destinationBroker) throws PulsarAdminException { + sync(() -> unloadNamespaceBundleAsync(namespace, bundle, destinationBroker)); + } + @Override public CompletableFuture unloadNamespaceBundleAsync(String namespace, String bundle) { NamespaceName ns = NamespaceName.get(namespace); @@ -821,6 +827,14 @@ public CompletableFuture unloadNamespaceBundleAsync(String namespace, Stri return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); } + @Override + public CompletableFuture unloadNamespaceBundleAsync(String namespace, + String bundle, String destinationBroker) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, bundle, "unload").queryParam("destinationBroker", destinationBroker); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 940e96911165f..d715c6b4e06c7 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -442,7 +442,7 @@ public void namespaces() throws Exception { namespaces = new CmdNamespaces(() -> admin); namespaces.run(split("unload myprop/clust/ns1 -b 0x80000000_0xffffffff")); - verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff"); + verify(mockNamespaces).unloadNamespaceBundle("myprop/clust/ns1", "0x80000000_0xffffffff", null); namespaces.run(split("split-bundle myprop/clust/ns1 -b 0x00000000_0xffffffff")); verify(mockNamespaces).splitNamespaceBundle("myprop/clust/ns1", "0x00000000_0xffffffff", false, null); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index b64df272b4468..f673ff40b79c3 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -880,13 +880,17 @@ private class Unload extends CliCommand { @Parameter(names = { "--bundle", "-b" }, description = "{start-boundary}_{end-boundary}") private String bundle; + @Parameter(names = { "--destinationBroker", "-d" }, + description = "Target brokerWebServiceAddress to which the bundle has to be allocated to") + private String destinationBroker; + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); if (bundle == null) { getAdmin().namespaces().unload(namespace); } else { - getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle); + getAdmin().namespaces().unloadNamespaceBundle(namespace, bundle, destinationBroker); } } }