Skip to content

[improve][broker] Add lastDelayedMessageTimestamp to delayed delivery tracker#25268

Open
Dream95 wants to merge 1 commit intoapache:masterfrom
Dream95:feat_last_delayed_message_monitor
Open

[improve][broker] Add lastDelayedMessageTimestamp to delayed delivery tracker#25268
Dream95 wants to merge 1 commit intoapache:masterfrom
Dream95:feat_last_delayed_message_monitor

Conversation

@Dream95
Copy link
Contributor

@Dream95 Dream95 commented Feb 26, 2026

Main Issue: #25267

Motivation

Without knowing when the last delayed message is scheduled to be delivered, we cannot easily:

  1. Monitor how far in the future delayed messages extend (e.g. "last message is 7 days from now")
  2. Alert when delayed messages are scheduled too far ahead (e.g. misconfigured producers)
  3. Debug delayed delivery issues (e.g. why consumers are not receiving messages yet)
  4. Plan capacity (e.g. understanding the time range of pending delayed messages)

Tracker layer only. Metric will be added in a follow-up commit.

Modifications

Interface

  • Added getLastDelayedMessageTimestamp() to DelayedDeliveryTracker
  • Updated the DISABLE implementation to return 0

InMemoryDelayedDeliveryTracker

  • Implemented via Long2ObjectSortedMap.lastLongKey() (O(1))
  • Recovers on restart as the map is rebuilt from cursor

BucketDelayedDeliveryTracker

  • Added lastDelayedMessageTimestamp (AtomicLong), updated in addMessage() with CAS (updateLastTimestamp()), reset in clear()
  • Recovery: uses pre-computed ImmutableBucket.lastScheduleTimestamp in the same loop as message count (single pass, no extra I/O)

ImmutableBucket

  • Added lastScheduleTimestamp (max schedule timestamp across all segments), set during recovery from snapshot metadata

Verifying this change

  • Make sure that the change passes the CI checks.
    Unit tests

  • AbstractDeliveryTrackerTest: test() cover the new behavior

  • BucketDelayedDeliveryTrackerTest: testRecoverSnapshot() and testClear() verify recovery and reset

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: (Dream95#6)

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 26, 2026
@Denovo1998
Copy link
Contributor

At first glance, this lastDelayedMessageTimestamp should be a property of the topic? A topic will have multiple dispatchers, and each dispatcher corresponds to a delayed delivery tracker.
Maybe it shouldn't be recorded repeatedly in the delayed delivery tracker.

@coderzc @lhotari WDYT

@Dream95
Copy link
Contributor Author

Dream95 commented Feb 26, 2026

Because we need a subscription-level metric: each subscription has its own lastDelayedMessageTimestamp.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants