From 3b8be736ee11525c5cee342ea79d0adcd9653d6c Mon Sep 17 00:00:00 2001 From: fanjianye Date: Thu, 17 Aug 2023 18:10:26 +0800 Subject: [PATCH] response only when topicPolicy is actually updated success --- .../admin/impl/PersistentTopicsBase.java | 52 +++++++++++++++++-- .../service/BrokerServiceException.java | 6 +++ .../broker/admin/TopicPoliciesTest.java | 32 ++++++------ .../apache/pulsar/client/util/RetryUtil.java | 25 +++++++-- .../common/policies/data/TopicPolicies.java | 33 ++++++++++++ 5 files changed, 123 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 56598f5cc451e..5e476b5f93ac8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.broker.service.TopicPoliciesService.DEFAULT_GET_TOPIC_POLICY_TIMEOUT; import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; @@ -77,6 +78,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; @@ -96,8 +98,11 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -3470,7 +3475,7 @@ protected CompletableFuture internalSetRetention(RetentionPolicies retenti } return getTopicPoliciesAsyncWithRetry(topicName, isGlobal) .thenCompose(op -> { - TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + TopicPolicies topicPolicies = op.map(TopicPolicies::new).orElseGet(TopicPolicies::new); for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name()); if (backlogQuota == null) { @@ -3490,7 +3495,43 @@ protected CompletableFuture internalSetRetention(RetentionPolicies retenti topicPolicies.setRetentionPolicies(retention); topicPolicies.setIsGlobal(isGlobal); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); - }); + }).thenCompose(__ -> isRetentionUpdateSuccess(isGlobal, retention)); + } + + private CompletableFuture isRetentionUpdateSuccess(boolean isGlobal, RetentionPolicies retention) { + + CompletableFuture response = new CompletableFuture<>(); + Backoff usedBackoff = new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .create(); + log.info("start currentTime:{}", System.currentTimeMillis()); + try { + RetryUtil.retryWithoutLogAsynchronously(() -> { + log.info("end currentTime:{}", System.currentTimeMillis()); + CompletableFuture future = new CompletableFuture<>(); + getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getRetentionPolicies)) + .thenAccept(retentionPolicies -> { + if (retentionPolicies.isPresent() && retentionPolicies.get().equals(retention)) { + future.complete(null); + } else if (!retentionPolicies.isPresent() && retention == null) { + future.complete(null); + } else { + log.info("retentionPolicies:{}, retention:{}", retentionPolicies.orElse(null), retention); + future.completeExceptionally(new BrokerServiceException.TopicPoliciesCacheNotUpdateAfterSetException()); + } + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + return future; + }, usedBackoff, pulsar().getExecutor(), response); + } catch (Exception e) { + response.completeExceptionally(e); + } + return response; } protected CompletableFuture internalRemoveRetention(boolean isGlobal) { @@ -3499,9 +3540,10 @@ protected CompletableFuture internalRemoveRetention(boolean isGlobal) { if (!op.isPresent()) { return CompletableFuture.completedFuture(null); } - op.get().setRetentionPolicies(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); - }); + TopicPolicies topicPolicies = op.map(TopicPolicies::new).get(); + topicPolicies.setRetentionPolicies(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + }).thenCompose(__ -> isRetentionUpdateSuccess(isGlobal, null)); } protected CompletableFuture internalGetPersistence(boolean applied, boolean isGlobal) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 3e77588b2459f..f32ac5330ed8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -220,6 +220,12 @@ public TopicPoliciesCacheNotInitException() { } } + public static class TopicPoliciesCacheNotUpdateAfterSetException extends BrokerServiceException { + public TopicPoliciesCacheNotUpdateAfterSetException() { + super("Topic policies cache have not update after set."); + } + } + public static class TopicBacklogQuotaExceededException extends BrokerServiceException { @Getter private final BacklogQuota.RetentionPolicy retentionPolicy; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 87471f4972f8d..2e8969a5feec5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -615,33 +615,33 @@ public void testCheckRetentionTimeBasedQuota() throws Exception { @Test public void testSetRetention() throws Exception { - RetentionPolicies retention = new RetentionPolicies(60, 1024); - log.info("Retention: {} will set to the topic: {}", retention, testTopic); + for (int i = 0; i < 10; i++) { + RetentionPolicies retention = new RetentionPolicies(i, 1024); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); - admin.topicPolicies().setRetention(testTopic, retention); - log.info("Retention set success on topic: {}", testTopic); + admin.topicPolicies().setRetention(testTopic, retention); + log.info("Retention set success on topic: {}", testTopic); - Awaitility.await() - .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention)); + Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention); + } admin.topics().deletePartitionedTopic(testTopic, true); } @Test public void testRemoveRetention() throws Exception { + for (int i = 0; i < 10; i++) { + RetentionPolicies retention = new RetentionPolicies(i, 1024); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); - RetentionPolicies retention = new RetentionPolicies(60, 1024); - log.info("Retention: {} will set to the topic: {}", retention, testTopic); - - admin.topicPolicies().setRetention(testTopic, retention); - log.info("Retention set success on topic: {}", testTopic); + admin.topicPolicies().setRetention(testTopic, retention); + log.info("Retention set success on topic: {}", testTopic); - Awaitility.await() - .untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention)); + Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention); - admin.topicPolicies().removeRetention(testTopic); - Awaitility.await() - .untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getRetention(testTopic))); + admin.topicPolicies().removeRetention(testTopic); + Assert.assertNull(admin.topicPolicies().getRetention(testTopic)); + } admin.topics().deletePartitionedTopic(testTopic, true); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java index 93501d7b6c18b..5ce0bb9541779 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java @@ -39,12 +39,27 @@ public static void retryAsynchronously(Supplier> suppli throw new IllegalArgumentException("Illegal initial time"); } scheduledExecutorService.execute(() -> - executeWithRetry(supplier, backoff, scheduledExecutorService, callback)); + executeWithRetry(supplier, backoff, scheduledExecutorService, callback, true)); + } + + public static void retryWithoutLogAsynchronously(Supplier> supplier, Backoff backoff, + ScheduledExecutorService scheduledExecutorService, + CompletableFuture callback) { + if (backoff.getMax() <= 0) { + throw new IllegalArgumentException("Illegal max retry time"); + } + if (backoff.getInitial() <= 0) { + throw new IllegalArgumentException("Illegal initial time"); + } + scheduledExecutorService.schedule(() -> + executeWithRetry(supplier, backoff, scheduledExecutorService, callback, false), + backoff.next(), TimeUnit.MILLISECONDS); } private static void executeWithRetry(Supplier> supplier, Backoff backoff, ScheduledExecutorService scheduledExecutorService, - CompletableFuture callback) { + CompletableFuture callback, + boolean isLog) { supplier.get().whenComplete((result, e) -> { if (e != null) { long next = backoff.next(); @@ -52,9 +67,11 @@ private static void executeWithRetry(Supplier> supplier if (isMandatoryStop) { callback.completeExceptionally(e); } else { - log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next); + if (isLog) { + log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next); + } scheduledExecutorService.schedule(() -> - executeWithRetry(supplier, backoff, scheduledExecutorService, callback), + executeWithRetry(supplier, backoff, scheduledExecutorService, callback, isLog), next, TimeUnit.MILLISECONDS); } return; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 4a76170d116a3..ac3a90e23cc16 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -78,6 +78,39 @@ public class TopicPolicies { // If set, it will override the namespace settings for allowing auto subscription creation private AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride; + public TopicPolicies(TopicPolicies policies) { + this.backLogQuotaMap = policies.backLogQuotaMap; + this.subscriptionTypesEnabled = policies.subscriptionTypesEnabled; + this.replicationClusters = policies.replicationClusters; + this.shadowTopics = policies.shadowTopics; + this.isGlobal = policies.isGlobal; + this.persistence = policies.persistence; + this.retentionPolicies = policies.retentionPolicies; + this.deduplicationEnabled = policies.deduplicationEnabled; + this.messageTTLInSeconds = policies.messageTTLInSeconds; + this.maxProducerPerTopic = policies.maxProducerPerTopic; + this.maxConsumerPerTopic = policies.maxConsumerPerTopic; + this.maxConsumersPerSubscription = policies.maxConsumersPerSubscription; + this.maxUnackedMessagesOnConsumer = policies.maxUnackedMessagesOnConsumer; + this.maxUnackedMessagesOnSubscription = policies.maxUnackedMessagesOnSubscription; + this.delayedDeliveryTickTimeMillis = policies.delayedDeliveryTickTimeMillis; + this.delayedDeliveryEnabled = policies.delayedDeliveryEnabled; + this.offloadPolicies = policies.offloadPolicies; + this.inactiveTopicPolicies = policies.inactiveTopicPolicies; + this.dispatchRate = policies.dispatchRate; + this.subscriptionDispatchRate = policies.subscriptionDispatchRate; + this.compactionThreshold = policies.compactionThreshold; + this.publishRate = policies.publishRate; + this.subscribeRate = policies.subscribeRate; + this.deduplicationSnapshotIntervalSeconds = policies.deduplicationSnapshotIntervalSeconds; + this.maxMessageSize = policies.maxMessageSize; + this.maxSubscriptionsPerTopic = policies.maxSubscriptionsPerTopic; + this.replicatorDispatchRate = policies.replicatorDispatchRate; + this.schemaCompatibilityStrategy = policies.schemaCompatibilityStrategy; + this.entryFilters = policies.entryFilters; + this.autoSubscriptionCreationOverride = policies.autoSubscriptionCreationOverride; + } + /** * Subscription level policies for specific subscription. */