From 31bfded7d717afd3e76de4d839bdb3daebf704a0 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 17 May 2024 18:36:07 +0800 Subject: [PATCH 1/2] [fix][broker] The topic might reference a closed ledger --- .../apache/pulsar/broker/PulsarService.java | 5 ++ .../pulsar/broker/service/BrokerService.java | 5 ++ .../client/api/OrphanPersistentTopicTest.java | 67 +++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295fb5..8fb383cca4427 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception return new BrokerService(pulsar, ioEventLoopGroup); } + @VisibleForTesting + public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) { + this.transactionBufferProvider = transactionBufferProvider; + } + private CompactionServiceFactory loadCompactionServiceFactory() { String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName(); var compactionServiceFactory = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6603e240ee7d9..7acc15a0adee6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1007,6 +1007,9 @@ public CompletableFuture> getTopic(final TopicName topicName, bo CompletableFuture> topicFuture = topics.get(topicName.toString()); if (topicFuture != null) { if (topicFuture.isCompletedExceptionally() + && FutureUtil.getException(topicFuture).get().equals(FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION)) { + return FutureUtil.failedFuture(FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); + } else if (topicFuture.isCompletedExceptionally() || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { // Exceptional topics should be recreated. topics.remove(topicName.toString(), topicFuture); @@ -1744,6 +1747,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " topic", topic, FutureUtil.getException(topicFuture)); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { + topics.remove(topic, topicFuture); if (ex != null) { log.warn("[{}] Get an error when closing topic.", topic, ex); @@ -1760,6 +1764,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { + topics.remove(topic, topicFuture); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 7cd9da7574dbb..a399a6fd9f024 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.List; @@ -27,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -34,6 +37,9 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.compaction.CompactionServiceFactory; @@ -108,6 +114,67 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); } + @Test + public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { + // Make the topic loading timeout faster. + long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); + int topicLoadTimeoutSeconds = 1; + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(true); + pulsar.getConfig().setTransactionCoordinatorEnabled(true); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + // Mock message deduplication recovery speed topicLoadTimeoutSeconds + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { + if (mlPath.equals(path)) { + log.info("Topic load timeout: " + path); + return true; + } + return false; + }); + + // First load topic will trigger timeout + // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. + // Here, we simulate a sleep to ensure that the ledger is not immediately closed. + TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() { + @Override + public TransactionBuffer newTransactionBuffer(Topic originTopic) { + return new TransactionBufferDisable(originTopic) { + @SneakyThrows + @Override + public CompletableFuture closeAsync() { + Thread.sleep(500); + return super.closeAsync(); + } + }; + } + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider); + CompletableFuture> firstLoad = pulsar.getBrokerService().getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + // assert first create topic timeout + .untilAsserted(() -> { + assertTrue(firstLoad.isCompletedExceptionally()); + }); + + // Once the first load topic times out, immediately to load the topic again. + Producer producer = pulsarClient.newProducer().topic(tpName).create(); + for (int i = 0; i < 100; i++) { + MessageId send = producer.send("msg".getBytes()); + assertNotNull(send); + } + + // set to back + pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider); + pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(false); + pulsar.getConfig().setTransactionCoordinatorEnabled(false); + } + @Test public void testNoOrphanTopicIfInitFailed() throws Exception { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); From a07f5a7391b34c420587d8c3095b75bdb2028ebc Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 20 May 2024 16:13:23 +0800 Subject: [PATCH 2/2] Fix unit test. --- .../pulsar/broker/service/ReplicatorTest.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 765727aeac319..6084ab06b48d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -19,12 +19,10 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception { // Ok } - final CompletableFuture> timedOutTopicFuture = topicFuture; - // timeout topic future should be removed from cache - retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, - 1000); - - assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); - try { Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); @@ -1452,6 +1443,12 @@ public void testCleanupTopic() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); mlFuture.complete(ml); + // Once ml is created, topic should be removed due timeout. + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertNull(pulsar1.getBrokerService().getTopics().get(topicName)); + }); + + // Re-create topic will success. Consumer consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") .subscriptionType(SubscriptionType.Shared).subscribeAsync() .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);