diff --git a/pip/pip-423.md b/pip/pip-423.md new file mode 100644 index 0000000000000..461acf0e7433b --- /dev/null +++ b/pip/pip-423.md @@ -0,0 +1,291 @@ +# PIP-423: Add a new admin API to acknowledge/skip messages by message IDs + +# Background knowledge + +* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. +* **Batch Messages:** To improve throughput, Pulsar producers can batch multiple individual messages into a single entry that is written to BookKeeper. In this case, the `MessageId` also contains a `batchIndex` to identify a specific message within the batch. The entry's metadata stores the total number of messages in the batch. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). The cursor is responsible for ensuring that acknowledged messages are not redelivered. +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, or even a single message within a batch. When a message is acknowledged individually, the broker's `ManagedCursor` persistently tracks this "acknowledgement hole" to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism. +* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. Since the message is marked as consumed by the cursor, the `DelayedDeliveryTracker` will not dispatch it. +* **Existing `skip` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal provides a more precise way to skip messages by their specific `MessageId`. + +# Motivation + +Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance: + +* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively. +* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch. +* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue. + +The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by a list of specific `MessageId`s (including `ledgerId`, `entryId`, and optional `batchIndex`), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. + +# Goals + +## In Scope + +* Introduce a new Admin REST API endpoint and a corresponding `pulsar-admin` CLI command to support skipping a specific list of messages for a subscription. +* The target message(s) will be identified by their `ledgerId`, `entryId`, and an optional `batchIndex` for messages within a batch, or by their Base64-encoded `MessageId` byte representation. +* The implementation will leverage Pulsar's existing, robust cursor management (`deletePositionsAsync` / `AckType.Individual`) mechanism for persistence and reliability. +* This feature will only be supported for subscription types that allow individual acknowledgements (i.e., not cumulative ack modes). +* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription. +* Support for targeting specific partitions directly. + +## Out of Scope + +* Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated endpoint will be created for clarity. +* Automatic skipping of messages across all partitions of a partitioned topic with a single request. (The command targets specific partitions directly, e.g., `topic-partition-0`). +* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation that must be run on each cluster where the skip is needed. + +# High Level Design + +The proposed solution introduces a new admin API that triggers Pulsar's cursor deletion/acknowledgement capability on demand for specific messages. + +1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request specifies the topic (or partition), target subscription, and a list of message identifiers. These identifiers can be provided as a triplet of `ledgerId:entryId[:batchIndex]` or as Base64-encoded `MessageId` byte arrays. + +2. **Broker Receives Request:** The Pulsar broker receives the admin request on the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses the flexible JSON payload (polymorphic deserialization supports multiple input formats) and validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule. It also verifies that the operation is not invoked on the root of a partitioned topic (as it requires targeting the exact partition where the message resides). + +3. **Delegate to Subscription:** The broker invokes a new method `skipMessages(List entries)` on the target `PersistentSubscription` object. + +4. **Perform Deletion/Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: + * It verifies that the subscription's type supports individual acknowledgements (i.e., not cumulative). + * For messages specified without a `batchIndex` (or full entry), it constructs a `Position` object for the entire entry. + * For messages specified with a `batchIndex`, it asynchronously replays the entry from BookKeeper to get the batch metadata (e.g., batch size). It then validates the index and constructs a `Position` object that includes an "ack set" (a bitset) indicating which messages within the batch are being acknowledged. + * It calls its internal `deletePositionsAsync()` method for all the constructed `Position` objects. + +5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these deletions, which are persisted to metadata storage. + * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. + * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**. + +# Detailed Design + +## Design & Implementation Details + +The implementation introduces a new flexible request DTO, an internal model for skip requests, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`. + +1. **New Request DTO:** A new class `SkipMessageIdsRequest` is created in the admin layer. It features custom Jackson deserialization to handle polymorphic JSON input smoothly on the server. + +2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method that accepts a list of `SkipEntry` objects. +```java +public interface Subscription extends MessageExpirer { + // ... existing methods + CompletableFuture skipMessages(int numMessagesToSkip); + + CompletableFuture skipMessages(List entries); +} +``` + +3. **Internal Skip Model (`SkipEntry`):** The `SkipEntry` class serves as the internal data transfer object between the admin layer and the subscription logic. +```java +public final class SkipEntry { + private final long ledgerId; + private final long entryId; + // null or empty => full entry + private final List batchIndexes; + + public SkipEntry(long ledgerId, long entryId, List batchIndexes) { + // ... + } +} +``` + +4. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements. + +```java +@Override +public CompletableFuture skipMessages(List entries) { + if (Subscription.isCumulativeAckMode(getType())) { + return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type.")); + } + + // Separate full-entry acks from partial (batchIndex) acks + List fullEntryPositions = new ArrayList<>(); + Map> partialAckIndexByPos = new HashMap<>(); + + // ... logic to populate these collections from 'entries' + + // If there are partial acks, read the corresponding entries to get batch metadata + if (!partialAckIndexByPos.isEmpty()) { + Set positionsToLoad = ...; // positions for entries with batch acks + cursor.asyncReplayEntries(positionsToLoad, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List readEntries, Object ctx) { + // 1. Parse MessageMetadata to get batch size. + // 2. Validate batch indexes. + // 3. Create a BitSet representing the ack state. + // 4. Create a Position with the ack set using AckSetStateUtil.createPositionWithAckSet(). + + // Finally, delete all positions (full and partial) + deletePositionsAsync(positionsForAck); + } + }, null, true); + } else { + // Only full-entry acks are present + return deletePositionsAsync(fullEntryPositions); + } +} +``` + +5. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that validates the request, aggregates duplicate entry IDs locally into `AggregatedSkip` to prevent redundant Bookie reads, translates them into `SkipEntry`s, and delegates to the subscription. It actively blocks requests hitting a partitioned topic parent, demanding precision at the partition level. + +## Public-facing Changes + +### Public API + +A new REST endpoint is added for skipping specific messages. + +* **Path:** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` +* **Query Parameters:** + * `authoritative` (boolean, default: false) +* **HTTP Body Parameters (JSON):** + The body is a JSON object defining the type of payload and the list of IDs. The API accepts two primary formats via custom deserialization. + + **Format 1: Structured Message IDs (type: `messageId`)** + Allows skipping by `ledgerId`, `entryId`, and optionally `batchIndex`. + ```json + { + "type": "messageId", + "messageIds": [ + { "ledgerId": 12345, "entryId": 100 }, + { "ledgerId": 12346, "entryId": 200, "batchIndex": 5 } + ] + } + ``` + + **Format 2: Base64 Encoded Message IDs (type: `byteArray`)** + Allows skipping by standard Base64 string representations of `MessageId.toByteArray()`. If the JSON payload is just a raw array, it defaults to this behavior. + ```json + { + "type": "byteArray", + "messageIds": [ + "CLlgEAQwAA==", + "CLlgEAYwAA==" + ] + } + ``` + +* **Response Codes:** + * `204 No Content`: Operation successful. + * `307 Temporary Redirect`: Current broker doesn't serve the namespace of this topic. + * `400 Bad Request`: Bad Request: invalid messageIds format. + * `401 Unauthorized`: Don't have permission to administrate resources on this tenant. + * `403 Forbidden`: Don't have admin permission (missing `TopicOperation.SKIP`). + * `404 Not Found`: Namespace, topic, or subscription does not exist. + * `405 Method Not Allowed`: Skipping messages on a partitioned topic root is not allowed (must target partition), or unsupported subscription type (Cumulative). + * `500 Internal Server Error`: Internal server error. + * `503 Service Unavailable`: Failed to validate global cluster configuration. + +### Binary protocol + +No changes. + +### Configuration + +**No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.** + +**Administrators should be aware of these settings if they expect a high volume of message cancellations:** + +``` +# Max number of "acknowledgment holes" that are going to be persistently stored. +# When acknowledging out of order, a consumer will leave holes that are supposed +# to be quickly filled by acking all the messages. The information of which +# messages are acknowledged is persisted by compressing in "ranges" of messages +# that were acknowledged. After the max number of ranges is reached, the information +# will only be tracked in memory and messages will be redelivered in case of +# crashes. +managedLedgerMaxUnackedRangesToPersist=10000 + +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=true + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true + +# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher +# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into +# MetadataStore. +managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000 + +# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). +# If value is NONE, then save the ManagedCursorInfo bytes data directly without compression. +# Using compression reduces the size of persistent cursor (subscription) metadata. This enables using a higher +# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the overall metadata stored in +# the metadata store such as ZooKeeper. +managedCursorInfoCompressionType=NONE + +# ManagedCursorInfo compression size threshold (bytes), only compress metadata when origin size more then this value. +# 0 means compression will always apply. +managedCursorInfoCompressionThresholdInBytes=16384 + +# ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). +# If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly without compression. +# Using compression reduces the size of the persistent topic metadata. When a topic contains a large number of +# individual ledgers in BookKeeper or tiered storage, compression helps prevent the metadata size from exceeding +# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also reduces the overall metadata stored +# in the metadata store such as ZooKeeper. +managedLedgerInfoCompressionType=NONE + +# ManagedLedgerInfo compression size threshold (bytes), only compress metadata when origin size more then this value. +# 0 means compression will always apply. +managedLedgerInfoCompressionThresholdInBytes=16384 +``` + +### CLI + +A new command `skip-messages` is added to `pulsar-admin topics`. + +* **Command:** `skip-messages` +* **Description:** Skip some messages for the subscription. +* **Options:** + * `-s, --subscription` (required): Subscription to skip messages on. + * `--messageId-base64` (optional, repeatable): Base64-encoded `MessageId.toByteArray()`. + * `--messageId-triplet` (optional, repeatable): MessageId as `ledgerId:entryId[:batchIndex]`. + +**Note:** The CLI strictly requires the user to pick *exactly one* of the message ID formats (`--messageId-base64` OR `--messageId-triplet`). Mixing formats in a single CLI command will result in a parameter error. + +**Examples:** +```bash +# Skip using triplet format (ledger:entry:batch) +pulsar-admin topics skip-messages persistent://public/default/my-topic-partition-0 \ + -s my-sub \ + --messageId-triplet 123:45 \ + --messageId-triplet 124:46:2 + +# Skip using base64 format +pulsar-admin topics skip-messages persistent://public/default/my-topic-partition-0 \ + -s my-sub \ + --messageId-base64 "CLlgEAQwAA==" \ + --messageId-base64 "CLlgEAYwAA==" +``` + +# Backward & Forward Compatibility + +## Upgrade + +This feature is strictly additive. It introduces a new admin endpoint and CLI command. Upgrading brokers will enable this new functionality natively. Existing clients and operations remain completely unaffected. To use the new CLI command, the `pulsar-client-tools` must be upgraded. + +## Downgrade / Rollback + +If brokers are downgraded to a version without this feature, the new admin endpoint will no longer be available, and calls to it will yield `404 Not Found`. +There are no persistent state format changes. "Acknowledgement holes" created by this API use the identical underlying format (`BitSet` / `AckType.Individual`) as those created by regular consumer acknowledgements and will be understood perfectly by older broker versions upon rollback. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +This operation is local to a subscription's cursor within a single cluster. It has no direct impact on geo-replication. To skip a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster individually. + +# Alternatives + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w +* Mailing List voting thread: https://lists.apache.org/thread/7jbc3h42no9whjrpd6q0kmsyw985d7zo \ No newline at end of file