Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.service.TopicPoliciesService.DEFAULT_GET_TOPIC_POLICY_TIMEOUT;
import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
Expand All @@ -96,8 +98,11 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
Expand Down Expand Up @@ -3470,7 +3475,7 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
}
return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
TopicPolicies topicPolicies = op.map(TopicPolicies::new).orElseGet(TopicPolicies::new);
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
if (backlogQuota == null) {
Expand All @@ -3490,7 +3495,43 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
topicPolicies.setRetentionPolicies(retention);
topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}).thenCompose(__ -> isRetentionUpdateSuccess(isGlobal, retention));
}

private CompletableFuture<Void> isRetentionUpdateSuccess(boolean isGlobal, RetentionPolicies retention) {

CompletableFuture<Void> response = new CompletableFuture<>();
Backoff usedBackoff = new BackoffBuilder()
.setInitialTime(500, TimeUnit.MILLISECONDS)
.setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
.setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS)
.create();
log.info("start currentTime:{}", System.currentTimeMillis());
try {
RetryUtil.retryWithoutLogAsynchronously(() -> {
log.info("end currentTime:{}", System.currentTimeMillis());
CompletableFuture<Void> future = new CompletableFuture<>();
getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getRetentionPolicies))
.thenAccept(retentionPolicies -> {
if (retentionPolicies.isPresent() && retentionPolicies.get().equals(retention)) {
future.complete(null);
} else if (!retentionPolicies.isPresent() && retention == null) {
future.complete(null);
} else {
log.info("retentionPolicies:{}, retention:{}", retentionPolicies.orElse(null), retention);
future.completeExceptionally(new BrokerServiceException.TopicPoliciesCacheNotUpdateAfterSetException());
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}, usedBackoff, pulsar().getExecutor(), response);
} catch (Exception e) {
response.completeExceptionally(e);
}
return response;
}

protected CompletableFuture<Void> internalRemoveRetention(boolean isGlobal) {
Expand All @@ -3499,9 +3540,10 @@ protected CompletableFuture<Void> internalRemoveRetention(boolean isGlobal) {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setRetentionPolicies(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
TopicPolicies topicPolicies = op.map(TopicPolicies::new).get();
topicPolicies.setRetentionPolicies(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}).thenCompose(__ -> isRetentionUpdateSuccess(isGlobal, null));
}

protected CompletableFuture<PersistencePolicies> internalGetPersistence(boolean applied, boolean isGlobal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ public TopicPoliciesCacheNotInitException() {
}
}

public static class TopicPoliciesCacheNotUpdateAfterSetException extends BrokerServiceException {
public TopicPoliciesCacheNotUpdateAfterSetException() {
super("Topic policies cache have not update after set.");
}
}

public static class TopicBacklogQuotaExceededException extends BrokerServiceException {
@Getter
private final BacklogQuota.RetentionPolicy retentionPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,33 +615,33 @@ public void testCheckRetentionTimeBasedQuota() throws Exception {

@Test
public void testSetRetention() throws Exception {
RetentionPolicies retention = new RetentionPolicies(60, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);
for (int i = 0; i < 10; i++) {
RetentionPolicies retention = new RetentionPolicies(i, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);

admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);

Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention));
Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention);
}

admin.topics().deletePartitionedTopic(testTopic, true);
}

@Test
public void testRemoveRetention() throws Exception {
for (int i = 0; i < 10; i++) {
RetentionPolicies retention = new RetentionPolicies(i, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);

RetentionPolicies retention = new RetentionPolicies(60, 1024);
log.info("Retention: {} will set to the topic: {}", retention, testTopic);

admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);
admin.topicPolicies().setRetention(testTopic, retention);
log.info("Retention set success on topic: {}", testTopic);

Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention));
Assert.assertEquals(admin.topicPolicies().getRetention(testTopic), retention);

admin.topicPolicies().removeRetention(testTopic);
Awaitility.await()
.untilAsserted(() -> Assert.assertNull(admin.topicPolicies().getRetention(testTopic)));
admin.topicPolicies().removeRetention(testTopic);
Assert.assertNull(admin.topicPolicies().getRetention(testTopic));
}

admin.topics().deletePartitionedTopic(testTopic, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,39 @@ public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> suppli
throw new IllegalArgumentException("Illegal initial time");
}
scheduledExecutorService.execute(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback));
executeWithRetry(supplier, backoff, scheduledExecutorService, callback, true));
}

public static <T> void retryWithoutLogAsynchronously(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
if (backoff.getMax() <= 0) {
throw new IllegalArgumentException("Illegal max retry time");
}
if (backoff.getInitial() <= 0) {
throw new IllegalArgumentException("Illegal initial time");
}
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback, false),
backoff.next(), TimeUnit.MILLISECONDS);
}

private static <T> void executeWithRetry(Supplier<CompletableFuture<T>> supplier, Backoff backoff,
ScheduledExecutorService scheduledExecutorService,
CompletableFuture<T> callback) {
CompletableFuture<T> callback,
boolean isLog) {
supplier.get().whenComplete((result, e) -> {
if (e != null) {
long next = backoff.next();
boolean isMandatoryStop = backoff.isMandatoryStopMade();
if (isMandatoryStop) {
callback.completeExceptionally(e);
} else {
log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next);
if (isLog) {
log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next);
}
scheduledExecutorService.schedule(() ->
executeWithRetry(supplier, backoff, scheduledExecutorService, callback),
executeWithRetry(supplier, backoff, scheduledExecutorService, callback, isLog),
next, TimeUnit.MILLISECONDS);
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,39 @@ public class TopicPolicies {
// If set, it will override the namespace settings for allowing auto subscription creation
private AutoSubscriptionCreationOverrideImpl autoSubscriptionCreationOverride;

public TopicPolicies(TopicPolicies policies) {
this.backLogQuotaMap = policies.backLogQuotaMap;
this.subscriptionTypesEnabled = policies.subscriptionTypesEnabled;
this.replicationClusters = policies.replicationClusters;
this.shadowTopics = policies.shadowTopics;
this.isGlobal = policies.isGlobal;
this.persistence = policies.persistence;
this.retentionPolicies = policies.retentionPolicies;
this.deduplicationEnabled = policies.deduplicationEnabled;
this.messageTTLInSeconds = policies.messageTTLInSeconds;
this.maxProducerPerTopic = policies.maxProducerPerTopic;
this.maxConsumerPerTopic = policies.maxConsumerPerTopic;
this.maxConsumersPerSubscription = policies.maxConsumersPerSubscription;
this.maxUnackedMessagesOnConsumer = policies.maxUnackedMessagesOnConsumer;
this.maxUnackedMessagesOnSubscription = policies.maxUnackedMessagesOnSubscription;
this.delayedDeliveryTickTimeMillis = policies.delayedDeliveryTickTimeMillis;
this.delayedDeliveryEnabled = policies.delayedDeliveryEnabled;
this.offloadPolicies = policies.offloadPolicies;
this.inactiveTopicPolicies = policies.inactiveTopicPolicies;
this.dispatchRate = policies.dispatchRate;
this.subscriptionDispatchRate = policies.subscriptionDispatchRate;
this.compactionThreshold = policies.compactionThreshold;
this.publishRate = policies.publishRate;
this.subscribeRate = policies.subscribeRate;
this.deduplicationSnapshotIntervalSeconds = policies.deduplicationSnapshotIntervalSeconds;
this.maxMessageSize = policies.maxMessageSize;
this.maxSubscriptionsPerTopic = policies.maxSubscriptionsPerTopic;
this.replicatorDispatchRate = policies.replicatorDispatchRate;
this.schemaCompatibilityStrategy = policies.schemaCompatibilityStrategy;
this.entryFilters = policies.entryFilters;
this.autoSubscriptionCreationOverride = policies.autoSubscriptionCreationOverride;
}

/**
* Subscription level policies for specific subscription.
*/
Expand Down