Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,16 @@ protected LastCumulativeAck initialValue() {
private boolean flushRequired = false;

public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (messageId.equals(this.messageId)) {
if (this.bitSetRecyclable != null && bitSetRecyclable != null
&& bitSetRecyclable.nextSetBit(0) > this.bitSetRecyclable.nextSetBit(0)) {
this.bitSetRecyclable.recycle();
set(messageId, bitSetRecyclable);
flushRequired = true;
}
return;
}

if (messageId.compareTo(this.messageId) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have the compare sult, I think we can improve it like

        int compareResult = messageId.compareTo(this.messageId);
        if (messageId.compareTo(this.messageId) > 0) {
            /* ... */
        } else if (compareResult == 0) {
            /* compare bitSetRecyclable... */
        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem compareTo has already compare the batchIndex.

return ComparisonChain.start()
.compare(ledgerId1, ledgerId2)
.compare(entryId1, entryId2)
.compare(partitionIndex1, partitionIndex2)
.compare(batchIndex1, batchIndex2)
.result();
so we don't need to do single check, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right

if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,28 @@

public class LastCumulativeAckTest {

@Test
public void testUpdateBitSetRecyclable() {
final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
bitSetRecyclable1.set(0, 10);
bitSetRecyclable1.clear(0, 3);
lastCumulativeAck.update(messageId1, bitSetRecyclable1);
assertTrue(lastCumulativeAck.isFlushRequired());
assertSame(lastCumulativeAck.getMessageId(), messageId1);
assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);

// In the same message, the batch index is incremented.
final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
bitSetRecyclable2.set(0, 10);
bitSetRecyclable2.clear(0, 6);
lastCumulativeAck.update(messageId1, bitSetRecyclable2);
assertTrue(lastCumulativeAck.isFlushRequired());
assertSame(lastCumulativeAck.getMessageId(), messageId1);
assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
}

@Test
public void testUpdate() {
final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
Expand Down