Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -56,6 +58,8 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -878,6 +882,53 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
}
}

private void validateLeaderBroker() {
if (!this.isLeaderBroker()) {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerUrl = leaderBroker.getServiceUrl();
CompletableFuture<LookupResult> result = pulsar().getNamespaceService()
.createLookupResult(leaderBrokerUrl, false, null);
try {
LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
: lookupResult.getLookupData().getHttpUrl();
if (redirectUrl == null) {
log.error("Redirected broker's service url is not configured");
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Redirected broker's service url is not configured.");
}
URL url = new URL(redirectUrl);
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
.port(url.getPort())
.replaceQueryParam("authoritative",
false).build();

// Redirect
if (log.isDebugEnabled()) {
log.debug("Redirecting the request call to leader - {}", redirect);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
} catch (MalformedURLException exception) {
log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
throw new RestException(exception);
} catch (ExecutionException | InterruptedException exception) {
log.error("Leader broker not found - {}", leaderBrokerUrl);
throw new RestException(exception.getCause());
} catch (TimeoutException exception) {
log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
throw new RestException(exception);
}
}
}

public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
if (StringUtils.isBlank(destinationBroker)) {
return;
}
validateLeaderBroker();
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
}

public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to update the doc in admin-api-topics.md file

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

validateNamespaceName(property, cluster, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception {

CompletableFuture<Set<String>> getAvailableBrokersAsync();

String setNamespaceBundleAffinity(String bundle, String broker);

void stop() throws PulsarServerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,6 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
* @return bundle data
*/
BundleData getBundleDataOrDefault(String bundle);

String setNamespaceBundleAffinity(String bundle, String broker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
Expand All @@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager {
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
private Map<String, String> bundleBrokerAffinityMap;

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -142,4 +147,12 @@ public void stop() throws PulsarServerException {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {

private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
private Map<String, String> bundleBrokerAffinityMap;

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand All @@ -215,7 +216,7 @@ public ModularLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();

this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -1212,4 +1213,13 @@ public List<Metrics> getLoadBalancingMetrics() {

return metricsCollection;
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -65,13 +66,13 @@ public LoadManagerReport generateLoadReport() {

@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null);
if (!StringUtils.isBlank(affinityBroker)) {
return Optional.of(buildBrokerResourceUnit(affinityBroker));
}
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
return leastLoadedBroker.map(s -> {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
return leastLoadedBroker.map(this::buildBrokerResourceUnit);
}

private String getBrokerWebServiceUrl(String broker) {
Expand Down Expand Up @@ -146,4 +147,16 @@ public Set<String> getAvailableBrokers() throws Exception {
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return loadManager.getAvailableBrokersAsync();
}

private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
String webServiceUrl = getBrokerWebServiceUrl(broker);
String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
return loadManager.setNamespaceBundleAffinity(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification

private volatile Future<?> updateRankingHandle;

private Map<String, String> bundleBrokerAffinityMap;

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -251,6 +255,7 @@ public Long load(String key) throws Exception {
}
});
this.pulsar = pulsar;
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

public SimpleLoadManagerImpl(PulsarService pulsar) {
Expand Down Expand Up @@ -1443,6 +1448,15 @@ public void doNamespaceBundleSplit() throws Exception {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}

@Override
public void stop() throws PulsarServerException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -692,6 +692,7 @@ private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnit
String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);

if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
newAuthoritative).build();

newAuthoritative).replaceQueryParam("destinationBroker",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if destinationBroker is optional then if we don't pass it then by default value will be null. so, is it necessary to pass null value for destinationBroker ? or can we just make any change here if we are any passing null value?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are replacing the param with null because if destination broker is not null in the input request, it will again try to call leader broker to add the input to bundle affinity map which is already done before this step.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

null).build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ public void testNamespacesApiRedirects() throws Exception {
doReturn(uri).when(uriInfo).getRequestUri();

namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false);
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null);
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
Expand Down Expand Up @@ -1053,7 +1053,7 @@ public void testUnloadNamespaceWithBundles() throws Exception {
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);
namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
false);
false, null);
verify(response, timeout(5000).times(1)).resume(any(RestException.class));

// cleanup
Expand Down
Loading