Skip to content

[feat][pip] PIP-426: Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions#24396

Open
berg223 wants to merge 20 commits intoapache:masterfrom
berg223:fix_consumer_throtting_and_cumulative_unacked_stats
Open

[feat][pip] PIP-426: Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions#24396
berg223 wants to merge 20 commits intoapache:masterfrom
berg223:fix_consumer_throtting_and_cumulative_unacked_stats

Conversation

@berg223
Copy link
Contributor

@berg223 berg223 commented Jun 8, 2025

Fixes #24159

Main Issue: #24159

Motivation

  1. I have tried to fix the issue by [fix][broker] fix unacked message count is zero when using exclusive subscription #24376. However, that PR doesn't support features about cumulative ack, batching mode, and transaction. So I want to improve it further.
  2. I have found another issue that flowcontrol of exclusive or failover consumer is not work. Since the issues are highly correlated, I want to fix them at the same time.
We can reproduce flow control issue by this unit test
    @Test(timeOut = 30000)
    public void testMaxUnackedMessagesOnExclusiveConsumer() throws Exception {
        final String topicName = testTopic + System.currentTimeMillis();
        final String subscriberName = "test-sub-exclusive" + System.currentTimeMillis();
        final int unackMsgAllowed = 100;
        final int receiverQueueSize = 10;
        final int totalProducedMsgs = 300;

        ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                .subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
                .ackTimeout(1, TimeUnit.MINUTES)
                .subscriptionType(SubscriptionType.Exclusive);
        @Cleanup
        Consumer<String> consumer = consumerBuilder.subscribe();
        // 1) Produced Messages
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        // 2) Unlimited, so all messages can be consumed
        int count = 0;
        List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
        for (int i = 0; i < totalProducedMsgs; i++) {
            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
            if (message == null) {
                break;
            }
            count++;
            list.add(message);
        }
        assertEquals(count, totalProducedMsgs);
        list.forEach(message -> {
            try {
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
            }
        });
        // 3) Set restrictions, so only part of the data can be consumed
        waitCacheInit(topicName);
        admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
        Awaitility.await().untilAsserted(()
                -> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
        assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
        // 4) consumer can only consume 100 messages
        for (int i = 0; i < totalProducedMsgs; i++) {
            String message = "my-message-" + i;
            producer.send(message);
        }
        int consumerCounter = 0;
        Message<String> message = null;
        for (int i = 0; i < totalProducedMsgs; i++) {
            try {
                Message<String> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    break;
                }
                message = msg;
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
        consumer.acknowledgeCumulative(message.getMessageId());
        consumerCounter = 0;
        for (int i = 0; i < totalProducedMsgs - unackMsgAllowed; i++) {
            try {
                message = consumer.receive(500, TimeUnit.MILLISECONDS);
                if (message == null) {
                    break;
                }
                ++consumerCounter;
            } catch (PulsarClientException e) {
                break;
            }
        }
        assertEquals(consumerCounter, unackMsgAllowed);
    }

Modifications

  • Reused and extended the pendingAcks mechanism in the Consumer class to support exclusive and failover subscriptions.
  • Removed the restriction that pendingAcks only works in individual ack mode.
  • Reused PendingAckHandleImpl for transaction support after removing the dependency on Subscription.isIndividualAckMode.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnConsumer

This change added tests and can be verified as follows:

  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnExclusiveConsumer
  • MaxUnackedMessagesTest#testMaxUnackedMessagesOnFailOverConsumer
  • PersistentDispatcherSingleActiveConsumerTest#testUnackedMessages
  • PersistentDispatcherSingleActiveConsumerTest#testUnackedMessagesWithTransaction

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jun 8, 2025
@berg223 berg223 changed the title [improve][broker] enable consumer throtting and track unacked message for exlusive or failover subscription [improve][broker] enable consumer throtting and get unacked message count for exlusive or failover subscription Jun 8, 2025
@berg223 berg223 changed the title [improve][broker] enable consumer throtting and get unacked message count for exlusive or failover subscription [improve][broker] Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions Jun 9, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes make sense to me, but since this changes the behavior of the existing way exclusive and failover subscriptions currently behave, it will be necessary to write a PIP for this change. One common request has been that when behavior is changed that there's a feature flag which controls whether the old or new behavior is used.
Keeping pending acks state comes with a cost, so that's one reason why it might be useful to turn this feature off to avoid instability when upgrading an existing system.

You could use LLM to draft a PIP by passing

Even before writing a PIP, it could be useful to ask whether this type of change would be acceptable in the community. However, having the PIP document to collect the background could be useful already at this stage. The initial discussion happens on the Pulsar dev mailing list (joining instructions).

@berg223 berg223 changed the title [improve][broker] Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions [feat][pip] PIP-426: Enable Consumer Throttling and Accurate Unacknowledged Message Tracking for Exclusive and Failover Subscriptions Jun 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] unacked message count is zero when using exclusive subscription.

2 participants