From 6be1c0f22b203bb2af29ffecbed5316e8b52bc3c Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 1 Nov 2025 20:38:10 +0800 Subject: [PATCH 1/2] [feat] PIP-448: Topic-level Delayed Message Tracker for Memory Optimization --- pip/pip-448.md | 159 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 pip/pip-448.md diff --git a/pip/pip-448.md b/pip/pip-448.md new file mode 100644 index 0000000000000..cabc1ee6fe6e1 --- /dev/null +++ b/pip/pip-448.md @@ -0,0 +1,159 @@ +# PIP-448: Topic-level Delayed Message Tracker for Memory Optimization + +# Background knowledge + +In Apache Pulsar, **Delayed Message Delivery** allows producers to specify a delay for a message, ensuring it is not delivered to any consumer until the specified time has passed. This is a useful feature for implementing tasks like scheduled reminders or retry mechanisms with backoff. + +Currently, the default mechanism for handling delayed messages is the **In-Memory Delayed Delivery Tracker** (`InMemoryDelayedDeliveryTracker`). This tracker is instantiated on a *per-subscription* basis within the broker. When a topic has multiple subscriptions (e.g., in a shared subscription model), each subscription gets its own independent `InMemoryDelayedDeliveryTracker` instance. + +The consequence of this design is that if a delayed message is published to a topic with 'N' subscriptions, that message's metadata (its position: ledgerId and entryId) is stored 'N' times in the broker's memory, once for each subscription's tracker. This leads to significant memory overhead, especially for topics with a large number of subscriptions, as the memory usage scales linearly with the number of subscriptions. + +The **`DelayedDeliveryTrackerFactory`** is responsible for creating these tracker instances whenever a new subscription dispatcher is initialized. + +# Motivation + +The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`. + +This excessive memory usage can cause: +* Increased memory pressure on Pulsar brokers. +* More frequent and longer Garbage Collection (GC) pauses, impacting broker performance. +* Potential OutOfMemoryErrors, leading to broker instability. +* Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions. + +By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases. + +# Goals + +## In Scope +* Introduce a topic-level delayed message index that is shared across all subscriptions of a single topic. This will store each delayed message's position only once. +* Significantly reduce the memory footprint for delayed message handling, changing the memory complexity from `O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`. +* Maintain the existing `DelayedDeliveryTracker` interface to ensure seamless integration with the existing dispatcher logic, requiring no changes to the dispatcher's core message delivery flow. +* Make this new topic-level tracker the default in-memory implementation, replacing the legacy per-subscription tracker. + +## Out of Scope +* This proposal does not modify the persistent, bucket-based delayed delivery tracker (`BucketDelayedDeliveryTracker`). The scope is limited to the in-memory implementation. +* No changes will be made to the public-facing client APIs, REST APIs, or the wire protocol for producing or consuming delayed messages. This is a broker-internal optimization. +* Modifying the semantics of delayed message delivery. The user-facing behavior will remain identical. + +# High Level Design + +The core idea of this proposal is to shift from a per-subscription delayed message tracker to a shared, topic-level tracker. This will be achieved by introducing two new components: a `TopicDelayedDeliveryTrackerManager` and an `InMemoryTopicDelayedDeliveryTrackerView`. + +1. **Shared Topic-Level Manager**: For each topic, a single `InMemoryTopicDelayedDeliveryTrackerManager` instance will be created. This manager will own and maintain a global index of all delayed messages for that topic. The index will store each message's position just once, keyed by its delivery timestamp. + +2. **Per-Subscription View**: The dispatcher for each subscription will no longer get a full, independent tracker. Instead, it will receive an `InMemoryTopicDelayedDeliveryTrackerView` object. This view implements the `DelayedDeliveryTracker` interface but acts as a lightweight adapter or proxy to the shared `TopicDelayedDeliveryTrackerManager`. It maintains per-subscription state, such as the `markDeletePosition`, but all core operations (adding messages, retrieving scheduled messages) are delegated to the shared manager. + +3. **Factory and Lifecycle Management**: The `InMemoryDelayedDeliveryTrackerFactory` will be updated to manage the lifecycle of these new topic-level managers. It will maintain a cache of managers keyed by topic name. When a tracker is requested for a subscription: + * If a manager for that topic already exists, it is retrieved from the cache. + * If not, a new manager is created and stored in the cache. + * The manager then creates a new `View` object for the requesting subscription. + * When a subscription is closed, it unregisters itself from the manager. When the last subscription for a topic is closed, the manager cleans up its resources and is removed from the factory's cache. + +This architectural change can be described as follows: + +* **Before this change:** For a single topic, each subscription (e.g., Sub1, Sub2, Sub3) maintained its own complete `InMemoryDelayedDeliveryTracker` instance. If a delayed message was sent to the topic, its metadata would be stored independently in the tracker for Sub1, the tracker for Sub2, and the tracker for Sub3, causing data duplication. + +* **After this change:** For a single topic, there is only one central `InMemoryTopicDelayedDeliveryTrackerManager` instance that holds a shared index of all delayed messages. Each subscription (Sub1, Sub2, Sub3) receives a lightweight `View` object. All these views point to and interact with the single, shared manager, eliminating data duplication. + +The manager will handle pruning of acknowledged messages from the shared index by tracking the `markDeletePosition` of all active subscriptions and only removing messages that have been acknowledged by *all* of them (i.e., cleaning up to the minimum `markDeletePosition`). + +# Detailed Design + +## Design & Implementation Details + +### `TopicDelayedDeliveryTrackerManager` (New Interface) +This new interface defines the contract for a topic-level delayed delivery manager. +```java +public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable { + // Creates a subscription-specific view + DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher); + // Unregisters a subscription + void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher); + // Updates tick time + void onTickTimeUpdated(long newTickTimeMillis); + // Topic-level stats + long topicBufferMemoryBytes(); + long topicDelayedMessages(); +} +``` + +### `InMemoryTopicDelayedDeliveryTrackerManager` (New Class) +This is the main implementation of the topic-level manager. +* **Data Structure**: The core index is a `ConcurrentSkipListMap>`. This structure maps a delivery timestamp to a map of `ledgerId -> Roaring64Bitmap` of `entryId`s. This is highly efficient for storing and querying message positions. +* **Subscription Context (`SubContext`)**: A nested static class holds per-subscription state, primarily the `dispatcher` reference and the latest `markDeletePosition`. The manager maintains a `ConcurrentHashMap` to track all active subscriptions. +* **Message Addition**: When a message is added via a subscription's `View`, it's inserted into the shared index. Duplicates (same message added by different subscriptions) are inherently handled by the `Roaring64Bitmap` which acts as a set. +* **Message Retrieval**: When a `View` requests scheduled messages, the manager queries the shared index for messages ready for delivery. It then filters these results against that specific subscription's `markDeletePosition` to ensure it only returns messages that have not yet been acknowledged by that subscription. +* **Pruning**: The manager periodically prunes the index. It calculates the minimum `markDeletePosition` across all registered subscriptions. Any message with a position less than or equal to this minimum position is safe to remove from the shared index. +* **Lifecycle**: The manager is created by the factory for the first subscription on a topic. It is destroyed and its resources are released when the last subscription is closed, which is triggered by an `onEmptyCallback` from the factory. + +### `InMemoryTopicDelayedDeliveryTrackerView` (New Class) +This class implements the `DelayedDeliveryTracker` interface, making it compatible with the dispatcher. +* **Role**: It acts as a lightweight proxy, holding a reference to the shared `InMemoryTopicDelayedDeliveryTrackerManager` and its own `SubContext`. +* **Operations**: All `DelayedDeliveryTracker` method calls (e.g., `addMessage`, `getScheduledMessages`) are forwarded to the shared manager, passing along its `SubContext` so the manager can perform operations in the correct per-subscription context. +* **`updateMarkDeletePosition`**: A new method is added to the view, called by the dispatcher when the cursor moves. This provides an efficient, event-driven way to keep the manager's `SubContext` up-to-date with the latest `markDeletePosition`. + +### `InMemoryDelayedDeliveryTrackerFactory` (Modified) +The factory's logic is changed to manage the lifecycle of topic-level managers. +* **Cache**: It maintains a `ConcurrentMap topicManagers`. +* **`newTracker` method**: + 1. It uses `topicManagers.computeIfAbsent()` to atomically get or create an `InMemoryTopicDelayedDeliveryTrackerManager` for the given topic. + 2. The `onEmptyCallback` provided during creation ensures that when the manager becomes empty (last subscription closes), it is removed from the `topicManagers` cache. + 3. It then calls `manager.createOrGetView(dispatcher)` to get a subscription-specific view. + 4. It returns this view to the dispatcher. + +### `PersistentDispatcherMultipleConsumers` (Modified) +A new method is introduced to enable event-driven updates. +* **`markDeletePositionMoveForward()`**: This new method is called by the cursor when its `markDeletePosition` advances. Inside this method, it checks if the active tracker is an `InMemoryTopicDelayedDeliveryTrackerView` and, if so, calls `view.updateMarkDeletePosition()`. This pushes the latest `markDeletePosition` to the tracker view immediately, allowing for more timely pruning decisions in the shared manager. + +## Public-facing Changes + +This proposal involves a broker-internal optimization. There are **no changes** to any public-facing components. + +### Public API +No changes. + +### Binary protocol +No changes. + +### Configuration +No changes. + +### CLI +No changes. + +### Metrics +No changes. Existing metrics related to delayed messages will continue to function, but their values at the subscription level will now be derived from the shared topic-level manager. + +# Monitoring + +While no new metrics are introduced, the effects of this change can be monitored through existing broker metrics: +* **JVM Heap Memory**: Monitor the broker's JVM heap usage (`jvm_memory_bytes_used{area="heap"}`). For brokers hosting topics with many subscriptions and delayed messages, a significant reduction in heap memory usage and a more stable memory footprint should be observed. +* **Garbage Collection**: Monitor JVM GC metrics (`jvm_gc_collection_seconds_count`, `jvm_gc_collection_seconds_sum`). A reduction in memory pressure should lead to fewer and shorter GC pauses, improving overall broker stability and performance. +* Users can compare memory usage before and after upgrading to a version with this change to quantify the improvement. + +# Security Considerations + +This proposal refactors an internal component of the broker and does not introduce any new public APIs, endpoints, or protocol commands. The security model remains unchanged. + +The design ensures data isolation between subscriptions. When a subscription requests scheduled messages, the `InMemoryTopicDelayedDeliveryTrackerManager` filters the results based on that specific subscription's `markDeletePosition`. This prevents one subscription from accessing or being affected by the acknowledgment state of another. Therefore, multi-tenancy guarantees are preserved, and the change does not introduce any new security risks. + +# Backward & Forward Compatibility + +## Upgrade +The change is fully backward compatible. Upgrading a broker to a version containing this feature requires no special steps. Upon restart, the `InMemoryDelayedDeliveryTrackerFactory` will begin creating the new topic-level managers instead of the old per-subscription trackers. This is a drop-in replacement. + +## Downgrade / Rollback +A downgrade is seamless. The change does not alter any on-disk data formats or persistent metadata. If a broker is rolled back to a previous version, it will simply revert to using the original `InMemoryDelayedDeliveryTracker` on a per-subscription basis. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations +Delayed message delivery is a broker-local feature that does not directly interact with the geo-replication mechanism. Therefore, this change has no impact on geo-replication, and no special considerations are needed for upgrade or downgrade in a replicated environment. + +# Alternatives + +# General Notes +This implementation effectively replaces the legacy `InMemoryDelayedDeliveryTracker` as the default in-memory strategy. The factory no longer instantiates the old class. This provides a significant performance and scalability improvement for a common Pulsar use case. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: \ No newline at end of file From 51d363c40b76860d799a2b885b336579947ac295 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 2 Nov 2025 12:20:23 +0800 Subject: [PATCH 2/2] [feat][pip] Introduce topic-level delayed message tracker to optimize memory usage --- pip/pip-448.md | 157 ++++++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 75 deletions(-) diff --git a/pip/pip-448.md b/pip/pip-448.md index cabc1ee6fe6e1..c7cf81d2c5ef4 100644 --- a/pip/pip-448.md +++ b/pip/pip-448.md @@ -4,15 +4,13 @@ In Apache Pulsar, **Delayed Message Delivery** allows producers to specify a delay for a message, ensuring it is not delivered to any consumer until the specified time has passed. This is a useful feature for implementing tasks like scheduled reminders or retry mechanisms with backoff. -Currently, the default mechanism for handling delayed messages is the **In-Memory Delayed Delivery Tracker** (`InMemoryDelayedDeliveryTracker`). This tracker is instantiated on a *per-subscription* basis within the broker. When a topic has multiple subscriptions (e.g., in a shared subscription model), each subscription gets its own independent `InMemoryDelayedDeliveryTracker` instance. +The legacy default mechanism for handling delayed messages is the `InMemoryDelayedDeliveryTracker`. This tracker is instantiated on a *per-subscription* basis within the broker. When a topic has multiple subscriptions, each subscription gets its own independent `InMemoryDelayedDeliveryTracker` instance. -The consequence of this design is that if a delayed message is published to a topic with 'N' subscriptions, that message's metadata (its position: ledgerId and entryId) is stored 'N' times in the broker's memory, once for each subscription's tracker. This leads to significant memory overhead, especially for topics with a large number of subscriptions, as the memory usage scales linearly with the number of subscriptions. - -The **`DelayedDeliveryTrackerFactory`** is responsible for creating these tracker instances whenever a new subscription dispatcher is initialized. +The consequence of this per-subscription design is that if a delayed message is published to a topic with 'N' subscriptions, that message's metadata (its position) is stored 'N' times in the broker's memory. This leads to significant memory overhead, especially for topics with a large number of subscriptions, as the memory usage scales linearly with the number of subscriptions. # Motivation -The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`. +The primary motivation for this proposal is to address the high memory consumption caused by the legacy per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages can become prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`. This excessive memory usage can cause: * Increased memory pressure on Pulsar brokers. @@ -20,95 +18,85 @@ This excessive memory usage can cause: * Potential OutOfMemoryErrors, leading to broker instability. * Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions. -By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases. +By introducing an alternative, topic-level tracking mechanism, we can provide a memory-efficient solution to enhance broker stability and scalability for these critical use cases. # Goals ## In Scope -* Introduce a topic-level delayed message index that is shared across all subscriptions of a single topic. This will store each delayed message's position only once. -* Significantly reduce the memory footprint for delayed message handling, changing the memory complexity from `O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`. -* Maintain the existing `DelayedDeliveryTracker` interface to ensure seamless integration with the existing dispatcher logic, requiring no changes to the dispatcher's core message delivery flow. -* Make this new topic-level tracker the default in-memory implementation, replacing the legacy per-subscription tracker. +* Introduce a new, optional, topic-level delayed message tracker that is shared across all subscriptions of a single topic. This will store each delayed message's position only once. +* Significantly reduce the memory footprint for delayed message handling when this new tracker is enabled, changing the memory complexity from `O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`. +* Provide new configuration options to allow operators to tune the behavior of the new tracker, such as pruning intervals and cleanup delays. +* Maintain the existing `DelayedDeliveryTracker` interface to ensure seamless integration with the dispatcher logic. +* Preserve the existing per-subscription `InMemoryDelayedDeliveryTrackerFactory` as the default for backward compatibility, requiring operators to opt-in to use the new topic-level tracker. ## Out of Scope -* This proposal does not modify the persistent, bucket-based delayed delivery tracker (`BucketDelayedDeliveryTracker`). The scope is limited to the in-memory implementation. -* No changes will be made to the public-facing client APIs, REST APIs, or the wire protocol for producing or consuming delayed messages. This is a broker-internal optimization. -* Modifying the semantics of delayed message delivery. The user-facing behavior will remain identical. +* This proposal does not modify the persistent, bucket-based delayed delivery tracker (`BucketDelayedDeliveryTracker`). +* No changes will be made to the public-facing client APIs, REST APIs, or the wire protocol. This is a broker-internal optimization. +* The semantic behavior of delayed messages from a user's perspective will remain identical. # High Level Design -The core idea of this proposal is to shift from a per-subscription delayed message tracker to a shared, topic-level tracker. This will be achieved by introducing two new components: a `TopicDelayedDeliveryTrackerManager` and an `InMemoryTopicDelayedDeliveryTrackerView`. +The core idea is to introduce a new, opt-in `DelayedDeliveryTrackerFactory` that implements a shared, topic-level tracking strategy. This is achieved with two new components: a `TopicDelayedDeliveryTrackerManager` and a subscription-scoped `InMemoryTopicDelayedDeliveryTracker`. + +1. **New Factory (`InMemoryTopicDelayedDeliveryTrackerFactory`)**: A new factory class is introduced. To enable the feature, operators must set `delayedDeliveryTrackerFactoryClassName` to `org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory`. This factory manages the lifecycle of topic-level managers. -1. **Shared Topic-Level Manager**: For each topic, a single `InMemoryTopicDelayedDeliveryTrackerManager` instance will be created. This manager will own and maintain a global index of all delayed messages for that topic. The index will store each message's position just once, keyed by its delivery timestamp. +2. **Shared Topic-Level Manager (`InMemoryTopicDelayedDeliveryTrackerManager`)**: For each topic, a single instance of this manager is created by the new factory. This manager owns a global index of all delayed messages for that topic, storing each message's position only once. -2. **Per-Subscription View**: The dispatcher for each subscription will no longer get a full, independent tracker. Instead, it will receive an `InMemoryTopicDelayedDeliveryTrackerView` object. This view implements the `DelayedDeliveryTracker` interface but acts as a lightweight adapter or proxy to the shared `TopicDelayedDeliveryTrackerManager`. It maintains per-subscription state, such as the `markDeletePosition`, but all core operations (adding messages, retrieving scheduled messages) are delegated to the shared manager. +3. **Per-Subscription Tracker (`InMemoryTopicDelayedDeliveryTracker`)**: The dispatcher for each subscription receives an instance of this class. It implements the standard `DelayedDeliveryTracker` interface but acts as a lightweight proxy to the shared `TopicDelayedDeliveryTrackerManager`. It maintains per-subscription state (like the `markDeletePosition`) while delegating core operations to the shared manager. -3. **Factory and Lifecycle Management**: The `InMemoryDelayedDeliveryTrackerFactory` will be updated to manage the lifecycle of these new topic-level managers. It will maintain a cache of managers keyed by topic name. When a tracker is requested for a subscription: - * If a manager for that topic already exists, it is retrieved from the cache. - * If not, a new manager is created and stored in the cache. - * The manager then creates a new `View` object for the requesting subscription. - * When a subscription is closed, it unregisters itself from the manager. When the last subscription for a topic is closed, the manager cleans up its resources and is removed from the factory's cache. +4. **Lifecycle and Caching**: The new factory maintains a cache of managers keyed by topic name. When a tracker is requested: + * It gets or creates a manager for the topic. + * The manager then creates a new `InMemoryTopicDelayedDeliveryTracker` for the subscription. + * When a subscription closes, it unregisters from the manager. When the last subscription unregisters, the manager's cleanup is scheduled based on the `delayedDeliveryTopicManagerIdleMillis` configuration, allowing it to be reused if a new subscription appears quickly. This architectural change can be described as follows: -* **Before this change:** For a single topic, each subscription (e.g., Sub1, Sub2, Sub3) maintained its own complete `InMemoryDelayedDeliveryTracker` instance. If a delayed message was sent to the topic, its metadata would be stored independently in the tracker for Sub1, the tracker for Sub2, and the tracker for Sub3, causing data duplication. +* **Default/Legacy Behavior:** For a single topic, each subscription (e.g., Sub1, Sub2) maintains its own complete `InMemoryDelayedDeliveryTracker`. Message metadata is duplicated in each tracker. -* **After this change:** For a single topic, there is only one central `InMemoryTopicDelayedDeliveryTrackerManager` instance that holds a shared index of all delayed messages. Each subscription (Sub1, Sub2, Sub3) receives a lightweight `View` object. All these views point to and interact with the single, shared manager, eliminating data duplication. +* **New Topic-Level Behavior:** For a single topic, there is only one central `InMemoryTopicDelayedDeliveryTrackerManager` holding a shared index. Each subscription (Sub1, Sub2) receives a lightweight `InMemoryTopicDelayedDeliveryTracker` that acts as a view/proxy, pointing to the single, shared manager, thus eliminating data duplication. -The manager will handle pruning of acknowledged messages from the shared index by tracking the `markDeletePosition` of all active subscriptions and only removing messages that have been acknowledged by *all* of them (i.e., cleaning up to the minimum `markDeletePosition`). +The manager handles pruning of acknowledged messages from the shared index by tracking the `markDeletePosition` of all active subscriptions and only removing messages that have been acknowledged by *all* of them. # Detailed Design ## Design & Implementation Details +### `InMemoryTopicDelayedDeliveryTrackerFactory` (New Class) +This is the new factory that must be explicitly configured to enable the topic-level tracking feature. +* **Role**: Manages the lifecycle of `InMemoryTopicDelayedDeliveryTrackerManager` instances. +* **Cache**: It maintains a `ConcurrentMap` to cache managers per topic. +* **Lifecycle Management**: When the last subscription for a topic is closed, the factory uses the `delayedDeliveryTopicManagerIdleMillis` setting to determine when to clean up the manager. A value of `0` removes it immediately, while a positive value schedules a delayed removal, preventing churn if subscriptions are volatile. +* **Configuration**: It reads and passes the new tuning parameters (`pruneMinIntervalMillis`, `pruneEligibleRatio`) to the manager instances it creates. + ### `TopicDelayedDeliveryTrackerManager` (New Interface) -This new interface defines the contract for a topic-level delayed delivery manager. +This interface defines the contract for a topic-level manager. ```java public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable { - // Creates a subscription-specific view - DelayedDeliveryTracker createOrGetView(AbstractPersistentDispatcherMultipleConsumers dispatcher); - // Unregisters a subscription + DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher); void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher); - // Updates tick time - void onTickTimeUpdated(long newTickTimeMillis); - // Topic-level stats - long topicBufferMemoryBytes(); - long topicDelayedMessages(); + // ... other methods } ``` ### `InMemoryTopicDelayedDeliveryTrackerManager` (New Class) -This is the main implementation of the topic-level manager. -* **Data Structure**: The core index is a `ConcurrentSkipListMap>`. This structure maps a delivery timestamp to a map of `ledgerId -> Roaring64Bitmap` of `entryId`s. This is highly efficient for storing and querying message positions. -* **Subscription Context (`SubContext`)**: A nested static class holds per-subscription state, primarily the `dispatcher` reference and the latest `markDeletePosition`. The manager maintains a `ConcurrentHashMap` to track all active subscriptions. -* **Message Addition**: When a message is added via a subscription's `View`, it's inserted into the shared index. Duplicates (same message added by different subscriptions) are inherently handled by the `Roaring64Bitmap` which acts as a set. -* **Message Retrieval**: When a `View` requests scheduled messages, the manager queries the shared index for messages ready for delivery. It then filters these results against that specific subscription's `markDeletePosition` to ensure it only returns messages that have not yet been acknowledged by that subscription. -* **Pruning**: The manager periodically prunes the index. It calculates the minimum `markDeletePosition` across all registered subscriptions. Any message with a position less than or equal to this minimum position is safe to remove from the shared index. -* **Lifecycle**: The manager is created by the factory for the first subscription on a topic. It is destroyed and its resources are released when the last subscription is closed, which is triggered by an `onEmptyCallback` from the factory. - -### `InMemoryTopicDelayedDeliveryTrackerView` (New Class) -This class implements the `DelayedDeliveryTracker` interface, making it compatible with the dispatcher. -* **Role**: It acts as a lightweight proxy, holding a reference to the shared `InMemoryTopicDelayedDeliveryTrackerManager` and its own `SubContext`. -* **Operations**: All `DelayedDeliveryTracker` method calls (e.g., `addMessage`, `getScheduledMessages`) are forwarded to the shared manager, passing along its `SubContext` so the manager can perform operations in the correct per-subscription context. -* **`updateMarkDeletePosition`**: A new method is added to the view, called by the dispatcher when the cursor moves. This provides an efficient, event-driven way to keep the manager's `SubContext` up-to-date with the latest `markDeletePosition`. - -### `InMemoryDelayedDeliveryTrackerFactory` (Modified) -The factory's logic is changed to manage the lifecycle of topic-level managers. -* **Cache**: It maintains a `ConcurrentMap topicManagers`. -* **`newTracker` method**: - 1. It uses `topicManagers.computeIfAbsent()` to atomically get or create an `InMemoryTopicDelayedDeliveryTrackerManager` for the given topic. - 2. The `onEmptyCallback` provided during creation ensures that when the manager becomes empty (last subscription closes), it is removed from the `topicManagers` cache. - 3. It then calls `manager.createOrGetView(dispatcher)` to get a subscription-specific view. - 4. It returns this view to the dispatcher. +This is the implementation of the topic-level manager. +* **Data Structure**: Uses a `ConcurrentSkipListMap>` as its core index, mapping delivery timestamps to message positions efficiently. +* **Subscription Context (`SubContext`)**: Holds per-subscription state, including the `markDeletePosition`. +* **Pruning Logic**: Pruning is throttled and tunable: + * The minimum time between pruning attempts is controlled by `delayedDeliveryPruneMinIntervalMillis`. If not set, it uses an adaptive interval based on the tick time. + * Opportunistic pruning is triggered during the delivery check if the ratio of subscriptions ready for delivery exceeds `delayedDeliveryPruneEligibleRatio`. +* **`createOrGetTracker`**: This method creates the per-subscription `InMemoryTopicDelayedDeliveryTracker` instance. + +### `InMemoryTopicDelayedDeliveryTracker` (New Class) +This class implements the `DelayedDeliveryTracker` interface for a single subscription. +* **Role**: Acts as a lightweight proxy, forwarding all operations (e.g., `addMessage`, `getScheduledMessages`) to the shared `InMemoryTopicDelayedDeliveryTrackerManager`. +* **`updateMarkDeletePosition`**: A method called by the dispatcher via `markDeletePositionMoveForward` to provide the manager with the most up-to-date acknowledgment position for its subscription. ### `PersistentDispatcherMultipleConsumers` (Modified) -A new method is introduced to enable event-driven updates. -* **`markDeletePositionMoveForward()`**: This new method is called by the cursor when its `markDeletePosition` advances. Inside this method, it checks if the active tracker is an `InMemoryTopicDelayedDeliveryTrackerView` and, if so, calls `view.updateMarkDeletePosition()`. This pushes the latest `markDeletePosition` to the tracker view immediately, allowing for more timely pruning decisions in the shared manager. +* **`markDeletePositionMoveForward()`**: This new method is called when the cursor's mark-delete position advances. It checks if the tracker is an `InMemoryTopicDelayedDeliveryTracker` and, if so, pushes the updated position to it. This enables more efficient, event-driven state updates for the shared manager. ## Public-facing Changes -This proposal involves a broker-internal optimization. There are **no changes** to any public-facing components. - ### Public API No changes. @@ -116,44 +104,63 @@ No changes. No changes. ### Configuration -No changes. +The following new configuration parameters are added to `broker.conf` and `standalone.conf`: + +* **`delayedDeliveryTrackerFactoryClassName`** + * The documentation is updated to specify the new class name to enable this feature: `org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory`. The old `org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory` remains the default. + +* **`delayedDeliveryPruneMinIntervalMillis`** + * **Description**: Minimum interval (in milliseconds) between prune attempts within the topic-level tracker. A positive value overrides the default adaptive interval. + * **Default**: `0` (use adaptive interval based on `delayedDeliveryTickTimeMillis`). + +* **`delayedDeliveryPruneEligibleRatio`** + * **Description**: A ratio [0.0, 1.0] of subscriptions that must be eligible for delivery to trigger an opportunistic prune. For example, 0.5 means pruning is attempted when at least half of the subscriptions are ready to receive messages. + * **Default**: `0.5`. + +* **`delayedDeliveryTopicManagerIdleMillis`** + * **Description**: Idle timeout (in milliseconds) for a topic-level manager. After the last subscription closes, the manager will be removed from the cache after this delay, unless a new subscription connects in the meantime. + * **Default**: `0` (remove immediately). ### CLI No changes. ### Metrics -No changes. Existing metrics related to delayed messages will continue to function, but their values at the subscription level will now be derived from the shared topic-level manager. +No changes. # Monitoring -While no new metrics are introduced, the effects of this change can be monitored through existing broker metrics: -* **JVM Heap Memory**: Monitor the broker's JVM heap usage (`jvm_memory_bytes_used{area="heap"}`). For brokers hosting topics with many subscriptions and delayed messages, a significant reduction in heap memory usage and a more stable memory footprint should be observed. -* **Garbage Collection**: Monitor JVM GC metrics (`jvm_gc_collection_seconds_count`, `jvm_gc_collection_seconds_sum`). A reduction in memory pressure should lead to fewer and shorter GC pauses, improving overall broker stability and performance. -* Users can compare memory usage before and after upgrading to a version with this change to quantify the improvement. +The effects of this change can be monitored through existing broker metrics: +* **JVM Heap Memory**: Monitor the broker's JVM heap usage (`jvm_memory_bytes_used{area="heap"}`). When the new factory is enabled for topics with many subscriptions and delayed messages, a significant reduction in heap memory usage should be observed. +* **Garbage Collection**: Monitor JVM GC metrics (`jvm_gc_collection_seconds_count`, `jvm_gc_collection_seconds_sum`). A reduction in memory pressure should lead to fewer and shorter GC pauses. # Security Considerations -This proposal refactors an internal component of the broker and does not introduce any new public APIs, endpoints, or protocol commands. The security model remains unchanged. - -The design ensures data isolation between subscriptions. When a subscription requests scheduled messages, the `InMemoryTopicDelayedDeliveryTrackerManager` filters the results based on that specific subscription's `markDeletePosition`. This prevents one subscription from accessing or being affected by the acknowledgment state of another. Therefore, multi-tenancy guarantees are preserved, and the change does not introduce any new security risks. +This proposal adds a new internal broker component that is not exposed via any public API. The design ensures data isolation between subscriptions. The `InMemoryTopicDelayedDeliveryTrackerManager` filters scheduled messages based on each subscription's individual `markDeletePosition`, preventing the acknowledgment state of one subscription from affecting another. Multi-tenancy and security guarantees are preserved. # Backward & Forward Compatibility ## Upgrade -The change is fully backward compatible. Upgrading a broker to a version containing this feature requires no special steps. Upon restart, the `InMemoryDelayedDeliveryTrackerFactory` will begin creating the new topic-level managers instead of the old per-subscription trackers. This is a drop-in replacement. +To use this feature, an operator must perform two steps after upgrading the broker binaries: +1. Enable the new tracker by setting the configuration parameter in `broker.conf`: + `delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory` +2. Restart the broker. + +If this configuration change is not made, the broker will continue to use the legacy per-subscription tracker, ensuring full backward compatibility. The new configuration options (`delayedDeliveryPrune...`, `delayedDeliveryTopicManager...`) only take effect when the new factory is active. ## Downgrade / Rollback -A downgrade is seamless. The change does not alter any on-disk data formats or persistent metadata. If a broker is rolled back to a previous version, it will simply revert to using the original `InMemoryDelayedDeliveryTracker` on a per-subscription basis. +To perform a downgrade, the configuration change must be reverted before starting the broker with the older version. +1. Change `delayedDeliveryTrackerFactoryClassName` back to the old factory or remove the line to use the default. +2. Deploy the older broker version and restart. -## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations -Delayed message delivery is a broker-local feature that does not directly interact with the geo-replication mechanism. Therefore, this change has no impact on geo-replication, and no special considerations are needed for upgrade or downgrade in a replicated environment. +Failure to revert the configuration will cause the older broker to fail on startup because it cannot find the `InMemoryTopicDelayedDeliveryTrackerFactory` class. No on-disk data formats are changed, so the process is safe. -# Alternatives +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations +Delayed message delivery is a broker-local feature. This change has no impact on geo-replication, and no special considerations are needed for upgrade or downgrade in a replicated environment. # General Notes -This implementation effectively replaces the legacy `InMemoryDelayedDeliveryTracker` as the default in-memory strategy. The factory no longer instantiates the old class. This provides a significant performance and scalability improvement for a common Pulsar use case. +This proposal introduces a new, highly memory-efficient delayed message tracking strategy. Operators must explicitly enable it via configuration to take advantage of its benefits. The legacy implementation remains the default to guarantee a seamless upgrade experience for existing users. # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/8x1l077mlq2gfc7lq4gwrql14to8gntj * Mailing List voting thread: \ No newline at end of file