[feat][broker] Implement topic-level delayed delivery tracking with in-memory manager#16
[feat][broker] Implement topic-level delayed delivery tracking with in-memory manager#16Denovo1998 wants to merge 16 commits intomasterfrom
Conversation
…fixed-delay detection and memory optimization
…h enhanced memory management and concurrency controls
…InMemoryTopicDelayedDeliveryTrackerManager
… for improved performance and memory efficiency
…eliveryTrackerManager for event-driven updates and improved performance
…emoving deprecated constructor and methods for improved clarity
…ryDelayedDeliveryTrackerFactory for improved testability
…ryTopicDelayedDeliveryTrackerManager for improved accuracy and performance
…s in InMemoryTopicDelayedDeliveryTrackerManager for improved clarity and maintainability
… thread-safe updates and improve concurrency handling in InMemoryTopicDelayedDeliveryTrackerManager
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a topic-level delayed delivery tracker manager for in-memory delayed message tracking, significantly optimizing memory usage in multi-subscription scenarios. Instead of maintaining separate delayed message indexes for each subscription, a single global index is shared across all subscriptions of a topic, with per-subscription views filtering based on mark-delete positions.
Key changes:
- Introduces
TopicDelayedDeliveryTrackerManagerinterface andInMemoryTopicDelayedDeliveryTrackerManagerimplementation with a shared global delayed message index per topic - Adds
InMemoryTopicDelayedDeliveryTrackerViewas subscription-specific views that implement theDelayedDeliveryTrackerinterface - Updates
InMemoryDelayedDeliveryTrackerFactoryto cache and reuse topic-level managers across subscriptions - Implements event-driven mark-delete position updates via
markDeletePositionMoveForward()callback in dispatcher implementations - Adds comprehensive unit tests for the new topic-level tracking mechanism
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
TopicDelayedDeliveryTrackerManager.java |
New interface defining the contract for topic-level delayed delivery tracking |
InMemoryTopicDelayedDeliveryTrackerManager.java |
Core implementation managing shared delayed message index with subscription contexts and per-bucket locking |
InMemoryTopicDelayedDeliveryTrackerView.java |
Per-subscription view wrapper delegating to the topic-level manager |
InMemoryDelayedDeliveryTrackerFactory.java |
Updated to instantiate and cache topic-level managers instead of per-subscription trackers |
PersistentDispatcherMultipleConsumers.java |
Implements mark-delete position callback for in-memory topic tracker views |
PersistentDispatcherMultipleConsumersClassic.java |
Implements mark-delete position callback for in-memory topic tracker views |
DelayedDeliveryTrackerFactoryTest.java |
Updates test assertions to reference the new view class name |
InMemoryTopicDeliveryTrackerTest.java |
Comprehensive test suite covering concurrent operations, pruning, timer scheduling, and multi-subscription scenarios |
InMemoryDelayedDeliveryTrackerFactoryUnitTest.java |
Tests factory-level behavior including manager caching and lifecycle |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/delayed/InMemoryTopicDelayedDeliveryTrackerManager.java
Outdated
Show resolved
Hide resolved
...-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java
Outdated
Show resolved
Hide resolved
...-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryTopicDeliveryTrackerTest.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
| // Return an estimate of visible delayed messages for this subscription | ||
| // For now, return the total count - could be enhanced to count only visible messages | ||
| return manager.topicDelayedMessages(); |
There was a problem hiding this comment.
The comment indicates a known limitation where the returned count includes messages that may not be visible to this specific subscription (filtered by mark-delete position). This discrepancy between the method's semantic meaning and its actual behavior should be clearly documented in the method's JavaDoc to avoid confusion for API consumers.
| timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { |
There was a problem hiding this comment.
Similar to the view class, this method's name suggests it returns subscription-specific visible messages, but it returns the global count. The comment acknowledges this limitation, but the method name getNumberOfVisibleDelayedMessagesForSub is misleading. Consider either renaming to clarify it returns an approximation or implementing accurate per-subscription counting.
| private long getNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { | |
| private long getApproximateNumberOfVisibleDelayedMessagesForSub(SubContext subContext) { |
…or topic-level delayed message tracking and update related configurations
…layed delivery tracker and rename view class for clarity
…tests in InMemoryTopicDelayedDeliveryTrackerTest
…rackerFactory and related configurations to clarify memory usage and implementation details
…d tracker across dispatcher implementations
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Clear timer state | ||
| timerLock.lock(); | ||
| try { | ||
| currentTimeoutTarget = -1; | ||
| this.timeout = null; |
There was a problem hiding this comment.
Missing reschedule leaves later delayed messages undelivered
After the timer fires, run() clears currentTimeoutTarget and timeout but never schedules a new timeout for the next delayed bucket. If a topic has multiple delayed timestamps queued (e.g. messages at T1 and T2>T1) and no new messages are added after T1, the first tick triggers delivery but the remaining messages never become visible because the manager will not call readMoreEntriesAsync again when time reaches T2. The previous per-subscription tracker re‑armed its timer in getScheduledMessages; this implementation does not, so delayed messages can be stuck indefinitely until another message is added or tick time changes.
Useful? React with 👍 / 👎.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #16 +/- ##
============================================
+ Coverage 74.28% 74.31% +0.03%
- Complexity 2492 33588 +31096
============================================
Files 1913 1916 +3
Lines 149280 149738 +458
Branches 17324 17401 +77
============================================
+ Hits 110887 111277 +390
- Misses 29565 29599 +34
- Partials 8828 8862 +34
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Fixes #xyz
Main Issue: apache#24600
PIP: apache#24928
Motivation
The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of O(num_delayed_messages * num_subscriptions).
This excessive memory usage can cause:
Increased memory pressure on Pulsar brokers.
More frequent and longer Garbage Collection (GC) pauses, impacting broker performance.
Potential OutOfMemoryErrors, leading to broker instability.
Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions.
By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: apache#24927