diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 9bec9d39bf858..90859461838b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -658,6 +658,16 @@ protected LastCumulativeAck initialValue() { private boolean flushRequired = false; public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) { + if (messageId.equals(this.messageId)) { + if (this.bitSetRecyclable != null && bitSetRecyclable != null + && bitSetRecyclable.nextSetBit(0) > this.bitSetRecyclable.nextSetBit(0)) { + this.bitSetRecyclable.recycle(); + set(messageId, bitSetRecyclable); + flushRequired = true; + } + return; + } + if (messageId.compareTo(this.messageId) > 0) { if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { this.bitSetRecyclable.recycle(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java index 102ccfc0e07a5..e496ff04d4816 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/LastCumulativeAckTest.java @@ -29,6 +29,28 @@ public class LastCumulativeAckTest { + @Test + public void testUpdateBitSetRecyclable() { + final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); + final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10); + final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create(); + bitSetRecyclable1.set(0, 10); + bitSetRecyclable1.clear(0, 3); + lastCumulativeAck.update(messageId1, bitSetRecyclable1); + assertTrue(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAck.getMessageId(), messageId1); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1); + + // In the same message, the batch index is incremented. + final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create(); + bitSetRecyclable2.set(0, 10); + bitSetRecyclable2.clear(0, 6); + lastCumulativeAck.update(messageId1, bitSetRecyclable2); + assertTrue(lastCumulativeAck.isFlushRequired()); + assertSame(lastCumulativeAck.getMessageId(), messageId1); + assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2); + } + @Test public void testUpdate() { final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();