From 8dcb28fdf150ec9a4caf130ac96b6795f40e19a2 Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 9 Jan 2023 11:54:42 +0800 Subject: [PATCH 1/3] Close the transactionBuffer after MessageDeduplication#checkStatus failed --- .../org/apache/pulsar/broker/service/BrokerService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ee0ad6e103ba6..99059180578db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1599,6 +1599,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { .exceptionally((ex) -> { log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); topicFuture.completeExceptionally(ex); From 0dcc20393684135aa95d715e8abdc9055b0243a0 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 10 Jan 2023 19:32:06 +0800 Subject: [PATCH 2/3] add test --- .../buffer/TopicTransactionBufferTest.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index fe964718c3d56..47a9c7514a747 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,10 +18,18 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.TopicName; @@ -30,10 +38,14 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class TopicTransactionBufferTest extends TransactionTestBase { @@ -86,4 +98,50 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { FieldUtils.writeField(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed, true); txn.commit().get(); } + + @Test + public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + TopicName topicName = TopicName.get(topic); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService = pulsar.getBrokerService(); + + CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); + + CompletableFuture ledgerFuture = new CompletableFuture<>(); + pulsar.getManagedLedgerFactory().asyncOpen(topicName.getPersistenceNamingEncoding(), configFuture.get(), + new AsyncCallbacks.OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledgerFuture.complete(ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + ledgerFuture.completeExceptionally(exception); + } + }, () -> brokerService.isTopicNsOwnedByBroker(topicName), null); + + PersistentTopic persistentTopic0 = new PersistentTopic(topic, ledgerFuture.get(), brokerService); + PersistentTopic persistentTopic = Mockito.spy(persistentTopic0); + Mockito.doReturn(CompletableFuture.failedFuture(new ManagedLedgerException("This is an exception"))) + .when(persistentTopic).checkDeduplicationStatus(); + + persistentTopic.initialize() + .thenCompose(v -> persistentTopic.checkDeduplicationStatus()) + .exceptionally(ex -> { + persistentTopic.getTransactionBuffer().closeAsync(); + persistentTopic.stopReplProducers(); + return null; + }); + + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + + TopicTransactionBuffer ttb = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Awaitility.waitAtMost(1, TimeUnit.MINUTES) + .until(() -> ttb.getState().equals(expectState)); + } + } From 821604f0c9cc57c7cec1e1508969ebff2c830d12 Mon Sep 17 00:00:00 2001 From: daojun Date: Sat, 14 Jan 2023 13:47:57 +0800 Subject: [PATCH 3/3] fix test --- .../pulsar/broker/service/BrokerService.java | 11 +++- .../buffer/TopicTransactionBufferTest.java | 66 +++++++++---------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 481aebe8e891a..5c272a97f2ec9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1516,6 +1516,14 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean }); } + + @VisibleForTesting + public void createPersistentTopic0(final String topic, boolean createIfMissing, + CompletableFuture> topicFuture, + Map properties) { + createPersistentTopic(topic, createIfMissing, topicFuture, properties); + } + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { @@ -3366,7 +3374,8 @@ public long getPausedConnections() { } @SuppressWarnings("unchecked") - private T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService, + @VisibleForTesting + public T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService, Class topicClazz) throws PulsarServerException { if (topicFactory != null) { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 47a9c7514a747..5a9c928ca3c90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,14 +18,13 @@ */ package org.apache.pulsar.broker.transaction.buffer; -import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; @@ -43,10 +42,12 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class TopicTransactionBufferTest extends TransactionTestBase { @@ -102,46 +103,41 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { @Test public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception { String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); - TopicName topicName = TopicName.get(topic); PulsarService pulsar = pulsarServiceList.get(0); - BrokerService brokerService = pulsar.getBrokerService(); - - CompletableFuture configFuture = brokerService.getManagedLedgerConfig(topicName); - - CompletableFuture ledgerFuture = new CompletableFuture<>(); - pulsar.getManagedLedgerFactory().asyncOpen(topicName.getPersistenceNamingEncoding(), configFuture.get(), - new AsyncCallbacks.OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledgerFuture.complete(ledger); - } - - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - ledgerFuture.completeExceptionally(exception); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + + Mockito + .doAnswer(inv -> { + String topic1 = inv.getArgument(0); + ManagedLedger ledger = inv.getArgument(1); + BrokerService service = inv.getArgument(2); + Class topicKlass = inv.getArgument(3); + if (topicKlass.equals(PersistentTopic.class)) { + PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); + CompletableFuture f =CompletableFuture + .failedFuture(new ManagedLedgerException("This is an exception")); + Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); + reference.set(pt); + return pt; + } else { + return new NonPersistentTopic(topic1, service); } - }, () -> brokerService.isTopicNsOwnedByBroker(topicName), null); - - PersistentTopic persistentTopic0 = new PersistentTopic(topic, ledgerFuture.get(), brokerService); - PersistentTopic persistentTopic = Mockito.spy(persistentTopic0); - Mockito.doReturn(CompletableFuture.failedFuture(new ManagedLedgerException("This is an exception"))) - .when(persistentTopic).checkDeduplicationStatus(); + }) + .when(brokerService) + .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), + Mockito.eq(PersistentTopic.class)); - persistentTopic.initialize() - .thenCompose(v -> persistentTopic.checkDeduplicationStatus()) - .exceptionally(ex -> { - persistentTopic.getTransactionBuffer().closeAsync(); - persistentTopic.stopReplProducers(); - return null; - }); + brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Assert.assertTrue(buffer instanceof TopicTransactionBuffer); - - TopicTransactionBuffer ttb = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; - Awaitility.waitAtMost(1, TimeUnit.MINUTES) - .until(() -> ttb.getState().equals(expectState)); + Assert.assertEquals(ttb.getState(), expectState); } }