Skip to content

[improve][pip] PIP-398: Subscription replication on the broker, namespace and topic levels#23770

Open
nodece wants to merge 7 commits intoapache:masterfrom
nodece:subscription-replication-on-namespace-topic
Open

[improve][pip] PIP-398: Subscription replication on the broker, namespace and topic levels#23770
nodece wants to merge 7 commits intoapache:masterfrom
nodece:subscription-replication-on-namespace-topic

Conversation

@nodece
Copy link
Member

@nodece nodece commented Dec 23, 2024

Motivation

Enhance the subscription replication.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added PIP doc-not-needed Your PR changes do not impact docs labels Dec 23, 2024
@lhotari
Copy link
Member

lhotari commented Dec 23, 2024

@nodece would it be possible to cover #23769 feature too? That enables subscription replication for all subscriptions at the broker level if I understand it correctly. Could you work together with @yyj8 to cover that?

@nodece
Copy link
Member Author

nodece commented Dec 23, 2024

@nodece would it be possible to cover #23769 feature too? That enables subscription replication for all subscriptions at the broker level if I understand it correctly. Could you work together with @yyj8 to cover that?

#23769 aims to overwrite the subscription replication state from the consumer, which is different from this PIP. Actually, I also need an overwrite feature, and then the users use the admin API to manage the subscription replication.

The downstream users sometimes enable/disable the subscription replication on the consumer, this lead to the namespace or topic level is ignored.

We can also merge #23769 to this PIP. What's your opinion? @lhotari @yyj8

@lhotari
Copy link
Member

lhotari commented Dec 23, 2024

@nodece would it be possible to cover #23769 feature too? That enables subscription replication for all subscriptions at the broker level if I understand it correctly. Could you work together with @yyj8 to cover that?

#23769 aims to overwrite the subscription replication state from the consumer, which is different from this PIP. Actually, I also need an overwrite feature, and then the users use the admin API to manage the subscription replication.

The downstream users sometimes enable/disable the subscription replication on the consumer, this lead to the namespace or topic level is ignored.

We can also merge #23769 to this PIP. What's your opinion? @lhotari @yyj8

@nodece @yyj8 Please work together to combine both cases since they are both valuable and useful. I think it would make
sense to combine the work in a single PIP to ensure that we achieve consistency across the changes.

@yyj8
Copy link
Contributor

yyj8 commented Dec 23, 2024

@nodece would it be possible to cover #23769 feature too? That enables subscription replication for all subscriptions at the broker level if I understand it correctly. Could you work together with @yyj8 to cover that?

#23769 aims to overwrite the subscription replication state from the consumer, which is different from this PIP. Actually, I also need an overwrite feature, and then the users use the admin API to manage the subscription replication.
The downstream users sometimes enable/disable the subscription replication on the consumer, this lead to the namespace or topic level is ignored.
We can also merge #23769 to this PIP. What's your opinion? @lhotari @yyj8

@nodece @yyj8 Please work together to combine both cases since they are both valuable and useful. I think it would make sense to combine the work in a single PIP to ensure that we achieve consistency across the changes.

@nodece would it be possible to cover #23769 feature too? That enables subscription replication for all subscriptions at the broker level if I understand it correctly. Could you work together with @yyj8 to cover that?

#23769 aims to overwrite the subscription replication state from the consumer, which is different from this PIP. Actually, I also need an overwrite feature, and then the users use the admin API to manage the subscription replication.
The downstream users sometimes enable/disable the subscription replication on the consumer, this lead to the namespace or topic level is ignored.
We can also merge #23769 to this PIP. What's your opinion? @lhotari @yyj8

@nodece @yyj8 Please work together to combine both cases since they are both valuable and useful. I think it would make sense to combine the work in a single PIP to ensure that we achieve consistency across the changes.

This is a great idea and suggestion. Can we consider a strategy that divides into three dimensions: cluster dimension, namespace dimension, and topic dimension.
(1)PR #23769 As a cluster dimension strategy, merge it into this PIP #23770;
(2)PR #23769 add the function of dynamically configuring parameter replicateAllSubscriptionState values through the command bin/pulse admin brokers update-dynamic-config --config replicateAllSubscriptionState --value true/false

@nodece @lhotari Can you see if this method is feasible?

@nodece
Copy link
Member Author

nodece commented Dec 23, 2024

@yyj8

cluster dimension, namespace dimension, and topic dimension.

Once the consumer level is configured, it is the highest priority, the cluster, namespace, and topic levels will be ignored, please see #23769 (comment) for details.

(2)PR #23769 add the function of dynamically configuring parameter replicateAllSubscriptionState values through the command bin/pulse admin brokers update-dynamic-config --config replicateAllSubscriptionState --value true/false

This is feasible.

@yyj8
Copy link
Contributor

yyj8 commented Dec 24, 2024

@yyj8

cluster dimension, namespace dimension, and topic dimension.

Once the consumer level is configured, it is the highest priority, the cluster, namespace, and topic levels will be ignored, please see #23769 (comment) for details.

(2)PR #23769 add the function of dynamically configuring parameter replicateAllSubscriptionState values through the command bin/pulse admin brokers update-dynamic-config --config replicateAllSubscriptionState --value true/false

This is feasible.

The default configuration for the client is replicateSubscriptionState=false. In the scenario of cluster migration, we hope that there is no need for business code modification, and the server will control the replication of the state of all subscriptions in the cluster uniformly. If the configuration priority of the client is the highest, and the client does not display the configuration replicateSubscriptionState=true, this goes against our original intention of not requiring business code modification.

My suggestion is to configure the cluster dimension. If repliceAllSubscriptState=true is enabled, it will be a mandatory configuration to overwrite the client. If repliceAllSubscriptState=false, then use the client's configuration.

@nodece
Copy link
Member Author

nodece commented Dec 24, 2024

@yyj8

The default configuration for the client is replicateSubscriptionState=false.

The latest client defaults to null, please see #23757.

In the scenario of cluster migration, we hope that there is no need for business code modification, and the server will control the replication of the state of all subscriptions in the cluster uniformly. If the configuration priority of the client is the highest, and the client does not display the configuration replicateSubscriptionState=true, this goes against our original intention of not requiring business code modification.
My suggestion is to configure the cluster dimension. If repliceAllSubscriptState=true is enabled, it will be a mandatory configuration to overwrite the client. If repliceAllSubscriptState=false, then use the client's configuration.

I understand your idea, this can make all subscriptions replicated when repliceAllSubscriptState=true, when true, this equals the consumer with replicateSubscriptionState=true, and they have the highest.

However, this idea conflicts with PIP-398, which assumes that the consumer-level configuration for replicateSubscriptionState is not set, and instead, replication behavior is driven by namespace and topic policies. If the replicateSubscriptionState is set, it means that the specified subscription will be replicated, namespace and topic levels cannot change this behavior. If changed, it will result in a breaking change. This is worth considering.

In our case, I don't want to configure the replicateSubscriptionState on the consumer, but the user has been configured, I cannot change the user code. We have the same case, so I also want to ignore the replicateSubscriptionState.

My idea

To combine your case and my case, I suggest introducing a broker configuration for overwriting the consume configuration, and assuming the namespace and topic level are set, the final result will be so like this:

  • overwirteConsumerReplicateSubscriptionState=true: The subscription will be replicated.
  • overwirteConsumerReplicateSubscriptionState=false: The subscription will be replicated.
  • overwirteConsumerReplicateSubscriptionState=null: This means the consumer level is null, and then the broker depends on the namespace and topic level to check the subscription replication.

We can also introduce replicateSubscriptionsState=true/false/empty configuration on the broker level, when the namespace/topic/consumer levels are not set, we use the broker level configuration to check if replicates the subscription.

@yyj8
Copy link
Contributor

yyj8 commented Dec 27, 2024

@yyj8

The default configuration for the client is replicateSubscriptionState=false.

The latest client defaults to null, please see #23757.

In the scenario of cluster migration, we hope that there is no need for business code modification, and the server will control the replication of the state of all subscriptions in the cluster uniformly. If the configuration priority of the client is the highest, and the client does not display the configuration replicateSubscriptionState=true, this goes against our original intention of not requiring business code modification.
My suggestion is to configure the cluster dimension. If repliceAllSubscriptState=true is enabled, it will be a mandatory configuration to overwrite the client. If repliceAllSubscriptState=false, then use the client's configuration.

I understand your idea, this can make all subscriptions replicated when repliceAllSubscriptState=true, when true, this equals the consumer with replicateSubscriptionState=true, and they have the highest.

However, this idea conflicts with PIP-398, which assumes that the consumer-level configuration for replicateSubscriptionState is not set, and instead, replication behavior is driven by namespace and topic policies. If the replicateSubscriptionState is set, it means that the specified subscription will be replicated, namespace and topic levels cannot change this behavior. If changed, it will result in a breaking change. This is worth considering.

In our case, I don't want to configure the replicateSubscriptionState on the consumer, but the user has been configured, I cannot change the user code. We have the same case, so I also want to ignore the replicateSubscriptionState.

My idea

To combine your case and my case, I suggest introducing a broker configuration for overwriting the consume configuration, and assuming the namespace and topic level are set, the final result will be so like this:

  • overwirteConsumerReplicateSubscriptionState=true: The subscription will be replicated.
  • overwirteConsumerReplicateSubscriptionState=false: The subscription will be replicated.
  • overwirteConsumerReplicateSubscriptionState=null: This means the consumer level is null, and then the broker depends on the namespace and topic level to check the subscription replication.

We can also introduce replicateSubscriptionsState=true/false/empty configuration on the broker level, when the namespace/topic/consumer levels are not set, we use the broker level configuration to check if replicates the subscription.

@nodece I agree with your suggestion.
Before using cluster level configuration, we first check if there are topic and namespace dimensions. If there are none, we apply cluster level configuration; The priority of the three dimensions from high to low is: topic -> namespace -> cluster.

Then we can make unified code adjustments on your pip, or you can directly merge my code into your code, or after you submit the code, I can synchronize your code and modify the cluster level configuration before submitting.

@nodece
Copy link
Member Author

nodece commented Dec 30, 2024

Before using cluster level configuration, we first check if there are topic and namespace dimensions. If there are none, we apply cluster level configuration; The priority of the three dimensions from high to low is: topic -> namespace -> cluster.

@yyj8 Correct. The replicateSubscriptionState is used to auto-create a subscription. When set to false, the broker doesn't persist that to the cursor property in the old version. Therefore, we only need to add replicateSubscriptionState=true/false/null at the cluster level.

Breaking Change: For an existing subscription where replicateSubscriptionState=false is set at the consumer level, the new broker will use the subscription replication policy from the topic/namespace/cluster level.

The new subscription will follow the above priority.

/cc @lhotari

@nodece nodece force-pushed the subscription-replication-on-namespace-topic branch from cd4d790 to c421642 Compare January 15, 2025 04:27
@nodece nodece changed the title [improve][pip] PIP-398: Subscription replication on the namespace and topic levels [improve][pip] PIP-398: Subscription replication on the broker, namespace and topic levels Jan 15, 2025
@nodece
Copy link
Member Author

nodece commented Jan 15, 2025

@yyj8 This PIP has been updated, could you have a chance to review this PIP?

@yyj8
Copy link
Contributor

yyj8 commented Jan 16, 2025

@yyj8 This PIP has been updated, could you have a chance to review this PIP?

Are you referring to updating the code and modifying the functional design specifications in this pip pip/pip-398.md design documentation?

@nodece
Copy link
Member Author

nodece commented Jan 16, 2025

@yyj8 This PIP has been updated, could you have a chance to review this PIP?

Are you referring to updating the code and modifying the functional design specifications in this pip pip/pip-398.md design documentation?

Just for this PIP.

… topic levels

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the subscription-replication-on-namespace-topic branch from c421642 to 9a493a9 Compare February 18, 2025 10:45
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece nodece force-pushed the subscription-replication-on-namespace-topic branch from 9a493a9 to b335e5b Compare February 18, 2025 10:46
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some thoughts.

@lhotari
Copy link
Member

lhotari commented Feb 18, 2025

The term replicateSubscriptionState is ambiguous (it's not introduced in this PIP). It's unclear whether it refers to "replicating the subscription state" or represents "the state of replicating subscriptions". This demonstrates why putting more thought into naming concepts could be helpful.

Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@nodece
Copy link
Member Author

nodece commented Feb 18, 2025

The term replicateSubscriptionState is ambiguous (it's not introduced in this PIP). It's unclear whether it refers to "replicating the subscription state" or represents "the state of replicating subscriptions". This demonstrates why putting more thought into naming concepts could be helpful.

"replicating the subscription state" is correct. This term is from the design of existing consumer.

@nodece nodece requested a review from lhotari February 20, 2025 01:46
@nodece
Copy link
Member Author

nodece commented Feb 21, 2025

@lhotari Do you have more suggestions? If not, I will send a vote to the mailing list.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

@lhotari Do you have more suggestions? If not, I will send a vote to the mailing list.

@nodece I think that at the topic and namespace level, it could be better to combine overwriteConsumerReplicateSubscriptionState (you mentioned it in #23770 (comment)) and replicateSubscriptionState into a single enum instead of 2 separate boolean values to be used in policies. Let's say ReplicateSubscriptionStatePolicy enum with values:

  • OVERRIDE_ENABLE - overrides any value set by the consumer (or lower level policy) and enables subscription replication
  • OVERRIDE_DISABLE - overrides any value set by the consumer (or lower level policy) and disables subscription replication
  • ENABLE - enables subscription replication if not explicitly disabled by the consumer (or lower level policy)
  • DISABLE - disables subscription replication if not explicitly enabled by the consumer (or lower level policy)

I don't know if this is accurate, but putting more thought in how to address the topic and namespace level would be useful.

I haven't seen the APIs being updated to have this information in the policies. I don't think that introducing a completely separate concept for topic and namespace level makes sense.

@nodece
Copy link
Member Author

nodece commented Feb 21, 2025

@lhotari

This is an important section: https://github.com/apache/pulsar/pull/23770/files#diff-67fc7a48cc071911c2239d1c628335d4147f54345051f36efd2f18dbcc8339c6R110

Since the consumer enables the subscription replication when the replicateSubscriptionState is true in the consumer level: https://github.com/apache/pulsar/blob/v4.0.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1115-L1118

Prioritizing true over false ensures consistency, this approach should be applied to topic, namespace, and broker as well. By doing so, we can avoid unnecessary back-and-forth state changes and maintain smoother transitions.

Another thing is that we did not persist the case wherereplicateSubscriptionState is false in the cursor: https://github.com/apache/pulsar/blob/v4.0.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java#L218-L221

  1. If the consumer level is present and its value is true, enable subscription replication.
  2. If the consumer level is false, and the topic level is present with its value set to true, enable subscription replication.
  3. If the consumer level is false, the topic level is not set, but the namespace level is present and its value is true, enable subscription replication.
  4. If the consumer level is false, and both the topic and namespace levels are not set, but the broker level is present with its value set to true, enable subscription replication.

I haven't seen the APIs being updated to have this information in the policies. I don't think that introducing a completely separate concept for topic and namespace level makes sense.

The replicateSubscriptionState follows hierarchy topic policies, with the difference being that we only apply topic/namespace/broker policy to subscription when the consumer level is false.

@lhotari
Copy link
Member

lhotari commented Feb 21, 2025

The replicateSubscriptionState follows hierarchy topic policies, with the difference being that we only apply topic/namespace/broker policy to subscription when the consumer level is false.

@nodece Yes, I understand that. Isn't there a need to override any client application consumer level setting with topic or namespace level policies? What if Pulsar administrator would explicitly want to disallow or force using replicated subscriptions for a particular topic or namespace while geo-replication is enabled?

@nodece
Copy link
Member Author

nodece commented Feb 21, 2025

Isn't there a need to override any client application consumer level setting with topic or namespace level policies?

When the topic/namespace level is true, the consumer level will be ignored.

Usually, we want to enable the subscription replication, not disable.

What if Pulsar administrator would explicitly want to disallow or force using replicated subscriptions for a particular topic or namespace while geo-replication is enabled?

The administrator is permitted to enable the subscription replication and subsequently disable it.

@nodece nodece requested a review from lhotari February 21, 2025 10:11
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
@lhotari
Copy link
Member

lhotari commented Aug 25, 2025

@nodece Do you have plans to take this PIP forward?

@nodece
Copy link
Member Author

nodece commented Aug 25, 2025

@lhotari Vote has been started.

Comment on lines +66 to +101
1. Add the field `Boolean replicate_subscriptions_state` to the `org.apache.pulsar.common.policies.data.Policies` class
to control subscription replication at the namespace level:
```java
public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public Boolean replicate_subscription_state;
}
```
2. Add the management methods to the `org.apache.pulsar.client.admin.Namespaces` interface:
```java
public interface Namespaces {
void setReplicateSubscriptionState(String namespace, Boolean enabled) throws PulsarAdminException;
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String namespace, Boolean enabled);
Boolean getReplicateSubscriptionState(String namespace) throws PulsarAdminException;
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String namespace);
}
```
3. Implement the management methods in the `org.apache.pulsar.client.admin.internal.NamespacesImpl` class.

### Topic level

1. Add the field `Boolean replicateSubscriptionState` to the `org.apache.pulsar.common.policies.data.TopicPolicies`
class to enable subscription replication at the topic level:
```java
public class TopicPolicies {
public Boolean replicateSubscriptionState;
}
```
2. Add the management methods to the `org.apache.pulsar.client.admin.TopicPolicies` interface:
```java
public interface TopicPolicies {
void setReplicateSubscriptionState(String topic, Boolean enabled) throws PulsarAdminException;
CompletableFuture<Void> setReplicateSubscriptionStateAsync(String topic, Boolean enabled);
Boolean getReplicateSubscriptionState(String topic, boolean applied) throws PulsarAdminException;
CompletableFuture<Boolean> getReplicateSubscriptionStateAsync(String topic, boolean applied);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly suggest replacing the Boolean with an enum. Using a boolean value for replicateSubscriptionState is very confusing.

Here's an example what ReplicateSubscriptionStatePolicy enum could be:

  • OVERRIDE_ENABLE - overrides any value set by the consumer (or lower level policy) and enables subscription replication
  • OVERRIDE_DISABLE - overrides any value set by the consumer (or lower level policy) and disables subscription replication
  • ENABLE - enables subscription replication if not explicitly disabled by the consumer (or lower level policy)
  • DISABLE - disables subscription replication if not explicitly enabled by the consumer (or lower level policy)

Copy link
Member Author

@nodece nodece Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a pulsar-admin topics set-replicated-subscription-status command, the current implementation uses Boolean for similar that, so I followed the same design for consistency and to avoid introducing a new pattern in the CLI/API.

The meaning is:

  • null → remove configuration(I can add a remove api instead of this, and then use boolean instead of Boolean in set API)
  • true → enable
  • false → disable (but true has priority if both are set in the different levels)

Using an enum would make the meaning explicit, but it introduces additional complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari Any updates?

Copy link
Member

@lhotari lhotari Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a pulsar-admin topics set-replicated-subscription-status command, the current implementation uses Boolean for similar that, so I followed the same design for consistency and to avoid introducing a new pattern in the CLI/API.

This is a valid argument if we are able to achieve all required use cases with a Boolean.

After thinking about it again, it seems that these choices need to be covered:

  1. override any setting by the consumer (or lower level policy) and enable subscription replication for all subscriptions
  2. override any setting by the consumer (or lower level policy) and disable subscription replication for all subscriptions
  3. enable subscription replication if not explicitly disabled by the consumer (or lower level policy) for all subscriptions
  4. no setting at all (higher level setting would take priority). If all policies (broker, ns, topic) are "empty", the consumer setting would get used directly.

@nodece With a single Boolean value, how would you distinguish between the 2 different cases (1. and 3. above) where subscription replication is enabled?

(with "lower level policy", it means the hierarchy of broker -> namespace -> topic)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodece With a single Boolean value, how would you distinguish between the 2 different cases (1. and 3. above) where subscription replication is enabled?

override any setting by the consumer (or lower level policy) and enable subscription replication for all subscriptions

Subscription replication is enabled if:

  • The consumer sets true
  • The first non-null policy found in the following order is true: topic → namespace → broker. Once a policy is found (even if false), lower levels are ignored.

no setting at all (higher level setting would take priority). If all policies (broker, ns, topic) are "empty", the consumer setting would get used directly.

Correct.

I understand the concern about using null for removing the setting. To make the semantics clearer, I'll add an explicit removeReplicateSubscriptionState() API instead of relying on null.
So no need to worry about Boolean vs boolean, the new API will handle the unset case explicitly.

@lhotari
Copy link
Member

lhotari commented Aug 25, 2025

@lhotari Vote has been started.

@nodece I added a request for changes for the replicateSubscriptionState which is very confusing when a Boolean is used. It's very hard to understand what meaning null, TRUE and FALSE would have. Using an enum would resolve this confusion since it's possible to document the enum values and make the meaning explicit.

@nodece nodece requested a review from lhotari August 28, 2025 15:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants