Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -40,6 +42,9 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra

private long fixedDelayDetectionLookahead;

// Cache of topic-level managers: topic name -> manager instance
private final ConcurrentMap<String, TopicDelayedDeliveryTrackerManager> topicManagers = new ConcurrentHashMap<>();

@Override
public void initialize(PulsarService pulsarService) {
ServiceConfiguration config = pulsarService.getConfig();
Expand All @@ -54,7 +59,7 @@ public void initialize(PulsarService pulsarService) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
try {
tracker = newTracker0(dispatcher);
} catch (Exception e) {
Expand All @@ -66,13 +71,30 @@ public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleCon
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
DelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();

// Get or create topic-level manager for this topic
TopicDelayedDeliveryTrackerManager manager = topicManagers.computeIfAbsent(topicName,
k -> new InMemoryTopicDelayedDeliveryTrackerManager(timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead));
Comment on lines +78 to +80
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topicManagers map could accumulate stale entries for topics that have been unloaded. When a topic is unloaded and all subscriptions are closed, the manager should be removed from the map. Consider adding cleanup logic in InMemoryTopicDelayedDeliveryTrackerManager.unregister() that notifies the factory to remove the manager from the cache, or add a callback mechanism for the manager to remove itself when it closes.

Copilot uses AI. Check for mistakes.

// Create a per-subscription view from the topic-level manager
return manager.createOrGetView(dispatcher);
}

@Override
public void close() {
// Close all topic-level managers
for (TopicDelayedDeliveryTrackerManager manager : topicManagers.values()) {
try {
manager.close();
} catch (Exception e) {
log.warn("Failed to close topic-level delayed delivery manager", e);
}
}
topicManagers.clear();

if (timer != null) {
timer.stop();
}
Expand Down
Loading