Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -96,6 +97,13 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {

protected final String topic;

// Reference to the CompletableFuture returned when creating this topic in BrokerService.
// Used to safely remove the topic from BrokerService's cache by ensuring we remove the exact
// topic instance that was created.
@Getter
@Setter
protected volatile CompletableFuture<Optional<Topic>> createFuture;

// Producers currently connected to this topic
protected final ConcurrentHashMap<String, Producer> producers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,7 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
nonPersistentTopic.setCreateFuture(topicFuture);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
topicFuture.completeExceptionally(e);
Expand Down Expand Up @@ -1800,6 +1801,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: newTopic(topic, ledger, BrokerService.this, PersistentTopic.class);
persistentTopic.setCreateFuture(topicFuture);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
Expand Down Expand Up @@ -2409,47 +2411,18 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (createTopicFuture.isEmpty()){
return CompletableFuture.completedFuture(null);
}
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
}

private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
if (topic == null){
return Optional.empty();
}
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
// If not exists in cache, do nothing.
if (createTopicFuture == null){
return Optional.empty();
}
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
if (!createTopicFuture.isDone()){
return Optional.empty();
}
// If the future in cache has exception complete,
// the topic instance in the cache is not the same with the topic.
if (createTopicFuture.isCompletedExceptionally()){
return Optional.empty();
}
Optional<Topic> optionalTopic = createTopicFuture.join();
Topic topicInCache = optionalTopic.orElse(null);
if (topicInCache == null || topicInCache != topic){
return Optional.empty();
} else {
return Optional.of(createTopicFuture);
}
}

private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
/**
* Removes the topic from the cache only if the topicName and associated createFuture match exactly.
* The TopicEvent.UNLOAD event will be triggered before and after removal.
*
* @param topic The topic to be removed.
* @return A CompletableFuture that completes when the operation is done.
*/
public CompletableFuture<Void> removeTopicFromCache(AbstractTopic topic) {
TopicName topicName = TopicName.get(topic.getName());
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
removeTopicFromCache(topic.getName(), namespaceBundle, topic.getCreateFuture());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;

private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) {
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
? new SnapshotSegmentAbortedTxnProcessorImpl(topic)
: new SingleSnapshotAbortedTxnProcessorImpl(topic);
}

private static AbortedTxnProcessor.SnapshotType determineSnapshotType(PersistentTopic topic) {
return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
? AbortedTxnProcessor.SnapshotType.Segment
: AbortedTxnProcessor.SnapshotType.Single;
}

public TopicTransactionBuffer(PersistentTopic topic) {
this(topic, createSnapshotProcessor(topic), determineSnapshotType(topic));
}

@VisibleForTesting
TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor snapshotAbortedTxnProcessor,
AbortedTxnProcessor.SnapshotType snapshotType) {
super(State.None);
this.topic = topic;
this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
Expand All @@ -118,13 +136,8 @@ public TopicTransactionBuffer(PersistentTopic topic) {
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
this.maxReadPosition = topic.getManagedLedger().getLastConfirmedEntry();
if (topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()) {
snapshotAbortedTxnProcessor = new SnapshotSegmentAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
} else {
snapshotAbortedTxnProcessor = new SingleSnapshotAbortedTxnProcessorImpl(topic);
snapshotType = AbortedTxnProcessor.SnapshotType.Single;
}
this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor;
this.snapshotType = snapshotType;
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicFactory;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class TransactionPersistentTopicTest extends ProducerConsumerBase {

private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1);

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
// Intercept when the `topicFuture` is about to complete and wait until the topic close operation finishes.
conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(false);
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testNoOrphanClosedTopicIfTxnInternalFailed() {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");

BrokerService brokerService = pulsar.getBrokerService();

// 1. Mock close topic when create transactionBuffer
TransactionBufferProvider mockTransactionBufferProvider = originTopic -> {
AbortedTxnProcessor abortedTxnProcessor = mock(AbortedTxnProcessor.class);
doAnswer(invocation -> {
topicInitSuccessSignal.await();
return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed"));
}).when(abortedTxnProcessor).recoverFromSnapshot();
when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
return new TopicTransactionBuffer(
(PersistentTopic) originTopic, abortedTxnProcessor, AbortedTxnProcessor.SnapshotType.Single);
};
TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider();
pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);

// 2. Trigger create topic and assert topic load success.
CompletableFuture<Optional<Topic>> firstLoad = brokerService.getTopic(tpName, true);
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
.pollInterval(200, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertTrue(firstLoad.isDone());
assertFalse(firstLoad.isCompletedExceptionally());
});

// 3. Assert topic removed from cache
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertFalse(brokerService.getTopics().containsKey(tpName));
});

// 4. Set txn provider to back
pulsar.setTransactionBufferProvider(originalTransactionBufferProvider);
}

public static class MyTopicFactory implements TopicFactory {
@Override
public <T extends Topic> T create(String topic, ManagedLedger ledger, BrokerService brokerService,
Class<T> topicClazz) {
try {
if (topicClazz == NonPersistentTopic.class) {
return (T) new NonPersistentTopic(topic, brokerService);
} else {
return (T) new MyPersistentTopic(topic, ledger, brokerService);
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

@Override
public void close() throws IOException {
// No-op
}
}

public static class MyPersistentTopic extends PersistentTopic {

public MyPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
super(topic, ledger, brokerService);
}

@SneakyThrows
@Override
public CompletableFuture<Void> checkDeduplicationStatus() {
topicInitSuccessSignal.countDown();
// Sleep 1s pending txn buffer recover failed and close topic
Thread.sleep(1000);
return CompletableFuture.completedFuture(null);
}
}

}
Loading