Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1836,4 +1836,9 @@ public void shutdownNow() {
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}

@VisibleForTesting
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
this.transactionBufferProvider = transactionBufferProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,17 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
try {
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()) {
try {
topicFuture.join();
} catch (Exception ex) {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx == FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION) {
return CompletableFuture.failedFuture(new TimeoutException("The previous loading task"
+ " has not finished yet even through it has timeout, please retry again."));
}
}
}
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
Expand Down Expand Up @@ -1608,6 +1619,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ " topic", topic, FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic, topicFuture);
if (ex != null) {
log.warn("[{}] Get an error when closing topic.",
topic, ex);
Expand Down Expand Up @@ -1645,6 +1657,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the topic doesn't exist
topicFuture.complete(Optional.empty());
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
} else {
log.warn("Failed to create topic {}", topic, exception);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@
*/
package org.apache.pulsar.client.api;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
import static org.junit.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -114,4 +121,68 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception {
admin.topics().delete(tpName, false);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}

@Test
public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
// Make the topic loading timeout faster.
long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds();
int topicLoadTimeoutSeconds = 1;
pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(true);
pulsar.getConfig().setTransactionCoordinatorEnabled(true);
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

// Mock message deduplication recovery speed topicLoadTimeoutSeconds
AtomicBoolean stopDelay = new AtomicBoolean();
String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME;
mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
if (mlPath.equals(path) && !stopDelay.get()) {
log.info("Topic load timeout: " + path);
return true;
}
return false;
});

// First load topic will trigger timeout
// The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close.
// Here, we simulate a sleep to ensure that the ledger is not immediately closed.
TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() {
@Override
public TransactionBuffer newTransactionBuffer(Topic originTopic) {
return new TransactionBufferDisable(originTopic) {
@SneakyThrows
@Override
public CompletableFuture<Void> closeAsync() {
Thread.sleep(500);
return super.closeAsync();
}
};
}
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
CompletableFuture<Optional<Topic>> firstLoad = pulsar.getBrokerService().getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
// assert first create topic timeout
.untilAsserted(() -> {
assertTrue(firstLoad.isCompletedExceptionally());
});

// Once the first load topic times out, immediately to load the topic again.
stopDelay.set(true);
Producer<byte[]> producer = pulsarClient.newProducer().topic(tpName).create();
for (int i = 0; i < 10; i++) {
MessageId send = producer.send("msg".getBytes());
Thread.sleep(100);
assertNotNull(send);
}

// set to back
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
pulsar.getConfig().setBrokerDeduplicationEnabled(false);
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
}
}