diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java index 3d24d3746d66a..203715ca7dbcf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java @@ -193,6 +193,39 @@ public void testInterleavedChunks() throws Exception { assertEquals(receivedUuidList1, Arrays.asList("A-0", "B-0", "B-1", "A-1")); } + // Issue #25220 + @Test + public void testNegativeAckChunkedMessage() throws Exception { + final String topic = "persistent://my-property/my-ns/test-negative-acknowledge-with-chunk"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .enableChunking(true) + .chunkMaxMessageSize(1024) // 1KB max - forces chunking for larger messages + .create(); + String longMessage = "X".repeat(10 * 1024); + producer.sendAsync(longMessage); + producer.flush(); + + // negative ack the first message + consumer.negativeAcknowledge(consumer.receive()); + + // now 2s has passed, the first message should be redelivered 1s later. + Message msg1 = consumer.receive(2, TimeUnit.SECONDS); + assertNotNull(msg1); + } + private Producer createProducer(String topic) throws PulsarClientException { return pulsarClient.newProducer(Schema.STRING) .topic(topic) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index adecd97564ff7..7091b05151e4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1508,7 +1508,6 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien // and return undecrypted payload if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { - // right now, chunked messages are only supported by non-shared subscription if (isChunkedMessage) { uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx); if (uncompressedPayload == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index e0ec16f507e4d..d975d22be7da3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -158,7 +158,7 @@ static long trimLowerBit(long timestamp, int bits) { private synchronized void add(MessageId messageId, int redeliveryCount) { if (messageId instanceof TraceableMessageId) { Span span = ((TraceableMessageId) messageId).getTracingSpan(); - if (span != null) { + if (span != null || messageId instanceof ChunkMessageIdImpl) { MessageIdAdv msgId = (MessageIdAdv) messageId; nackedMessageIds.computeIfAbsent(msgId.getLedgerId(), k -> new Long2ObjectOpenHashMap<>()) .put(msgId.getEntryId(), messageId);