From e0ff806cd85c60d67508d2376cc2e37cf119e6b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B5=A9?= Date: Fri, 27 Feb 2026 17:48:47 +0800 Subject: [PATCH] Fix backlog clearing for unloaded namespace bundles. --- .../broker/admin/impl/NamespacesBase.java | 265 ++++++++---------- .../pulsar/broker/admin/v1/Namespaces.java | 57 ++-- .../pulsar/broker/admin/v2/Namespaces.java | 57 ++-- .../pulsar/broker/admin/AdminApiTest.java | 126 +++++++++ 4 files changed, 324 insertions(+), 181 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bfbd65234813e..cd4ec29d4a9d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1537,134 +1537,103 @@ private CompletableFuture doUpdatePersistenceAsync(PersistencePolicies per ); } - protected void internalClearNamespaceBacklog(AsyncResponse asyncResponse, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); - - final List> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces() - .clearNamespaceBundleBacklogAsync(namespaceName.toString(), nsBundle.getBundleRange())); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to clear backlog on the bundles for namespace {}: {}", clientAppId(), - namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", clientAppId(), - namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + protected CompletableFuture internalClearNamespaceBacklogAsync(boolean authoritative) { + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogAsync(namespaceName.toString(), + nsBundle.getBundleRange())); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info("[{}] Successfully cleared backlog on all the bundles for namespace {}", + clientAppId(), namespaceName)); } @SuppressWarnings("deprecation") - protected void internalClearNamespaceBundleBacklog(String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + protected CompletableFuture internalClearNamespaceBundleBacklogAsync(String bundleRange, + boolean authoritative) { checkNotNull(bundleRange, "BundleRange should not be null"); - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - clearBacklog(namespaceName, bundleRange, null); - log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), namespaceName, - bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> { + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return validateClusterOwnershipAsync(namespaceName.getCluster()) + .thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(), + namespaceName.getCluster())); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + // Allow acquiring ownership for an unassigned bundle so backlog can be cleared + // even if not loaded. + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(__ -> clearBacklogAsync(namespaceName, bundleRange, null)) + .thenRun(() -> log.info("[{}] Successfully cleared backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange)); } - protected void internalClearNamespaceBacklogForSubscription(AsyncResponse asyncResponse, String subscription, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + protected CompletableFuture internalClearNamespaceBacklogForSubscriptionAsync(String subscription, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); - final List> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there is no backlog on this bundle to clear - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces().clearNamespaceBundleBacklogForSubscriptionAsync( - namespaceName.toString(), nsBundle.getBundleRange(), subscription)); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to clear backlog for subscription {} on the bundles for namespace {}: {}", - clientAppId(), subscription, namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", - clientAppId(), subscription, namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces() + .clearNamespaceBundleBacklogForSubscriptionAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info( + "[{}] Successfully cleared backlog for subscription {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName)); } @SuppressWarnings("deprecation") - protected void internalClearNamespaceBundleBacklogForSubscription(String subscription, String bundleRange, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.CLEAR_BACKLOG); + protected CompletableFuture internalClearNamespaceBundleBacklogForSubscriptionAsync(String subscription, + String bundleRange, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); checkNotNull(bundleRange, "BundleRange should not be null"); - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - clearBacklog(namespaceName, bundleRange, subscription); - log.info("[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", clientAppId(), - subscription, namespaceName, bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.CLEAR_BACKLOG) + .thenCompose(__ -> { + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return validateClusterOwnershipAsync(namespaceName.getCluster()) + .thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(), + namespaceName.getCluster())); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + // Allow acquiring ownership for an unassigned bundle so backlog can be cleared + // even if not loaded. + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(__ -> clearBacklogAsync(namespaceName, bundleRange, subscription)) + .thenRun(() -> log.info( + "[{}] Successfully cleared backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange)); } protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription, @@ -1885,39 +1854,53 @@ private boolean checkQuotas(Policies policies, RetentionPolicies retention) { return checkBacklogQuota(quota, retention); } - private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) { - try { - List topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), - nsName.toString() + "/" + bundleRange); - - List> futures = new ArrayList<>(); - if (subscription != null) { - if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { - subscription = PersistentReplicator.getRemoteCluster(subscription); - } - for (Topic topic : topicList) { - if (topic instanceof PersistentTopic - && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { - futures.add(((PersistentTopic) topic).clearBacklog(subscription)); + private CompletableFuture clearBacklogAsync(NamespaceName nsName, String bundleRange, String subscription) { + final NamespaceBundleFactory bundleFactory = pulsar().getNamespaceService().getNamespaceBundleFactory(); + final NamespaceBundle targetBundle = bundleFactory.getBundle(nsName.toString(), bundleRange); + return pulsar().getNamespaceService().getListOfPersistentTopics(nsName) + .thenCompose(topicsInNamespace -> { + List> futures = new ArrayList<>(); + String effectiveSubscription = subscription; + if (effectiveSubscription != null + && effectiveSubscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { + effectiveSubscription = PersistentReplicator.getRemoteCluster(effectiveSubscription); } - } - } else { - for (Topic topic : topicList) { - if (topic instanceof PersistentTopic - && !pulsar().getBrokerService().isSystemTopic(TopicName.get(topic.getName()))) { - futures.add(((PersistentTopic) topic).clearBacklog()); + final String finalSubscription = effectiveSubscription; + + for (String topic : topicsInNamespace) { + TopicName topicName = TopicName.get(topic); + if (pulsar().getBrokerService().isSystemTopic(topicName)) { + continue; + } + NamespaceBundle bundle = bundleFactory.getBundle(topicName); + if (bundle == null || !bundle.equals(targetBundle)) { + continue; + } + futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + Topic loaded = optTopic.get(); + if (!(loaded instanceof PersistentTopic persistentTopic)) { + return CompletableFuture.completedFuture(null); + } + return finalSubscription != null + ? persistentTopic.clearBacklog(finalSubscription) + : persistentTopic.clearBacklog(); + })); } - } - } - FutureUtil.waitForAll(futures).get(); - } catch (Exception e) { - log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(), - nsName.toString(), bundleRange, subscription, e); - throw new RestException(e); - } + return FutureUtil.waitForAll(futures); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to clear backlog for namespace {}/{}, subscription: {}", clientAppId(), + nsName, bundleRange, subscription, cause); + throw new RestException(cause); + }); } + private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) { try { List topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index cad6899c8a290..0d95c554c5832 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1376,14 +1376,14 @@ public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklog(asyncResponse, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalClearNamespaceBacklogAsync(authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1393,12 +1393,20 @@ public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklog(@PathParam("property") String property, + public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBundleBacklog(bundleRange, authoritative); + internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1411,14 +1419,15 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalClearNamespaceBacklogForSubscriptionAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1428,12 +1437,20 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") String property, + public void clearNamespaceBundleBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative); + internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 90f4b087bfe85..728c6a4569012 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1515,14 +1515,14 @@ public void getPersistence( public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklog(asyncResponse, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogAsync(authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace {}", clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1533,11 +1533,19 @@ public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklog(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklog(bundleRange, authoritative); + internalClearNamespaceBundleBacklogAsync(bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog on namespace bundle {}/{}", clientAppId(), + namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1551,14 +1559,15 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalClearNamespaceBacklogForSubscription(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalClearNamespaceBacklogForSubscriptionAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1569,12 +1578,20 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant, + public void clearNamespaceBundleBacklogForSubscription(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative); + internalClearNamespaceBundleBacklogForSubscriptionAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to clear backlog for subscription {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 663a6f0a41900..12df910a4045c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2285,6 +2285,132 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception { assertEquals(backlog, 0); } + @Test(dataProvider = "numBundles") + public void testClearNamespaceBundleBacklogOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns1-bundles"; + admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + + Consumer consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + for (int i = 0; i < 10; i++) { + producer.send(("message-" + i).getBytes()); + } + producer.close(); + consumer.close(); + + long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlog, 10); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().clearNamespaceBundleBacklog(namespace, bundle.getBundleRange()); + + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlogAfter, 0); + } + + @Test(dataProvider = "numBundles") + public void testClearNamespaceBundleBacklogForSubscriptionOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns1-bundles"; + admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + for (int i = 0; i < 10; i++) { + producer.send(("message-" + i).getBytes()); + } + producer.close(); + consumer1.close(); + consumer2.close(); + + long backlog = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlog, 10); + long otherBacklog = admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); + assertEquals(otherBacklog, 10); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle.getBundleRange(), + subscription); + + @Cleanup + Consumer consumer3 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + @Cleanup + Consumer consumer4 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + long backlogAfter = admin.topics().getStats(topic).getSubscriptions().get(subscription).getMsgBacklog(); + assertEquals(backlogAfter, 0); + long otherBacklogAfter = + admin.topics().getStats(topic).getSubscriptions().get(otherSubscription).getMsgBacklog(); + assertEquals(otherBacklogAfter, 10); + } + + @Test(dataProvider = "bundling") public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);