Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,39 @@ public void testInterleavedChunks() throws Exception {
assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", "A-1"));
}

// Issue #25220
@Test
public void testNegativeAckChunkedMessage() throws Exception {
final String topic = "persistent://my-property/my-ns/test-negative-acknowledge-with-chunk";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub1")
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.enableChunking(true)
.chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages
.create();
String longMessage = "X".repeat(10 * 1024);
producer.sendAsync(longMessage);
producer.flush();

// negative ack the first message
consumer.negativeAcknowledge(consumer.receive());

// now 2s has passed, the first message should be redelivered 1s later.
Message<String> msg1 = consumer.receive(2, TimeUnit.SECONDS);
assertNotNull(msg1);
}

private Producer<String> createProducer(String topic) throws PulsarClientException {
return pulsarClient.newProducer(Schema.STRING)
.topic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,6 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {

// right now, chunked messages are only supported by non-shared subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
if (uncompressedPayload == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static long trimLowerBit(long timestamp, int bits) {
private synchronized void add(MessageId messageId, int redeliveryCount) {
if (messageId instanceof TraceableMessageId) {
Span span = ((TraceableMessageId) messageId).getTracingSpan();
if (span != null) {
if (span != null || messageId instanceof ChunkMessageIdImpl) {
MessageIdAdv msgId = (MessageIdAdv) messageId;
nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new Long2ObjectOpenHashMap<>())
.put(msgId.getEntryId(), messageId);
Expand Down
Loading