From fcc91e586922e4e1d9e5f23492838096cc3205b1 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 21 Dec 2024 18:59:47 +0800 Subject: [PATCH 1/2] [improvement][broker]Add replicate all subscription state switch on the broker side. --- conf/broker.conf | 5 +++++ .../apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++ .../broker/service/persistent/PersistentTopic.java | 11 +++++++++-- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index af335c141534f..be2df60b52063 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -660,6 +660,11 @@ acknowledgmentAtBatchIndexLevelEnabled=false # Enable tracking of replicated subscriptions state across clusters. enableReplicatedSubscriptions=true +# Whether to replicate all subscription states. If true, it will overwrite the value of +# the replicateSubscriptionState configuration item configured by the client consumer; +# If false, use the value of the client-side replicateSubscriptionState. +replicateAllSubscriptionState=false + # Frequency of snapshots for replicated subscriptions tracking. replicatedSubscriptionsSnapshotFrequencyMillis=1000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8b5a4ef270b0e..51833e04a9728 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1431,6 +1431,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "Enable tracking of replicated subscriptions state across clusters.") private boolean enableReplicatedSubscriptions = true; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Whether to replicate all subscription states. If true, it will overwrite the value of" + + " the replicateSubscriptionState configuration item configured by the client consumer;" + + " If false, use the value of the client-side replicateSubscriptionState.") + private boolean replicateAllSubscriptionState = false; + @FieldContext( category = CATEGORY_SERVER, doc = "Frequency of snapshots for replicated subscriptions tracking.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 651d12373628b..0c4484f86311a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -911,13 +911,20 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; + boolean enableReplicatedSubscriptions = brokerService.pulsar().getConfiguration() + .isEnableReplicatedSubscriptions(); - if (replicatedSubscriptionState - && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { + if (replicatedSubscriptionState && !enableReplicatedSubscriptions) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); replicatedSubscriptionState = false; } + boolean replicateAllSubscriptionState = brokerService.pulsar().getConfiguration() + .isReplicateAllSubscriptionState(); + if (replicateAllSubscriptionState && enableReplicatedSubscriptions) { + replicatedSubscriptionState = true; + } + if (subType == SubType.Key_Shared && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { return FutureUtil.failedFuture( From 69947b5ec7691726dfb1c390dea4c46d800bcf68 Mon Sep 17 00:00:00 2001 From: yyj8 <1012293987@qq.com> Date: Sat, 21 Dec 2024 19:55:08 +0800 Subject: [PATCH 2/2] [improvement][broker]Add replicate all subscription state switch on the broker side, Resolve code conflicts. --- .../pulsar/broker/service/persistent/PersistentTopic.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0c4484f86311a..fcdbe50c9f6d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -911,17 +911,17 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - boolean enableReplicatedSubscriptions = brokerService.pulsar().getConfiguration() - .isEnableReplicatedSubscriptions(); - if (replicatedSubscriptionState && !enableReplicatedSubscriptions) { + if (replicatedSubscriptionState + && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); replicatedSubscriptionState = false; } boolean replicateAllSubscriptionState = brokerService.pulsar().getConfiguration() .isReplicateAllSubscriptionState(); - if (replicateAllSubscriptionState && enableReplicatedSubscriptions) { + if (replicateAllSubscriptionState + && brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { replicatedSubscriptionState = true; }