From 79069f38428f3b87817a0060ed51546173abccc0 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Sat, 7 Oct 2023 20:24:45 +0800 Subject: [PATCH 01/11] [fix][broker] rackaware policy is ineffective when delete zk rack info after bkclient initialize (#20944) (cherry picked from commit d9ebaf5bf6fda44d21ac24cec7dbe208b59dc597) --- .../BookieRackAffinityMapping.java | 4 ++-- .../BookieRackAffinityMappingTest.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 3654b7b54c3e9..7a8ec103e9fe3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -120,8 +120,6 @@ public synchronized void setConf(Configuration conf) { store.registerListener(this::handleUpdates); racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() .orElseGet(BookiesRackConfiguration::new); - updateRacksWithHost(racksWithHost); - watchAvailableBookies(); for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); @@ -131,6 +129,8 @@ public synchronized void setConf(Configuration conf) { bookieAddressListLastTime); } } + updateRacksWithHost(racksWithHost); + watchAvailableBookies(); } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index ac97db71e8907..ce84034687aad 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception { bkClientConf.getTimeoutTimerNumTicks()); RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); + mapping.registerRackChangeListener(repp); Class clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy"); Field field1 = clazz1.getDeclaredField("knownBookies"); field1.setAccessible(true); @@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception { assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1"); assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + //remove bookie2 rack, the bookie2 rack should be /default-rack + data = "{\"group1\": {\"" + BOOKIE1 + + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}"; + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1); + + racks = mapping + .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName())) + .stream().filter(Objects::nonNull).toList(); + assertEquals(racks.size(), 1); + assertEquals(racks.get(0), "/rack0"); + assertEquals(knownBookies.size(), 3); + assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0"); + assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack"); + assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + timer.stop(); } } From ebdf7dca42ee861a894ed7f9b9f78b079f765b88 Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Wed, 23 Aug 2023 18:03:29 -0700 Subject: [PATCH 02/11] [fix] [broker] Producer is blocked on creation because backlog exceeded on topic, when dedup is enabled and no producer is there (#20951) (cherry picked from commit 30073dbac0e941869b43e090d2682935e8f094e5) --- .../broker/service/BacklogQuotaManager.java | 28 ++++++++++++- .../persistent/MessageDeduplication.java | 11 +++++ .../service/persistent/PersistentTopic.java | 4 ++ .../service/BacklogQuotaManagerTest.java | 40 ++++++++++++++++--- 4 files changed, 76 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 2744469ea8dba..179c18ab19893 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -103,7 +103,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQ break; case producer_exception: case producer_request_hold: - disconnectProducers(persistentTopic); + if (!advanceSlowestSystemCursor(persistentTopic)) { + // The slowest is not a system cursor. Disconnecting producers to put backpressure. + disconnectProducers(persistentTopic); + } break; default: break; @@ -268,4 +271,27 @@ private void disconnectProducers(PersistentTopic persistentTopic) { }); } + + /** + * Advances the slowest cursor if that is a system cursor. + * + * @param persistentTopic + * @return true if the slowest cursor is a system cursor + */ + private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) { + + ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); + if (slowestConsumer == null) { + return false; + } + + if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) { + persistentTopic.getMessageDeduplication().takeSnapshot(); + return true; + } + + // We may need to check other system cursors here : replicator, compaction + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 4c3d8e1a467c5..ba2600d9134d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; @@ -132,6 +133,9 @@ public MessageDupUnknownException() { private final String replicatorPrefix; + + private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); + public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) { this.pulsar = pulsar; this.topic = topic; @@ -432,6 +436,11 @@ private void takeSnapshot(Position position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } + + if (!snapshotTaking.compareAndSet(false, true)) { + return; + } + Map snapshot = new TreeMap<>(); highestSequencedPersisted.forEach((producerName, sequenceId) -> { if (snapshot.size() < maxNumberOfProducers) { @@ -446,11 +455,13 @@ public void markDeleteComplete(Object ctx) { log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); } lastSnapshotTimestamp = System.currentTimeMillis(); + snapshotTaking.set(false); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position); + snapshotTaking.set(false); } }, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cdd37f9795256..6f3a704c4f378 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -177,6 +177,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ConcurrentOpenHashMap replicators; static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; + + public static boolean isDedupCursorName(String name) { + return DEDUPLICATION_CURSOR_NAME.equals(name); + } private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch"; private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index d3452b4064fd9..9e1acf1371c3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -117,8 +117,8 @@ void setup() throws Exception { config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); - config.setSystemTopicEnabled(false); - config.setTopicLevelPoliciesEnabled(false); + config.setSystemTopicEnabled(true); + config.setTopicLevelPoliciesEnabled(true); config.setForceDeleteNamespaceAllowed(true); pulsar = new PulsarService(config); @@ -1159,8 +1159,13 @@ public void testProducerException() throws Exception { assertTrue(gotException, "backlog exceeded exception did not occur"); } - @Test - public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { + @DataProvider(name = "dedupTestSet") + public static Object[][] dedupTestSet() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @Test(dataProvider = "dedupTestSet") + public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", @@ -1176,9 +1181,12 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { boolean gotException = false; Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); - byte[] content = new byte[1024]; Producer producer = createProducer(client, topic1); + + admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + for (int i = 0; i < 10; i++) { producer.send(content); } @@ -1197,6 +1205,7 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { } assertTrue(gotException, "backlog exceeded exception did not occur"); + assertFalse(producer.isConnected()); // now remove backlog and ensure that producer is unblocked; TopicStats stats = getTopicStats(topic1); @@ -1213,14 +1222,33 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { Exception sendException = null; gotException = false; try { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { producer.send(content); + Message msg = consumer.receive(); + consumer.acknowledge(msg); } } catch (Exception e) { gotException = true; sendException = e; } + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); assertFalse(gotException, "unable to publish due to " + sendException); + + gotException = false; + long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp(); + try { + // try to send over backlog quota and make sure it passes + producer.send(content); + producer.send(content); + } catch (PulsarClientException ce) { + assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException + || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); + gotException = true; + sendException = ce; + } + assertFalse(gotException, "unable to publish due to " + sendException); + assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp()); + } @Test From 78a42caf0fff0e899ad938fabb35b5ef47f7a1d9 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 21 Aug 2023 04:49:30 +0800 Subject: [PATCH 03/11] [fix] [admin] Fix get topic stats fail if a subscription catch up concurrently (#20971) **Background**: when calling `pulsar-admin topics stats --get-earliest-time-in-backlog `, Pulsar will read the first entry which is not acknowledged, and respond with the entry write time. The flow is like this: - get the mark deleted position of the subscription - if no backlog, response `-1` - else read the next position of the mark deleted position, and respond with the entry write time. **Issue**: if the command `pulsar-admin topics stats --get-earliest-time-in-backlog ` and `consumer.acknowledge` are executed at the same time, the step 2 in above flow will get a position which is larger than the last confirmed position, lead a read entry error. | time | `pulsar-admin topics stats --get-earliest-time-in-backlog ` | `consumer.acknowledge` | | --- | --- | --- | | 1 | mark deleted position is `3:1` and LAC is `3:2` now | | 2 | the check `whether has backlog` is passed | | 3 | | acknowledged `3:2`, mark deleted position is `3:2` now | | 4 | calculate next position: `3:3` | | 5 | Read `3:3` and get an error: `read entry failed` | Note: the test in PR is not intended to reproduce the issue. Respond `-1` if the next position of the mark deleted position is larger than the LAC (cherry picked from commit 7c96a36c58768e71cc445371bb3d98c5ac6e05cd) --- .../mledger/impl/ManagedLedgerImpl.java | 4 +++ .../persistent/PersistentSubscription.java | 22 +++++++++------- .../pulsar/broker/admin/AdminApi2Test.java | 26 +++++++++++++++++++ .../pulsar/broker/admin/AdminApiTest.java | 4 +-- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f90795e8ff006..45c3fb3ac8594 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1221,6 +1221,10 @@ public CompletableFuture getEarliestMessagePublishTimeOfPos(PositionImpl p } PositionImpl nextPos = getNextValidPosition(pos); + if (nextPos.compareTo(lastConfirmedEntry) > 0) { + return CompletableFuture.completedFuture(-1L); + } + asyncReadEntry(nextPos, new ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e2e5c4aa05371..7d494b153c47c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1154,16 +1154,20 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger()) .getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition()); } - if (getEarliestTimeInBacklog && subStats.msgBacklog > 0) { - ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); - PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition(); - long result = 0; - try { - result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get(); - } catch (InterruptedException | ExecutionException e) { - result = -1; + if (getEarliestTimeInBacklog) { + if (subStats.msgBacklog > 0) { + ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); + PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition(); + long result = 0; + try { + result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get(); + } catch (InterruptedException | ExecutionException e) { + result = -1; + } + subStats.earliestMsgPublishTimeInBacklog = result; + } else { + subStats.earliestMsgPublishTimeInBacklog = -1; } - subStats.earliestMsgPublishTimeInBacklog = result; } subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed; subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index ab076b3e2fe85..09544ee957e2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -2535,6 +2535,32 @@ public void testFailedUpdatePartitionedTopic() throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, newPartitions); } + /** + * Validate retring failed partitioned topic should succeed. + * @throws Exception + */ + @Test + public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://prop-xyz/ns1/tp_"); + final String subscriptionName = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + // Send one message. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false) + .create(); + MessageIdImpl messageId = (MessageIdImpl) producer.send("123"); + // Catch up. + admin.topics().skipAllMessages(topicName, subscriptionName); + // Get topic stats with earliestTimeInBacklog + TopicStats topicStats = admin.topics().getStats(topicName, false, false, true); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(), -1L); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); + } + @Test(dataProvider = "topicType") public void testPartitionedStatsAggregationByProducerName(String topicType) throws Exception { conf.setAggregatePublisherStatsByProducerName(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c349e902882e1..341842f19811f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1254,7 +1254,7 @@ public void testGetStats() throws Exception { TopicStats topicStats = admin.topics().getStats(topic, false, false, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); - assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); + assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), -1); // publish several messages publishMessagesOnPersistentTopic(topic, 10); @@ -1272,7 +1272,7 @@ public void testGetStats() throws Exception { topicStats = admin.topics().getStats(topic, false, false, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); - assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); + assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), -1); } @Test From 47f0d97e5bd40bcb06f96e96e0a6f8a5809fcd43 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 21 Aug 2023 13:44:39 +0800 Subject: [PATCH 04/11] [fix][broker] Fix compaction subscription delete by inactive subscription check. (#20983) (cherry picked from commit 43cd86d6835b4b096739e510f9fa928a64e46ac9) --- .../service/persistent/PersistentTopic.java | 4 +- .../broker/service/BrokerServiceTest.java | 66 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6f3a704c4f378..4a9d0069c9e0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2374,7 +2374,9 @@ public void checkInactiveSubscriptions() { .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { - if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() + || sub.isReplicated() + || isCompactionSubscription(subName)) { return; } if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index d4ae0da3617e3..b91e9c1aff994 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.http.HttpResponse; @@ -76,6 +77,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; @@ -106,6 +108,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.compaction.Compactor; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -1222,6 +1225,69 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex } } + @Test + public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception { + String namespace = "prop/test"; + + // set up broker set compaction threshold. + cleanup(); + conf.setBrokerServiceCompactionThresholdInBytes(8); + setup(); + + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } + + // set enable subscription expiration. + admin.namespaces().setSubscriptionExpirationTime(namespace, 1); + + String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete"; + + admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic); + + CompletableFuture> topicCf = + pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true); + + Optional topicOptional = topicCf.get(); + assertTrue(topicOptional.isPresent()); + + PersistentTopic topic = (PersistentTopic) topicOptional.get(); + + PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION); + assertNotNull(sub); + + topic.checkCompaction(); + + Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction"); + currentCompaction.setAccessible(true); + CompletableFuture compactionFuture = (CompletableFuture)currentCompaction.get(topic); + + compactionFuture.get(); + + ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor(); + + // make cursor last active time to very small to check if it will be deleted + Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive"); + cursorLastActiveField.setAccessible(true); + cursorLastActiveField.set(cursor, 0); + + // replace origin object. so we can check if subscription is deleted. + PersistentSubscription spySubscription = Mockito.spy(sub); + topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription); + + // trigger inactive check. + topic.checkInactiveSubscriptions(); + + // Compaction subscription should not call delete method. + Mockito.verify(spySubscription, Mockito.never()).delete(); + + // check if the subscription exist. + assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); + + } + /** * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and * it should not introduce deadlock while performing it. From a9e41c11f5a8292629d54b0a8487a8b4783c9f4d Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Wed, 11 Oct 2023 11:07:50 +0800 Subject: [PATCH 05/11] [fix] [bk-client] Fix bk client MinNumRacksPerWriteQuorum and EnforceMinNumRacksPerWriteQuorum not work problem. (#21327) (cherry picked from commit 61a7adf08b14067500f9bd17b6da824ba58e9707) --- .../broker/BookKeeperClientFactoryImpl.java | 5 ++++- .../BookKeeperClientFactoryImplTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index f3d69202a080b..2d85d582a37ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -221,7 +221,7 @@ static void setDefaultEnsemblePlacementPolicy( } } - private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store, + static void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store, Class policyClass) { bkConf.setEnsemblePlacementPolicy(policyClass); bkConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); @@ -229,6 +229,9 @@ private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfi bkConf.setProperty(REPP_DNS_RESOLVER_CLASS, conf.getProperties().getProperty(REPP_DNS_RESOLVER_CLASS, BookieRackAffinityMapping.class.getName())); + bkConf.setMinNumRacksPerWriteQuorum(conf.getBookkeeperClientMinNumRacksPerWriteQuorum()); + bkConf.setEnforceMinNumRacksPerWriteQuorum(conf.isBookkeeperClientEnforceMinNumRacksPerWriteQuorum()); + bkConf.setProperty(NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, conf.getProperties().getProperty( NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index e26b0aa756162..428684de7a821 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -41,6 +41,7 @@ import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.powermock.reflect.Whitebox; +import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; import org.testng.annotations.Test; /** @@ -152,6 +153,24 @@ public void testSetDefaultEnsemblePlacementPolicyRackAwareEnabledChangedValues() assertEquals(20, bkConf.getMinNumRacksPerWriteQuorum()); } + @Test + public void testSetEnsemblePlacementPolicys() { + ClientConfiguration bkConf = new ClientConfiguration(); + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setBookkeeperClientMinNumRacksPerWriteQuorum(3); + conf.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(true); + + MetadataStore store = mock(MetadataStore.class); + + BookKeeperClientFactoryImpl.setEnsemblePlacementPolicy( + bkConf, + conf, + store, + ZkIsolatedBookieEnsemblePlacementPolicy.class); + assertEquals(bkConf.getMinNumRacksPerWriteQuorum(), 3); + assertTrue(bkConf.getEnforceMinNumRacksPerWriteQuorum()); + } + @Test public void testSetDiskWeightBasedPlacementEnabled() { BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); From b272a2a9142c5185c8ed0ded42d448809a9f06d6 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Thu, 19 Oct 2023 10:18:12 +0800 Subject: [PATCH 06/11] [fix][broker] Fix heartbeat namespace create transaction internal topic (#21348) (cherry picked from commit c8a2f49c6c6edaf6b5667f9ac7df65b815aefe58) --- .../service/persistent/PersistentTopic.java | 3 ++- .../systopic/PartitionedSystemTopicTest.java | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4a9d0069c9e0d..f5c0ecad64dd6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,7 +295,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS checkReplicatedSubscriptionControllerState(); TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(topicName)) { + && !isEventSystemTopic(topicName) + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 3cd25d3b00c67..f593edc20956e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -76,6 +76,7 @@ protected void setup() throws Exception { conf.setDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setTransactionCoordinatorEnabled(true); super.baseSetup(); } @@ -202,6 +203,24 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { }); } + @Test + public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", + namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + Optional optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + } + @Test public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test"; From 9106cca50083bc5f038326447dd477b4819e65d9 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Thu, 19 Oct 2023 18:39:41 +0800 Subject: [PATCH 07/11] [fix][broker] Fix heartbeat namespace create event topic and cannot delete heartbeat topic (#21360) Co-authored-by: fanjianye Co-authored-by: Jiwei Guo (cherry picked from commit 700a29d5c877dcde5f3c8c1e946b00a8296b8d4f) --- .../SystemTopicBasedTopicPoliciesService.java | 14 +++++++---- .../systopic/PartitionedSystemTopicTest.java | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 8d25603ccc58c..0f13718fb9a6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -92,20 +92,23 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { @Override public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return CompletableFuture.completedFuture(null); + } return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); } @Override public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException( + "Not allowed to update topic policy for the heartbeat topic")); + } return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies); } private CompletableFuture sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { - if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic")); - } return pulsarService.getPulsarResources().getNamespaceResources() .getPoliciesAsync(topicName.getNamespaceObject()) .thenCompose(namespacePolicies -> { @@ -217,6 +220,9 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC @Override public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return null; + } if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { NamespaceName namespace = topicName.getNamespaceObject(); prepareInitPoliciesCache(namespace, new CompletableFuture<>()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index f593edc20956e..8d3fd67c35a5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -186,6 +186,13 @@ public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception { Optional optionalTopic = pulsar.getBrokerService() .getTopic(topicName.getPartition(1).toString(), false).join(); Assert.assertTrue(optionalTopic.isEmpty()); + + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + admin.topics().getRetention(heartbeatTopicName.toString()); + optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); } @Test @@ -203,6 +210,22 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { }); } + @Test + public void testHeartbeatTopicBeDeleted() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + pulsar.getConfig()); + TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + + admin.topics().delete(heartbeatTopicName.toString(), true); + topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 0); + } + @Test public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { admin.brokers().healthcheck(TopicVersion.V2); From 88fa4578b4914655c118d5282dd9d7edfbbaf9eb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 19 Oct 2023 16:48:37 +0300 Subject: [PATCH 08/11] [fix][sec] Upgrade Zookeeper to 3.8.3 to address CVE-2023-44981 (#21398) (cherry picked from commit e5120ec68907525177f5add5c95b022f3106da1a) --- distribution/server/src/assemble/LICENSE.bin.txt | 6 +++--- pom.xml | 2 +- pulsar-sql/presto-distribution/LICENSE | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index c4b06a882623d..37a62a0691151 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -528,9 +528,9 @@ The Apache Software License, Version 2.0 - io.vertx-vertx-web-3.9.8.jar - io.vertx-vertx-web-common-3.9.8.jar * Apache ZooKeeper - - org.apache.zookeeper-zookeeper-3.8.1.jar - - org.apache.zookeeper-zookeeper-jute-3.8.1.jar - - org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.1.jar + - org.apache.zookeeper-zookeeper-3.8.3.jar + - org.apache.zookeeper-zookeeper-jute-3.8.3.jar + - org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.3.jar * Snappy Java - org.xerial.snappy-snappy-java-1.1.10.1.jar * Google HTTP Client diff --git a/pom.xml b/pom.xml index 0faf53ecc856f..8b09ebbc94620 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ flexible messaging model and an intuitive client API. 1.21 4.15.4 - 3.8.1 + 3.8.3 1.5.0 1.10.0 1.1.10.1 diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index d1ab7ed196c35..b67249d0ea5ae 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -479,8 +479,8 @@ The Apache Software License, Version 2.0 - memory-0.8.3.jar - sketches-core-0.8.3.jar * Apache Zookeeper - - zookeeper-3.8.1.jar - - zookeeper-jute-3.8.1.jar + - zookeeper-3.8.3.jar + - zookeeper-jute-3.8.3.jar * Apache Yetus Audience Annotations - audience-annotations-0.12.0.jar * Swagger From b45a8dfb53200fb22dade6fe6bf17b027984465e Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Mon, 17 Oct 2022 09:44:11 +0800 Subject: [PATCH 09/11] [fix][test] AdvertisedListenersTest.setup (#17869) --- .../mledger/impl/ManagedLedgerBkTest.java | 4 +- .../test/BookKeeperClusterTestCase.java | 9 ++- .../pulsar/broker/MultiBrokerBaseTest.java | 5 ++ .../loadbalance/AdvertisedListenersTest.java | 8 +-- .../SimpleProtocolHandlerTestsBase.java | 13 +++- .../worker/PulsarFunctionTlsTest.java | 11 ++- .../pulsar/common/util/PortManager.java | 69 +++++++++++++++++++ .../pulsar/common/util/PortManagerTest.java | 37 ++++++++++ .../test/BookKeeperClusterTestCase.java | 8 ++- .../SimpleProxyExtensionTestBase.java | 13 +++- 10 files changed, 158 insertions(+), 19 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 9026c0f6ac4e7..b767cd8e91013 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -117,7 +118,7 @@ public void testBookieFailure() throws Exception { metadataStore.unsetAlwaysFail(); bkc = new BookKeeperTestClient(baseClientConf); - startNewBookie(); + int port = startNewBookie(); // Reconnect a new bk client factory.shutdown(); @@ -147,6 +148,7 @@ public void testBookieFailure() throws Exception { assertEquals("entry-2", new String(entries.get(0).getData())); entries.forEach(Entry::release); factory.shutdown(); + releaseLockedPort(port); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 39f9dc9ba7d84..d6f8d19421f20 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -24,6 +24,7 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; @@ -62,7 +63,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.ReplicationWorker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -113,6 +114,7 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; protected ExecutorService executor; + private final List bookiePorts = new ArrayList<>(); SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { @@ -264,7 +266,7 @@ protected void startBKCluster(String metadataServiceUri) throws Exception { // Create Bookie Servers (B1, B2, B3) for (int i = 0; i < numBookies; i++) { - startNewBookie(); + bookiePorts.add(startNewBookie()); } } @@ -283,6 +285,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -290,7 +293,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index c00ae8cd0d39d..5b78a32dc37e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -124,6 +126,9 @@ protected void additionalBrokersCleanup() { try { pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L); pulsarService.close(); + pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); } catch (PulsarServerException e) { // ignore } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index 489efa5755ba8..9ca4510a209b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -25,7 +26,6 @@ import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; @@ -66,9 +66,9 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke } private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { - int pulsarPort = PortManager.nextFreePort(); - int httpPort = PortManager.nextFreePort(); - int httpsPort = PortManager.nextFreePort(); + int pulsarPort = nextLockedFreePort(); + int httpPort = nextLockedFreePort(); + int httpsPort = nextLockedFreePort(); // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended conf.setAdvertisedAddress(advertisedAddress); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 639ccf7ecd01b..f08771934f666 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,11 +39,14 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; @Slf4j @@ -54,6 +57,8 @@ public static final class MyProtocolHandler implements ProtocolHandler { private ServiceConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String protocolName() { return "test"; @@ -81,7 +86,9 @@ public void start(BrokerService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index e7c6dddc9b2cf..818080be32d4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,7 +34,6 @@ import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; @@ -86,8 +87,8 @@ void setup() throws Exception { // start brokers for (int i = 0; i < BROKER_COUNT; i++) { - int brokerPort = PortManager.nextFreePort(); - int webPort = PortManager.nextFreePort(); + int brokerPort = nextLockedFreePort(); + int webPort = nextLockedFreePort(); ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); @@ -187,6 +188,10 @@ void tearDown() throws Exception { } if (pulsarServices[i] != null) { pulsarServices[i].close(); + pulsarServices[i].getConfiguration(). + getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i].getConfiguration() + .getWebServicePort().ifPresent(PortManager::releaseLockedPort); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java new file mode 100644 index 0000000000000..b9df071fdbc7e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.net.ServerSocket; +import java.util.HashSet; +import java.util.Set; + +public class PortManager { + + private static final Set PORTS = new HashSet<>(); + + /** + * Return a locked available port. + * + * @return locked available port. + */ + public static synchronized int nextLockedFreePort() { + int exceptionCount = 0; + while (true) { + try (ServerSocket ss = new ServerSocket(0)) { + int port = ss.getLocalPort(); + if (!checkPortIfLocked(port)) { + PORTS.add(port); + return port; + } + } catch (Exception e) { + exceptionCount++; + if (exceptionCount > 100) { + throw new RuntimeException("Unable to allocate socket port", e); + } + } + } + } + + /** + * Returns whether the port was released successfully. + * + * @return whether the release is successful. + */ + public static synchronized boolean releaseLockedPort(int lockedPort) { + return PORTS.remove(lockedPort); + } + + /** + * Check port if locked. + * + * @return whether the port is locked. + */ + public static synchronized boolean checkPortIfLocked(int lockedPort) { + return PORTS.contains(lockedPort); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java new file mode 100644 index 0000000000000..88057ba943ab2 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import org.testng.annotations.Test; + +import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class PortManagerTest { + @Test + public void testCheckPortIfLockedAndRemove() { + int port = nextLockedFreePort(); + assertTrue(checkPortIfLocked(port)); + assertTrue(releaseLockedPort(port)); + assertFalse(checkPortIfLocked(port)); + } +} diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 3444b0287d913..266c22c6d81e8 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -27,6 +27,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -84,7 +85,7 @@ import org.apache.bookkeeper.test.ZooKeeperClusterUtil; import org.apache.bookkeeper.test.ZooKeeperUtil; import org.apache.bookkeeper.util.DiskChecker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -130,6 +131,8 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; + private final List bookiePorts = new ArrayList<>(); + SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { try { @@ -283,6 +286,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -290,7 +294,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 0cc4bbb6bb500..c779acb6ebe90 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -43,12 +43,15 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; @@ -60,6 +63,8 @@ public static final class MyProxyExtension implements ProxyExtension { private ProxyConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String extensionName() { return "test"; @@ -81,7 +86,9 @@ public void start(ProxyService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } } From 2f5e75347115710dee63472c38d15a4856b5d3cf Mon Sep 17 00:00:00 2001 From: houxiaoyu Date: Wed, 1 Mar 2023 09:43:13 +0800 Subject: [PATCH 10/11] [improve][broker] Replace ScheduledExecutorService to ExecutorService in ModularLoadManagerImpl (#19656) (cherry picked from commit 145e985f7b7ef981d33036b79b24fd5a4e27d43c) --- .../loadbalance/impl/ModularLoadManagerImpl.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index d9d1629cc103c..4f7fcc79a0b2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -36,9 +36,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -176,8 +176,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // Pulsar service used to initialize this. private PulsarService pulsar; - // Executor service used to regularly update broker data. - private final ScheduledExecutorService scheduler; + // Executor service used to update broker data. + private final ExecutorService executors; // check if given broker can load persistent/non-persistent topic private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate; @@ -216,7 +216,7 @@ public ModularLoadManagerImpl() { loadData = new LoadData(); loadSheddingPipeline = new ArrayList<>(); preallocatedBundleToBroker = new ConcurrentHashMap<>(); - scheduler = Executors.newSingleThreadScheduledExecutor( + executors = Executors.newSingleThreadExecutor( new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = new HashMap<>(); @@ -277,7 +277,7 @@ public void initialize(final PulsarService pulsar) { // register listeners for domain changes pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() .registerListener(__ -> { - scheduler.execute(() -> refreshBrokerToFailureDomainMap()); + executors.execute(() -> refreshBrokerToFailureDomainMap()); }); loadSheddingPipeline.add(createLoadSheddingStrategy()); @@ -291,7 +291,7 @@ public void handleDataNotification(Notification t) { }); try { - scheduler.execute(ModularLoadManagerImpl.this::updateAll); + executors.execute(ModularLoadManagerImpl.this::updateAll); } catch (RejectedExecutionException e) { // Executor is shutting down } @@ -1019,7 +1019,7 @@ public void start() throws PulsarServerException { */ @Override public void stop() throws PulsarServerException { - scheduler.shutdownNow(); + executors.shutdownNow(); try { brokersData.close(); From e127ca298a8b5ef430bcad1590d7e46ad69625c8 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 24 Oct 2023 10:06:02 +0800 Subject: [PATCH 11/11] Fix compilation error for unit test. --- .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 1 + .../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index b91e9c1aff994..0345ca8080e55 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -110,6 +110,7 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.Compactor; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 8d3fd67c35a5d..1461f05463ddb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -213,7 +213,7 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { @Test public void testHeartbeatTopicBeDeleted() throws Exception { admin.brokers().healthcheck(TopicVersion.V2); - NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfig()); TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); @@ -229,7 +229,7 @@ public void testHeartbeatTopicBeDeleted() throws Exception { @Test public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { admin.brokers().healthcheck(TopicVersion.V2); - NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getLookupServiceAddress(), + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfig()); TopicName topicName = TopicName.get("persistent", namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);