Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions pip/pip-423.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# PIP-423: Add a new admin API to acknowledge/skip messages by message IDs

# Background knowledge

* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends.
* **Batch Messages:** To improve throughput, Pulsar producers can batch multiple individual messages into a single entry that is written to BookKeeper. In this case, the `MessageId` also contains a `batchIndex` to identify a specific message within the batch. The entry's metadata stores the total number of messages in the batch.
* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). The cursor is responsible for ensuring that acknowledged messages are not redelivered.
* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, or even a single message within a batch. When a message is acknowledged individually, the broker's `ManagedCursor` persistently tracks this "acknowledgement hole" to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism.
* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. Since the message is marked as consumed by the cursor, the `DelayedDeliveryTracker` will not dispatch it.
* **Existing `skip` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal provides a more precise way to skip messages by their specific `MessageId`.

# Motivation

Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance:

* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively.
* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch.
* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue.

The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by a list of specific `MessageId`s (including `ledgerId`, `entryId`, and optional `batchIndex`), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog.

# Goals

## In Scope

* Introduce a new Admin REST API endpoint and a corresponding `pulsar-admin` CLI command to support skipping a specific list of messages for a subscription.
* The target message(s) will be identified by their `ledgerId`, `entryId`, and an optional `batchIndex` for messages within a batch, or by their Base64-encoded `MessageId` byte representation.
* The implementation will leverage Pulsar's existing, robust cursor management (`deletePositionsAsync` / `AckType.Individual`) mechanism for persistence and reliability.
* This feature will only be supported for subscription types that allow individual acknowledgements (i.e., not cumulative ack modes).
* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription.
* Support for targeting specific partitions directly.

## Out of Scope

* Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated endpoint will be created for clarity.
* Automatic skipping of messages across all partitions of a partitioned topic with a single request. (The command targets specific partitions directly, e.g., `topic-partition-0`).
* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation that must be run on each cluster where the skip is needed.

# High Level Design

The proposed solution introduces a new admin API that triggers Pulsar's cursor deletion/acknowledgement capability on demand for specific messages.

1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request specifies the topic (or partition), target subscription, and a list of message identifiers. These identifiers can be provided as a triplet of `ledgerId:entryId[:batchIndex]` or as Base64-encoded `MessageId` byte arrays.

2. **Broker Receives Request:** The Pulsar broker receives the admin request on the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses the flexible JSON payload (polymorphic deserialization supports multiple input formats) and validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule. It also verifies that the operation is not invoked on the root of a partitioned topic (as it requires targeting the exact partition where the message resides).

3. **Delegate to Subscription:** The broker invokes a new method `skipMessages(List<SkipEntry> entries)` on the target `PersistentSubscription` object.

4. **Perform Deletion/Acknowledgement:** Inside the `PersistentSubscription`, the following occurs:
* It verifies that the subscription's type supports individual acknowledgements (i.e., not cumulative).
* For messages specified without a `batchIndex` (or full entry), it constructs a `Position` object for the entire entry.
* For messages specified with a `batchIndex`, it asynchronously replays the entry from BookKeeper to get the batch metadata (e.g., batch size). It then validates the index and constructs a `Position` object that includes an "ack set" (a bitset) indicating which messages within the batch are being acknowledged.
* It calls its internal `deletePositionsAsync()` method for all the constructed `Position` objects.

5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these deletions, which are persisted to metadata storage.
* For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered.
* For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**.

# Detailed Design

## Design & Implementation Details

The implementation introduces a new flexible request DTO, an internal model for skip requests, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`.

1. **New Request DTO:** A new class `SkipMessageIdsRequest` is created in the admin layer. It features custom Jackson deserialization to handle polymorphic JSON input smoothly on the server.

2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method that accepts a list of `SkipEntry` objects.
```java
public interface Subscription extends MessageExpirer {
// ... existing methods
CompletableFuture<Void> skipMessages(int numMessagesToSkip);

CompletableFuture<Void> skipMessages(List<SkipEntry> entries);
}
```

3. **Internal Skip Model (`SkipEntry`):** The `SkipEntry` class serves as the internal data transfer object between the admin layer and the subscription logic.
```java
public final class SkipEntry {
private final long ledgerId;
private final long entryId;
// null or empty => full entry
private final List<Integer> batchIndexes;

public SkipEntry(long ledgerId, long entryId, List<Integer> batchIndexes) {
// ...
}
}
```

4. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements.

```java
@Override
public CompletableFuture<Void> skipMessages(List<SkipEntry> entries) {
if (Subscription.isCumulativeAckMode(getType())) {
return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type."));
}

// Separate full-entry acks from partial (batchIndex) acks
List<Position> fullEntryPositions = new ArrayList<>();
Map<String, List<Integer>> partialAckIndexByPos = new HashMap<>();

// ... logic to populate these collections from 'entries'

// If there are partial acks, read the corresponding entries to get batch metadata
if (!partialAckIndexByPos.isEmpty()) {
Set<Position> positionsToLoad = ...; // positions for entries with batch acks
cursor.asyncReplayEntries(positionsToLoad, new AsyncCallbacks.ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> readEntries, Object ctx) {
// 1. Parse MessageMetadata to get batch size.
// 2. Validate batch indexes.
// 3. Create a BitSet representing the ack state.
// 4. Create a Position with the ack set using AckSetStateUtil.createPositionWithAckSet().

// Finally, delete all positions (full and partial)
deletePositionsAsync(positionsForAck);
}
}, null, true);
} else {
// Only full-entry acks are present
return deletePositionsAsync(fullEntryPositions);
}
}
```

5. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that validates the request, aggregates duplicate entry IDs locally into `AggregatedSkip` to prevent redundant Bookie reads, translates them into `SkipEntry`s, and delegates to the subscription. It actively blocks requests hitting a partitioned topic parent, demanding precision at the partition level.

## Public-facing Changes

### Public API

A new REST endpoint is added for skipping specific messages.

* **Path:** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds`
* **Query Parameters:**
* `authoritative` (boolean, default: false)
* **HTTP Body Parameters (JSON):**
The body is a JSON object defining the type of payload and the list of IDs. The API accepts two primary formats via custom deserialization.

**Format 1: Structured Message IDs (type: `messageId`)**
Allows skipping by `ledgerId`, `entryId`, and optionally `batchIndex`.
```json
{
"type": "messageId",
"messageIds": [
{ "ledgerId": 12345, "entryId": 100 },
{ "ledgerId": 12346, "entryId": 200, "batchIndex": 5 }
]
}
```

**Format 2: Base64 Encoded Message IDs (type: `byteArray`)**
Allows skipping by standard Base64 string representations of `MessageId.toByteArray()`. If the JSON payload is just a raw array, it defaults to this behavior.
```json
{
"type": "byteArray",
"messageIds": [
"CLlgEAQwAA==",
"CLlgEAYwAA=="
]
}
```

* **Response Codes:**
* `204 No Content`: Operation successful.
* `307 Temporary Redirect`: Current broker doesn't serve the namespace of this topic.
* `400 Bad Request`: Bad Request: invalid messageIds format.
* `401 Unauthorized`: Don't have permission to administrate resources on this tenant.
* `403 Forbidden`: Don't have admin permission (missing `TopicOperation.SKIP`).
* `404 Not Found`: Namespace, topic, or subscription does not exist.
* `405 Method Not Allowed`: Skipping messages on a partitioned topic root is not allowed (must target partition), or unsupported subscription type (Cumulative).
* `500 Internal Server Error`: Internal server error.
* `503 Service Unavailable`: Failed to validate global cluster configuration.

### Binary protocol

No changes.

### Configuration

**No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.**

**Administrators should be aware of these settings if they expect a high volume of message cancellations:**

```
# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Maximum number of partially acknowledged batch messages per subscription that will have their batch
# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true.
# When this limit is exceeded, remaining batch message containing the batch deleted indexes will
# only be tracked in memory. In case of broker restarts or load balancing events, the batch
# deleted indexes will be cleared while redelivering the messages to consumers.
managedLedgerMaxBatchDeletedIndexToPersist=10000

# When storing acknowledgement state, choose a more compact serialization format that stores
# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires
# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective.
managedLedgerPersistIndividualAckAsLongArray=true

# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position"
# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged
# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages.
managedLedgerUnackedRangesOpenCacheSetEnabled=true

# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# MetadataStore.
managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000

# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is NONE, then save the ManagedCursorInfo bytes data directly without compression.
# Using compression reduces the size of persistent cursor (subscription) metadata. This enables using a higher
# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the overall metadata stored in
# the metadata store such as ZooKeeper.
managedCursorInfoCompressionType=NONE

# ManagedCursorInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedCursorInfoCompressionThresholdInBytes=16384

# ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY).
# If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly without compression.
# Using compression reduces the size of the persistent topic metadata. When a topic contains a large number of
# individual ledgers in BookKeeper or tiered storage, compression helps prevent the metadata size from exceeding
# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also reduces the overall metadata stored
# in the metadata store such as ZooKeeper.
managedLedgerInfoCompressionType=NONE

# ManagedLedgerInfo compression size threshold (bytes), only compress metadata when origin size more then this value.
# 0 means compression will always apply.
managedLedgerInfoCompressionThresholdInBytes=16384
```

### CLI

A new command `skip-messages` is added to `pulsar-admin topics`.

* **Command:** `skip-messages`
* **Description:** Skip some messages for the subscription.
* **Options:**
* `-s, --subscription` (required): Subscription to skip messages on.
* `--messageId-base64` (optional, repeatable): Base64-encoded `MessageId.toByteArray()`.
* `--messageId-triplet` (optional, repeatable): MessageId as `ledgerId:entryId[:batchIndex]`.

**Note:** The CLI strictly requires the user to pick *exactly one* of the message ID formats (`--messageId-base64` OR `--messageId-triplet`). Mixing formats in a single CLI command will result in a parameter error.

**Examples:**
```bash
# Skip using triplet format (ledger:entry:batch)
pulsar-admin topics skip-messages persistent://public/default/my-topic-partition-0 \
-s my-sub \
--messageId-triplet 123:45 \
--messageId-triplet 124:46:2

# Skip using base64 format
pulsar-admin topics skip-messages persistent://public/default/my-topic-partition-0 \
-s my-sub \
--messageId-base64 "CLlgEAQwAA==" \
--messageId-base64 "CLlgEAYwAA=="
```

# Backward & Forward Compatibility

## Upgrade

This feature is strictly additive. It introduces a new admin endpoint and CLI command. Upgrading brokers will enable this new functionality natively. Existing clients and operations remain completely unaffected. To use the new CLI command, the `pulsar-client-tools` must be upgraded.

## Downgrade / Rollback

If brokers are downgraded to a version without this feature, the new admin endpoint will no longer be available, and calls to it will yield `404 Not Found`.
There are no persistent state format changes. "Acknowledgement holes" created by this API use the identical underlying format (`BitSet` / `AckType.Individual`) as those created by regular consumer acknowledgements and will be understood perfectly by older broker versions upon rollback.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

This operation is local to a subscription's cursor within a single cluster. It has no direct impact on geo-replication. To skip a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster individually.

# Alternatives

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w
* Mailing List voting thread: https://lists.apache.org/thread/7jbc3h42no9whjrpd6q0kmsyw985d7zo