Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1537,134 +1537,103 @@ private CompletableFuture<Void> doUpdatePersistenceAsync(PersistencePolicies per
);
}

protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);

final List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear
if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces()
.clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange()));
}
}
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(),
namespaceName, exception.getCause().getMessage());
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
return null;
} else {
asyncResponse.resume(new RestException(exception.getCause()));
return null;
}
}
log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
return null;
});
protected CompletableFuture<Void> internalClearNamespaceBacklogAsync(boolean authoritative) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG)
.thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundlesAsync(namespaceName))
.thenCompose(bundles -> {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
for (NamespaceBundle nsBundle : bundles.getBundles()) {
try {
futures.add(pulsar().getAdminClient().namespaces()
.clearNamespaceBundleBacklogAsync(namespaceName.toString(),
nsBundle.getBundleRange()));
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}
}
return FutureUtil.waitForAll(futures);
}).thenRun(() -> log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}",
clientAppId(), namespaceName));
}

@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
protected CompletableFuture<Void> internalClearNamespaceBundleBacklogAsync(String bundleRange,
boolean authoritative) {
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}

validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true);

clearBacklog(namespaceName, bundleRange, null);
log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName,
bundleRange);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG)
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return validateClusterOwnershipAsync(namespaceName.getCluster())
.thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
// Allow acquiring ownership for an unassigned bundle so backlog can be cleared
// even if not loaded.
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(__ -> clearBacklogAsync(namespaceName, bundleRange, null))
.thenRun(() -> log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange));
}

protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
protected CompletableFuture<Void> internalClearNamespaceBacklogForSubscriptionAsync(String subscription,
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear
if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
}
}
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}",
clientAppId(), subscription, namespaceName, exception.getCause().getMessage());
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
return null;
} else {
asyncResponse.resume(new RestException(exception.getCause()));
return null;
}
}
log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}",
clientAppId(), subscription, namespaceName);
asyncResponse.resume(Response.noContent().build());
return null;
});
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG)
.thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundlesAsync(namespaceName))
.thenCompose(bundles -> {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
for (NamespaceBundle nsBundle : bundles.getBundles()) {
try {
futures.add(pulsar().getAdminClient().namespaces()
.clearNamespaceBundleBacklogForSubscriptionAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}
}
return FutureUtil.waitForAll(futures);
}).thenRun(() -> log.info(
"[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}",
clientAppId(), subscription, namespaceName));
}

@SuppressWarnings("deprecation")
protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG);
protected CompletableFuture<Void> internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription,
String bundleRange,
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}

validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true);

clearBacklog(namespaceName, bundleRange, subscription);
log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(),
subscription, namespaceName, bundleRange);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG)
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return validateClusterOwnershipAsync(namespaceName.getCluster())
.thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
// Allow acquiring ownership for an unassigned bundle so backlog can be cleared
// even if not loaded.
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(__ -> clearBacklogAsync(namespaceName, bundleRange, subscription))
.thenRun(() -> log.info(
"[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}",
clientAppId(), subscription, namespaceName, bundleRange));
}

protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
Expand Down Expand Up @@ -1885,39 +1854,53 @@ private boolean checkQuotas(Policies policies, RetentionPolicies retention) {
return checkBacklogQuota(quota, retention);
}

private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
nsName.toString() + "/" + bundleRange);

List<CompletableFuture<Void>> futures = new ArrayList<>();
if (subscription != null) {
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
subscription = PersistentReplicator.getRemoteCluster(subscription);
}
for (Topic topic : topicList) {
if (topic instanceof PersistentTopic
&& !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog(subscription));
private CompletableFuture<Void> clearBacklogAsync(NamespaceName nsName, String bundleRange, String subscription) {
final NamespaceBundleFactory bundleFactory = pulsar().getNamespaceService().getNamespaceBundleFactory();
final NamespaceBundle targetBundle = bundleFactory.getBundle(nsName.toString(), bundleRange);
return pulsar().getNamespaceService().getListOfPersistentTopics(nsName)
.thenCompose(topicsInNamespace -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
String effectiveSubscription = subscription;
if (effectiveSubscription != null
&& effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
effectiveSubscription = PersistentReplicator.getRemoteCluster(effectiveSubscription);
}
}
} else {
for (Topic topic : topicList) {
if (topic instanceof PersistentTopic
&& !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) {
futures.add(((PersistentTopic) topic).clearBacklog());
final String finalSubscription = effectiveSubscription;

for (String topic : topicsInNamespace) {
TopicName topicName = TopicName.get(topic);
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
continue;
}
NamespaceBundle bundle = bundleFactory.getBundle(topicName);
if (bundle == null || !bundle.equals(targetBundle)) {
continue;
}
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
Topic loaded = optTopic.get();
if (!(loaded instanceof PersistentTopic persistentTopic)) {
return CompletableFuture.completedFuture(null);
}
return finalSubscription != null
? persistentTopic.clearBacklog(finalSubscription)
: persistentTopic.clearBacklog();
}));
}
}
}

FutureUtil.waitForAll(futures).get();
} catch (Exception e) {
log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(),
nsName.toString(), bundleRange, subscription, e);
throw new RestException(e);
}
return FutureUtil.waitForAll(futures);
}).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(),
nsName, bundleRange, subscription, cause);
throw new RestException(cause);
});
}


private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
Expand Down
Loading