diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 69a38bc50de9d..9a115e6d1ca4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -48,6 +48,7 @@ import java.util.function.ToLongFunction; import javax.annotation.Nonnull; import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -96,6 +97,13 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener { protected final String topic; + // Reference to the CompletableFuture returned when creating this topic in BrokerService. + // Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact + // topic instance that was created. + @Getter + @Setter + protected volatile CompletableFuture> createFuture; + // Producers currently connected to this topic protected final ConcurrentHashMap producers; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 79e6fb2b02e31..ddd436b085493 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1326,6 +1326,7 @@ private CompletableFuture> createNonPersistentTopic(String topic NonPersistentTopic nonPersistentTopic; try { nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); + nonPersistentTopic.setCreateFuture(topicFuture); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); topicFuture.completeExceptionally(e); @@ -1800,6 +1801,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); + persistentTopic.setCreateFuture(topicFuture); persistentTopic .initialize() .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) @@ -2409,47 +2411,18 @@ public AuthorizationService getAuthorizationService() { return authorizationService; } - public CompletableFuture removeTopicFromCache(Topic topic) { - Optional>> createTopicFuture = findTopicFutureInCache(topic); - if (createTopicFuture.isEmpty()){ - return CompletableFuture.completedFuture(null); - } - return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); - } - - private Optional>> findTopicFutureInCache(Topic topic){ - if (topic == null){ - return Optional.empty(); - } - final CompletableFuture> createTopicFuture = topics.get(topic.getName()); - // If not exists in cache, do nothing. - if (createTopicFuture == null){ - return Optional.empty(); - } - // If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic. - if (!createTopicFuture.isDone()){ - return Optional.empty(); - } - // If the future in cache has exception complete, - // the topic instance in the cache is not the same with the topic. - if (createTopicFuture.isCompletedExceptionally()){ - return Optional.empty(); - } - Optional optionalTopic = createTopicFuture.join(); - Topic topicInCache = optionalTopic.orElse(null); - if (topicInCache == null || topicInCache != topic){ - return Optional.empty(); - } else { - return Optional.of(createTopicFuture); - } - } - - private CompletableFuture removeTopicFutureFromCache(String topic, - CompletableFuture> createTopicFuture) { - TopicName topicName = TopicName.get(topic); + /** + * Removes the topic from the cache only if the topicName and associated createFuture match exactly. + * The TopicEvent.UNLOAD event will be triggered before and after removal. + * + * @param topic The topic to be removed. + * @return A CompletableFuture that completes when the operation is done. + */ + public CompletableFuture removeTopicFromCache(AbstractTopic topic) { + TopicName topicName = TopicName.get(topic.getName()); return pulsar.getNamespaceService().getBundleAsync(topicName) .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle, createTopicFuture); + removeTopicFromCache(topic.getName(), namespaceBundle, topic.getCreateFuture()); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 41977e6b61d88..c43f0ed7fb9c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; + private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { + return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() + ? new SnapshotSegmentAbortedTxnProcessorImpl(topic) + : new SingleSnapshotAbortedTxnProcessorImpl(topic); + } + + private static AbortedTxnProcessor.SnapshotType determineSnapshotType(PersistentTopic topic) { + return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() + ? AbortedTxnProcessor.SnapshotType.Segment + : AbortedTxnProcessor.SnapshotType.Single; + } + public TopicTransactionBuffer(PersistentTopic topic) { + this(topic, createSnapshotProcessor(topic), determineSnapshotType(topic)); + } + + @VisibleForTesting + TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor, + AbortedTxnProcessor.SnapshotType snapshotType) { super(State.None); this.topic = topic; this.timer = topic.getBrokerService().getPulsar().getTransactionTimer(); @@ -118,13 +136,8 @@ public TopicTransactionBuffer(PersistentTopic topic) { this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry(); - if (topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) { - snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(topic); - snapshotType = AbortedTxnProcessor.SnapshotType.Segment; - } else { - snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic); - snapshotType = AbortedTxnProcessor.SnapshotType.Single; - } + this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor; + this.snapshotType = snapshotType; this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack(); this.recover(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java new file mode 100644 index 0000000000000..508423adce4d8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.buffer.impl; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TopicFactory; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class TransactionPersistentTopicTest extends ProducerConsumerBase { + + private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1); + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + // Intercept when the `topicFuture` is about to complete and wait until the topic close operation finishes. + conf.setTopicFactoryClassName(MyTopicFactory.class.getName()); + conf.setTransactionCoordinatorEnabled(true); + conf.setBrokerDeduplicationEnabled(false); + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testNoOrphanClosedTopicIfTxnInternalFailed() { + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + BrokerService brokerService = pulsar.getBrokerService(); + + // 1. Mock close topic when create transactionBuffer + TransactionBufferProvider mockTransactionBufferProvider = originTopic -> { + AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class); + doAnswer(invocation -> { + topicInitSuccessSignal.await(); + return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed")); + }).when(abortedTxnProcessor).recoverFromSnapshot(); + when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + return new TopicTransactionBuffer( + (PersistentTopic) originTopic, abortedTxnProcessor, AbortedTxnProcessor.SnapshotType.Single); + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionBufferProvider(mockTransactionBufferProvider); + + // 2. Trigger create topic and assert topic load success. + CompletableFuture> firstLoad = brokerService.getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertTrue(firstLoad.isDone()); + assertFalse(firstLoad.isCompletedExceptionally()); + }); + + // 3. Assert topic removed from cache + Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(brokerService.getTopics().containsKey(tpName)); + }); + + // 4. Set txn provider to back + pulsar.setTransactionBufferProvider(originalTransactionBufferProvider); + } + + public static class MyTopicFactory implements TopicFactory { + @Override + public T create(String topic, ManagedLedger ledger, BrokerService brokerService, + Class topicClazz) { + try { + if (topicClazz == NonPersistentTopic.class) { + return (T) new NonPersistentTopic(topic, brokerService); + } else { + return (T) new MyPersistentTopic(topic, ledger, brokerService); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + public void close() throws IOException { + // No-op + } + } + + public static class MyPersistentTopic extends PersistentTopic { + + public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + super(topic, ledger, brokerService); + } + + @SneakyThrows + @Override + public CompletableFuture checkDeduplicationStatus() { + topicInitSuccessSignal.countDown(); + // Sleep 1s pending txn buffer recover failed and close topic + Thread.sleep(1000); + return CompletableFuture.completedFuture(null); + } + } + +}