diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index c4b06a882623d..37a62a0691151 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -528,9 +528,9 @@ The Apache Software License, Version 2.0 - io.vertx-vertx-web-3.9.8.jar - io.vertx-vertx-web-common-3.9.8.jar * Apache ZooKeeper - - org.apache.zookeeper-zookeeper-3.8.1.jar - - org.apache.zookeeper-zookeeper-jute-3.8.1.jar - - org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.1.jar + - org.apache.zookeeper-zookeeper-3.8.3.jar + - org.apache.zookeeper-zookeeper-jute-3.8.3.jar + - org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.3.jar * Snappy Java - org.xerial.snappy-snappy-java-1.1.10.1.jar * Google HTTP Client diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f90795e8ff006..45c3fb3ac8594 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1221,6 +1221,10 @@ public CompletableFuture getEarliestMessagePublishTimeOfPos(PositionImpl p } PositionImpl nextPos = getNextValidPosition(pos); + if (nextPos.compareTo(lastConfirmedEntry) > 0) { + return CompletableFuture.completedFuture(-1L); + } + asyncReadEntry(nextPos, new ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index 9026c0f6ac4e7..b767cd8e91013 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -117,7 +118,7 @@ public void testBookieFailure() throws Exception { metadataStore.unsetAlwaysFail(); bkc = new BookKeeperTestClient(baseClientConf); - startNewBookie(); + int port = startNewBookie(); // Reconnect a new bk client factory.shutdown(); @@ -147,6 +148,7 @@ public void testBookieFailure() throws Exception { assertEquals("entry-2", new String(entries.get(0).getData())); entries.forEach(Entry::release); factory.shutdown(); + releaseLockedPort(port); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 39f9dc9ba7d84..d6f8d19421f20 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -24,6 +24,7 @@ package org.apache.bookkeeper.test; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; @@ -62,7 +63,7 @@ import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.replication.Auditor; import org.apache.bookkeeper.replication.ReplicationWorker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; @@ -113,6 +114,7 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; protected ExecutorService executor; + private final List bookiePorts = new ArrayList<>(); SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { @@ -264,7 +266,7 @@ protected void startBKCluster(String metadataServiceUri) throws Exception { // Create Bookie Servers (B1, B2, B3) for (int i = 0; i < numBookies; i++) { - startNewBookie(); + bookiePorts.add(startNewBookie()); } } @@ -283,6 +285,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -290,7 +293,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pom.xml b/pom.xml index 0faf53ecc856f..8b09ebbc94620 100644 --- a/pom.xml +++ b/pom.xml @@ -118,7 +118,7 @@ flexible messaging model and an intuitive client API. 1.21 4.15.4 - 3.8.1 + 3.8.3 1.5.0 1.10.0 1.1.10.1 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 3654b7b54c3e9..7a8ec103e9fe3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -120,8 +120,6 @@ public synchronized void setConf(Configuration conf) { store.registerListener(this::handleUpdates); racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() .orElseGet(BookiesRackConfiguration::new); - updateRacksWithHost(racksWithHost); - watchAvailableBookies(); for (Map bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); @@ -131,6 +129,8 @@ public synchronized void setConf(Configuration conf) { bookieAddressListLastTime); } } + updateRacksWithHost(racksWithHost); + watchAvailableBookies(); } catch (InterruptedException | ExecutionException | MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java index ac97db71e8907..ce84034687aad 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java @@ -254,6 +254,7 @@ public void testWithPulsarRegistrationClient() throws Exception { bkClientConf.getTimeoutTimerNumTicks()); RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy(); + mapping.registerRackChangeListener(repp); Class clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy"); Field field1 = clazz1.getDeclaredField("knownBookies"); field1.setAccessible(true); @@ -323,6 +324,22 @@ public void testWithPulsarRegistrationClient() throws Exception { assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1"); assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + //remove bookie2 rack, the bookie2 rack should be /default-rack + data = "{\"group1\": {\"" + BOOKIE1 + + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}}}"; + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1); + + racks = mapping + .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName())) + .stream().filter(Objects::nonNull).toList(); + assertEquals(racks.size(), 1); + assertEquals(racks.get(0), "/rack0"); + assertEquals(knownBookies.size(), 3); + assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0"); + assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/default-rack"); + assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack"); + timer.stop(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index f3d69202a080b..2d85d582a37ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -221,7 +221,7 @@ static void setDefaultEnsemblePlacementPolicy( } } - private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store, + static void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfiguration conf, MetadataStore store, Class policyClass) { bkConf.setEnsemblePlacementPolicy(policyClass); bkConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); @@ -229,6 +229,9 @@ private void setEnsemblePlacementPolicy(ClientConfiguration bkConf, ServiceConfi bkConf.setProperty(REPP_DNS_RESOLVER_CLASS, conf.getProperties().getProperty(REPP_DNS_RESOLVER_CLASS, BookieRackAffinityMapping.class.getName())); + bkConf.setMinNumRacksPerWriteQuorum(conf.getBookkeeperClientMinNumRacksPerWriteQuorum()); + bkConf.setEnforceMinNumRacksPerWriteQuorum(conf.isBookkeeperClientEnforceMinNumRacksPerWriteQuorum()); + bkConf.setProperty(NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, conf.getProperties().getProperty( NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index d9d1629cc103c..4f7fcc79a0b2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -36,9 +36,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -176,8 +176,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // Pulsar service used to initialize this. private PulsarService pulsar; - // Executor service used to regularly update broker data. - private final ScheduledExecutorService scheduler; + // Executor service used to update broker data. + private final ExecutorService executors; // check if given broker can load persistent/non-persistent topic private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate; @@ -216,7 +216,7 @@ public ModularLoadManagerImpl() { loadData = new LoadData(); loadSheddingPipeline = new ArrayList<>(); preallocatedBundleToBroker = new ConcurrentHashMap<>(); - scheduler = Executors.newSingleThreadScheduledExecutor( + executors = Executors.newSingleThreadExecutor( new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager")); this.brokerToFailureDomainMap = new HashMap<>(); @@ -277,7 +277,7 @@ public void initialize(final PulsarService pulsar) { // register listeners for domain changes pulsar.getPulsarResources().getClusterResources().getFailureDomainResources() .registerListener(__ -> { - scheduler.execute(() -> refreshBrokerToFailureDomainMap()); + executors.execute(() -> refreshBrokerToFailureDomainMap()); }); loadSheddingPipeline.add(createLoadSheddingStrategy()); @@ -291,7 +291,7 @@ public void handleDataNotification(Notification t) { }); try { - scheduler.execute(ModularLoadManagerImpl.this::updateAll); + executors.execute(ModularLoadManagerImpl.this::updateAll); } catch (RejectedExecutionException e) { // Executor is shutting down } @@ -1019,7 +1019,7 @@ public void start() throws PulsarServerException { */ @Override public void stop() throws PulsarServerException { - scheduler.shutdownNow(); + executors.shutdownNow(); try { brokersData.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 2744469ea8dba..179c18ab19893 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -103,7 +103,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQ break; case producer_exception: case producer_request_hold: - disconnectProducers(persistentTopic); + if (!advanceSlowestSystemCursor(persistentTopic)) { + // The slowest is not a system cursor. Disconnecting producers to put backpressure. + disconnectProducers(persistentTopic); + } break; default: break; @@ -268,4 +271,27 @@ private void disconnectProducers(PersistentTopic persistentTopic) { }); } + + /** + * Advances the slowest cursor if that is a system cursor. + * + * @param persistentTopic + * @return true if the slowest cursor is a system cursor + */ + private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) { + + ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); + if (slowestConsumer == null) { + return false; + } + + if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) { + persistentTopic.getMessageDeduplication().takeSnapshot(); + return true; + } + + // We may need to check other system cursors here : replicator, compaction + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 8d25603ccc58c..0f13718fb9a6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -92,20 +92,23 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { @Override public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return CompletableFuture.completedFuture(null); + } return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); } @Override public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException( + "Not allowed to update topic policy for the heartbeat topic")); + } return sendTopicPolicyEvent(topicName, ActionType.UPDATE, policies); } private CompletableFuture sendTopicPolicyEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { - if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { - return CompletableFuture.failedFuture( - new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic")); - } return pulsarService.getPulsarResources().getNamespaceResources() .getPoliciesAsync(topicName.getNamespaceObject()) .thenCompose(namespacePolicies -> { @@ -217,6 +220,9 @@ public TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesC @Override public TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException { + if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + return null; + } if (!policyCacheInitMap.containsKey(topicName.getNamespaceObject())) { NamespaceName namespace = topicName.getNamespaceObject(); prepareInitPoliciesCache(namespace, new CompletableFuture<>()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 4c3d8e1a467c5..ba2600d9134d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; @@ -132,6 +133,9 @@ public MessageDupUnknownException() { private final String replicatorPrefix; + + private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); + public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) { this.pulsar = pulsar; this.topic = topic; @@ -432,6 +436,11 @@ private void takeSnapshot(Position position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } + + if (!snapshotTaking.compareAndSet(false, true)) { + return; + } + Map snapshot = new TreeMap<>(); highestSequencedPersisted.forEach((producerName, sequenceId) -> { if (snapshot.size() < maxNumberOfProducers) { @@ -446,11 +455,13 @@ public void markDeleteComplete(Object ctx) { log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); } lastSnapshotTimestamp = System.currentTimeMillis(); + snapshotTaking.set(false); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position); + snapshotTaking.set(false); } }, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index e2e5c4aa05371..7d494b153c47c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1154,16 +1154,20 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger()) .getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition()); } - if (getEarliestTimeInBacklog && subStats.msgBacklog > 0) { - ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); - PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition(); - long result = 0; - try { - result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get(); - } catch (InterruptedException | ExecutionException e) { - result = -1; + if (getEarliestTimeInBacklog) { + if (subStats.msgBacklog > 0) { + ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); + PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition(); + long result = 0; + try { + result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get(); + } catch (InterruptedException | ExecutionException e) { + result = -1; + } + subStats.earliestMsgPublishTimeInBacklog = result; + } else { + subStats.earliestMsgPublishTimeInBacklog = -1; } - subStats.earliestMsgPublishTimeInBacklog = result; } subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed; subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index cdd37f9795256..f5c0ecad64dd6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -177,6 +177,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final ConcurrentOpenHashMap replicators; static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; + + public static boolean isDedupCursorName(String name) { + return DEDUPLICATION_CURSOR_NAME.equals(name); + } private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch"; private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; @@ -291,7 +295,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS checkReplicatedSubscriptionControllerState(); TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(topicName)) { + && !isEventSystemTopic(topicName) + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { @@ -2370,7 +2375,9 @@ public void checkInactiveSubscriptions() { .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { - if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() + || sub.isReplicated() + || isCompactionSubscription(subName)) { return; } if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index e26b0aa756162..428684de7a821 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -41,6 +41,7 @@ import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.powermock.reflect.Whitebox; +import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; import org.testng.annotations.Test; /** @@ -152,6 +153,24 @@ public void testSetDefaultEnsemblePlacementPolicyRackAwareEnabledChangedValues() assertEquals(20, bkConf.getMinNumRacksPerWriteQuorum()); } + @Test + public void testSetEnsemblePlacementPolicys() { + ClientConfiguration bkConf = new ClientConfiguration(); + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setBookkeeperClientMinNumRacksPerWriteQuorum(3); + conf.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(true); + + MetadataStore store = mock(MetadataStore.class); + + BookKeeperClientFactoryImpl.setEnsemblePlacementPolicy( + bkConf, + conf, + store, + ZkIsolatedBookieEnsemblePlacementPolicy.class); + assertEquals(bkConf.getMinNumRacksPerWriteQuorum(), 3); + assertTrue(bkConf.getEnforceMinNumRacksPerWriteQuorum()); + } + @Test public void testSetDiskWeightBasedPlacementEnabled() { BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java index c00ae8cd0d39d..5b78a32dc37e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MultiBrokerBaseTest.java @@ -21,12 +21,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.impl.ZKMetadataStore; @@ -124,6 +126,9 @@ protected void additionalBrokersCleanup() { try { pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L); pulsarService.close(); + pulsarService.getConfiguration().getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarService.getConfiguration().getWebServicePortTls().ifPresent(PortManager::releaseLockedPort); } catch (PulsarServerException e) { // ignore } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index ab076b3e2fe85..09544ee957e2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -2535,6 +2535,32 @@ public void testFailedUpdatePartitionedTopic() throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, newPartitions); } + /** + * Validate retring failed partitioned topic should succeed. + * @throws Exception + */ + @Test + public void testTopicStatsWithEarliestTimeInBacklogIfNoBacklog() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://prop-xyz/ns1/tp_"); + final String subscriptionName = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + // Send one message. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false) + .create(); + MessageIdImpl messageId = (MessageIdImpl) producer.send("123"); + // Catch up. + admin.topics().skipAllMessages(topicName, subscriptionName); + // Get topic stats with earliestTimeInBacklog + TopicStats topicStats = admin.topics().getStats(topicName, false, false, true); + assertEquals(topicStats.getSubscriptions().get(subscriptionName).getEarliestMsgPublishTimeInBacklog(), -1L); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); + } + @Test(dataProvider = "topicType") public void testPartitionedStatsAggregationByProducerName(String topicType) throws Exception { conf.setAggregatePublisherStatsByProducerName(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c349e902882e1..341842f19811f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1254,7 +1254,7 @@ public void testGetStats() throws Exception { TopicStats topicStats = admin.topics().getStats(topic, false, false, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); - assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); + assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), -1); // publish several messages publishMessagesOnPersistentTopic(topic, 10); @@ -1272,7 +1272,7 @@ public void testGetStats() throws Exception { topicStats = admin.topics().getStats(topic, false, false, true); assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0); - assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0); + assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), -1); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java index 489efa5755ba8..9ca4510a209b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -25,7 +26,6 @@ import java.util.Optional; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; @@ -66,9 +66,9 @@ protected ServiceConfiguration createConfForAdditionalBroker(int additionalBroke } private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { - int pulsarPort = PortManager.nextFreePort(); - int httpPort = PortManager.nextFreePort(); - int httpsPort = PortManager.nextFreePort(); + int pulsarPort = nextLockedFreePort(); + int httpPort = nextLockedFreePort(); + int httpsPort = nextLockedFreePort(); // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended conf.setAdvertisedAddress(advertisedAddress); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java index 639ccf7ecd01b..f08771934f666 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/protocol/SimpleProtocolHandlerTestsBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.util.PortManager; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,11 +39,14 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; @Slf4j @@ -54,6 +57,8 @@ public static final class MyProtocolHandler implements ProtocolHandler { private ServiceConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String protocolName() { return "test"; @@ -81,7 +86,9 @@ public void start(BrokerService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index d3452b4064fd9..9e1acf1371c3c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -117,8 +117,8 @@ void setup() throws Exception { config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType("non-partitioned"); - config.setSystemTopicEnabled(false); - config.setTopicLevelPoliciesEnabled(false); + config.setSystemTopicEnabled(true); + config.setTopicLevelPoliciesEnabled(true); config.setForceDeleteNamespaceAllowed(true); pulsar = new PulsarService(config); @@ -1159,8 +1159,13 @@ public void testProducerException() throws Exception { assertTrue(gotException, "backlog exceeded exception did not occur"); } - @Test - public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { + @DataProvider(name = "dedupTestSet") + public static Object[][] dedupTestSet() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @Test(dataProvider = "dedupTestSet") + public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", @@ -1176,9 +1181,12 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { boolean gotException = false; Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); - byte[] content = new byte[1024]; Producer producer = createProducer(client, topic1); + + admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + for (int i = 0; i < 10; i++) { producer.send(content); } @@ -1197,6 +1205,7 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { } assertTrue(gotException, "backlog exceeded exception did not occur"); + assertFalse(producer.isConnected()); // now remove backlog and ensure that producer is unblocked; TopicStats stats = getTopicStats(topic1); @@ -1213,14 +1222,33 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { Exception sendException = null; gotException = false; try { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { producer.send(content); + Message msg = consumer.receive(); + consumer.acknowledge(msg); } } catch (Exception e) { gotException = true; sendException = e; } + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); assertFalse(gotException, "unable to publish due to " + sendException); + + gotException = false; + long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp(); + try { + // try to send over backlog quota and make sure it passes + producer.send(content); + producer.send(content); + } catch (PulsarClientException ce) { + assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException + || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); + gotException = true; + sendException = ce; + } + assertFalse(gotException, "unable to publish due to " + sendException); + assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp()); + } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index d4ae0da3617e3..0345ca8080e55 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -67,6 +67,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.http.HttpResponse; @@ -76,6 +77,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; @@ -106,7 +108,9 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; +import org.apache.pulsar.compaction.Compactor; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1222,6 +1226,69 @@ public void testConcurrentLoadTopicExceedLimitShouldNotBeAutoCreated() throws Ex } } + @Test + public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() throws Exception { + String namespace = "prop/test"; + + // set up broker set compaction threshold. + cleanup(); + conf.setBrokerServiceCompactionThresholdInBytes(8); + setup(); + + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } + + // set enable subscription expiration. + admin.namespaces().setSubscriptionExpirationTime(namespace, 1); + + String compactionInactiveTestTopic = "persistent://prop/test/testCompactionCursorShouldNotDelete"; + + admin.topics().createNonPartitionedTopic(compactionInactiveTestTopic); + + CompletableFuture> topicCf = + pulsar.getBrokerService().getTopic(compactionInactiveTestTopic, true); + + Optional topicOptional = topicCf.get(); + assertTrue(topicOptional.isPresent()); + + PersistentTopic topic = (PersistentTopic) topicOptional.get(); + + PersistentSubscription sub = (PersistentSubscription) topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION); + assertNotNull(sub); + + topic.checkCompaction(); + + Field currentCompaction = PersistentTopic.class.getDeclaredField("currentCompaction"); + currentCompaction.setAccessible(true); + CompletableFuture compactionFuture = (CompletableFuture)currentCompaction.get(topic); + + compactionFuture.get(); + + ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor(); + + // make cursor last active time to very small to check if it will be deleted + Field cursorLastActiveField = ManagedCursorImpl.class.getDeclaredField("lastActive"); + cursorLastActiveField.setAccessible(true); + cursorLastActiveField.set(cursor, 0); + + // replace origin object. so we can check if subscription is deleted. + PersistentSubscription spySubscription = Mockito.spy(sub); + topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, spySubscription); + + // trigger inactive check. + topic.checkInactiveSubscriptions(); + + // Compaction subscription should not call delete method. + Mockito.verify(spySubscription, Mockito.never()).delete(); + + // check if the subscription exist. + assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); + + } + /** * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and * it should not introduce deadlock while performing it. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index 3cd25d3b00c67..1461f05463ddb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -76,6 +76,7 @@ protected void setup() throws Exception { conf.setDefaultNumPartitions(PARTITIONS); conf.setManagedLedgerMaxEntriesPerLedger(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); + conf.setTransactionCoordinatorEnabled(true); super.baseSetup(); } @@ -185,6 +186,13 @@ public void testSystemNamespaceNotCreateChangeEventsTopic() throws Exception { Optional optionalTopic = pulsar.getBrokerService() .getTopic(topicName.getPartition(1).toString(), false).join(); Assert.assertTrue(optionalTopic.isEmpty()); + + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + admin.topics().getRetention(heartbeatTopicName.toString()); + optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); } @Test @@ -202,6 +210,40 @@ public void testHeartbeatTopicNotAllowedToSendEvent() throws Exception { }); } + @Test + public void testHeartbeatTopicBeDeleted() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfig()); + TopicName heartbeatTopicName = TopicName.get("persistent", namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + + admin.topics().delete(heartbeatTopicName.toString(), true); + topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 0); + } + + @Test + public void testHeartbeatNamespaceNotCreateTransactionInternalTopic() throws Exception { + admin.brokers().healthcheck(TopicVersion.V2); + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), + pulsar.getConfig()); + TopicName topicName = TopicName.get("persistent", + namespaceName, SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + Optional optionalTopic = pulsar.getBrokerService() + .getTopic(topicName.getPartition(1).toString(), false).join(); + Assert.assertTrue(optionalTopic.isEmpty()); + + List topics = getPulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); + Assert.assertEquals(topics.size(), 1); + TopicName heartbeatTopicName = TopicName.get("persistent", + namespaceName, BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX); + Assert.assertEquals(topics.get(0), heartbeatTopicName.toString()); + } + @Test public void testSetBacklogCausedCreatingProducerFailure() throws Exception { final String ns = "prop/ns-test"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java index e7c6dddc9b2cf..818080be32d4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertEquals; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,7 +34,6 @@ import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.util.ClassLoaderUtils; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; @@ -86,8 +87,8 @@ void setup() throws Exception { // start brokers for (int i = 0; i < BROKER_COUNT; i++) { - int brokerPort = PortManager.nextFreePort(); - int webPort = PortManager.nextFreePort(); + int brokerPort = nextLockedFreePort(); + int webPort = nextLockedFreePort(); ServiceConfiguration config = new ServiceConfiguration(); config.setBrokerShutdownTimeoutMs(0L); @@ -187,6 +188,10 @@ void tearDown() throws Exception { } if (pulsarServices[i] != null) { pulsarServices[i].close(); + pulsarServices[i].getConfiguration(). + getBrokerServicePort().ifPresent(PortManager::releaseLockedPort); + pulsarServices[i].getConfiguration() + .getWebServicePort().ifPresent(PortManager::releaseLockedPort); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java new file mode 100644 index 0000000000000..b9df071fdbc7e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/PortManager.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.net.ServerSocket; +import java.util.HashSet; +import java.util.Set; + +public class PortManager { + + private static final Set PORTS = new HashSet<>(); + + /** + * Return a locked available port. + * + * @return locked available port. + */ + public static synchronized int nextLockedFreePort() { + int exceptionCount = 0; + while (true) { + try (ServerSocket ss = new ServerSocket(0)) { + int port = ss.getLocalPort(); + if (!checkPortIfLocked(port)) { + PORTS.add(port); + return port; + } + } catch (Exception e) { + exceptionCount++; + if (exceptionCount > 100) { + throw new RuntimeException("Unable to allocate socket port", e); + } + } + } + } + + /** + * Returns whether the port was released successfully. + * + * @return whether the release is successful. + */ + public static synchronized boolean releaseLockedPort(int lockedPort) { + return PORTS.remove(lockedPort); + } + + /** + * Check port if locked. + * + * @return whether the port is locked. + */ + public static synchronized boolean checkPortIfLocked(int lockedPort) { + return PORTS.contains(lockedPort); + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java new file mode 100644 index 0000000000000..88057ba943ab2 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/PortManagerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import org.testng.annotations.Test; + +import static org.apache.pulsar.common.util.PortManager.checkPortIfLocked; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; +import static org.apache.pulsar.common.util.PortManager.releaseLockedPort; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +public class PortManagerTest { + @Test + public void testCheckPortIfLockedAndRemove() { + int port = nextLockedFreePort(); + assertTrue(checkPortIfLocked(port)); + assertTrue(releaseLockedPort(port)); + assertFalse(checkPortIfLocked(port)); + } +} diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 3444b0287d913..266c22c6d81e8 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -27,6 +27,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_INDEX_SCOPE; import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_LEDGER_SCOPE; import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.testng.Assert.assertFalse; import com.google.common.base.Stopwatch; import java.io.File; @@ -84,7 +85,7 @@ import org.apache.bookkeeper.test.ZooKeeperClusterUtil; import org.apache.bookkeeper.test.ZooKeeperUtil; import org.apache.bookkeeper.util.DiskChecker; -import org.apache.bookkeeper.util.PortManager; +import org.apache.pulsar.common.util.PortManager; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -130,6 +131,8 @@ public void handleTestMethodName(Method method) { private boolean isAutoRecoveryEnabled; + private final List bookiePorts = new ArrayList<>(); + SynchronousQueue asyncExceptions = new SynchronousQueue<>(); protected void captureThrowable(Runnable c) { try { @@ -283,6 +286,7 @@ protected void stopBKCluster() throws Exception { t.shutdown(); } servers.clear(); + bookiePorts.removeIf(PortManager::releaseLockedPort); } protected ServerConfiguration newServerConfiguration() throws Exception { @@ -290,7 +294,7 @@ protected ServerConfiguration newServerConfiguration() throws Exception { int port; if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) { - port = PortManager.nextFreePort(); + port = nextLockedFreePort(); } else { port = 0; } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java index 0cc4bbb6bb500..c779acb6ebe90 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/extensions/SimpleProxyExtensionTestBase.java @@ -22,12 +22,12 @@ import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.util.PortManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.util.PortManager; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.proxy.server.ProxyConfiguration; import org.apache.pulsar.proxy.server.ProxyService; @@ -43,12 +43,15 @@ import java.net.SocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; @@ -60,6 +63,8 @@ public static final class MyProxyExtension implements ProxyExtension { private ProxyConfiguration conf; + private final List ports = new ArrayList<>(); + @Override public String extensionName() { return "test"; @@ -81,7 +86,9 @@ public void start(ProxyService service) { @Override public Map> newChannelInitializers() { - return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), PortManager.nextFreePort()), + int port = nextLockedFreePort(); + this.ports.add(port); + return Collections.singletonMap(new InetSocketAddress(conf.getBindAddress(), port), new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { @@ -106,7 +113,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { @Override public void close() { - + ports.removeIf(PortManager::releaseLockedPort); } } diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE index d1ab7ed196c35..b67249d0ea5ae 100644 --- a/pulsar-sql/presto-distribution/LICENSE +++ b/pulsar-sql/presto-distribution/LICENSE @@ -479,8 +479,8 @@ The Apache Software License, Version 2.0 - memory-0.8.3.jar - sketches-core-0.8.3.jar * Apache Zookeeper - - zookeeper-3.8.1.jar - - zookeeper-jute-3.8.1.jar + - zookeeper-3.8.3.jar + - zookeeper-jute-3.8.3.jar * Apache Yetus Audience Annotations - audience-annotations-0.12.0.jar * Swagger