diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 61f9d5c86b32f..5d56bd02a8cc3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -448,8 +448,9 @@ public Future sendMessages(final List entries, } private void incrementUnackedMessages(int unackedMessages) { - if (Subscription.isIndividualAckMode(subType) - && addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages() + int updatedUnackedMessages = addAndGetUnAckedMsgs(this, unackedMessages); + if (updatedUnackedMessages >= getMaxUnackedMessages() + && Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0) { blockedConsumerOnUnackedMsgs = true; } @@ -761,8 +762,12 @@ private void checkAckValidationError(CommandAck ack, Position position) { private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer, Position position, MessageIdData msgId) { - if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { - return removePendingAcks(ackOwnedConsumer, position); + if (msgId.getAckSetsCount() == 0) { + if (Subscription.isIndividualAckMode(subType)) { + return removePendingAcks(ackOwnedConsumer, position); + } + // Exclusive or Failover model + return true; } return false; } @@ -1185,7 +1190,7 @@ public Subscription getSubscription() { private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { int unackedMsgs = 0; - if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { + if (isPersistentTopic) { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 99a09d3a5d708..ff562cfe3ac19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -89,9 +89,9 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su : ""/* NonDurableCursor doesn't have name */); this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(), - TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), - TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(), - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), + TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(), + TimeUnit.MILLISECONDS); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.initializeDispatchRateLimiterIfNeeded(); } @@ -231,18 +231,19 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks, SendMessageInfo sendMessageInfo, long epoch) { currentConsumer - .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), - sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), - redeliveryTracker, epoch) - .addListener(future -> { - if (future.isSuccess()) { - acquirePermitsForDeliveredMessages(topic, cursor, entries.size(), - sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); - - // Schedule a new read batch operation only after the previous batch has been written to the socket. - executor.execute(() -> readMoreEntries(getActiveConsumer())); - } - }); + .sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), + sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), + redeliveryTracker, epoch) + .addListener(future -> { + if (future.isSuccess()) { + acquirePermitsForDeliveredMessages(topic, cursor, entries.size(), + sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); + + // Schedule a new read batch operation only after the previous batch has been written to the + // socket. + executor.execute(() -> readMoreEntries(getActiveConsumer())); + } + }); } @Override @@ -585,6 +586,11 @@ public boolean checkAndUnblockIfStuck() { return false; } + public int getUnackedMessages() { + Consumer activeConsumer = getActiveConsumer(); + return activeConsumer != null ? activeConsumer.getUnackedMessages() : 0; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); public static class ReadEntriesCtx { @@ -597,13 +603,14 @@ public static class ReadEntriesCtx { private ReadEntriesCtx(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } + private static final Recycler RECYCLER = new Recycler() { - @Override - protected ReadEntriesCtx newObject(Recycler.Handle recyclerHandle) { - return new ReadEntriesCtx(recyclerHandle); - } - }; + @Override + protected ReadEntriesCtx newObject(Recycler.Handle recyclerHandle) { + return new ReadEntriesCtx(recyclerHandle); + } + }; public static ReadEntriesCtx create(Consumer consumer, long epoch) { ReadEntriesCtx readEntriesCtx = RECYCLER.get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 97b4dc06d0837..ed479bb0bfc31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1315,7 +1315,9 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge SubType subType = getType(); subStats.type = getTypeString(); if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) { - Consumer activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer(); + PersistentDispatcherSingleActiveConsumer d = ((PersistentDispatcherSingleActiveConsumer) dispatcher); + subStats.unackedMessages = d.getUnackedMessages(); + Consumer activeConsumer = d.getActiveConsumer(); if (activeConsumer != null) { subStats.activeConsumerName = activeConsumer.consumerName(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 76559297d34a7..df0c057897718 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -22,7 +22,9 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH; -import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*; +import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.all_matching; +import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.changed; +import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.none; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -113,7 +115,29 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.*; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; +import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; +import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; +import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.awaitility.Awaitility; @@ -1541,7 +1565,7 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa assertTrue(consumedTimestamp < lastConsumedTimestamp); assertTrue(ackedTimestamp < lastAckedTimestamp); assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp); - assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp); + assertTrue(lastConsumedFlowTimestamp > consumedFlowTimestamp); assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats); assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 55d759ba74079..c4b3dd6478625 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -1096,7 +1096,9 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); } } else { - assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + if (!enableBatch) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java index a4c9e26ffb853..f16f7db50cfb0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -27,10 +31,14 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -124,4 +132,70 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception { // Verify: the topic can be deleted successfully. admin.topics().delete(topicName, false); } + + @Test + public void testUnackedMessages() throws Exception { + String topicNamePrefix = "testUnackedMessages-"; + String subscriptionNamePrefix = "testUnackedMessages-sub-"; + int totalProducedMessage = 100; + // we will check unacked messages every 20 messages + int checkStep = 20; + + for(SubscriptionType subscription : List.of(SubscriptionType.Exclusive, SubscriptionType.Failover)) { + String topicName = topicNamePrefix + subscription.name(); + String subscriptionName = subscriptionNamePrefix + subscription.name(); + @Cleanup + org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName).subscriptionName(subscriptionName) + .subscriptionType(subscription) + .receiverQueueSize(totalProducedMessage) + .ackTimeout(1000, TimeUnit.MILLISECONDS) + .negativeAckRedeliveryDelay(0, TimeUnit.SECONDS) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .maxAcknowledgmentGroupSize(1) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + + for (int i = 0; i < totalProducedMessage; ++i) { + producer.send(Integer.toString(i)); + } + + assertUnackedMessage(topicName, subscriptionName, totalProducedMessage); + + for (int i = 0; i < totalProducedMessage; ++i) { + Message msg = consumer.receive(1, TimeUnit.SECONDS); + if(Objects.isNull(msg)) { + break; + } + consumer.acknowledge(msg.getMessageId()); + if((i + 1) % checkStep == 0) { + assertUnackedMessage(topicName, subscriptionName, totalProducedMessage - i - 1); + } + } + + assertUnackedMessage(topicName, subscriptionName, 0); + + TopicStats topicStats = admin.topics().getStats(topicName); + assertThat(topicStats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isEqualTo(0); + + producer.send("1"); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + + consumer.negativeAcknowledge(msg); + assertUnackedMessage(topicName, subscriptionName, 1); + + consumer.acknowledge(msg); + assertUnackedMessage(topicName, subscriptionName, 0); + } + } + + private void assertUnackedMessage(String topicName, String subscriptionName, int expectedUnackedMessage) { + Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(500, TimeUnit.MILLISECONDS).untilAsserted(() -> { + TopicStats topicStats = admin.topics().getStats(topicName); + assertThat(topicStats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isEqualTo(expectedUnackedMessage); + }); + } + }