-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
Broker version - latest
Broker Java version - 21
Client library type - Java
Client library version - 4.0.8 (reproduced starting at 4.0.1)
Client Java version - 21
Issue Description
We observed in 4.0.5 that after negative acknowledging a chunked message, the consumer stopped. We expected it to consume the redelivered chunked message as well as any future incoming messages.
This appears to be introduced when changing the private HashMap<MessageId, Long> nackedMessages = null; to a private ConcurrentLongLongPairHashMap nackedMessages = null; in this PR. nackedMessages no longer supports a ChunkedMessageId as a key and is now just a LongLongPair of a single message.
When the consumer attempts to find unacked messages to redeliver here, it searches the map using a ChunkedMessageId which is not how the chunked message was stored in the map.
Error messages
Reproducing the issue
Branch with the test below that reproduces the issue: branch with test
@Test
public void testNegativeAckChunkedMessage() throws Exception {
cleanup();
setup();
String topic = BrokerTestUtil.newUniqueName("testNegativeAcksWithChunks");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.enableChunking(true)
.chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages
.create();
String longMessage = "X".repeat(10 * 1024);
producer.sendAsync(longMessage);
producer.flush();
// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());
// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}
Additional information
Two possible solutions could be:
- modify the ConsumerImpl
unAckedChunkedMessageIdSequenceMapto use the same format of key that NegativeAcksTracker passes - modify the NegativeAcksTracker
nackedMessagesto preserve the chunked message id sequence format
Please advise on the recommended fix.
Are you willing to submit a PR?
- I'm willing to submit a PR!