Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,9 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
}

private void incrementUnackedMessages(int unackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages()
int updatedUnackedMessages = addAndGetUnAckedMsgs(this, unackedMessages);
if (updatedUnackedMessages >= getMaxUnackedMessages()
&& Subscription.isIndividualAckMode(subType)
&& getMaxUnackedMessages() > 0) {
blockedConsumerOnUnackedMsgs = true;
}
Expand Down Expand Up @@ -761,8 +762,12 @@ private void checkAckValidationError(CommandAck ack, Position position) {

private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer,
Position position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
return removePendingAcks(ackOwnedConsumer, position);
if (msgId.getAckSetsCount() == 0) {
if (Subscription.isIndividualAckMode(subType)) {
return removePendingAcks(ackOwnedConsumer, position);
}
// Exclusive or Failover model
return true;
}
return false;
}
Expand Down Expand Up @@ -1185,7 +1190,7 @@ public Subscription getSubscription() {

private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
int unackedMsgs = 0;
if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) {
if (isPersistentTopic) {
subscription.addUnAckedMessages(ackedMessages);
unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
TimeUnit.MILLISECONDS);
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded();
}
Expand Down Expand Up @@ -231,18 +231,19 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
SendMessageInfo sendMessageInfo, long epoch) {
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
});
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the
// socket.
executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
});
}

@Override
Expand Down Expand Up @@ -585,6 +586,11 @@ public boolean checkAndUnblockIfStuck() {
return false;
}

public int getUnackedMessages() {
Consumer activeConsumer = getActiveConsumer();
return activeConsumer != null ? activeConsumer.getUnackedMessages() : 0;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

public static class ReadEntriesCtx {
Expand All @@ -597,13 +603,14 @@ public static class ReadEntriesCtx {
private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<ReadEntriesCtx> RECYCLER =
new Recycler<ReadEntriesCtx>() {
@Override
protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
return new ReadEntriesCtx(recyclerHandle);
}
};
@Override
protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
return new ReadEntriesCtx(recyclerHandle);
}
};

public static ReadEntriesCtx create(Consumer consumer, long epoch) {
ReadEntriesCtx readEntriesCtx = RECYCLER.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,9 @@ public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(GetStatsOptions ge
SubType subType = getType();
subStats.type = getTypeString();
if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
Consumer activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
PersistentDispatcherSingleActiveConsumer d = ((PersistentDispatcherSingleActiveConsumer) dispatcher);
subStats.unackedMessages = d.getUnackedMessages();
Consumer activeConsumer = d.getActiveConsumer();
if (activeConsumer != null) {
subStats.activeConsumerName = activeConsumer.consumerName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.*;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.all_matching;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.changed;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.none;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -113,7 +115,29 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.*;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -1541,7 +1565,7 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa
assertTrue(consumedTimestamp < lastConsumedTimestamp);
assertTrue(ackedTimestamp < lastAckedTimestamp);
assertTrue(startConsumedTimestampInConsumerStats < lastConsumedTimestamp);
assertEquals(lastConsumedFlowTimestamp, consumedFlowTimestamp);
assertTrue(lastConsumedFlowTimestamp > consumedFlowTimestamp);
assertTrue(ackedTimestampInSubStats < lastAckedTimestampInSubStats);
assertEquals(lastConsumedTimestamp, lastConsumedTimestampInSubStats);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,9 @@ private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subTyp
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
}
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
if (!enableBatch) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,10 +31,14 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -124,4 +132,70 @@ public void testSkipReadEntriesFromCloseCursor() throws Exception {
// Verify: the topic can be deleted successfully.
admin.topics().delete(topicName, false);
}

@Test
public void testUnackedMessages() throws Exception {
String topicNamePrefix = "testUnackedMessages-";
String subscriptionNamePrefix = "testUnackedMessages-sub-";
int totalProducedMessage = 100;
// we will check unacked messages every 20 messages
int checkStep = 20;

for(SubscriptionType subscription : List.of(SubscriptionType.Exclusive, SubscriptionType.Failover)) {
String topicName = topicNamePrefix + subscription.name();
String subscriptionName = subscriptionNamePrefix + subscription.name();
@Cleanup
org.apache.pulsar.client.api.Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName).subscriptionName(subscriptionName)
.subscriptionType(subscription)
.receiverQueueSize(totalProducedMessage)
.ackTimeout(1000, TimeUnit.MILLISECONDS)
.negativeAckRedeliveryDelay(0, TimeUnit.SECONDS)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.maxAcknowledgmentGroupSize(1)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();

for (int i = 0; i < totalProducedMessage; ++i) {
producer.send(Integer.toString(i));
}

assertUnackedMessage(topicName, subscriptionName, totalProducedMessage);

for (int i = 0; i < totalProducedMessage; ++i) {
Message<String> msg = consumer.receive(1, TimeUnit.SECONDS);
if(Objects.isNull(msg)) {
break;
}
consumer.acknowledge(msg.getMessageId());
if((i + 1) % checkStep == 0) {
assertUnackedMessage(topicName, subscriptionName, totalProducedMessage - i - 1);
}
}

assertUnackedMessage(topicName, subscriptionName, 0);

TopicStats topicStats = admin.topics().getStats(topicName);
assertThat(topicStats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isEqualTo(0);

producer.send("1");
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);

consumer.negativeAcknowledge(msg);
assertUnackedMessage(topicName, subscriptionName, 1);

consumer.acknowledge(msg);
assertUnackedMessage(topicName, subscriptionName, 0);
}
}

private void assertUnackedMessage(String topicName, String subscriptionName, int expectedUnackedMessage) {
Awaitility.await().atMost(5, TimeUnit.SECONDS).pollInterval(500, TimeUnit.MILLISECONDS).untilAsserted(() -> {
TopicStats topicStats = admin.topics().getStats(topicName);
assertThat(topicStats.getSubscriptions().get(subscriptionName).getUnackedMessages()).isEqualTo(expectedUnackedMessage);
});
}

}