From 48b7f46fa9b150b835368f2921c6365365cf2b60 Mon Sep 17 00:00:00 2001 From: Dream95 Date: Wed, 11 Feb 2026 20:48:52 +0800 Subject: [PATCH] [improve][broker] Add lastDelayedMessageTimestamp to delayed delivery tracker --- .../delayed/DelayedDeliveryTracker.java | 10 +++++++ .../InMemoryDelayedDeliveryTracker.java | 8 ++++++ .../bucket/BucketDelayedDeliveryTracker.java | 28 +++++++++++++++++-- .../delayed/bucket/ImmutableBucket.java | 11 ++++++++ .../delayed/AbstractDeliveryTrackerTest.java | 1 + .../BucketDelayedDeliveryTrackerTest.java | 8 +++++- 6 files changed, 63 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index 7c954879fe845..9ac0c1f844216 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -56,6 +56,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable { */ long getBufferMemoryUsage(); + /** + * Get the delivery timestamp of the last delayed message. + */ + long getLastDelayedMessageTimestamp(); + /** * Get a set of position of messages that have already reached the delivery time. */ @@ -107,6 +112,11 @@ public long getBufferMemoryUsage() { return 0; } + @Override + public long getLastDelayedMessageTimestamp() { + return 0; + } + @Override public NavigableSet getScheduledMessages(int maxMessages) { return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ad5ab25fbbf6b..ecd1fe7600b28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -255,6 +255,14 @@ public long getBufferMemoryUsage() { Roaring64Bitmap::getLongSizeInBytes).sum()).sum(); } + @Override + public long getLastDelayedMessageTimestamp() { + if (delayedMessageMap.isEmpty()) { + return 0; + } + return delayedMessageMap.lastLongKey(); + } + @Override public void close() { super.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 3f0fcc516571f..fa82779b47d32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -94,6 +94,8 @@ public static record SnapshotKey(long ledgerId, long entryId) {} private final AtomicLong numberDelayedMessages = new AtomicLong(0); + private final AtomicLong lastDelayedMessageTimestamp = new AtomicLong(0); + // Thread safety locks private final StampedLock stampedLock = new StampedLock(); @@ -235,10 +237,13 @@ private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryT MutableLong numberDelayedMessages = new MutableLong(0); immutableBucketMap.values().forEach(bucket -> { numberDelayedMessages.add(bucket.numberBucketDelayedMessages); + updateLastTimestamp(bucket.getLastScheduleTimestamp()); }); - log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", - dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue()); + log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, " + + "numberDelayedMessages: {}, lastDelayedMessageTimestamp: {}", + dispatcher.getName(), immutableBucketMap.size(), numberDelayedMessages.getValue(), + lastDelayedMessageTimestamp.get()); return numberDelayedMessages.getValue(); } @@ -406,6 +411,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver } numberDelayedMessages.incrementAndGet(); + updateLastTimestamp(deliverAt); if (log.isDebugEnabled()) { log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, @@ -615,6 +621,23 @@ public long getBufferMemoryUsage() { return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); } + @Override + public long getLastDelayedMessageTimestamp() { + if (numberDelayedMessages.get() == 0) { + return 0; + } + return lastDelayedMessageTimestamp.get(); + } + + private void updateLastTimestamp(long timestamp) { + long current; + while ((current = lastDelayedMessageTimestamp.get()) < timestamp) { + if (lastDelayedMessageTimestamp.compareAndSet(current, timestamp)) { + break; + } + } + } + @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { if (!checkPendingLoadDone()) { @@ -750,6 +773,7 @@ public synchronized CompletableFuture clear() { lastMutableBucket.clear(); snapshotSegmentLastIndexMap.clear(); numberDelayedMessages.set(0); + lastDelayedMessageTimestamp.set(0); return future; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index a1944a21ea794..463516046d7e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -27,6 +27,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -50,6 +51,10 @@ class ImmutableBucket extends Bucket { @Setter List firstScheduleTimestamps = new ArrayList<>(); + @Getter + @Setter + private long lastScheduleTimestamp = 0; + ImmutableBucket(String dispatcherName, ManagedCursor cursor, FutureUtil.Sequencer sequencer, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId); @@ -100,6 +105,12 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b SnapshotSegmentMetadata::getMinScheduleTimestamp).toList(); this.setFirstScheduleTimestamps(firstScheduleTimestamps); + long lastTimestamp = metadataList.stream() + .mapToLong(SnapshotSegmentMetadata::getMaxScheduleTimestamp) + .max() + .orElse(0L); + this.setLastScheduleTimestamp(lastTimestamp); + return nextSnapshotEntryIndex + 1; }); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java index e92e37a282e85..78a4ef4b8172c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java @@ -78,6 +78,7 @@ public void test(DelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); assertEquals(tracker.getNumberOfDelayedMessages(), 5); + assertEquals(tracker.getLastDelayedMessageTimestamp(), 50); assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 6ff98fa7f7004..7518ea3e58237 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -189,6 +189,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc } assertEquals(tracker.getNumberOfDelayedMessages(), 100); + assertEquals(tracker.getLastDelayedMessageTimestamp(), 100 * 10); clockTime.set(1 * 10); @@ -206,6 +207,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc }); tracker.addMessage(101, 101, 101 * 10); + assertEquals(tracker.getLastDelayedMessageTimestamp(), 101 * 10); tracker.close(); @@ -216,6 +218,9 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) throws Exc assertFalse(tracker2.containsMessage(101, 101)); assertEquals(tracker2.getNumberOfDelayedMessages(), 70); + // Verify lastDelayedMessageTimestamp is correctly recovered from snapshot + // Messages 31-100 remain, so the max timestamp should be 100 * 10 = 1000 + assertEquals(tracker2.getLastDelayedMessageTimestamp(), 100 * 10); clockTime.set(100 * 10); @@ -453,6 +458,7 @@ public void testClear(BucketDelayedDeliveryTracker tracker) assertEquals(tracker.getNumberOfDelayedMessages(), 1001); assertTrue(tracker.getImmutableBuckets().asMapOfRanges().size() > 0); assertEquals(tracker.getLastMutableBucket().size(), 1); + assertTrue(tracker.getLastDelayedMessageTimestamp() > 0); tracker.clear().get(1, TimeUnit.MINUTES); @@ -460,7 +466,7 @@ public void testClear(BucketDelayedDeliveryTracker tracker) assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 0); assertEquals(tracker.getLastMutableBucket().size(), 0); assertEquals(tracker.getSharedBucketPriorityQueue().size(), 0); - + assertEquals(tracker.getLastDelayedMessageTimestamp(), 0); tracker.close(); } }