From fc876b729e3689dea4c293cdb3127f39d6ebdc00 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 30 Oct 2025 20:20:22 +0800 Subject: [PATCH 1/2] [feat][broker] Implement topic-level delayed delivery tracking with in-memory manager --- ...InMemoryDelayedDeliveryTrackerFactory.java | 30 +- ...oryTopicDelayedDeliveryTrackerManager.java | 473 ++++++++++++++++++ ...MemoryTopicDelayedDeliveryTrackerView.java | 120 +++++ .../TopicDelayedDeliveryTrackerManager.java | 74 +++ .../DelayedDeliveryTrackerFactoryTest.java | 6 +- 5 files changed, 696 insertions(+), 7 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index f8b8f5a8ba459..555c730465c69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -22,6 +22,8 @@ import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -40,6 +42,9 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra private long fixedDelayDetectionLookahead; + // Cache of topic-level managers: topic name -> manager instance + private final ConcurrentMap topicManagers = new ConcurrentHashMap<>(); + @Override public void initialize(PulsarService pulsarService) { ServiceConfiguration config = pulsarService.getConfig(); @@ -54,7 +59,7 @@ public void initialize(PulsarService pulsarService) { public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) { String topicName = dispatcher.getTopic().getName(); String subscriptionName = dispatcher.getSubscription().getName(); - DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; + DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; try { tracker = newTracker0(dispatcher); } catch (Exception e) { @@ -66,13 +71,30 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon } @VisibleForTesting - InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { - return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, - isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead); + DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + + // Get or create topic-level manager for this topic + TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, + k -> new InMemoryTopicDelayedDeliveryTrackerManager(timer, tickTimeMillis, + isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead)); + + // Create a per-subscription view from the topic-level manager + return manager.createOrGetView(dispatcher); } @Override public void close() { + // Close all topic-level managers + for (TopicDelayedDeliveryTrackerManager manager : topicManagers.values()) { + try { + manager.close(); + } catch (Exception e) { + log.warn("Failed to close topic-level delayed delivery manager", e); + } + } + topicManagers.clear(); + if (timer != null) { timer.stop(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java new file mode 100644 index 0000000000000..fa0575f393502 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + private final Long2ObjectSortedMap> delayedMessageMap = + new Long2ObjectRBTreeMap<>(); + + // Subscription registry: subscription name -> subscription context + private final Map subscriptionContexts = new HashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + + // Configuration + private final long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Timestamp precision for memory optimization + private final int timestampPrecisionBitCnt; + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private final long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private Position markDeletePosition; + + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + return isDelayedDeliveryDeliverAtTimeStrict ? System.currentTimeMillis() : + System.currentTimeMillis() + tickTimeMillis; + } + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(tickTimeMillis); + } + + private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) { + int bitCnt = 0; + while (tickTimeMillis > 0) { + tickTimeMillis >>= 1; + bitCnt++; + } + return bitCnt > 0 ? bitCnt - 1 : 0; + } + + private static long trimLowerBit(long timestamp, int bits) { + return timestamp & (-1L << bits); + } + + @Override + public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + synchronized (this) { + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead)); + + return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); + } + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + synchronized (this) { + subscriptionContexts.remove(subscriptionName); + + // If no more subscriptions, close the manager + if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) { + close(); + } + } + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + // For now, tick time updates are not supported after initialization + // This could be enhanced to update all subscription contexts + log.warn("Tick time updates are not currently supported for topic-level delayed delivery managers"); + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + synchronized (this) { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + delayedMessageMap.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + } + + // Internal methods for subscription views + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + synchronized (this) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); + Long2ObjectSortedMap ledgerMap = delayedMessageMap.computeIfAbsent( + timestamp, k -> new Long2ObjectRBTreeMap<>()); + Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + + // Check if this entry already exists (deduplication) + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + updateBufferMemoryEstimate(); + } + + updateTimer(); + return true; + } + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + synchronized (this) { + if (delayedMessageMap.isEmpty()) { + return false; + } + + long cutoffTime = subContext.getCutoffTime(); + long firstTimestamp = delayedMessageMap.firstLongKey(); + + if (firstTimestamp > cutoffTime) { + return false; + } + + // Quick check: if there's any message in the earliest time bucket that's after mark delete + Long2ObjectSortedMap ledgerMap = delayedMessageMap.get(firstTimestamp); + if (ledgerMap != null) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete == null) { + return true; // No mark delete means all messages are available + } + + for (var entry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = entry.getLongKey(); + Roaring64Bitmap entryIds = entry.getValue(); + + if (ledgerId > markDelete.getLedgerId()) { + return true; + } else if (ledgerId == markDelete.getLedgerId()) { + // Check if there are any entry IDs after mark delete + if (entryIds.stream().anyMatch(entryId -> entryId > markDelete.getEntryId())) { + return true; + } + } + } + } + + return false; + } + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + synchronized (this) { + int remaining = maxMessages; + NavigableSet positions = new TreeSet<>(); + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Iterate through time buckets + var iterator = delayedMessageMap.long2ObjectEntrySet().iterator(); + while (iterator.hasNext() && remaining > 0) { + var timeEntry = iterator.next(); + long timestamp = timeEntry.getLongKey(); + + if (timestamp > cutoffTime) { + break; + } + + Long2ObjectSortedMap ledgerMap = timeEntry.getValue(); + + // Iterate through ledgers in this time bucket + var ledgerIterator = ledgerMap.long2ObjectEntrySet().iterator(); + while (ledgerIterator.hasNext() && remaining > 0) { + var ledgerEntry = ledgerIterator.next(); + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + + // Filter entries based on mark delete position + long[] entryIdArray = entryIds.toArray(); + for (long entryId : entryIdArray) { + if (remaining <= 0) { + break; + } + + // Skip entries that are before or at mark delete + if (markDelete != null) { + if (ledgerId < markDelete.getLedgerId()) { + continue; + } + if (ledgerId == markDelete.getLedgerId() && entryId <= markDelete.getEntryId()) { + continue; + } + } + + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } + + // Note: We don't remove messages from the global index here + // Pruning will be handled separately based on min mark delete across all subscriptions + + return positions; + } + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Simplified implementation - could be enhanced with fixed delay detection + return fixedDelayDetectionLookahead > 0 + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= fixedDelayDetectionLookahead + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + synchronized (this) { + subContext.updateMarkDeletePosition(position); + // Trigger pruning if needed + pruneByMinMarkDelete(); + } + } + + // Private helper methods + + private void updateTimer() { + if (delayedMessageMap.isEmpty()) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + + long nextDeliveryTime = delayedMessageMap.firstLongKey(); + if (nextDeliveryTime == currentTimeoutTarget) { + return; + } + + if (timeout != null) { + timeout.cancel(); + } + + long now = clock.millis(); + long delayMillis = nextDeliveryTime - now; + + if (delayMillis < 0) { + // Messages are ready, but we don't need to keep retriggering + return; + } + + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + } + + private void updateBufferMemoryEstimate() { + // Simplified memory estimation + long estimatedBytes = delayedMessageMap.values().stream() + .mapToLong(ledgerMap -> ledgerMap.values().stream() + .mapToLong(Roaring64Bitmap::getLongSizeInBytes).sum()) + .sum(); + bufferMemoryBytes.set(estimatedBytes); + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete != null) { + if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 0) { + minMarkDelete = markDelete; + } + } + } + + if (minMarkDelete == null) { + return; + } + + // Prune entries that are before min mark delete + var iterator = delayedMessageMap.long2ObjectEntrySet().iterator(); + while (iterator.hasNext()) { + var timeEntry = iterator.next(); + Long2ObjectSortedMap ledgerMap = timeEntry.getValue(); + + var ledgerIterator = ledgerMap.long2ObjectEntrySet().iterator(); + while (ledgerIterator.hasNext()) { + var ledgerEntry = ledgerIterator.next(); + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + + if (ledgerId < minMarkDelete.getLedgerId()) { + // Entire ledger can be removed + delayedMessagesCount.addAndGet(-entryIds.getLongCardinality()); + ledgerIterator.remove(); + } else if (ledgerId == minMarkDelete.getLedgerId()) { + // Remove entries <= mark delete entry ID + long removedCount = 0; + var entryIterator = entryIds.iterator(); + while (entryIterator.hasNext()) { + long entryId = entryIterator.next(); + if (entryId <= minMarkDelete.getEntryId()) { + entryIterator.remove(); + removedCount++; + } + } + delayedMessagesCount.addAndGet(-removedCount); + + if (entryIds.isEmpty()) { + ledgerIterator.remove(); + } + } + } + + if (ledgerMap.isEmpty()) { + iterator.remove(); + } + } + + updateBufferMemoryEstimate(); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (timeout == null || timeout.isCancelled()) { + return; + } + + synchronized (this) { + currentTimeoutTarget = -1; + this.timeout = null; + + // Trigger read more entries for all subscriptions + for (SubContext subContext : subscriptionContexts.values()) { + subContext.getDispatcher().readMoreEntriesAsync(); + } + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java new file mode 100644 index 0000000000000..c7af60d50924b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerView.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import java.util.NavigableSet; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; + +/** + * View object for a subscription that implements DelayedDeliveryTracker interface. + * This view forwards all operations to the topic-level manager while maintaining + * compatibility with existing dispatcher logic. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerView implements DelayedDeliveryTracker { + + private final InMemoryTopicDelayedDeliveryTrackerManager manager; + private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext; + private boolean closed = false; + + public InMemoryTopicDelayedDeliveryTrackerView(InMemoryTopicDelayedDeliveryTrackerManager manager, + InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) { + this.manager = manager; + this.subContext = subContext; + } + + @Override + public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { + checkClosed(); + return manager.addMessageForSub(subContext, ledgerId, entryId, deliveryAt); + } + + @Override + public boolean hasMessageAvailable() { + checkClosed(); + return manager.hasMessageAvailableForSub(subContext); + } + + @Override + public long getNumberOfDelayedMessages() { + checkClosed(); + // Return an estimate of visible delayed messages for this subscription + // For now, return the total count - could be enhanced to count only visible messages + return manager.topicDelayedMessages(); + } + + @Override + public long getBufferMemoryUsage() { + checkClosed(); + // Return the topic-level memory usage (shared by all subscriptions) + return manager.topicBufferMemoryBytes(); + } + + @Override + public NavigableSet getScheduledMessages(int maxMessages) { + checkClosed(); + return manager.getScheduledMessagesForSub(subContext, maxMessages); + } + + @Override + public boolean shouldPauseAllDeliveries() { + checkClosed(); + return manager.shouldPauseAllDeliveriesForSub(subContext); + } + + @Override + public void resetTickTime(long tickTime) { + checkClosed(); + manager.onTickTimeUpdated(tickTime); + } + + @Override + public CompletableFuture clear() { + checkClosed(); + // For topic-level manager, clear is a no-op for individual subscriptions + manager.clearForSub(); + return CompletableFuture.completedFuture(null); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + manager.unregister(subContext.getDispatcher()); + } + + /** + * Update the mark delete position for this subscription. + * This is called by the dispatcher when messages are acknowledged. + */ + public void updateMarkDeletePosition(Position position) { + checkClosed(); + manager.updateMarkDeletePosition(subContext, position); + } + + private void checkClosed() { + if (closed) { + throw new IllegalStateException("DelayedDeliveryTracker is already closed"); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java new file mode 100644 index 0000000000000..707bcefc76a53 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; + +/** + * Manager interface for topic-level delayed delivery tracking. + * This interface provides a unified abstraction for managing delayed delivery at the topic level, + * allowing different implementations (in-memory, bucket-based) to share the same contract. + *

+ * The manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, and provides per-subscription "view" objects that implement DelayedDeliveryTracker + * interface for compatibility with existing dispatcher logic. + */ +public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable { + + /** + * Create or get a delayed delivery tracker view for the specified subscription. + * + * @param dispatcher the dispatcher instance for the subscription + * @return a DelayedDeliveryTracker view for the subscription + */ + DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher); + + /** + * Unregister a subscription from the manager. + * + * @param dispatcher the dispatcher instance to unregister + */ + void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher); + + /** + * Update the tick time configuration for the topic. + * + * @param newTickTimeMillis the new tick time in milliseconds + */ + void onTickTimeUpdated(long newTickTimeMillis); + + /** + * Get the total memory usage of the topic-level delayed message index. + * + * @return memory usage in bytes + */ + long topicBufferMemoryBytes(); + + /** + * Get the total number of delayed messages in the topic-level index. + * + * @return number of delayed messages + */ + long topicDelayedMessages(); + + /** + * Close the manager and release all resources. + */ + void close(); +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java index f43b5495fac25..f01391c770d47 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTrackerFactoryTest.java @@ -81,7 +81,7 @@ public void testFallbackToInMemoryTracker() throws Exception { // the factory should be fallback to InMemoryDelayedDeliveryTrackerFactory @Cleanup DelayedDeliveryTracker tracker = brokerService.getDelayedDeliveryTrackerFactory().newTracker(dispatcher); - Assert.assertTrue(tracker instanceof InMemoryDelayedDeliveryTracker); + Assert.assertTrue(tracker instanceof InMemoryTopicDelayedDeliveryTrackerView); DelayedDeliveryTrackerFactory fallbackFactory = brokerService.getFallbackDelayedDeliveryTrackerFactory(); Assert.assertTrue(fallbackFactory instanceof InMemoryDelayedDeliveryTrackerFactory); @@ -223,11 +223,11 @@ public void testPublishDelayMessagesAndCreateBucketDelayDeliveryTrackerFailed() }); Optional optional = reference.get(); - Assert.assertTrue(optional.get() instanceof InMemoryDelayedDeliveryTracker); + Assert.assertTrue(optional.get() instanceof InMemoryTopicDelayedDeliveryTrackerView); // Mock DelayedDeliveryTracker and Count the number of addMessage() calls AtomicInteger counter = new AtomicInteger(0); - InMemoryDelayedDeliveryTracker tracker = (InMemoryDelayedDeliveryTracker) optional.get(); + InMemoryTopicDelayedDeliveryTrackerView tracker = (InMemoryTopicDelayedDeliveryTrackerView) optional.get(); tracker = Mockito.spy(tracker); Mockito.doAnswer(inv -> { counter.incrementAndGet(); From dee4363495a5a84edb0919cdb378b1d5b63eefa7 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 1 Nov 2025 14:38:11 +0800 Subject: [PATCH 2/2] feat[broker] Enhance InMemoryTopicDelayedDeliveryTrackerManager with fixed-delay detection and memory optimization --- ...oryTopicDelayedDeliveryTrackerManager.java | 152 +++++--- .../InMemoryTopicDeliveryTrackerTest.java | 341 ++++++++++++++++++ 2 files changed, 452 insertions(+), 41 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java index fa0575f393502..5e603b1c9b4b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java @@ -55,9 +55,11 @@ public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedD private final Timer timer; private Timeout timeout; private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; // Configuration - private final long tickTimeMillis; + private long tickTimeMillis; private final boolean isDelayedDeliveryDeliverAtTimeStrict; private final long fixedDelayDetectionLookahead; private final Clock clock; @@ -66,8 +68,12 @@ public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedD private final AtomicLong delayedMessagesCount = new AtomicLong(0); private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + // Fixed-delay detection (parity with legacy behavior) + private long highestDeliveryTimeTracked = 0; + private boolean messagesHaveFixedDelay = true; + // Timestamp precision for memory optimization - private final int timestampPrecisionBitCnt; + private int timestampPrecisionBitCnt; /** * Subscription context that holds per-subscription state. @@ -76,18 +82,21 @@ public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedD static class SubContext { private final AbstractPersistentDispatcherMultipleConsumers dispatcher; private final String subscriptionName; - private final long tickTimeMillis; + private long tickTimeMillis; private final boolean isDelayedDeliveryDeliverAtTimeStrict; private final long fixedDelayDetectionLookahead; + private final Clock clock; private Position markDeletePosition; SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, - boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) { + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { this.dispatcher = dispatcher; this.subscriptionName = dispatcher.getSubscription().getName(); this.tickTimeMillis = tickTimeMillis; this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; } void updateMarkDeletePosition(Position position) { @@ -95,8 +104,8 @@ void updateMarkDeletePosition(Position position) { } long getCutoffTime() { - return isDelayedDeliveryDeliverAtTimeStrict ? System.currentTimeMillis() : - System.currentTimeMillis() + tickTimeMillis; + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; } } @@ -138,7 +147,7 @@ public DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultip synchronized (this) { SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, - fixedDelayDetectionLookahead)); + fixedDelayDetectionLookahead, clock)); return new InMemoryTopicDelayedDeliveryTrackerView(this, subContext); } @@ -150,9 +159,11 @@ public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) synchronized (this) { subscriptionContexts.remove(subscriptionName); - - // If no more subscriptions, close the manager - if (subscriptionContexts.isEmpty() && delayedMessageMap.isEmpty()) { + // If no more subscriptions, proactively free index and close the manager to release memory + if (subscriptionContexts.isEmpty()) { + delayedMessageMap.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); close(); } } @@ -160,9 +171,24 @@ public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) @Override public void onTickTimeUpdated(long newTickTimeMillis) { - // For now, tick time updates are not supported after initialization - // This could be enhanced to update all subscription contexts - log.warn("Tick time updates are not currently supported for topic-level delayed delivery managers"); + synchronized (this) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Update precision bits for new tick time (accept old/new buckets co-exist) + this.timestampPrecisionBitCnt = calculateTimestampPrecisionBitCnt(newTickTimeMillis); + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + updateTimer(); + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", + newTickTimeMillis); + } + } } @Override @@ -205,23 +231,36 @@ boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, lon timestamp, k -> new Long2ObjectRBTreeMap<>()); Roaring64Bitmap entryIds = ledgerMap.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); - // Check if this entry already exists (deduplication) - if (!entryIds.contains(entryId)) { + // Incremental memory accounting: measure size delta on change + long before = entryIds.getLongSizeInBytes(); + boolean existed = entryIds.contains(entryId); + if (!existed) { entryIds.add(entryId); delayedMessagesCount.incrementAndGet(); - updateBufferMemoryEstimate(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); } updateTimer(); + // Update global fixed-delay detection + checkAndUpdateHighest(deliverAt); return true; } } + private void checkAndUpdateHighest(long deliverAt) { + if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); + } + /** * Check if there are messages available for a subscription. */ boolean hasMessageAvailableForSub(SubContext subContext) { synchronized (this) { + refreshMarkDeletePosition(subContext); if (delayedMessageMap.isEmpty()) { return false; } @@ -265,6 +304,7 @@ boolean hasMessageAvailableForSub(SubContext subContext) { */ NavigableSet getScheduledMessagesForSub(SubContext subContext, int maxMessages) { synchronized (this) { + refreshMarkDeletePosition(subContext); int remaining = maxMessages; NavigableSet positions = new TreeSet<>(); long cutoffTime = subContext.getCutoffTime(); @@ -289,18 +329,18 @@ NavigableSet getScheduledMessagesForSub(SubContext subContext, int max long ledgerId = ledgerEntry.getLongKey(); Roaring64Bitmap entryIds = ledgerEntry.getValue(); - // Filter entries based on mark delete position - long[] entryIdArray = entryIds.toArray(); - for (long entryId : entryIdArray) { - if (remaining <= 0) { - break; - } + // Fast skip if entire ledger is before mark-delete + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + + // Iterate over entry ids without materializing array + var it = entryIds.iterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); // Skip entries that are before or at mark delete if (markDelete != null) { - if (ledgerId < markDelete.getLedgerId()) { - continue; - } if (ledgerId == markDelete.getLedgerId() && entryId <= markDelete.getEntryId()) { continue; } @@ -312,8 +352,8 @@ NavigableSet getScheduledMessagesForSub(SubContext subContext, int max } } - // Note: We don't remove messages from the global index here - // Pruning will be handled separately based on min mark delete across all subscriptions + // Prune global index based on min mark-delete across all subscriptions + pruneByMinMarkDelete(); return positions; } @@ -323,9 +363,10 @@ NavigableSet getScheduledMessagesForSub(SubContext subContext, int max * Check if deliveries should be paused for a subscription. */ boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { - // Simplified implementation - could be enhanced with fixed delay detection - return fixedDelayDetectionLookahead > 0 - && getNumberOfVisibleDelayedMessagesForSub(subContext) >= fixedDelayDetectionLookahead + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() && !hasMessageAvailableForSub(subContext); } @@ -372,21 +413,20 @@ private void updateTimer() { long delayMillis = nextDeliveryTime - now; if (delayMillis < 0) { - // Messages are ready, but we don't need to keep retriggering + // Messages are ready; avoid retriggering timer, dispatcher will pick them on next read return; } + // Align with tick window like AbstractDelayedDeliveryTracker + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; - timeout = timer.newTimeout(this, delayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); + timeout = timer.newTimeout(this, calculatedDelayMillis, java.util.concurrent.TimeUnit.MILLISECONDS); } private void updateBufferMemoryEstimate() { - // Simplified memory estimation - long estimatedBytes = delayedMessageMap.values().stream() - .mapToLong(ledgerMap -> ledgerMap.values().stream() - .mapToLong(Roaring64Bitmap::getLongSizeInBytes).sum()) - .sum(); - bufferMemoryBytes.set(estimatedBytes); + // No-op in incremental mode (kept for compatibility) } private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { @@ -425,11 +465,14 @@ private void pruneByMinMarkDelete() { if (ledgerId < minMarkDelete.getLedgerId()) { // Entire ledger can be removed + long bytes = entryIds.getLongSizeInBytes(); delayedMessagesCount.addAndGet(-entryIds.getLongCardinality()); + bufferMemoryBytes.addAndGet(-bytes); ledgerIterator.remove(); } else if (ledgerId == minMarkDelete.getLedgerId()) { // Remove entries <= mark delete entry ID long removedCount = 0; + long before = entryIds.getLongSizeInBytes(); var entryIterator = entryIds.iterator(); while (entryIterator.hasNext()) { long entryId = entryIterator.next(); @@ -438,7 +481,9 @@ private void pruneByMinMarkDelete() { removedCount++; } } + long after = entryIds.getLongSizeInBytes(); delayedMessagesCount.addAndGet(-removedCount); + bufferMemoryBytes.addAndGet(after - before); if (entryIds.isEmpty()) { ledgerIterator.remove(); @@ -460,14 +505,39 @@ public void run(Timeout timeout) throws Exception { return; } + java.util.ArrayList toTrigger = new java.util.ArrayList<>(); synchronized (this) { currentTimeoutTarget = -1; this.timeout = null; + lastTickRun = clock.millis(); - // Trigger read more entries for all subscriptions + // Decide which dispatchers to trigger while holding the lock for (SubContext subContext : subscriptionContexts.values()) { - subContext.getDispatcher().readMoreEntriesAsync(); + if (hasMessageAvailableForSub(subContext)) { + toTrigger.add(subContext.getDispatcher()); + } + } + } + // Invoke callbacks outside the manager lock to reduce contention + for (AbstractPersistentDispatcherMultipleConsumers d : toTrigger) { + d.readMoreEntriesAsync(); + } + } + + private void refreshMarkDeletePosition(SubContext subContext) { + try { + Position pos = subContext.getDispatcher().getCursor().getMarkDeletedPosition(); + if (pos != null) { + Position current = subContext.getMarkDeletePosition(); + if (current == null || pos.compareTo(current) > 0) { + subContext.updateMarkDeletePosition(pos); + } + } + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("Failed to refresh mark-delete position for subscription {}", + subContext.getSubscriptionName(), t); } } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java new file mode 100644 index 0000000000000..9b6b1d3481412 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.mockito.stubbing.Answer; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InMemoryTopicDeliveryTrackerTest { + + private static class TestEnv { + final Timer timer; + final NavigableMap tasks; + final Clock clock; + final AtomicLong time; + + TestEnv() { + this.tasks = new TreeMap<>(); + this.time = new AtomicLong(0L); + this.clock = mock(Clock.class); + when(clock.millis()).then((Answer) invocation -> time.get()); + + this.timer = mock(Timer.class); + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgument(0, TimerTask.class); + long timeout = invocation.getArgument(1, Long.class); + TimeUnit unit = invocation.getArgument(2, TimeUnit.class); + long scheduleAt = time.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + when(t.isCancelled()).thenReturn(false); + return t; + }); + } + } + + private static AbstractPersistentDispatcherMultipleConsumers newDispatcher(String subName, ManagedCursor cursor) + throws Exception { + AbstractPersistentDispatcherMultipleConsumers dispatcher = + mock(AbstractPersistentDispatcherMultipleConsumers.class); + Subscription subscription = mock(Subscription.class); + when(subscription.getName()).thenReturn(subName); + when(dispatcher.getSubscription()).thenReturn(subscription); + when(dispatcher.getCursor()).thenReturn(cursor); + return dispatcher; + } + + @Test + public void testSingleSubscriptionBasicFlow() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + boolean strict = true; + long lookahead = 10; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, strict, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + assertFalse(view.hasMessageAvailable()); + + // Add 3 messages in the future + env.time.set(1000); + assertTrue(view.addMessage(1, 1, 1200)); + assertTrue(view.addMessage(1, 2, 1300)); + assertTrue(view.addMessage(2, 1, 1400)); + + assertFalse(view.hasMessageAvailable()); + assertEquals(view.getNumberOfDelayedMessages(), 3); + + // Advance time so first 2 buckets are visible + env.time.set(1350); + assertTrue(view.hasMessageAvailable()); + NavigableSet scheduled = view.getScheduledMessages(10); + // Should include both positions from first 2 buckets + assertEquals(scheduled.size(), 2); + + // Global counter doesn't drop until mark-delete pruning + assertEquals(view.getNumberOfDelayedMessages(), 3); + + // Mark-delete beyond the scheduled positions and prune + when(cursor.getMarkDeletedPosition()).thenReturn(PositionFactory.create(1L, 2L)); + // Trigger pruning by another get + view.getScheduledMessages(10); + // Now only one entry remains in global index + assertEquals(view.getNumberOfDelayedMessages(), 1); + + // Cleanup + view.close(); + } + + @Test + public void testSharedIndexDedupAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(1000); + assertTrue(v1.addMessage(10, 20, 2000)); + // Add the same message from another subscription; should be de-duplicated in global index + assertTrue(v2.addMessage(10, 20, 2000)); + + assertEquals(v1.getNumberOfDelayedMessages(), 1); + assertEquals(v2.getNumberOfDelayedMessages(), 1); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerRunTriggersOnlyAvailableSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + DelayedDeliveryTracker v1 = manager.createOrGetView(d1); + DelayedDeliveryTracker v2 = manager.createOrGetView(d2); + + env.time.set(0); + // Add two buckets. Only sub-a will have messages available based on mark-delete + assertTrue(v1.addMessage(1, 1, 500)); + assertTrue(v2.addMessage(1, 2, 500)); + + // Before cutoff + assertFalse(v1.hasMessageAvailable()); + assertFalse(v2.hasMessageAvailable()); + + // Set time after cutoff and set sub-a mark-delete behind entries, sub-b beyond entries + env.time.set(600); + when(c1.getMarkDeletedPosition()).thenReturn(PositionFactory.create(0L, 0L)); // visible + when(c2.getMarkDeletedPosition()).thenReturn(PositionFactory.create(1L, 5L)); // not visible + + // Invoke manager timer task directly + manager.run(mock(Timeout.class)); + + // Only d1 should be triggered + verify(d1, times(1)).readMoreEntriesAsync(); + verify(d2, times(0)).readMoreEntriesAsync(); + + v1.close(); + v2.close(); + } + + @Test + public void testPauseWithFixedDelays() throws Exception { + TestEnv env = new TestEnv(); + long lookahead = 5; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + InMemoryTopicDelayedDeliveryTrackerView view = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(dispatcher); + + // Add strictly increasing deliverAt times (fixed delay scenario) + env.time.set(0); + for (int i = 1; i <= lookahead; i++) { + assertTrue(view.addMessage(i, i, i * 100)); + } + assertTrue(view.shouldPauseAllDeliveries()); + + // Move time forward to make messages available -> pause should be lifted + env.time.set(lookahead * 100 + 1); + assertFalse(view.shouldPauseAllDeliveries()); + + view.close(); + } + + @Test + public void testDynamicTickTimeUpdateAffectsCutoff() throws Exception { + TestEnv env = new TestEnv(); + // non-strict mode: cutoff = now + tick + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, false, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + env.time.set(1000); + // deliverAt within current tick window -> rejected + assertFalse(view.addMessage(1, 1, 1050)); // cutoff=1100 + assertEquals(view.getNumberOfDelayedMessages(), 0); + + // shrink tick: cutoff reduces -> same deliverAt becomes accepted + view.resetTickTime(10); + assertTrue(view.addMessage(1, 1, 1050)); // cutoff=1010 + assertEquals(view.getNumberOfDelayedMessages(), 1); + + view.close(); + } + + @Test + public void testMinMarkDeleteAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + InMemoryTopicDelayedDeliveryTrackerView v1 = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d1); + InMemoryTopicDelayedDeliveryTrackerView v2 = + (InMemoryTopicDelayedDeliveryTrackerView) manager.createOrGetView(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 100)); + assertTrue(v1.addMessage(1, 2, 100)); + assertTrue(v1.addMessage(2, 1, 100)); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // c1 behind, c2 ahead + when(c1.getMarkDeletedPosition()).thenReturn(PositionFactory.create(0L, 0L)); + when(c2.getMarkDeletedPosition()).thenReturn(PositionFactory.create(10L, 10L)); + + env.time.set(200); + // Trigger v2 read + prune attempt; min mark-delete still from c1 => no prune + v2.getScheduledMessages(10); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // Advance c1 mark-delete beyond (1,2) + v1.updateMarkDeletePosition(PositionFactory.create(1L, 2L)); + v1.getScheduledMessages(10); + // Now only (2,1) should remain + assertEquals(v1.getNumberOfDelayedMessages(), 1); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerSchedulingWindowAlignment() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 1000; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker view = manager.createOrGetView(dispatcher); + + // Establish lastTickRun via a manual run at t=10000 + env.time.set(10000); + manager.run(mock(Timeout.class)); + + // Add with deliverAt=10001, but tick window alignment should schedule at >= 11000 + assertTrue(view.addMessage(1, 1, 10001)); + long scheduledAt = env.tasks.firstKey(); + assertTrue(scheduledAt >= 11000, "scheduledAt=" + scheduledAt); + + // If no recent tick run, deliverAt should determine + env.tasks.clear(); + env.time.set(20000); + // No run -> lastTickRun remains 10000; deliverAt=20005 < lastTickRun+tick(11000)? no, so schedule at deliverAt + assertTrue(view.addMessage(1, 2, 20005)); + long scheduledAt2 = env.tasks.firstKey(); + assertEquals(scheduledAt2, 20005); + + view.close(); + } + + @Test + public void testBufferMemoryUsageAndCleanup() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("sub-a", c); + DelayedDeliveryTracker v = manager.createOrGetView(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + assertTrue(v.getBufferMemoryUsage() > 0); + + v.close(); + // After last subscription closes, manager should clear index and memory + assertEquals(manager.topicDelayedMessages(), 0); + assertEquals(manager.topicBufferMemoryBytes(), 0); + } +}