diff --git a/conf/broker.conf b/conf/broker.conf index af335c141534f..be2df60b52063 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -660,6 +660,11 @@ acknowledgmentAtBatchIndexLevelEnabled=false # Enable tracking of replicated subscriptions state across clusters. enableReplicatedSubscriptions=true +# Whether to replicate all subscription states. If true, it will overwrite the value of +# the replicateSubscriptionState configuration item configured by the client consumer; +# If false, use the value of the client-side replicateSubscriptionState. +replicateAllSubscriptionState=false + # Frequency of snapshots for replicated subscriptions tracking. replicatedSubscriptionsSnapshotFrequencyMillis=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8b5a4ef270b0e..51833e04a9728 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1431,6 +1431,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Enable tracking of replicated subscriptions state across clusters.") private boolean enableReplicatedSubscriptions = true; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Whether to replicate all subscription states. If true, it will overwrite the value of" + + " the replicateSubscriptionState configuration item configured by the client consumer;" + + " If false, use the value of the client-side replicateSubscriptionState.") + private boolean replicateAllSubscriptionState = false; + @FieldContext( category = CATEGORY_SERVER, doc = "Frequency of snapshots for replicated subscriptions tracking.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 651d12373628b..fcdbe50c9f6d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -918,6 +918,13 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St replicatedSubscriptionState = false; } + boolean replicateAllSubscriptionState = brokerService.pulsar().getConfiguration() + .isReplicateAllSubscriptionState(); + if (replicateAllSubscriptionState + && brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { + replicatedSubscriptionState = true; + } + if (subType == SubType.Key_Shared && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { return FutureUtil.failedFuture(