From 1a50689d10e34279031603dba3e7fe27b2f4b083 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 26 Feb 2025 14:33:42 +0800 Subject: [PATCH 1/2] [improve][broker] Clean up config changes without a PIP since the 4.0 LTS - Remove `managedLedgerOffloadReadThreads` from https://github.com/apache/pulsar/pull/24025 - Remove `managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis` and `managedLedgerMaxReadsInFlightPermitsAcquireQueueSize` from https://github.com/apache/pulsar/pull/23901 - Remove `managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis` from https://github.com/apache/pulsar/pull/22792 The configs above only increase the complexity and are hard to configure. Add more comments to `managedLedgerPersistIndividualAckAsLongArray` from https://github.com/apache/pulsar/pull/23759. This config was added to keep the compatibility from 3.x or earlier so it has value to retain. 3.x users must configure it with false when upgrading to 4.0. --- conf/broker.conf | 9 ++++-- .../pulsar/broker/ServiceConfiguration.java | 32 +++---------------- .../broker/ManagedLedgerClientFactory.java | 4 --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../persistent/PersistentSubscription.java | 3 +- .../common/policies/data/OffloadPolicies.java | 2 -- .../policies/data/OffloadPoliciesImpl.java | 8 ----- 7 files changed, 14 insertions(+), 46 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index a543fa2db71c6..41db815724fad 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1755,9 +1755,6 @@ managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading managedLedgerOffloadMaxThreads=2 -# Maximum number of read thread pool threads for ledger offloading -managedLedgerOffloadReadThreads=2 - # The extraction directory of the nar package. # Available for Protocol Handler, Additional Servlets, Entry Filter, Offloaders, Broker Interceptor. # Default is System.getProperty("java.io.tmpdir"). @@ -1769,6 +1766,12 @@ managedLedgerOffloadPrefetchRounds=1 # Use Open Range-Set to cache unacked messages managedLedgerUnackedRangesOpenCacheSetEnabled=true +# Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate +# When it's false, the behavior will be the same with 3.x or earlier +# Modifying this config could lose existing individual acknowledgments, so you should configure it with false when +# upgrading from 3.x to 4.0 or later if you don't want to lose these acknowledgments. +managedLedgerPersistIndividualAckAsLongArray=true + # For Amazon S3 ledger offload, AWS region s3ManagedLedgerOffloadRegion= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ca4408aa4e7c0..2752256f00186 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2125,15 +2125,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " Consumer Netty channel. Use O to disable") private long managedLedgerMaxReadsInFlightSizeInMB = 0; - @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait for acquiring permits for max reads in " - + "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.") - private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000; - - @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum number of reads that can be queued for acquiring " - + "permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit " - + "is reached.") - private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 50000; - @FieldContext( category = CATEGORY_STORAGE_ML, dynamic = true, @@ -2259,23 +2250,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int managedLedgerCursorRolloverTimeInSeconds = 14400; - @FieldContext( - category = CATEGORY_STORAGE_ML, - dynamic = true, - doc = "When resetting a subscription by timestamp, the broker will use the" - + " ledger closing timestamp metadata to determine the range of ledgers" - + " to search for the message where the subscription position is reset to. " - + " Since by default, the search condition is based on the message publish time provided by the " - + " client at the publish time, there will be some clock skew between the ledger closing timestamp " - + " metadata and the publish time." - + " This configuration is used to set the max clock skew between the ledger closing" - + " timestamp and the message publish time for finding the range of ledgers to open for searching." - + " The default value is 60000 milliseconds (60 seconds). When set to -1, the broker will not" - + " use the ledger closing timestamp metadata to determine the range of ledgers to search for the" - + " message." - ) - private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = 60000; - @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n" @@ -2288,7 +2262,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece private int managedLedgerMaxUnackedRangesToPersist = 10000; @FieldContext( category = CATEGORY_STORAGE_ML, - doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate") + doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate\n" + + "When it's false, the behavior will be the same with 3.x or earlier\n" + + "Modifying this config could lose existing individual acknowledgments, so you should configure it" + + "with false when upgrading from 3.x to 4.0 or later if you don't want to lose these " + + "acknowledgments.") private boolean managedLedgerPersistIndividualAckAsLongArray = true; @FieldContext( category = CATEGORY_STORAGE_ML, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b060475a43f31..d79f57433c8be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -83,10 +83,6 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata (conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) + 1); } managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes); - managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis( - conf.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis()); - managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize( - conf.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize()); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds( conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c63f17c031e49..c0a03eb6cfc75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1693,7 +1693,7 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp protected synchronized OrderedScheduler getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderReadExecutor == null) { this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder() - .numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads()) + .numThreads(OffloadPoliciesImpl.DEFAULT_OFFLOAD_READ_THREADS) .name("offloader-read").build(); } return this.offloaderReadExecutor; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 275d1ae5818b0..4b2b6b384f1a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -97,6 +97,7 @@ import org.slf4j.LoggerFactory; public class PersistentSubscription extends AbstractSubscription { + private static final int CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS = 60000; protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Dispatcher dispatcher; @@ -792,7 +793,7 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { public CompletableFuture resetCursor(long timestamp) { CompletableFuture future = new CompletableFuture<>(); PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor, - config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); + CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS); if (log.isDebugEnabled()) { log.debug("[{}][{}] Resetting subscription to timestamp {}", topicName, subName, timestamp); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 924153e71ffe2..9073de658d405 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -87,8 +87,6 @@ interface Builder { Builder managedLedgerOffloadMaxThreads(Integer managedLedgerOffloadMaxThreads); - Builder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads); - Builder managedLedgerOffloadPrefetchRounds(Integer managedLedgerOffloadPrefetchRounds); Builder managedLedgerOffloadThresholdInBytes(Long managedLedgerOffloadThresholdInBytes); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 910075e387059..885b3e176c1b3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -110,9 +110,6 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) - private Integer managedLedgerOffloadReadThreads = DEFAULT_OFFLOAD_READ_THREADS; - @Configuration - @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) @@ -504,11 +501,6 @@ public OffloadPoliciesImplBuilder managedLedgerOffloadMaxThreads(Integer managed return this; } - public OffloadPoliciesImplBuilder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) { - impl.managedLedgerOffloadReadThreads = managedLedgerOffloadReadThreads; - return this; - } - public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds( Integer managedLedgerOffloadPrefetchRounds) { impl.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds; From 046dbc37b8b8f19375eb44fb300c21b4fc6093d2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 26 Feb 2025 15:38:14 +0800 Subject: [PATCH 2/2] Fix docs --- conf/broker.conf | 2 +- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 41db815724fad..9eddcf22620ad 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1769,7 +1769,7 @@ managedLedgerUnackedRangesOpenCacheSetEnabled=true # Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate # When it's false, the behavior will be the same with 3.x or earlier # Modifying this config could lose existing individual acknowledgments, so you should configure it with false when -# upgrading from 3.x to 4.0 or later if you don't want to lose these acknowledgments. +# upgrading from version <=4.0 to 4.1 or later if you don't want to lose these acknowledgments. managedLedgerPersistIndividualAckAsLongArray=true # For Amazon S3 ledger offload, AWS region diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2752256f00186..6518a149fdbff 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2263,9 +2263,9 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate\n" - + "When it's false, the behavior will be the same with 3.x or earlier\n" + + "When it's false, the behavior will be the same with 4.0 or earlier\n" + "Modifying this config could lose existing individual acknowledgments, so you should configure it" - + "with false when upgrading from 3.x to 4.0 or later if you don't want to lose these " + + "with false when upgrading from version <=4.0 to 4.1 or later if you don't want to lose these " + "acknowledgments.") private boolean managedLedgerPersistIndividualAckAsLongArray = true; @FieldContext(