From 21596227415968dd9b9219779657ff74decfa45b Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 6 Jun 2024 22:06:40 +0800 Subject: [PATCH 1/4] [fix][broker] The topic might reference a closed ledger --- .../apache/pulsar/broker/PulsarService.java | 5 + .../pulsar/broker/service/BrokerService.java | 104 ++++++++++-------- .../pulsar/broker/service/ReplicatorTest.java | 10 +- .../client/api/OrphanPersistentTopicTest.java | 67 +++++++++++ 4 files changed, 130 insertions(+), 56 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 6ee35ad295fb5..8fb383cca4427 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1920,6 +1920,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception return new BrokerService(pulsar, ioEventLoopGroup); } + @VisibleForTesting + public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) { + this.transactionBufferProvider = transactionBufferProvider; + } + private CompactionServiceFactory loadCompactionServiceFactory() { String compactionServiceFactoryClassName = config.getCompactionServiceFactoryClassName(); var compactionServiceFactory = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6603e240ee7d9..3551bba230bb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1001,35 +1001,27 @@ public CompletableFuture> getTopic(final String topic, boolean c return getTopic(TopicName.get(topic), createIfMissing, properties); } + /** + * Retrieves or creates a topic based on the specified parameters. + * 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout. + * 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}. + * 2. If the topic metadata not exists, and {@code createIfMissing} is false, + * returns an empty Optional in a CompletableFuture. + * 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture. + * Any exceptions will remove the topicFuture from the map. + * + * @param topicName The name of the topic, potentially including partition information. + * @param createIfMissing If true, creates the topic if it does not exist. + * @param properties Topic configuration properties used during creation. + * @return CompletableFuture with an Optional of the topic if found or created, otherwise empty. + */ public CompletableFuture> getTopic(final TopicName topicName, boolean createIfMissing, Map properties) { try { - CompletableFuture> topicFuture = topics.get(topicName.toString()); - if (topicFuture != null) { - if (topicFuture.isCompletedExceptionally() - || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { - // Exceptional topics should be recreated. - topics.remove(topicName.toString(), topicFuture); - } else { - // a non-existing topic in the cache shouldn't prevent creating a topic - if (createIfMissing) { - if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) { - return topicFuture; - } else { - return topicFuture.thenCompose(value -> { - if (!value.isPresent()) { - // retry and create topic - return getTopic(topicName, createIfMissing, properties); - } else { - // in-progress future completed successfully - return CompletableFuture.completedFuture(value); - } - }); - } - } else { - return topicFuture; - } - } + // If topic future exists in the cache returned directly regardless of whether it fails or timeout. + CompletableFuture> tp = topics.get(topicName.toString()); + if (tp != null) { + return tp; } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { @@ -1062,6 +1054,7 @@ public CompletableFuture> getTopic(final TopicName topicName, bo final String errorMsg = String.format("Illegal topic partition name %s with max allowed " + "%d partitions", topicName, metadata.partitions); + pulsar.getExecutor().execute(() -> topics.remove(topicName.toString())); log.warn(errorMsg); return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException(errorMsg)); @@ -1072,6 +1065,8 @@ public CompletableFuture> getTopic(final TopicName topicName, bo if (!optionalTopic.isPresent() && createIfMissing) { log.warn("[{}] Try to recreate the topic with createIfMissing=true " + "but the returned topic is empty", topicName); + // Before retry create topic, need remove it from topics. + topics.remove(topicName.toString()); return getTopic(topicName, createIfMissing, properties); } return CompletableFuture.completedFuture(optionalTopic); @@ -1079,12 +1074,14 @@ public CompletableFuture> getTopic(final TopicName topicName, bo }); }); } else { - return topics.computeIfAbsent(topicName.toString(), (name) -> { + if (!topics.containsKey(topicName.toString())) { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); - if (topicName.isPartitioned()) { - final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); - return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { - if (topicName.getPartitionIndex() < metadata.partitions) { + } + if (topicName.isPartitioned()) { + final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); + return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { + if (topicName.getPartitionIndex() < metadata.partitions) { + return topics.computeIfAbsent(topicName.toString(), (name) -> { topicEventsDispatcher .notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); @@ -1095,11 +1092,13 @@ public CompletableFuture> getTopic(final TopicName topicName, bo topicEventsDispatcher .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); return res; - } - topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); - return CompletableFuture.completedFuture(Optional.empty()); - }); - } else if (createIfMissing) { + }); + } + topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); + return CompletableFuture.completedFuture(Optional.empty()); + }); + } else if (createIfMissing) { + return topics.computeIfAbsent(topicName.toString(), (name) -> { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE); CompletableFuture> res = createNonPersistentTopic(name); @@ -1109,11 +1108,15 @@ public CompletableFuture> getTopic(final TopicName topicName, bo topicEventsDispatcher .notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD); return res; - } else { + }); + } else { + CompletableFuture> topicFuture = topics.get(topicName.toString()); + if (topicFuture == null) { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE); - return CompletableFuture.completedFuture(Optional.empty()); + topicFuture = CompletableFuture.completedFuture(Optional.empty()); } - }); + return topicFuture; + } } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topicName, e); @@ -1258,6 +1261,7 @@ private CompletableFuture> createNonPersistentTopic(String topic if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); } + pulsar.getExecutor().execute(() -> topics.remove(topic)); return FutureUtil.failedFuture( new NotAllowedException("Broker is not unable to load non-persistent topic")); } @@ -1267,6 +1271,7 @@ private CompletableFuture> createNonPersistentTopic(String topic nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(e); return topicFuture; } @@ -1283,7 +1288,7 @@ private CompletableFuture> createNonPersistentTopic(String topic }).exceptionally(ex -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex); }); return null; @@ -1299,7 +1304,7 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.complete(Optional.of(nonPersistentTopic)); // after get metadata return success, we should delete this topic from this broker, because this topic not // owner by this broker and it don't initialize and checkReplication - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); return null; }); @@ -1538,6 +1543,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topic); } + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new NotAllowedException( "Broker is unable to load persistent topic")); return topicFuture; @@ -1556,6 +1562,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S // do not recreate topic if topic is already migrated and deleted by broker // so, avoid creating a new topic if migration is already started if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex.getCause()); return null; } @@ -1570,6 +1577,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S } } }).exceptionally(ex -> { + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex.getCause()); return null; }); @@ -1623,7 +1631,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean finalProperties, topicPolicies) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(throwable); return null; }); @@ -1631,7 +1639,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { @@ -1662,7 +1670,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new NotAllowedException(msg)); return; } @@ -1744,6 +1752,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " topic", topic, FutureUtil.getException(topicFuture)); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { + topics.remove(topic); if (ex != null) { log.warn("[{}] Get an error when closing topic.", topic, ex); @@ -1760,6 +1769,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { + topics.remove(topic); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); @@ -1771,7 +1781,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } catch (PulsarServerException e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(e); } } @@ -1784,7 +1794,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(new PersistenceException(exception)); } } @@ -1794,7 +1804,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(exception); return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 765727aeac319..b58f416ea1a57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -19,12 +19,10 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -1434,13 +1432,6 @@ public void testCleanupTopic() throws Exception { // Ok } - final CompletableFuture> timedOutTopicFuture = topicFuture; - // timeout topic future should be removed from cache - retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5, - 1000); - - assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName)); - try { Consumer consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS); @@ -1452,6 +1443,7 @@ public void testCleanupTopic() throws Exception { ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2"); mlFuture.complete(ml); + // Re-create topic will success. Consumer consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") .subscriptionType(SubscriptionType.Shared).subscribeAsync() .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index 7cd9da7574dbb..a399a6fd9f024 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.List; @@ -27,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -34,6 +37,9 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.TopicPoliciesService; import org.apache.pulsar.broker.service.TopicPolicyListener; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; +import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.compaction.CompactionServiceFactory; @@ -108,6 +114,67 @@ public void testNoOrphanTopicAfterCreateTimeout() throws Exception { pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); } + @Test + public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception { + // Make the topic loading timeout faster. + long originalTopicLoadTimeoutSeconds = pulsar.getConfig().getTopicLoadTimeoutSeconds(); + int topicLoadTimeoutSeconds = 1; + pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(true); + pulsar.getConfig().setTransactionCoordinatorEnabled(true); + String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2"); + + // Mock message deduplication recovery speed topicLoadTimeoutSeconds + String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + DEDUPLICATION_CURSOR_NAME; + mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> { + if (mlPath.equals(path)) { + log.info("Topic load timeout: " + path); + return true; + } + return false; + }); + + // First load topic will trigger timeout + // The first topic load will trigger a timeout. When the topic closes, it will call transactionBuffer.close. + // Here, we simulate a sleep to ensure that the ledger is not immediately closed. + TransactionBufferProvider mockTransactionBufferProvider = new TransactionBufferProvider() { + @Override + public TransactionBuffer newTransactionBuffer(Topic originTopic) { + return new TransactionBufferDisable(originTopic) { + @SneakyThrows + @Override + public CompletableFuture closeAsync() { + Thread.sleep(500); + return super.closeAsync(); + } + }; + } + }; + TransactionBufferProvider originalTransactionBufferProvider = pulsar.getTransactionBufferProvider(); + pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider); + CompletableFuture> firstLoad = pulsar.getBrokerService().getTopic(tpName, true); + Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + // assert first create topic timeout + .untilAsserted(() -> { + assertTrue(firstLoad.isCompletedExceptionally()); + }); + + // Once the first load topic times out, immediately to load the topic again. + Producer producer = pulsarClient.newProducer().topic(tpName).create(); + for (int i = 0; i < 100; i++) { + MessageId send = producer.send("msg".getBytes()); + assertNotNull(send); + } + + // set to back + pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider); + pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds); + pulsar.getConfig().setBrokerDeduplicationEnabled(false); + pulsar.getConfig().setTransactionCoordinatorEnabled(false); + } + @Test public void testNoOrphanTopicIfInitFailed() throws Exception { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); From 17fb913cc91d8f8f28b70b923af51fd72adb3bb2 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 11 Jun 2024 14:40:50 +0800 Subject: [PATCH 2/4] make test not flaky --- .../apache/pulsar/client/api/OrphanPersistentTopicTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java index a399a6fd9f024..d6473efd788d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java @@ -163,8 +163,9 @@ public CompletableFuture closeAsync() { // Once the first load topic times out, immediately to load the topic again. Producer producer = pulsarClient.newProducer().topic(tpName).create(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 10; i++) { MessageId send = producer.send("msg".getBytes()); + Thread.sleep(100); assertNotNull(send); } From 1352e1f649b325b622c4239b87e2319ca6968b91 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 13 Jun 2024 11:09:43 +0800 Subject: [PATCH 3/4] Add topicFuture to topics.remove --- .../pulsar/broker/service/BrokerService.java | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 3551bba230bb8..cc61f7e39272b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1006,7 +1006,7 @@ public CompletableFuture> getTopic(final String topic, boolean c * 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout. * 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}. * 2. If the topic metadata not exists, and {@code createIfMissing} is false, - * returns an empty Optional in a CompletableFuture. + * returns an empty Optional in a CompletableFuture. And this empty future not be added to the map. * 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture. * Any exceptions will remove the topicFuture from the map. * @@ -1039,7 +1039,8 @@ public CompletableFuture> getTopic(final TopicName topicName, bo throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }).thenCompose(optionalTopicPolicies -> { final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - return topics.computeIfAbsent(topicName.toString(), (tpName) -> { + CompletableFuture> topicFuture = topics.computeIfAbsent(topicName.toString(), + (tpName) -> { if (topicName.isPartitioned()) { final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); return fetchPartitionedTopicMetadataAsync(topicNameEntity) @@ -1054,23 +1055,17 @@ public CompletableFuture> getTopic(final TopicName topicName, bo final String errorMsg = String.format("Illegal topic partition name %s with max allowed " + "%d partitions", topicName, metadata.partitions); - pulsar.getExecutor().execute(() -> topics.remove(topicName.toString())); log.warn(errorMsg); return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException(errorMsg)); }); } return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); - }).thenCompose(optionalTopic -> { - if (!optionalTopic.isPresent() && createIfMissing) { - log.warn("[{}] Try to recreate the topic with createIfMissing=true " - + "but the returned topic is empty", topicName); - // Before retry create topic, need remove it from topics. - topics.remove(topicName.toString()); - return getTopic(topicName, createIfMissing, properties); - } - return CompletableFuture.completedFuture(optionalTopic); + // Tips: Do not convert the `topicFuture` here; we need to ensure that the future + // placed in the map is consistent with the one returned by `loadOrCreatePersistentTopic`. + // Otherwise, if any exceptions occur, it will not be correctly removed from the topics. }); + return topicFuture; }); }); } else { @@ -1255,15 +1250,16 @@ private CompletableFuture> createNonPersistentTopic(String topic CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { pulsarStats.recordTopicLoadFailed(); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); } - pulsar.getExecutor().execute(() -> topics.remove(topic)); - return FutureUtil.failedFuture( + topicFuture.completeExceptionally( new NotAllowedException("Broker is not unable to load non-persistent topic")); + return topicFuture; } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic; @@ -1271,7 +1267,6 @@ private CompletableFuture> createNonPersistentTopic(String topic nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); - pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(e); return topicFuture; } @@ -1288,7 +1283,6 @@ private CompletableFuture> createNonPersistentTopic(String topic }).exceptionally(ex -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic)); topicFuture.completeExceptionally(ex); }); return null; @@ -1304,7 +1298,7 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.complete(Optional.of(nonPersistentTopic)); // after get metadata return success, we should delete this topic from this broker, because this topic not // owner by this broker and it don't initialize and checkReplication - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); @@ -1543,7 +1537,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S if (log.isDebugEnabled()) { log.debug("Broker is unable to load persistent topic {}", topic); } - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new NotAllowedException( "Broker is unable to load persistent topic")); return topicFuture; @@ -1562,7 +1556,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S // do not recreate topic if topic is already migrated and deleted by broker // so, avoid creating a new topic if migration is already started if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex.getCause()); return null; } @@ -1577,7 +1571,7 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S } } }).exceptionally(ex -> { - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(ex.getCause()); return null; }); @@ -1631,7 +1625,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean finalProperties, topicPolicies) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(throwable); return null; }); @@ -1639,7 +1633,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { @@ -1670,7 +1664,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new NotAllowedException(msg)); return; } @@ -1752,7 +1746,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " topic", topic, FutureUtil.getException(topicFuture)); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, ex) -> { - topics.remove(topic); + topics.remove(topic, topicFuture); if (ex != null) { log.warn("[{}] Get an error when closing topic.", topic, ex); @@ -1769,7 +1763,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + " Removing topic from topics list {}, {}", topic, ex); executor().submit(() -> { persistentTopic.close().whenComplete((ignore, closeEx) -> { - topics.remove(topic); + topics.remove(topic, topicFuture); if (closeEx != null) { log.warn("[{}] Get an error when closing topic.", topic, closeEx); @@ -1781,7 +1775,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } catch (PulsarServerException e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(e); } } @@ -1794,7 +1788,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(new PersistenceException(exception)); } } @@ -1804,7 +1798,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic)); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); topicFuture.completeExceptionally(exception); return null; }); From 40ce065f235070a8ba67d821be3ee6ebd632d5fe Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 13 Jun 2024 12:26:07 +0800 Subject: [PATCH 4/4] optimize --- .../pulsar/broker/service/BrokerService.java | 79 +++++++++---------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index cc61f7e39272b..65c397e82fada 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1003,11 +1003,12 @@ public CompletableFuture> getTopic(final String topic, boolean c /** * Retrieves or creates a topic based on the specified parameters. - * 0. If topic future exists in the cache returned directly regardless of whether it fails or timeout. - * 1. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}. - * 2. If the topic metadata not exists, and {@code createIfMissing} is false, + * 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException. + * 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout. + * 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}. + * 3. If the topic metadata not exists, and {@code createIfMissing} is false, * returns an empty Optional in a CompletableFuture. And this empty future not be added to the map. - * 3. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture. + * 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture. * Any exceptions will remove the topicFuture from the map. * * @param topicName The name of the topic, potentially including partition information. @@ -1025,6 +1026,13 @@ public CompletableFuture> getTopic(final TopicName topicName, bo } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { + if (!pulsar.getConfiguration().isEnablePersistentTopics()) { + if (log.isDebugEnabled()) { + log.debug("Broker is unable to load persistent topic {}", topicName); + } + return FutureUtil.failedFuture(new NotAllowedException( + "Broker is unable to load persistent topic")); + } return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName) .thenCompose(exists -> { if (!exists && !createIfMissing) { @@ -1039,36 +1047,40 @@ public CompletableFuture> getTopic(final TopicName topicName, bo throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo)); }).thenCompose(optionalTopicPolicies -> { final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null); - CompletableFuture> topicFuture = topics.computeIfAbsent(topicName.toString(), - (tpName) -> { - if (topicName.isPartitioned()) { - final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); - return fetchPartitionedTopicMetadataAsync(topicNameEntity) - .thenCompose((metadata) -> { - // Allow crate non-partitioned persistent topic that name includes - // `partition` - if (metadata.partitions == 0 - || topicName.getPartitionIndex() < metadata.partitions) { - return loadOrCreatePersistentTopic(tpName, createIfMissing, - properties, topicPolicies); - } + if (topicName.isPartitioned()) { + final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName()); + return fetchPartitionedTopicMetadataAsync(topicNameEntity) + .thenCompose((metadata) -> { + // Allow crate non-partitioned persistent topic that name includes + // `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return topics.computeIfAbsent(topicName.toString(), (tpName) -> + loadOrCreatePersistentTopic(tpName, + createIfMissing, properties, topicPolicies)); + } else { final String errorMsg = String.format("Illegal topic partition name %s with max allowed " + "%d partitions", topicName, metadata.partitions); log.warn(errorMsg); return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException(errorMsg)); - }); - } - return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies); - // Tips: Do not convert the `topicFuture` here; we need to ensure that the future - // placed in the map is consistent with the one returned by `loadOrCreatePersistentTopic`. - // Otherwise, if any exceptions occur, it will not be correctly removed from the topics. - }); - return topicFuture; + } + }); + } else { + return topics.computeIfAbsent(topicName.toString(), (tpName) -> + loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies)); + } }); }); } else { + if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { + if (log.isDebugEnabled()) { + log.debug("Broker is unable to load non-persistent topic {}", topicName); + } + return FutureUtil.failedFuture(new NotAllowedException( + "Broker is unable to load persistent topic")); + } if (!topics.containsKey(topicName.toString())) { topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE); } @@ -1253,14 +1265,6 @@ private CompletableFuture> createNonPersistentTopic(String topic pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); - if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { - if (log.isDebugEnabled()) { - log.debug("Broker is unable to load non-persistent topic {}", topic); - } - topicFuture.completeExceptionally( - new NotAllowedException("Broker is not unable to load non-persistent topic")); - return topicFuture; - } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic; try { @@ -1533,15 +1537,6 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); - if (!pulsar.getConfiguration().isEnablePersistentTopics()) { - if (log.isDebugEnabled()) { - log.debug("Broker is unable to load persistent topic {}", topic); - } - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new NotAllowedException( - "Broker is unable to load persistent topic")); - return topicFuture; - } checkTopicNsOwnership(topic) .thenRun(() -> {