Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
long getBufferMemoryUsage();

/**
* Get the delivery timestamp of the last delayed message.
*/
long getLastDelayedMessageTimestamp();

/**
* Get a set of position of messages that have already reached the delivery time.
*/
Expand Down Expand Up @@ -107,6 +112,11 @@ public long getBufferMemoryUsage() {
return 0;
}

@Override
public long getLastDelayedMessageTimestamp() {
return 0;
}

@Override
public NavigableSet<Position> getScheduledMessages(int maxMessages) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ public long getBufferMemoryUsage() {
Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
}

@Override
public long getLastDelayedMessageTimestamp() {
if (delayedMessageMap.isEmpty()) {
return 0;
}
return delayedMessageMap.lastLongKey();
}

@Override
public void close() {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public static record SnapshotKey(long ledgerId, long entryId) {}

private final AtomicLong numberDelayedMessages = new AtomicLong(0);

private final AtomicLong lastDelayedMessageTimestamp = new AtomicLong(0);

// Thread safety locks
private final StampedLock stampedLock = new StampedLock();

Expand Down Expand Up @@ -235,10 +237,13 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT
MutableLong numberDelayedMessages = new MutableLong(0);
immutableBucketMap.values().forEach(bucket -> {
numberDelayedMessages.add(bucket.numberBucketDelayedMessages);
updateLastTimestamp(bucket.getLastScheduleTimestamp());
});

log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}",
dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue());
log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, "
+ "numberDelayedMessages: {}, lastDelayedMessageTimestamp: {}",
dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue(),
lastDelayedMessageTimestamp.get());

return numberDelayedMessages.getValue();
}
Expand Down Expand Up @@ -406,6 +411,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
}

numberDelayedMessages.incrementAndGet();
updateLastTimestamp(deliverAt);

if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
Expand Down Expand Up @@ -615,6 +621,23 @@ public long getBufferMemoryUsage() {
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity();
}

@Override
public long getLastDelayedMessageTimestamp() {
if (numberDelayedMessages.get() == 0) {
return 0;
}
return lastDelayedMessageTimestamp.get();
}

private void updateLastTimestamp(long timestamp) {
long current;
while ((current = lastDelayedMessageTimestamp.get()) < timestamp) {
if (lastDelayedMessageTimestamp.compareAndSet(current, timestamp)) {
break;
}
}
}

@Override
public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages) {
if (!checkPendingLoadDone()) {
Expand Down Expand Up @@ -750,6 +773,7 @@ public synchronized CompletableFuture<Void> clear() {
lastMutableBucket.clear();
snapshotSegmentLastIndexMap.clear();
numberDelayedMessages.set(0);
lastDelayedMessageTimestamp.set(0);
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -50,6 +51,10 @@ class ImmutableBucket extends Bucket {
@Setter
List<Long> firstScheduleTimestamps = new ArrayList<>();

@Getter
@Setter
private long lastScheduleTimestamp = 0;

ImmutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer<Void> sequencer,
BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
Expand Down Expand Up @@ -100,6 +105,12 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
this.setFirstScheduleTimestamps(firstScheduleTimestamps);

long lastTimestamp = metadataList.stream()
.mapToLong(SnapshotSegmentMetadata::getMaxScheduleTimestamp)
.max()
.orElse(0L);
this.setLastScheduleTimestamp(lastTimestamp);

return nextSnapshotEntryIndex + 1;
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void test(DelayedDeliveryTracker tracker) throws Exception {

assertFalse(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertEquals(tracker.getLastDelayedMessageTimestamp(), 50);

assertEquals(tracker.getScheduledMessages(10), Collections.emptySet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc
}

assertEquals(tracker.getNumberOfDelayedMessages(), 100);
assertEquals(tracker.getLastDelayedMessageTimestamp(), 100 * 10);

clockTime.set(1 * 10);

Expand All @@ -206,6 +207,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc
});

tracker.addMessage(101, 101, 101 * 10);
assertEquals(tracker.getLastDelayedMessageTimestamp(), 101 * 10);

tracker.close();

Expand All @@ -216,6 +218,9 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc

assertFalse(tracker2.containsMessage(101, 101));
assertEquals(tracker2.getNumberOfDelayedMessages(), 70);
// Verify lastDelayedMessageTimestamp is correctly recovered from snapshot
// Messages 31-100 remain, so the max timestamp should be 100 * 10 = 1000
assertEquals(tracker2.getLastDelayedMessageTimestamp(), 100 * 10);

clockTime.set(100 * 10);

Expand Down Expand Up @@ -453,14 +458,15 @@ public void testClear(BucketDelayedDeliveryTracker tracker)
assertEquals(tracker.getNumberOfDelayedMessages(), 1001);
assertTrue(tracker.getImmutableBuckets().asMapOfRanges().size() > 0);
assertEquals(tracker.getLastMutableBucket().size(), 1);
assertTrue(tracker.getLastDelayedMessageTimestamp() > 0);

tracker.clear().get(1, TimeUnit.MINUTES);

assertEquals(tracker.getNumberOfDelayedMessages(), 0);
assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0);
assertEquals(tracker.getLastMutableBucket().size(), 0);
assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0);

assertEquals(tracker.getLastDelayedMessageTimestamp(), 0);
tracker.close();
}
}