diff --git a/conf/broker.conf b/conf/broker.conf index 8fd0e18af3696..dc3b055682038 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -654,6 +654,9 @@ delayedDeliveryEnabled=true # Class name of the factory that implements the delayed deliver tracker. # If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", # will create bucket based delayed message index tracker. +# If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", +# will use a topic-level in-memory delayed message index tracker (a distinct implementation that shares +# the index across subscriptions to reduce memory usage in multi-subscription scenarios). delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory # Control the tick time for when retrying on delayed delivery, @@ -669,6 +672,24 @@ delayedDeliveryTickTimeMillis=1000 # delayedDeliveryTickTimeMillis. isDelayedDeliveryDeliverAtTimeStrict=false +# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis. +# Set to 0 or a negative value to use the default adaptive interval. +delayedDeliveryPruneMinIntervalMillis=0 + +# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic +# prune in the in-memory topic-level delayed delivery tracker. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5. +delayedDeliveryPruneEligibleRatio=0.5 + +# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle +# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default). +delayedDeliveryTopicManagerIdleMillis=0 + # The delayed message index bucket min index count. # When the index count of the current bucket is more than this value and all message indexes of current ledger # have already been added to the tracker we will seal the bucket. diff --git a/conf/standalone.conf b/conf/standalone.conf index 708d4905b8ab3..1e1f5c64d8825 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1422,6 +1422,9 @@ delayedDeliveryEnabled=true # Class name of the factory that implements the delayed deliver tracker. # If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", # will create bucket based delayed message index tracker. +# If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", +# will use a topic-level in-memory delayed message index tracker (a distinct implementation that shares +# the index across subscriptions to reduce memory usage in multi-subscription scenarios). delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory # Control the tick time for when retrying on delayed delivery, @@ -1437,6 +1440,24 @@ delayedDeliveryTickTimeMillis=1000 # delayedDeliveryTickTimeMillis. isDelayedDeliveryDeliverAtTimeStrict=false +# Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed delivery tracker. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# Set to a positive value to override the default adaptive interval based on delayedDeliveryTickTimeMillis. +# Set to 0 or a negative value to use the default adaptive interval. +delayedDeliveryPruneMinIntervalMillis=0 + +# The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an opportunistic +# prune in the in-memory topic-level delayed delivery tracker. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# For example, 0.5 means prune when at least half of the subscriptions are eligible. Default is 0.5. +delayedDeliveryPruneEligibleRatio=0.5 + +# Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager. +# (org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory) +# When the last subscription is unregistered, the manager will be removed from the factory cache after this idle +# timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove immediately (default). +delayedDeliveryTopicManagerIdleMillis=0 + # The delayed message index bucket min index count. # When the index count of the current bucket is more than this value and all message indexes of current ledger # have already been added to the tracker we will seal the bucket. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 30fef55ece3bd..81335ff4975e0 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -370,7 +370,11 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_SERVER, doc = """ Class name of the factory that implements the delayed deliver tracker. If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", \ - will create bucket based delayed message index tracker. + will create bucket based delayed message index tracker.\n + If value is "org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory", \ + will create topic-level in-memory delayed message index tracker.\n + If value is "org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory", \ + will create in-memory delayed delivery tracker (per existing implementation). """) private String delayedDeliveryTrackerFactoryClassName = "org.apache.pulsar.broker.delayed" + ".InMemoryDelayedDeliveryTrackerFactory"; @@ -417,6 +421,28 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, + "logic to handle fixed delays in messages in a different way.") private long delayedDeliveryFixedDelayDetectionLookahead = 50_000; + @FieldContext(category = CATEGORY_SERVER, doc = """ + Minimum interval (in milliseconds) between prune attempts within the in-memory topic-level delayed + delivery tracker. Set to a positive value to override the default adaptive interval based on + delayedDeliveryTickTimeMillis. Set to 0 or a negative value to use the default adaptive interval. + """) + private long delayedDeliveryPruneMinIntervalMillis = 0; + + @FieldContext(category = CATEGORY_SERVER, doc = """ + The ratio [0.0, 1.0] of subscriptions that need to be eligible for delivery in order to trigger an + opportunistic prune in the in-memory topic-level delayed delivery tracker. For example, 0.5 means prune + when at least half of the subscriptions are eligible. Default is 0.5. + """) + private double delayedDeliveryPruneEligibleRatio = 0.5; + + @FieldContext(category = CATEGORY_SERVER, doc = """ + Idle timeout (in milliseconds) for the topic-level in-memory delayed delivery tracker manager. When the + last subscription is unregistered, the manager will be removed from the factory cache after this idle + timeout, provided no new subscriptions have been registered in the meantime. Set to 0 to remove + immediately (default). + """) + private long delayedDeliveryTopicManagerIdleMillis = 0; + @FieldContext(category = CATEGORY_SERVER, doc = """ The max allowed delay for delayed delivery (in milliseconds). If the broker receives a message which \ exceeds this max delay, then it will return an error to the producer. \ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java index f8b8f5a8ba459..c8137d1ed917c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java @@ -54,7 +54,7 @@ public void initialize(PulsarService pulsarService) { public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) { String topicName = dispatcher.getTopic().getName(); String subscriptionName = dispatcher.getSubscription().getName(); - DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; + DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; try { tracker = newTracker0(dispatcher); } catch (Exception e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTracker.java new file mode 100644 index 0000000000000..09f5bd8db8351 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTracker.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import java.util.NavigableSet; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; + +/** + * Subscription-scoped tracker implementing {@link DelayedDeliveryTracker} by delegating all + * operations to the topic-level {@link InMemoryTopicDelayedDeliveryTrackerManager}. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTracker implements DelayedDeliveryTracker { + + private final InMemoryTopicDelayedDeliveryTrackerManager manager; + private final InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext; + private volatile boolean closed = false; + + public InMemoryTopicDelayedDeliveryTracker(InMemoryTopicDelayedDeliveryTrackerManager manager, + InMemoryTopicDelayedDeliveryTrackerManager.SubContext subContext) { + this.manager = manager; + this.subContext = subContext; + } + + @Override + public boolean addMessage(long ledgerId, long entryId, long deliveryAt) { + checkClosed(); + return manager.addMessageForSub(subContext, ledgerId, entryId, deliveryAt); + } + + @Override + public boolean hasMessageAvailable() { + checkClosed(); + return manager.hasMessageAvailableForSub(subContext); + } + + @Override + public long getNumberOfDelayedMessages() { + checkClosed(); + // Return an estimate of visible delayed messages for this subscription + // For now, return the total count - could be enhanced to count only visible messages + return manager.topicDelayedMessages(); + } + + @Override + public long getBufferMemoryUsage() { + checkClosed(); + // Return the topic-level memory usage (shared by all subscriptions) + return manager.topicBufferMemoryBytes(); + } + + @Override + public NavigableSet getScheduledMessages(int maxMessages) { + checkClosed(); + return manager.getScheduledMessagesForSub(subContext, maxMessages); + } + + @Override + public boolean shouldPauseAllDeliveries() { + checkClosed(); + return manager.shouldPauseAllDeliveriesForSub(subContext); + } + + @Override + public void resetTickTime(long tickTime) { + checkClosed(); + manager.onTickTimeUpdated(tickTime); + } + + @Override + public CompletableFuture clear() { + checkClosed(); + // For topic-level manager, clear is a no-op for individual subscriptions + manager.clearForSub(); + return CompletableFuture.completedFuture(null); + } + + @Override + public void close() { + if (closed) { + return; + } + closed = true; + manager.unregister(subContext.getDispatcher()); + } + + /** + * Update the mark delete position for this subscription. + * This is called by the dispatcher when messages are acknowledged. + */ + public void updateMarkDeletePosition(Position position) { + checkClosed(); + manager.updateMarkDeletePosition(subContext, position); + } + + private void checkClosed() { + if (closed) { + throw new IllegalStateException("DelayedDeliveryTracker is already closed"); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java new file mode 100644 index 0000000000000..9b707e799fc3b --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import com.google.common.annotations.VisibleForTesting; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.time.Clock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class InMemoryTopicDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory { + + private static final Logger log = LoggerFactory.getLogger(InMemoryTopicDelayedDeliveryTrackerFactory.class); + + private Timer timer; + + private long tickTimeMillis; + + private boolean isDelayedDeliveryDeliverAtTimeStrict; + + private long fixedDelayDetectionLookahead; + + // New tuning knobs + private long pruneMinIntervalMillis; + private double pruneEligibleRatio; + private long topicManagerIdleMillis; + + // Cache of topic-level managers: topic name -> manager instance + private final ConcurrentMap topicManagers = new ConcurrentHashMap<>(); + + @VisibleForTesting + InMemoryTopicDelayedDeliveryTrackerFactory(Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.pruneMinIntervalMillis = 0; + this.pruneEligibleRatio = 0.5; + this.topicManagerIdleMillis = 0; + } + + @VisibleForTesting + int getCachedManagersSize() { + return topicManagers.size(); + } + + @VisibleForTesting + boolean hasManagerForTopic(String topicName) { + return topicManagers.containsKey(topicName); + } + + @Override + public void initialize(PulsarService pulsarService) { + ServiceConfiguration config = pulsarService.getConfig(); + this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"), + config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS); + this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis(); + this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict(); + this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead(); + this.pruneMinIntervalMillis = config.getDelayedDeliveryPruneMinIntervalMillis(); + this.pruneEligibleRatio = config.getDelayedDeliveryPruneEligibleRatio(); + this.topicManagerIdleMillis = config.getDelayedDeliveryTopicManagerIdleMillis(); + } + + @Override + public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + String subscriptionName = dispatcher.getSubscription().getName(); + DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE; + try { + tracker = newTracker0(dispatcher); + } catch (Exception e) { + // it should never go here + log.warn("Failed to create InMemoryTopicDelayedDeliveryTracker, topic {}, subscription {}", + topicName, subscriptionName, e); + } + return tracker; + } + + @VisibleForTesting + DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String topicName = dispatcher.getTopic().getName(); + + // Get or create topic-level manager for this topic with onEmpty callback to remove from cache + final TopicDelayedDeliveryTrackerManager[] holder = new TopicDelayedDeliveryTrackerManager[1]; + TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName, k -> { + InMemoryTopicDelayedDeliveryTrackerManager m = new InMemoryTopicDelayedDeliveryTrackerManager( + timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, pruneMinIntervalMillis, pruneEligibleRatio, () -> { + if (topicManagerIdleMillis <= 0) { + topicManagers.remove(topicName, holder[0]); + } else { + timer.newTimeout(__ -> { + TopicDelayedDeliveryTrackerManager tm = holder[0]; + if (tm instanceof InMemoryTopicDelayedDeliveryTrackerManager) { + if (!((InMemoryTopicDelayedDeliveryTrackerManager) tm).hasActiveSubscriptions()) { + topicManagers.remove(topicName, tm); + } + } else { + // If the manager has been replaced or removed, ensure entry is cleaned up + topicManagers.remove(topicName, tm); + } + }, topicManagerIdleMillis, TimeUnit.MILLISECONDS); + } + }); + holder[0] = m; + return m; + }); + + // Create a per-subscription tracker from the topic-level manager + return manager.createOrGetTracker(dispatcher); + } + + @Override + public void close() { + // Close all topic-level managers + for (TopicDelayedDeliveryTrackerManager manager : topicManagers.values()) { + try { + manager.close(); + } catch (Exception e) { + log.warn("Failed to close topic-level delayed delivery manager", e); + } + } + topicManagers.clear(); + + if (timer != null) { + timer.stop(); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java new file mode 100644 index 0000000000000..1b3f954298b2d --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap; +import java.time.Clock; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.roaringbitmap.longlong.LongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; + +/** + * In-memory implementation of topic-level delayed delivery tracker manager. + * This manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, significantly reducing memory usage in multi-subscription scenarios. + */ +@Slf4j +public class InMemoryTopicDelayedDeliveryTrackerManager implements TopicDelayedDeliveryTrackerManager, TimerTask { + + // Global delayed message index: timestamp -> ledgerId -> entryId bitmap + // Outer: sorted by timestamp for efficient finding of earliest bucket + // Inner: per-ledger bitmaps of entry-ids + private final ConcurrentSkipListMap> delayedMessageMap = + new ConcurrentSkipListMap<>(); + + // Subscription registry: subscription name -> subscription context + private final ConcurrentHashMap subscriptionContexts = new ConcurrentHashMap<>(); + + // Timer for delayed delivery + private final Timer timer; + private Timeout timeout; + private long currentTimeoutTarget = -1; + // Last time the TimerTask was triggered + private long lastTickRun = 0L; + + // Configuration + private long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + + // Statistics + private final AtomicLong delayedMessagesCount = new AtomicLong(0); + private final AtomicLong bufferMemoryBytes = new AtomicLong(0); + + // Prune throttling + // Last pruning time + private final AtomicLong lastPruneNanos = new AtomicLong(0); + // Minimum interval between prunes + private final long minPruneIntervalNanos; + + // Ratio of eligible subscriptions required to opportunistically prune [0.0, 1.0] + private final double pruneEligibleRatio; + + // Fixed-delay detection (parity with legacy behavior) + private final AtomicLong highestDeliveryTimeTracked = new AtomicLong(0); + private volatile boolean messagesHaveFixedDelay = true; + + // Per-bucket locks (timestamp -> lock) for fine-grained concurrency + private final ConcurrentHashMap bucketLocks = new ConcurrentHashMap<>(); + + // Timer state guard + private final ReentrantLock timerLock = new ReentrantLock(); + + /** + * Subscription context that holds per-subscription state. + */ + @Getter + static class SubContext { + private final AbstractPersistentDispatcherMultipleConsumers dispatcher; + private final String subscriptionName; + private volatile long tickTimeMillis; + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + private final long fixedDelayDetectionLookahead; + private final Clock clock; + private volatile Position markDeletePosition; + + /** + * Constructs a new SubContext for a subscription. + * + * @param dispatcher the dispatcher associated with the subscription + * @param tickTimeMillis the tick interval in milliseconds for delayed delivery checks + * @param isDelayedDeliveryDeliverAtTimeStrict if true, delayed messages are delivered strictly at their + * scheduled time; if false, messages may be delivered in the next + * tick window + * @param fixedDelayDetectionLookahead the lookahead window (in milliseconds) used for + * detecting fixed-delay messages + * @param clock the clock instance used for time calculations + */ + SubContext(AbstractPersistentDispatcherMultipleConsumers dispatcher, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead, + Clock clock) { + this.dispatcher = dispatcher; + this.subscriptionName = dispatcher.getSubscription().getName(); + this.tickTimeMillis = tickTimeMillis; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.clock = clock; + } + + void updateMarkDeletePosition(Position position) { + this.markDeletePosition = position; + } + + long getCutoffTime() { + long now = clock.millis(); + return isDelayedDeliveryDeliverAtTimeStrict ? now : now + tickTimeMillis; + } + } + + private final Runnable onEmptyCallback; + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead) { + this(timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead, + 0, 0.5, null); + } + + public InMemoryTopicDelayedDeliveryTrackerManager(Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + long fixedDelayDetectionLookahead, + long pruneMinIntervalMillis, + double pruneEligibleRatio, + Runnable onEmptyCallback) { + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; + this.onEmptyCallback = onEmptyCallback; + // Prune throttle interval: use configured override if positive, otherwise adaptive clamp [5ms, 50ms] + long pruneMs = pruneMinIntervalMillis > 0 + ? pruneMinIntervalMillis + : Math.max(5L, Math.min(50L, tickTimeMillis)); + this.minPruneIntervalNanos = TimeUnit.MILLISECONDS.toNanos(pruneMs); + // Prune eligible ratio: clamp into [0.0, 1.0] + if (Double.isNaN(pruneEligibleRatio)) { + pruneEligibleRatio = 0.5; + } + this.pruneEligibleRatio = Math.max(0.0, Math.min(1.0, pruneEligibleRatio)); + } + + // We bucket messages by aligning the deliverAt timestamp to the start of the logical tick window: + // bucketStart = deliverAt - (deliverAt % tickTimeMillis) + // If tickTimeMillis changes over time, the same message may land in different buckets when re-added + // by another subscription. Read paths dedup via TreeSet and counts include duplicates by design. + private long bucketStart(long timestamp) { + long t = this.tickTimeMillis; + if (t <= 0) { + return timestamp; + } + long mod = timestamp % t; + if (mod == 0) { + return timestamp; + } + return timestamp - mod; + } + + @Override + public DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + SubContext subContext = subscriptionContexts.computeIfAbsent(subscriptionName, + k -> new SubContext(dispatcher, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict, + fixedDelayDetectionLookahead, clock)); + return new InMemoryTopicDelayedDeliveryTracker(this, subContext); + } + + @Override + public void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher) { + String subscriptionName = dispatcher.getSubscription().getName(); + + subscriptionContexts.remove(subscriptionName); + // If no more subscriptions, proactively free index and release memory + if (subscriptionContexts.isEmpty()) { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + if (onEmptyCallback != null) { + try { + onEmptyCallback.run(); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("onEmptyCallback failed", t); + } + } + } + } + } + + /** + * Whether there are active subscriptions registered with this manager. + */ + public boolean hasActiveSubscriptions() { + return !subscriptionContexts.isEmpty(); + } + + @Override + public void onTickTimeUpdated(long newTickTimeMillis) { + if (this.tickTimeMillis == newTickTimeMillis) { + return; + } + this.tickTimeMillis = newTickTimeMillis; + // Propagate to all subscriptions + for (SubContext sc : subscriptionContexts.values()) { + sc.tickTimeMillis = newTickTimeMillis; + } + // Re-evaluate timer scheduling with new tick time + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + if (log.isDebugEnabled()) { + log.debug("Updated tickTimeMillis for topic-level delayed delivery manager to {} ms", newTickTimeMillis); + } + } + + @Override + public long topicBufferMemoryBytes() { + return bufferMemoryBytes.get(); + } + + @Override + public long topicDelayedMessages() { + return delayedMessagesCount.get(); + } + + @Override + public void close() { + timerLock.lock(); + try { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + currentTimeoutTarget = -1; + } finally { + timerLock.unlock(); + } + delayedMessageMap.clear(); + bucketLocks.clear(); + subscriptionContexts.clear(); + delayedMessagesCount.set(0); + bufferMemoryBytes.set(0); + } + + /** + * Add a message to the global delayed message index. + */ + boolean addMessageForSub(SubContext subContext, long ledgerId, long entryId, long deliverAt) { + if (deliverAt < 0 || deliverAt <= subContext.getCutoffTime()) { + return false; + } + + long timestamp = bucketStart(deliverAt); + ReentrantLock bLock = bucketLocks.computeIfAbsent(timestamp, k -> new ReentrantLock()); + bLock.lock(); + try { + Long2ObjectRBTreeMap ledgerMap = + delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()); + Roaring64Bitmap entryIds = ledgerMap.get(ledgerId); + if (entryIds == null) { + entryIds = new Roaring64Bitmap(); + ledgerMap.put(ledgerId, entryIds); + } + long before = entryIds.getLongSizeInBytes(); + if (!entryIds.contains(entryId)) { + entryIds.add(entryId); + delayedMessagesCount.incrementAndGet(); + long after = entryIds.getLongSizeInBytes(); + bufferMemoryBytes.addAndGet(after - before); + } + } finally { + bLock.unlock(); + } + + // Timer update and fixed delay detection + timerLock.lock(); + try { + updateTimerLocked(); + } finally { + timerLock.unlock(); + } + checkAndUpdateHighest(deliverAt); + return true; + } + + private void checkAndUpdateHighest(long deliverAt) { + long current; + do { + current = highestDeliveryTimeTracked.get(); + if (deliverAt < (current - tickTimeMillis)) { + messagesHaveFixedDelay = false; + } + } while (deliverAt > current && !highestDeliveryTimeTracked.compareAndSet(current, deliverAt)); + } + + /** + * Check if there are messages available for a subscription. + */ + boolean hasMessageAvailableForSub(SubContext subContext) { + if (delayedMessageMap.isEmpty()) { + return false; + } + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry> first = delayedMessageMap.firstEntry(); + if (first == null) { + return false; + } + long cutoffTime = subContext.getCutoffTime(); + long firstTs = first.getKey(); + return firstTs <= cutoffTime; + } + + /** + * Get scheduled messages for a subscription. + */ + NavigableSet getScheduledMessagesForSub(SubContext subContext, int maxMessages) { + NavigableSet positions = new TreeSet<>(); + int remaining = maxMessages; + + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + + // Snapshot of buckets up to cutoff and iterate per-bucket with bucket locks + List tsList = new ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + if (remaining <= 0) { + break; + } + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + Long2ObjectRBTreeMap ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + if (remaining <= 0) { + break; + } + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + LongIterator it = entryIds.getLongIterator(); + while (it.hasNext() && remaining > 0) { + long entryId = it.next(); + if (markDelete != null && ledgerId == markDelete.getLedgerId() + && entryId <= markDelete.getEntryId()) { + continue; + } + positions.add(PositionFactory.create(ledgerId, entryId)); + remaining--; + } + } + } finally { + bLock.unlock(); + } + } + + // Throttled prune: attempt prune even when result is empty (mark-delete might have filtered everything) + // Throttling ensures we don't pay the cost on every call + maybePruneByTime(); + + return positions; + } + + /** + * Check if deliveries should be paused for a subscription. + */ + boolean shouldPauseAllDeliveriesForSub(SubContext subContext) { + // Parity with legacy: pause if all observed delays are fixed and backlog is large enough + return subContext.getFixedDelayDetectionLookahead() > 0 + && messagesHaveFixedDelay + && getNumberOfVisibleDelayedMessagesForSub(subContext) >= subContext.getFixedDelayDetectionLookahead() + && !hasMessageAvailableForSub(subContext); + } + + /** + * Clear delayed messages for a subscription (no-op for topic-level manager). + */ + void clearForSub() { + // No-op: we don't clear global index for individual subscriptions + } + + /** + * Update mark delete position for a subscription. + */ + void updateMarkDeletePosition(SubContext subContext, Position position) { + // Event-driven update from dispatcher; keep it lightweight (no prune here) + subContext.updateMarkDeletePosition(position); + } + + private void updateTimerLocked() { + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry> first = delayedMessageMap.firstEntry(); + if (first == null) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + long nextDeliveryTime = first.getKey(); + long now = clock.millis(); + if (timeout != null && nextDeliveryTime == currentTimeoutTarget && currentTimeoutTarget >= now) { + return; + } + if (timeout != null) { + timeout.cancel(); + } + long delayMillis = nextDeliveryTime - now; + if (delayMillis < 0) { + // Bucket already in the past: schedule immediate to unblock readers + delayMillis = 0; + } + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + currentTimeoutTarget = nextDeliveryTime; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { + // Simplified implementation - returns total count + // Could be enhanced to count only messages visible to this subscription + return delayedMessagesCount.get(); + } + + private void pruneByMinMarkDelete() { + // Find the minimum mark delete position across all subscriptions. + // If any subscription hasn't established a mark-delete yet, skip pruning to preserve global visibility. + Position minMarkDelete = null; + for (SubContext subContext : subscriptionContexts.values()) { + Position markDelete = subContext.getMarkDeletePosition(); + if (markDelete == null) { + return; // at least one subscription without mark-delete -> no pruning + } + if (minMarkDelete == null || markDelete.compareTo(minMarkDelete) < 0) { + minMarkDelete = markDelete; + } + } + + // Prune per bucket under bucket lock + for (Long ts : new ArrayList<>(delayedMessageMap.keySet())) { + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + Long2ObjectRBTreeMap ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + ArrayList ledgersToRemove = new ArrayList<>(); + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (ledgerId < minMarkDelete.getLedgerId()) { + long bytes = entryIds.getLongSizeInBytes(); + delayedMessagesCount.addAndGet(-entryIds.getLongCardinality()); + bufferMemoryBytes.addAndGet(-bytes); + ledgersToRemove.add(ledgerId); + } else if (ledgerId == minMarkDelete.getLedgerId()) { + long before = entryIds.getLongSizeInBytes(); + long removedCount = 0; + LongIterator it = entryIds.getLongIterator(); + ArrayList toRemove = new ArrayList<>(); + while (it.hasNext()) { + long e = it.next(); + if (e <= minMarkDelete.getEntryId()) { + toRemove.add(e); + } + } + for (Long e : toRemove) { + entryIds.removeLong(e); + removedCount++; + } + long after = entryIds.getLongSizeInBytes(); + delayedMessagesCount.addAndGet(-removedCount); + bufferMemoryBytes.addAndGet(after - before); + if (entryIds.isEmpty()) { + ledgersToRemove.add(ledgerId); + } + } + } + for (Long ledgerId : ledgersToRemove) { + ledgerMap.remove(ledgerId); + } + if (ledgerMap.isEmpty()) { + delayedMessageMap.remove(ts); + bucketLocks.remove(ts); + } + } finally { + bLock.unlock(); + } + } + } + + private void maybePruneByTime() { + long now = System.nanoTime(); + long last = lastPruneNanos.get(); + if (now - last >= minPruneIntervalNanos) { + if (lastPruneNanos.compareAndSet(last, now)) { + pruneByMinMarkDelete(); + } + } + } + + // Note: pruning is throttled by minPruneIntervalNanos. Tests should use Awaitility + // to wait for prune to occur instead of relying on direct triggers here. + @Override + public void run(Timeout timeout) throws Exception { + if (timeout == null || timeout.isCancelled()) { + return; + } + + // Clear timer state + timerLock.lock(); + try { + currentTimeoutTarget = -1; + this.timeout = null; + lastTickRun = clock.millis(); + } finally { + timerLock.unlock(); + } + + ArrayList toTrigger = new ArrayList<>(); + // Use firstEntry() to avoid NoSuchElementException on concurrent empty map + Map.Entry> first = delayedMessageMap.firstEntry(); + if (first != null) { + for (SubContext subContext : subscriptionContexts.values()) { + if (hasVisibleMessageForSub(subContext)) { + toTrigger.add(subContext.getDispatcher()); + } + } + + // If a significant portion of subscriptions are eligible, opportunistically prune (throttled) + int subs = subscriptionContexts.size(); + int eligible = toTrigger.size(); + int threshold = Math.max(1, (int) Math.ceil(subs * pruneEligibleRatio)); + if (eligible >= threshold) { + maybePruneByTime(); + } + } + + // Invoke callbacks outside of locks + for (AbstractPersistentDispatcherMultipleConsumers d : toTrigger) { + d.readMoreEntriesAsync(); + } + } + + private boolean hasVisibleMessageForSub(SubContext subContext) { + long cutoffTime = subContext.getCutoffTime(); + Position markDelete = subContext.getMarkDeletePosition(); + List tsList = new ArrayList<>(delayedMessageMap.headMap(cutoffTime, true).keySet()); + for (Long ts : tsList) { + ReentrantLock bLock = bucketLocks.get(ts); + if (bLock == null) { + continue; + } + bLock.lock(); + try { + Long2ObjectRBTreeMap ledgerMap = delayedMessageMap.get(ts); + if (ledgerMap == null) { + continue; + } + for (Long2ObjectMap.Entry ledgerEntry : ledgerMap.long2ObjectEntrySet()) { + long ledgerId = ledgerEntry.getLongKey(); + if (markDelete != null && ledgerId < markDelete.getLedgerId()) { + continue; + } + Roaring64Bitmap entryIds = ledgerEntry.getValue(); + if (markDelete == null || ledgerId > markDelete.getLedgerId()) { + // at least one entry exists in this ledger bucket + if (!entryIds.isEmpty()) { + return true; + } + } else { + LongIterator it = entryIds.getLongIterator(); + while (it.hasNext()) { + long e = it.next(); + if (e > markDelete.getEntryId()) { + return true; + } + } + } + } + } finally { + bLock.unlock(); + } + } + return false; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java new file mode 100644 index 0000000000000..d87fff18697f2 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/TopicDelayedDeliveryTrackerManager.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; + +/** + * Manager interface for topic-level delayed delivery tracking. + * This interface provides a unified abstraction for managing delayed delivery at the topic level, + * allowing different implementations (in-memory, bucket-based) to share the same contract. + *

+ * The manager maintains a single global delayed message index per topic that is shared by all + * subscriptions, and provides per-subscription tracker objects that implement DelayedDeliveryTracker + * interface for compatibility with existing dispatcher logic. + */ +public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable { + + /** + * Create or get a delayed delivery tracker for the specified subscription. + * + * @param dispatcher the dispatcher instance for the subscription + * @return a DelayedDeliveryTracker bound to the subscription + */ + DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher); + + /** + * Unregister a subscription from the manager. + * + * @param dispatcher the dispatcher instance to unregister + */ + void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher); + + /** + * Update the tick time configuration for the topic. + * + * @param newTickTimeMillis the new tick time in milliseconds + */ + void onTickTimeUpdated(long newTickTimeMillis); + + /** + * Get the total memory usage of the topic-level delayed message index. + * + * @return memory usage in bytes + */ + long topicBufferMemoryBytes(); + + /** + * Get the total number of delayed messages in the topic-level index. + * + * @return number of delayed messages + */ + long topicDelayedMessages(); + + /** + * Close the manager and release all resources. + */ + void close(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java index 79d365b9fee21..c3ebc159c22db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/AbstractPersistentDispatcherMultipleConsumers.java @@ -19,9 +19,13 @@ package org.apache.pulsar.broker.service.persistent; import java.util.Map; +import java.util.Optional; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; +import org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Subscription; @@ -64,4 +68,28 @@ public AbstractPersistentDispatcherMultipleConsumers(Subscription subscription, public abstract Map getBucketDelayedIndexStats(); public abstract boolean isClassic(); + + /** + * Expose the optional DelayedDeliveryTracker from implementations. + */ + protected abstract Optional getDelayedDeliveryTrackerOptional(); + + /** + * Propagate mark-delete progress to topic-level in-memory delayed tracker (when enabled). + * Centralizes repeated logic across dispatcher implementations. + */ + protected final void propagateMarkDeleteToTopicDelayedTracker() { + Optional trackerOpt; + synchronized (this) { + trackerOpt = getDelayedDeliveryTrackerOptional(); + } + trackerOpt.ifPresent(tracker -> { + if (tracker instanceof InMemoryTopicDelayedDeliveryTracker view) { + Position md = getCursor().getMarkDeletedPosition(); + if (md != null) { + view.updateMarkDeletePosition(md); + } + } + }); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 097ab9cd0febf..b38eac99af32e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1476,6 +1476,17 @@ public ManagedCursor getCursor() { return cursor; } + @Override + protected Optional getDelayedDeliveryTrackerOptional() { + return delayedDeliveryTracker; + } + + @Override + public void markDeletePositionMoveForward() { + // Centralized propagation to topic-level tracker + propagateMarkDeleteToTopicDelayedTracker(); + } + protected int getStickyKeyHash(Entry entry) { // There's no need to calculate the hash for Shared subscription return STICKY_KEY_HASH_NOT_SET; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 0f496e461b85c..2656074cd674b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -1307,6 +1307,17 @@ public ManagedCursor getCursor() { return cursor; } + @Override + protected Optional getDelayedDeliveryTrackerOptional() { + return delayedDeliveryTracker; + } + + @Override + public void markDeletePositionMoveForward() { + // Centralized propagation to topic-level tracker + propagateMarkDeleteToTopicDelayedTracker(); + } + protected int getStickyKeyHash(Entry entry) { return StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index c2afc35c619e9..5d24d4078b6cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -625,6 +625,8 @@ public void markDeletePositionMoveForward() { // reschedule a read with a backoff after moving the mark-delete position forward since there might have // been consumers that were blocked by hash and couldn't make progress reScheduleReadWithKeySharedUnblockingInterval(); + // Propagate mark-delete progress to topic-level tracker view + propagateMarkDeleteToTopicDelayedTracker(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java index 1f29cb7362685..dfdfc937d8b5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersClassic.java @@ -492,6 +492,9 @@ && removeConsumersFromRecentJoinedConsumers()) { // can finally read more messages. It's safe to call readMoreEntries() multiple times. readMoreEntries(); } + + // Propagate mark-delete progress to topic-level tracker view + propagateMarkDeleteToTopicDelayedTracker(); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactoryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactoryTest.java new file mode 100644 index 0000000000000..018943ef53228 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerFactoryTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import io.netty.util.Timer; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InMemoryTopicDelayedDeliveryTrackerFactoryTest { + + private static AbstractPersistentDispatcherMultipleConsumers mockDispatcher(String topicName, String subName) { + AbstractPersistentDispatcherMultipleConsumers dispatcher = + mock(AbstractPersistentDispatcherMultipleConsumers.class); + PersistentTopic topic = mock(PersistentTopic.class); + when(topic.getName()).thenReturn(topicName); + when(dispatcher.getTopic()).thenReturn(topic); + Subscription sub = mock(Subscription.class); + when(sub.getName()).thenReturn(subName); + when(dispatcher.getSubscription()).thenReturn(sub); + return dispatcher; + } + + @Test + public void testManagersSharedPerTopicAndIndependentAcrossTopics() throws Exception { + Timer timer = mock(Timer.class); + InMemoryTopicDelayedDeliveryTrackerFactory f = + new InMemoryTopicDelayedDeliveryTrackerFactory(timer, 100, true, 0); + + AbstractPersistentDispatcherMultipleConsumers dA1 = mockDispatcher("persistent://ns/topicA", "sub1"); + AbstractPersistentDispatcherMultipleConsumers dA2 = mockDispatcher("persistent://ns/topicA", "sub2"); + AbstractPersistentDispatcherMultipleConsumers dB1 = mockDispatcher("persistent://ns/topicB", "sub1"); + + DelayedDeliveryTracker vA1 = f.newTracker0(dA1); + DelayedDeliveryTracker vA2 = f.newTracker0(dA2); + DelayedDeliveryTracker vB1 = f.newTracker0(dB1); + + assertEquals(f.getCachedManagersSize(), 2); + assertTrue(f.hasManagerForTopic("persistent://ns/topicA")); + assertTrue(f.hasManagerForTopic("persistent://ns/topicB")); + + // Add an entry via A1 and verify A2 observes the same shared topic-level count, B is independent + org.testng.Assert.assertTrue(vA1.addMessage(1, 1, System.currentTimeMillis() + 60000)); + org.testng.Assert.assertEquals(vA2.getNumberOfDelayedMessages(), 1L); + org.testng.Assert.assertEquals(vB1.getNumberOfDelayedMessages(), 0L); + + vA1.close(); + vA2.close(); + vB1.close(); + } + + @Test + public void testOnEmptyCallbackRemovesManagerFromCache() throws Exception { + Timer timer = mock(Timer.class); + InMemoryTopicDelayedDeliveryTrackerFactory f = + new InMemoryTopicDelayedDeliveryTrackerFactory(timer, 100, true, 0); + + AbstractPersistentDispatcherMultipleConsumers dA1 = mockDispatcher("persistent://ns/topicA", "sub1"); + AbstractPersistentDispatcherMultipleConsumers dA2 = mockDispatcher("persistent://ns/topicA", "sub2"); + + DelayedDeliveryTracker vA1 = f.newTracker0(dA1); + DelayedDeliveryTracker vA2 = f.newTracker0(dA2); + assertEquals(f.getCachedManagersSize(), 1); + + // Close both -> manager should unregister and onEmptyCallback should remove from cache + vA1.close(); + vA2.close(); + assertEquals(f.getCachedManagersSize(), 0); + } + + @Test + public void testFactoryCloseClosesManagersAndStopsTimer() throws Exception { + Timer timer = mock(Timer.class); + InMemoryTopicDelayedDeliveryTrackerFactory f = + new InMemoryTopicDelayedDeliveryTrackerFactory(timer, 100, true, 0); + + AbstractPersistentDispatcherMultipleConsumers dA1 = mockDispatcher("persistent://ns/topicA", "sub1"); + AbstractPersistentDispatcherMultipleConsumers dB1 = mockDispatcher("persistent://ns/topicB", "sub1"); + f.newTracker0(dA1); + f.newTracker0(dB1); + assertEquals(f.getCachedManagersSize(), 2); + + f.close(); + assertEquals(f.getCachedManagersSize(), 0); + // Cannot verify timer.stop() since factory owns the timer; ensure no exception + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java new file mode 100644 index 0000000000000..2c3edbde28225 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers; +import org.awaitility.Awaitility; +import org.mockito.stubbing.Answer; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class InMemoryTopicDeliveryTrackerTest { + + private static class TestEnv { + final Timer timer; + final NavigableMap tasks; + final Clock clock; + final AtomicLong time; + + TestEnv() { + this.tasks = new TreeMap<>(); + this.time = new AtomicLong(0L); + this.clock = mock(Clock.class); + when(clock.millis()).then((Answer) invocation -> time.get()); + + this.timer = mock(Timer.class); + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgument(0, TimerTask.class); + long timeout = invocation.getArgument(1, Long.class); + TimeUnit unit = invocation.getArgument(2, TimeUnit.class); + long scheduleAt = time.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + when(t.isCancelled()).thenReturn(false); + return t; + }); + } + } + + private static AbstractPersistentDispatcherMultipleConsumers newDispatcher(String subName, ManagedCursor cursor) { + AbstractPersistentDispatcherMultipleConsumers dispatcher = + mock(AbstractPersistentDispatcherMultipleConsumers.class); + Subscription subscription = mock(Subscription.class); + when(subscription.getName()).thenReturn(subName); + when(dispatcher.getSubscription()).thenReturn(subscription); + when(dispatcher.getCursor()).thenReturn(cursor); + return dispatcher; + } + + @Test + public void testSingleSubscriptionBasicFlow() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + boolean strict = true; + long lookahead = 10; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, strict, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker tracker = manager.createOrGetTracker(dispatcher); + + assertFalse(tracker.hasMessageAvailable()); + + // Add 3 messages in the future + env.time.set(1000); + assertTrue(tracker.addMessage(1, 1, 1200)); + assertTrue(tracker.addMessage(1, 2, 1300)); + assertTrue(tracker.addMessage(2, 1, 1400)); + + assertFalse(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 3); + + // Advance time so first 2 buckets are visible + env.time.set(1350); + assertTrue(tracker.hasMessageAvailable()); + NavigableSet scheduled = tracker.getScheduledMessages(10); + // Should include both positions from first 2 buckets + assertEquals(scheduled.size(), 2); + + // Global counter doesn't drop until mark-delete pruning + assertEquals(tracker.getNumberOfDelayedMessages(), 3); + + // Mark-delete beyond the scheduled positions and prune + ((InMemoryTopicDelayedDeliveryTracker) tracker) + .updateMarkDeletePosition(PositionFactory.create(1L, 2L)); + // Trigger pruning by another get + tracker.getScheduledMessages(10); + // Now only one entry should remain in global index; wait until prune catches up + Awaitility.await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(30)) + .untilAsserted(() -> { + tracker.getScheduledMessages(1); + assertEquals(tracker.getNumberOfDelayedMessages(), 1); + }); + + // Cleanup + tracker.close(); + } + + @Test + public void testSharedIndexDedupAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + + DelayedDeliveryTracker v1 = manager.createOrGetTracker(d1); + DelayedDeliveryTracker v2 = manager.createOrGetTracker(d2); + + env.time.set(1000); + assertTrue(v1.addMessage(10, 20, 2000)); + // Add the same message from another subscription; should be de-duplicated in global index + assertTrue(v2.addMessage(10, 20, 2000)); + + assertEquals(v1.getNumberOfDelayedMessages(), 1); + assertEquals(v2.getNumberOfDelayedMessages(), 1); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerRunTriggersOnlyAvailableSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 100; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + DelayedDeliveryTracker v1 = manager.createOrGetTracker(d1); + DelayedDeliveryTracker v2 = manager.createOrGetTracker(d2); + + env.time.set(0); + // Add two buckets. Only sub-a will have messages available based on mark-delete + assertTrue(v1.addMessage(1, 1, 500)); + assertTrue(v2.addMessage(1, 2, 500)); + + // Before cutoff + assertFalse(v1.hasMessageAvailable()); + assertFalse(v2.hasMessageAvailable()); + + // Set time after cutoff and set sub-a mark-delete behind entries, sub-b beyond entries + env.time.set(600); + ((InMemoryTopicDelayedDeliveryTracker) v1) + .updateMarkDeletePosition(PositionFactory.create(0L, 0L)); // visible for sub-a + ((InMemoryTopicDelayedDeliveryTracker) v2) + .updateMarkDeletePosition(PositionFactory.create(1L, 5L)); // filtered for sub-b + + // Invoke manager timer task directly + manager.run(mock(Timeout.class)); + + // Only d1 should be triggered + verify(d1, times(1)).readMoreEntriesAsync(); + verify(d2, times(0)).readMoreEntriesAsync(); + + v1.close(); + v2.close(); + } + + @Test + public void testPauseWithFixedDelays() throws Exception { + TestEnv env = new TestEnv(); + long lookahead = 5; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, lookahead); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + InMemoryTopicDelayedDeliveryTracker tracker = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(dispatcher); + + // Add strictly increasing deliverAt times (fixed delay scenario) + env.time.set(0); + for (int i = 1; i <= lookahead; i++) { + assertTrue(tracker.addMessage(i, i, i * 100L)); + } + assertTrue(tracker.shouldPauseAllDeliveries()); + + // Move time forward to make messages available -> pause should be lifted + env.time.set(lookahead * 100 + 1); + assertFalse(tracker.shouldPauseAllDeliveries()); + + tracker.close(); + } + + @Test + public void testDynamicTickTimeUpdateAffectsCutoff() throws Exception { + TestEnv env = new TestEnv(); + // non-strict mode: cutoff = now + tick + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, false, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker tracker = manager.createOrGetTracker(dispatcher); + + env.time.set(1000); + // deliverAt within current tick window -> rejected + assertFalse(tracker.addMessage(1, 1, 1050)); // cutoff=1100 + assertEquals(tracker.getNumberOfDelayedMessages(), 0); + + // shrink tick: cutoff reduces -> same deliverAt becomes accepted + tracker.resetTickTime(10); + assertTrue(tracker.addMessage(1, 1, 1050)); // cutoff=1010 + assertEquals(tracker.getNumberOfDelayedMessages(), 1); + + tracker.close(); + } + + @Test + public void testMinMarkDeleteAcrossSubscriptions() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("sub-a", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("sub-b", c2); + InMemoryTopicDelayedDeliveryTracker v1 = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d1); + InMemoryTopicDelayedDeliveryTracker v2 = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 100)); + assertTrue(v1.addMessage(1, 2, 100)); + assertTrue(v1.addMessage(2, 1, 100)); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // c1 behind, c2 ahead (set via view so manager receives updates) + v1.updateMarkDeletePosition(PositionFactory.create(0L, 0L)); + v2.updateMarkDeletePosition(PositionFactory.create(10L, 10L)); + + env.time.set(200); + // Trigger v2 read + prune attempt; min mark-delete still from c1 => no prune + v2.getScheduledMessages(10); + assertEquals(v1.getNumberOfDelayedMessages(), 3); + + // Advance c1 mark-delete beyond (1,2) + v1.updateMarkDeletePosition(PositionFactory.create(1L, 2L)); + v1.getScheduledMessages(10); + // Wait until prune catches up + Awaitility.await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(30)) + .untilAsserted(() -> { + v1.getScheduledMessages(1); + assertEquals(v1.getNumberOfDelayedMessages(), 1); + }); + + v1.close(); + v2.close(); + } + + @Test + public void testTimerSchedulingWindowAlignment() throws Exception { + TestEnv env = new TestEnv(); + long tickMs = 1000; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tickMs, env.clock, true, 0); + + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub-a", cursor); + DelayedDeliveryTracker tracker = manager.createOrGetTracker(dispatcher); + + // Establish lastTickRun via a manual run at t=10000 + env.time.set(10000); + manager.run(mock(Timeout.class)); + + // Add with deliverAt=10001, but tick window alignment should schedule at >= 11000 + assertTrue(tracker.addMessage(1, 1, 10001)); + long scheduledAt = env.tasks.firstKey(); + assertTrue(scheduledAt >= 11000, "scheduledAt=" + scheduledAt); + + // If no recent tick run, deliverAt should determine + env.tasks.clear(); + env.time.set(20000); + // No run -> lastTickRun remains 10000; bucketStart(20005)=20000; schedule aligns to bucket start immediately + assertTrue(tracker.addMessage(1, 2, 20005)); + long scheduledAt2 = env.tasks.firstKey(); + assertEquals(scheduledAt2, 20000); + + tracker.close(); + } + + @Test + public void testBufferMemoryUsageAndCleanup() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("sub-a", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + assertTrue(v.getBufferMemoryUsage() > 0); + + v.close(); + // After last subscription closes, manager should clear index and memory + assertEquals(manager.topicDelayedMessages(), 0); + assertEquals(manager.topicBufferMemoryBytes(), 0); + } + + @Test + public void testGetScheduledMessagesLimit() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("sub", cursor); + DelayedDeliveryTracker tracker = manager.createOrGetTracker(dispatcher); + + env.time.set(1000); + for (int i = 0; i < 10; i++) { + assertTrue(tracker.addMessage(1, i, 1001)); + } + env.time.set(2000); + NavigableSet positions = tracker.getScheduledMessages(3); + assertEquals(positions.size(), 3); + + Position prev = null; + for (Position p : positions) { + if (prev != null) { + assertTrue(prev.compareTo(p) < 0); + } + prev = p; + } + + tracker.close(); + } + + @Test + public void testHasMessageAvailableIgnoresMarkDelete() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + ManagedCursor cursor = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers dispatcher = newDispatcher("s", cursor); + InMemoryTopicDelayedDeliveryTracker tracker = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(dispatcher); + + env.time.set(900); + assertTrue(tracker.addMessage(1, 1, 1000)); + env.time.set(1000); + tracker.updateMarkDeletePosition(PositionFactory.create(1, 1)); + assertTrue(tracker.hasMessageAvailable()); + assertTrue(tracker.getScheduledMessages(10).isEmpty()); + + tracker.close(); + } + + @Test + public void testCrossBucketDuplicatesDedupOnRead() throws Exception { + TestEnv env = new TestEnv(); + long tick = 256; + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, tick, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", c2); + DelayedDeliveryTracker v1 = manager.createOrGetTracker(d1); + DelayedDeliveryTracker v2 = manager.createOrGetTracker(d2); + + env.time.set(1000); + long deliverAt = 1023; + assertTrue(v1.addMessage(9, 9, deliverAt)); + long before = manager.topicBufferMemoryBytes(); + + v2.resetTickTime(32); + assertTrue(v2.addMessage(9, 9, deliverAt)); + + env.time.set(2000); + NavigableSet scheduled = v1.getScheduledMessages(10); + assertEquals(scheduled.size(), 1); + assertTrue(manager.topicDelayedMessages() >= 1); + assertTrue(manager.topicBufferMemoryBytes() > before); + + v1.close(); + v2.close(); + } + + @Test + public void testClearIsNoOp() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + long before = manager.topicDelayedMessages(); + v.clear().join(); + assertEquals(manager.topicDelayedMessages(), before); + v.close(); + } + + @Test + public void testMultiSubscriptionCloseDoesNotClear() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", c2); + DelayedDeliveryTracker v1 = manager.createOrGetTracker(d1); + DelayedDeliveryTracker v2 = manager.createOrGetTracker(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 10)); + assertTrue(manager.topicDelayedMessages() > 0); + + v1.close(); + assertTrue(manager.topicDelayedMessages() > 0); + // Move time forward so remaining view can read + env.time.set(20); + assertFalse(v2.getScheduledMessages(10).isEmpty()); + + v2.close(); + assertEquals(manager.topicDelayedMessages(), 0); + } + + @Test + public void testBoundaryInputsRejected() throws Exception { + TestEnv env = new TestEnv(); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + + InMemoryTopicDelayedDeliveryTrackerManager mStrict = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + DelayedDeliveryTracker vStrict = mStrict.createOrGetTracker(d); + env.time.set(1000); + assertFalse(vStrict.addMessage(1, 1, -1)); + assertFalse(vStrict.addMessage(1, 2, 1000)); + vStrict.close(); + + InMemoryTopicDelayedDeliveryTrackerManager mNonStrict = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, false, 0); + DelayedDeliveryTracker vNon = mNonStrict.createOrGetTracker(d); + env.time.set(1000); + assertFalse(vNon.addMessage(1, 3, 1100)); + vNon.close(); + } + + private static void expectIllegalState(Runnable r) { + try { + r.run(); + org.testng.Assert.fail("Expected IllegalStateException"); + } catch (IllegalStateException expected) { + // ok + } + } + + @Test + public void testClosedViewThrowsOnOperations() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + InMemoryTopicDelayedDeliveryTracker v = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d); + v.close(); + + expectIllegalState(() -> v.addMessage(1, 1, 10)); + expectIllegalState(v::hasMessageAvailable); + expectIllegalState(() -> v.getScheduledMessages(1)); + expectIllegalState(v::clear); + } + + @Test + public void testRescheduleOnEarlierDeliverAt() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 10)); + assertEquals(env.tasks.firstKey().longValue(), 10L); + + assertTrue(v.addMessage(1, 2, 5)); + assertEquals(env.tasks.size(), 1); + assertEquals(env.tasks.firstKey().longValue(), 5L); + + v.close(); + } + + @Test + public void testEmptyIndexCancelsTimerOnClose() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 100, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 1000)); + assertFalse(env.tasks.isEmpty()); + v.close(); + assertTrue(env.tasks.isEmpty()); + } + + @Test + public void testMemoryGrowthAndPruneShrink() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + InMemoryTopicDelayedDeliveryTracker v = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d); + + env.time.set(0); + for (int i = 0; i < 50; i++) { + assertTrue(v.addMessage(1, i, 100)); + } + long memBefore = manager.topicBufferMemoryBytes(); + assertTrue(memBefore > 0); + + env.time.set(200); + v.updateMarkDeletePosition(PositionFactory.create(1, 25)); + v.getScheduledMessages(100); + // Wait until memory shrinks due to prune + Awaitility.await().atMost(Duration.ofSeconds(5)).pollInterval(Duration.ofMillis(60)) + .untilAsserted(() -> { + v.getScheduledMessages(1); + long memAfter = manager.topicBufferMemoryBytes(); + assertTrue(memAfter < memBefore, "Memory should shrink after prune"); + }); + + v.close(); + } + + @Test + public void testTimerCancelAndReschedule() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 10, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(1, 1, 100)); + long first = env.tasks.firstKey(); + assertTrue(first >= 10); + + assertTrue(v.addMessage(1, 2, 50)); + assertEquals(env.tasks.size(), 1); + + v.close(); + } + + @Test + public void testSortedAndDedupScheduled() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + assertTrue(v.addMessage(2, 3, 10)); + assertTrue(v.addMessage(1, 5, 10)); + assertTrue(v.addMessage(1, 5, 10)); + env.time.set(100); + + NavigableSet scheduled = v.getScheduledMessages(10); + assertEquals(scheduled.size(), 2); + List list = new ArrayList<>(scheduled); + assertTrue(list.get(0).compareTo(list.get(1)) < 0); + + v.close(); + } + + @Test + public void testGlobalDelayedCountSemantics() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c1 = mock(ManagedCursor.class); + ManagedCursor c2 = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d1 = newDispatcher("s1", c1); + AbstractPersistentDispatcherMultipleConsumers d2 = newDispatcher("s2", c2); + InMemoryTopicDelayedDeliveryTracker v1 = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d1); + InMemoryTopicDelayedDeliveryTracker v2 = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d2); + + env.time.set(0); + assertTrue(v1.addMessage(1, 1, 10)); + assertEquals(v1.getNumberOfDelayedMessages(), 1L); + // Hide for sub-1 only; keep sub-2 without mark-delete so global min mark-delete doesn't prune + v1.updateMarkDeletePosition(PositionFactory.create(1, 1)); + env.time.set(100); + assertTrue(v1.getScheduledMessages(10).isEmpty()); + assertEquals(v1.getNumberOfDelayedMessages(), 1L); + v1.close(); + v2.close(); + } + + @Test + public void testConcurrentAdditionsSameBucket() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 1, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + DelayedDeliveryTracker v = manager.createOrGetTracker(d); + + env.time.set(0); + int threads = 5; + int perThread = 50; + CountDownLatch start = new CountDownLatch(1); + ExecutorService es = Executors.newFixedThreadPool(threads); + for (int t = 0; t < threads; t++) { + final int base = t * perThread; + es.submit(() -> { + try { + start.await(); + for (int i = 0; i < perThread; i++) { + v.addMessage(1, base + i, 10); + } + } catch (InterruptedException ignored) { + } + }); + } + start.countDown(); + es.shutdown(); + es.awaitTermination(10, TimeUnit.SECONDS); + + env.time.set(100); + NavigableSet scheduled = v.getScheduledMessages(threads * perThread); + assertEquals(scheduled.size(), threads * perThread); + assertEquals(manager.topicDelayedMessages(), threads * perThread); + v.close(); + } + + @Test + public void testConcurrentAdditionsMultipleBucketsAndReads() throws Exception { + TestEnv env = new TestEnv(); + InMemoryTopicDelayedDeliveryTrackerManager manager = + new InMemoryTopicDelayedDeliveryTrackerManager(env.timer, 5, env.clock, true, 0); + ManagedCursor c = mock(ManagedCursor.class); + AbstractPersistentDispatcherMultipleConsumers d = newDispatcher("s", c); + InMemoryTopicDelayedDeliveryTracker v = + (InMemoryTopicDelayedDeliveryTracker) manager.createOrGetTracker(d); + + env.time.set(0); + ExecutorService es = Executors.newFixedThreadPool(2); + AtomicInteger added = new AtomicInteger(); + CountDownLatch start = new CountDownLatch(1); + es.submit(() -> { + try { + start.await(); + for (int i = 0; i < 200; i++) { + if (v.addMessage(1 + (i % 3), i, 10 + (i % 10))) { + added.incrementAndGet(); + } + } + } catch (InterruptedException ignored) { + } + }); + es.submit(() -> { + try { + start.await(); + env.time.set(1000); + for (int i = 0; i < 10; i++) { + v.getScheduledMessages(50); + } + } catch (InterruptedException ignored) { + } + }); + start.countDown(); + es.shutdown(); + es.awaitTermination(10, TimeUnit.SECONDS); + + assertTrue(manager.topicDelayedMessages() >= 0); + v.close(); + } +}