From 3cf02f5179cd5495290a8beb39827099a30f19c0 Mon Sep 17 00:00:00 2001 From: zymap Date: Fri, 27 Feb 2026 10:59:35 +0800 Subject: [PATCH] [feat][broker] PIP-456: Add subscriptionPrefixToSkipServerMarkerCheck configuration to allow RawReader to read marker messages --- pip/pip-456.md | 170 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 pip/pip-456.md diff --git a/pip/pip-456.md b/pip/pip-456.md new file mode 100644 index 0000000000000..2217238c3c0af --- /dev/null +++ b/pip/pip-456.md @@ -0,0 +1,170 @@ +# PIP-448: Add subscriptionPrefixToSkipServerMarkerCheck configuration to allow RawReader to read marker messages + +# Background knowledge + +In Apache Pulsar, **marker messages** are internal messages used by the broker for various operations such as compaction, replication, and transactions. These markers are not meant to be visible to end users in normal consumer reads. + +The broker filters out marker messages before delivering them to consumers in most cases. The filtering logic is implemented in `AbstractBaseDispatcher.java`, which checks if a message is a marker type and skips delivering it to regular consumers. + +**RawReader** is a low-level reader API that allows reading raw messages directly from a topic without any client-side filtering or processing. It's useful for administrative tasks, debugging, and implementing custom message processing pipelines. + +# Motivation + +When using RawReader to read messages for specific use cases like: +- Compaction verification and monitoring +- Custom data analysis pipelines +- Debugging and troubleshooting + +Users want to read **all** messages from the topic, including marker messages, so they can handle marker types on the reader side. Currently, the broker filters out these marker messages before delivering them to any consumer, including RawReader. + +This feature is particularly useful for: +1. **Compaction monitoring**: Users want to verify compaction results by reading all marker messages +2. **Custom processing**: Users need access to all message types for their specific use cases +3. **Debugging**: Developers need to inspect marker messages for troubleshooting + +# Goals + +## In Scope + +- Add a new broker configuration `subscriptionPrefixToSkipServerMarkerCheck` +- Allow configuring a subscription name prefix that will skip the server-side marker message filtering +- Enable RawReader (or any reader with matching subscription prefix) to receive marker messages + +## Out of Scope + +- Changing the default behavior for existing subscriptions +- Adding marker message handling on the client side +- Supporting marker message reading for regular consumers + +# High Level Design + +The solution adds a new broker configuration that allows operators to specify a subscription name prefix. When a reader creates a subscription with a name that starts with this configured prefix, the broker will skip filtering out marker messages, allowing them to be delivered to the reader. + +``` +┌─────────────┐ ┌─────────────┐ ┌─────────────┐ +│ Client │────▶│ Broker │────▶│ Dispatcher │ +│ (RawReader)│ │ │ │ │ +└─────────────┘ └─────────────┘ └─────────────┘ + │ + ┌──────────────────────────┼──────────────────────────┐ + │ │ │ + ▼ ▼ ▼ + ┌───────────────────┐ ┌───────────────────┐ ┌───────────────────┐ + │ Subscription with │ │ Subscription │ │ Subscription │ + │ matching prefix │ │ without prefix │ │ without prefix │ + │ receives markers │ │ filters markers │ │ filters markers │ + └───────────────────┘ └───────────────────┘ └───────────────────┘ +``` + +# Detailed Design + +## Design & Implementation Details + +### Configuration + +Add a new configuration option in `ServiceConfiguration.java`: + +```java +@Field( + doc = "Configure a subscription prefix. The reader with the matching prefix will receive all the data, " + + "including marker messages. This is useful for use cases like compaction where the raw reader " + + "needs to read all data and handle markers on the reader side." +) +private Set subscriptionPrefixToSkipServerMarkerCheck = new HashSet<>(); +``` + +### Dispatcher Modification + +Modify `AbstractBaseDispatcher.java` to check the subscription prefix before filtering marker messages: + +```java +// In the method that checks for marker messages +for (String prefix : serviceConfig.getSubscriptionPrefixToSkipServerMarkerCheck()) { + if (name.startsWith(prefix)) { + return true; // Skip marker check, allow marker messages + } +} +``` + +### Usage Example + +1. Configure in `broker.conf`: +```properties +subscriptionPrefixToSkipServerMarkerCheck=__raw_reader_ +``` + +2. Create a RawReader with a subscription name starting with the prefix: +```java +var reader = RawReader.create(client, topicName, "__raw_reader_compaction_check").get(); +``` + +Now the reader will receive all messages including marker types, allowing the client to handle markers as needed. + +## Public-facing Changes + +### Configuration + +| Configuration | Type | Default | Description | +|---------------|------|---------|-------------| +| subscriptionPrefixToSkipServerMarkerCheck | Set | empty | A set of subscription prefixes. Subscriptions with names starting with these prefixes will receive marker messages. | + +### CLI + +No CLI changes required. + +### Metrics + +No metric changes required. + +# Monitoring + +No specific monitoring required for this feature. Users who need to monitor marker messages can do so by: +- Tracking the number of marker messages received by their RawReader +- Adding application-level metrics to count marker message types + +# Security Considerations + +- This feature only affects subscriptions with specific prefixes configured by the broker operator +- Regular consumers are not affected by this change +- The prefix should be chosen carefully to avoid conflicts with internal Pulsar subscriptions (e.g., avoid common prefixes like `__`) +- Consider documenting recommended prefix patterns in the configuration + +# Backward & Forward Compatibility + +## Upgrade + +This is a new configuration with an empty default value, so: +- Existing deployments will not be affected +- After upgrading, operators can configure the prefix as needed +- No migration steps required + +## Downgrade / Rollback + +- Remove the `subscriptionPrefixToSkipServerMarkerCheck` configuration from broker.conf +- The feature will be disabled +- No data migration needed + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +- This configuration is local to each broker +- In a geo-replicated setup, each cluster can have different configurations +- No cross-cluster coordination needed + +# Alternatives + +1. **Client-side marker handling**: Add client-side logic to filter/handle markers. This was rejected because it requires changes to the client API and doesn't solve the use case of reading markers from the broker. + +2. **Per-subscription configuration**: Allow configuring at the subscription level via admin API. This was considered but adds complexity to the admin API and requires subscription state management. + +3. **Topic-level configuration**: Configure at the topic level which readers can receive markers. This was considered but adds complexity and doesn't fit the current use case of using prefixes. + +# General Notes + +- The implementation is already available in PR #25171 +- This feature is particularly useful for compaction verification and monitoring use cases + +# Links + + +* Mailing List discussion thread: +* Mailing List voting thread: