diff --git a/pip/pip-448.md b/pip/pip-448.md new file mode 100644 index 0000000000000..c7cf81d2c5ef4 --- /dev/null +++ b/pip/pip-448.md @@ -0,0 +1,166 @@ +# PIP-448: Topic-level Delayed Message Tracker for Memory Optimization + +# Background knowledge + +In Apache Pulsar, **Delayed Message Delivery** allows producers to specify a delay for a message, ensuring it is not delivered to any consumer until the specified time has passed. This is a useful feature for implementing tasks like scheduled reminders or retry mechanisms with backoff. + +The legacy default mechanism for handling delayed messages is the `InMemoryDelayedDeliveryTracker`. This tracker is instantiated on a *per-subscription* basis within the broker. When a topic has multiple subscriptions, each subscription gets its own independent `InMemoryDelayedDeliveryTracker` instance. + +The consequence of this per-subscription design is that if a delayed message is published to a topic with 'N' subscriptions, that message's metadata (its position) is stored 'N' times in the broker's memory. This leads to significant memory overhead, especially for topics with a large number of subscriptions, as the memory usage scales linearly with the number of subscriptions. + +# Motivation + +The primary motivation for this proposal is to address the high memory consumption caused by the legacy per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages can become prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of `O(num_delayed_messages * num_subscriptions)`. + +This excessive memory usage can cause: +* Increased memory pressure on Pulsar brokers. +* More frequent and longer Garbage Collection (GC) pauses, impacting broker performance. +* Potential OutOfMemoryErrors, leading to broker instability. +* Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions. + +By introducing an alternative, topic-level tracking mechanism, we can provide a memory-efficient solution to enhance broker stability and scalability for these critical use cases. + +# Goals + +## In Scope +* Introduce a new, optional, topic-level delayed message tracker that is shared across all subscriptions of a single topic. This will store each delayed message's position only once. +* Significantly reduce the memory footprint for delayed message handling when this new tracker is enabled, changing the memory complexity from `O(num_delayed_messages * num_subscriptions)` to `O(num_delayed_messages)`. +* Provide new configuration options to allow operators to tune the behavior of the new tracker, such as pruning intervals and cleanup delays. +* Maintain the existing `DelayedDeliveryTracker` interface to ensure seamless integration with the dispatcher logic. +* Preserve the existing per-subscription `InMemoryDelayedDeliveryTrackerFactory` as the default for backward compatibility, requiring operators to opt-in to use the new topic-level tracker. + +## Out of Scope +* This proposal does not modify the persistent, bucket-based delayed delivery tracker (`BucketDelayedDeliveryTracker`). +* No changes will be made to the public-facing client APIs, REST APIs, or the wire protocol. This is a broker-internal optimization. +* The semantic behavior of delayed messages from a user's perspective will remain identical. + +# High Level Design + +The core idea is to introduce a new, opt-in `DelayedDeliveryTrackerFactory` that implements a shared, topic-level tracking strategy. This is achieved with two new components: a `TopicDelayedDeliveryTrackerManager` and a subscription-scoped `InMemoryTopicDelayedDeliveryTracker`. + +1. **New Factory (`InMemoryTopicDelayedDeliveryTrackerFactory`)**: A new factory class is introduced. To enable the feature, operators must set `delayedDeliveryTrackerFactoryClassName` to `org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory`. This factory manages the lifecycle of topic-level managers. + +2. **Shared Topic-Level Manager (`InMemoryTopicDelayedDeliveryTrackerManager`)**: For each topic, a single instance of this manager is created by the new factory. This manager owns a global index of all delayed messages for that topic, storing each message's position only once. + +3. **Per-Subscription Tracker (`InMemoryTopicDelayedDeliveryTracker`)**: The dispatcher for each subscription receives an instance of this class. It implements the standard `DelayedDeliveryTracker` interface but acts as a lightweight proxy to the shared `TopicDelayedDeliveryTrackerManager`. It maintains per-subscription state (like the `markDeletePosition`) while delegating core operations to the shared manager. + +4. **Lifecycle and Caching**: The new factory maintains a cache of managers keyed by topic name. When a tracker is requested: + * It gets or creates a manager for the topic. + * The manager then creates a new `InMemoryTopicDelayedDeliveryTracker` for the subscription. + * When a subscription closes, it unregisters from the manager. When the last subscription unregisters, the manager's cleanup is scheduled based on the `delayedDeliveryTopicManagerIdleMillis` configuration, allowing it to be reused if a new subscription appears quickly. + +This architectural change can be described as follows: + +* **Default/Legacy Behavior:** For a single topic, each subscription (e.g., Sub1, Sub2) maintains its own complete `InMemoryDelayedDeliveryTracker`. Message metadata is duplicated in each tracker. + +* **New Topic-Level Behavior:** For a single topic, there is only one central `InMemoryTopicDelayedDeliveryTrackerManager` holding a shared index. Each subscription (Sub1, Sub2) receives a lightweight `InMemoryTopicDelayedDeliveryTracker` that acts as a view/proxy, pointing to the single, shared manager, thus eliminating data duplication. + +The manager handles pruning of acknowledged messages from the shared index by tracking the `markDeletePosition` of all active subscriptions and only removing messages that have been acknowledged by *all* of them. + +# Detailed Design + +## Design & Implementation Details + +### `InMemoryTopicDelayedDeliveryTrackerFactory` (New Class) +This is the new factory that must be explicitly configured to enable the topic-level tracking feature. +* **Role**: Manages the lifecycle of `InMemoryTopicDelayedDeliveryTrackerManager` instances. +* **Cache**: It maintains a `ConcurrentMap` to cache managers per topic. +* **Lifecycle Management**: When the last subscription for a topic is closed, the factory uses the `delayedDeliveryTopicManagerIdleMillis` setting to determine when to clean up the manager. A value of `0` removes it immediately, while a positive value schedules a delayed removal, preventing churn if subscriptions are volatile. +* **Configuration**: It reads and passes the new tuning parameters (`pruneMinIntervalMillis`, `pruneEligibleRatio`) to the manager instances it creates. + +### `TopicDelayedDeliveryTrackerManager` (New Interface) +This interface defines the contract for a topic-level manager. +```java +public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable { + DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher); + void unregister(AbstractPersistentDispatcherMultipleConsumers dispatcher); + // ... other methods +} +``` + +### `InMemoryTopicDelayedDeliveryTrackerManager` (New Class) +This is the implementation of the topic-level manager. +* **Data Structure**: Uses a `ConcurrentSkipListMap>` as its core index, mapping delivery timestamps to message positions efficiently. +* **Subscription Context (`SubContext`)**: Holds per-subscription state, including the `markDeletePosition`. +* **Pruning Logic**: Pruning is throttled and tunable: + * The minimum time between pruning attempts is controlled by `delayedDeliveryPruneMinIntervalMillis`. If not set, it uses an adaptive interval based on the tick time. + * Opportunistic pruning is triggered during the delivery check if the ratio of subscriptions ready for delivery exceeds `delayedDeliveryPruneEligibleRatio`. +* **`createOrGetTracker`**: This method creates the per-subscription `InMemoryTopicDelayedDeliveryTracker` instance. + +### `InMemoryTopicDelayedDeliveryTracker` (New Class) +This class implements the `DelayedDeliveryTracker` interface for a single subscription. +* **Role**: Acts as a lightweight proxy, forwarding all operations (e.g., `addMessage`, `getScheduledMessages`) to the shared `InMemoryTopicDelayedDeliveryTrackerManager`. +* **`updateMarkDeletePosition`**: A method called by the dispatcher via `markDeletePositionMoveForward` to provide the manager with the most up-to-date acknowledgment position for its subscription. + +### `PersistentDispatcherMultipleConsumers` (Modified) +* **`markDeletePositionMoveForward()`**: This new method is called when the cursor's mark-delete position advances. It checks if the tracker is an `InMemoryTopicDelayedDeliveryTracker` and, if so, pushes the updated position to it. This enables more efficient, event-driven state updates for the shared manager. + +## Public-facing Changes + +### Public API +No changes. + +### Binary protocol +No changes. + +### Configuration +The following new configuration parameters are added to `broker.conf` and `standalone.conf`: + +* **`delayedDeliveryTrackerFactoryClassName`** + * The documentation is updated to specify the new class name to enable this feature: `org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory`. The old `org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory` remains the default. + +* **`delayedDeliveryPruneMinIntervalMillis`** + * **Description**: Minimum interval (in milliseconds) between prune attempts within the topic-level tracker. A positive value overrides the default adaptive interval. + * **Default**: `0` (use adaptive interval based on `delayedDeliveryTickTimeMillis`). + +* **`delayedDeliveryPruneEligibleRatio`** + * **Description**: A ratio [0.0, 1.0] of subscriptions that must be eligible for delivery to trigger an opportunistic prune. For example, 0.5 means pruning is attempted when at least half of the subscriptions are ready to receive messages. + * **Default**: `0.5`. + +* **`delayedDeliveryTopicManagerIdleMillis`** + * **Description**: Idle timeout (in milliseconds) for a topic-level manager. After the last subscription closes, the manager will be removed from the cache after this delay, unless a new subscription connects in the meantime. + * **Default**: `0` (remove immediately). + +### CLI +No changes. + +### Metrics +No changes. + +# Monitoring + +The effects of this change can be monitored through existing broker metrics: +* **JVM Heap Memory**: Monitor the broker's JVM heap usage (`jvm_memory_bytes_used{area="heap"}`). When the new factory is enabled for topics with many subscriptions and delayed messages, a significant reduction in heap memory usage should be observed. +* **Garbage Collection**: Monitor JVM GC metrics (`jvm_gc_collection_seconds_count`, `jvm_gc_collection_seconds_sum`). A reduction in memory pressure should lead to fewer and shorter GC pauses. + +# Security Considerations + +This proposal adds a new internal broker component that is not exposed via any public API. The design ensures data isolation between subscriptions. The `InMemoryTopicDelayedDeliveryTrackerManager` filters scheduled messages based on each subscription's individual `markDeletePosition`, preventing the acknowledgment state of one subscription from affecting another. Multi-tenancy and security guarantees are preserved. + +# Backward & Forward Compatibility + +## Upgrade +To use this feature, an operator must perform two steps after upgrading the broker binaries: +1. Enable the new tracker by setting the configuration parameter in `broker.conf`: + `delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryTopicDelayedDeliveryTrackerFactory` +2. Restart the broker. + +If this configuration change is not made, the broker will continue to use the legacy per-subscription tracker, ensuring full backward compatibility. The new configuration options (`delayedDeliveryPrune...`, `delayedDeliveryTopicManager...`) only take effect when the new factory is active. + +## Downgrade / Rollback +To perform a downgrade, the configuration change must be reverted before starting the broker with the older version. +1. Change `delayedDeliveryTrackerFactoryClassName` back to the old factory or remove the line to use the default. +2. Deploy the older broker version and restart. + +Failure to revert the configuration will cause the older broker to fail on startup because it cannot find the `InMemoryTopicDelayedDeliveryTrackerFactory` class. No on-disk data formats are changed, so the process is safe. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations +Delayed message delivery is a broker-local feature. This change has no impact on geo-replication, and no special considerations are needed for upgrade or downgrade in a replicated environment. + +# General Notes +This proposal introduces a new, highly memory-efficient delayed message tracking strategy. Operators must explicitly enable it via configuration to take advantage of its benefits. The legacy implementation remains the default to guarantee a seamless upgrade experience for existing users. + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/8x1l077mlq2gfc7lq4gwrql14to8gntj +* Mailing List voting thread: \ No newline at end of file