Skip to content

[improve][broker] Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions#1

Open
berg223 wants to merge 20 commits intomasterfrom
fix_consumer_throtting_and_cumulative_unacked_stats
Open

[improve][broker] Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions#1
berg223 wants to merge 20 commits intomasterfrom
fix_consumer_throtting_and_cumulative_unacked_stats

Conversation

@berg223
Copy link
Owner

@berg223 berg223 commented Jun 5, 2025

Fixes apache#24159

Main Issue: apache#24159

Motivation

  1. I have tried to fix the issue by [fix][broker] fix unacked message count is zero when using exclusive subscription apache/pulsar#24376. However, it's not work when consumer use cumulative ack mode or batching mode. So I want to fix it further.
  2. I have found another issue that flowcontrol of exclusive or failover consumer is not work. Since the issues are highly correlated, I want to fix them at the same time.
We can reproduce flow control issue by this unit test
@Test(timeOut = 30000)
    public void testMaxUnackedMessagesOnExclusiveConsumer() throws Exception {
        final String topicName = testTopic + System.currentTimeMillis();
        final String subscriberName = "test-sub-exclusive" + System.currentTimeMillis();
        final int unackMsgAllowed = 100;
        final int receiverQueueSize = 10;
        final int totalProducedMsgs = 300;

        ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                .subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
                .ackTimeout(1, TimeUnit.MINUTES)
                .subscriptionType(SubscriptionType.Exclusive);
        @Cleanup
        Consumer<String> consumer = consumerBuilder.subscribe();
        // 1) Produced Messages
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        // 2) Unlimited, so all messages can be consumed
        int count = 0;
        List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
        for (int i = 0; i < totalProducedMsgs; i++) {
            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            count++;
            list.add(message);
        }
        assertEquals(count, totalProducedMsgs);
        list.forEach(message -> {
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
            }
        });
        // 3) Set restrictions, so only part of the data can be consumed
        waitCacheInit(topicName);
        admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
        // 4) consumer can only consume 100 messages
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        int consumerCounter = 0;
        Message<String> message = null;
        for (int i = 0; i < totalProducedMsgs; i++) {
            try {
                Message<String> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    break;
                }
                message = msg;
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
        consumer.acknowledgeCumulative(message.getMessageId());
        consumerCounter = 0;
        for (int i = 0; i < totalProducedMsgs - unackMsgAllowed; i++) {
            try {
                message = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (message == null) {
                    break;
                }
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
    }

Modifications

We use the pendingAcks in Consumer to track unacked message for exlusive or failover subscription. We will also fix the flow control issue by the pendingAcks. However, pendingAcks is limited to individualAckMode before. So we will remove the limitation in all place.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer

This change added tests and can be verified as follows:

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnExclusiveConsumer
  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnFailOverConsumer
  • PersistentDispatcherSingleActiveConsumerTest#testUnackedMessages

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@berg223 berg223 closed this Jun 8, 2025
@berg223 berg223 reopened this Jun 8, 2025
@berg223 berg223 changed the title [WIP][broker] fix unacked message count is zero when using exclusive subscription [improve][broker] fix unacked message count is zero when using exclusive subscription Jun 8, 2025
@berg223 berg223 changed the title [improve][broker] fix unacked message count is zero when using exclusive subscription [improve][broker] Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions Jun 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] unacked message count is zero when using exclusive subscription.

1 participant