diff --git a/conf/broker.conf b/conf/broker.conf index a543fa2db71c6..9eddcf22620ad 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1755,9 +1755,6 @@ managedLedgerOffloadDriver= # Maximum number of thread pool threads for ledger offloading managedLedgerOffloadMaxThreads=2 -# Maximum number of read thread pool threads for ledger offloading -managedLedgerOffloadReadThreads=2 - # The extraction directory of the nar package. # Available for Protocol Handler, Additional Servlets, Entry Filter, Offloaders, Broker Interceptor. # Default is System.getProperty("java.io.tmpdir"). @@ -1769,6 +1766,12 @@ managedLedgerOffloadPrefetchRounds=1 # Use Open Range-Set to cache unacked messages managedLedgerUnackedRangesOpenCacheSetEnabled=true +# Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate +# When it's false, the behavior will be the same with 3.x or earlier +# Modifying this config could lose existing individual acknowledgments, so you should configure it with false when +# upgrading from version <=4.0 to 4.1 or later if you don't want to lose these acknowledgments. +managedLedgerPersistIndividualAckAsLongArray=true + # For Amazon S3 ledger offload, AWS region s3ManagedLedgerOffloadRegion= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ca4408aa4e7c0..6518a149fdbff 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2125,15 +2125,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " Consumer Netty channel. Use O to disable") private long managedLedgerMaxReadsInFlightSizeInMB = 0; - @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait for acquiring permits for max reads in " - + "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.") - private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000; - - @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum number of reads that can be queued for acquiring " - + "permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit " - + "is reached.") - private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 50000; - @FieldContext( category = CATEGORY_STORAGE_ML, dynamic = true, @@ -2259,23 +2250,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private int managedLedgerCursorRolloverTimeInSeconds = 14400; - @FieldContext( - category = CATEGORY_STORAGE_ML, - dynamic = true, - doc = "When resetting a subscription by timestamp, the broker will use the" - + " ledger closing timestamp metadata to determine the range of ledgers" - + " to search for the message where the subscription position is reset to. " - + " Since by default, the search condition is based on the message publish time provided by the " - + " client at the publish time, there will be some clock skew between the ledger closing timestamp " - + " metadata and the publish time." - + " This configuration is used to set the max clock skew between the ledger closing" - + " timestamp and the message publish time for finding the range of ledgers to open for searching." - + " The default value is 60000 milliseconds (60 seconds). When set to -1, the broker will not" - + " use the ledger closing timestamp metadata to determine the range of ledgers to search for the" - + " message." - ) - private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = 60000; - @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n" @@ -2288,7 +2262,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece private int managedLedgerMaxUnackedRangesToPersist = 10000; @FieldContext( category = CATEGORY_STORAGE_ML, - doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate") + doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate\n" + + "When it's false, the behavior will be the same with 4.0 or earlier\n" + + "Modifying this config could lose existing individual acknowledgments, so you should configure it" + + "with false when upgrading from version <=4.0 to 4.1 or later if you don't want to lose these " + + "acknowledgments.") private boolean managedLedgerPersistIndividualAckAsLongArray = true; @FieldContext( category = CATEGORY_STORAGE_ML, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b060475a43f31..d79f57433c8be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -83,10 +83,6 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata (conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) + 1); } managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes); - managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis( - conf.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis()); - managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize( - conf.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize()); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds( conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c63f17c031e49..c0a03eb6cfc75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1693,7 +1693,7 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp protected synchronized OrderedScheduler getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderReadExecutor == null) { this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder() - .numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads()) + .numThreads(OffloadPoliciesImpl.DEFAULT_OFFLOAD_READ_THREADS) .name("offloader-read").build(); } return this.offloaderReadExecutor; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 275d1ae5818b0..4b2b6b384f1a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -97,6 +97,7 @@ import org.slf4j.LoggerFactory; public class PersistentSubscription extends AbstractSubscription { + private static final int CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS = 60000; protected final PersistentTopic topic; protected final ManagedCursor cursor; protected volatile Dispatcher dispatcher; @@ -792,7 +793,7 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { public CompletableFuture resetCursor(long timestamp) { CompletableFuture future = new CompletableFuture<>(); PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor, - config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); + CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS); if (log.isDebugEnabled()) { log.debug("[{}][{}] Resetting subscription to timestamp {}", topicName, subName, timestamp); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 924153e71ffe2..9073de658d405 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -87,8 +87,6 @@ interface Builder { Builder managedLedgerOffloadMaxThreads(Integer managedLedgerOffloadMaxThreads); - Builder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads); - Builder managedLedgerOffloadPrefetchRounds(Integer managedLedgerOffloadPrefetchRounds); Builder managedLedgerOffloadThresholdInBytes(Long managedLedgerOffloadThresholdInBytes); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 910075e387059..885b3e176c1b3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -110,9 +110,6 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) - private Integer managedLedgerOffloadReadThreads = DEFAULT_OFFLOAD_READ_THREADS; - @Configuration - @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS; @Configuration @JsonProperty(access = JsonProperty.Access.READ_WRITE) @@ -504,11 +501,6 @@ public OffloadPoliciesImplBuilder managedLedgerOffloadMaxThreads(Integer managed return this; } - public OffloadPoliciesImplBuilder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) { - impl.managedLedgerOffloadReadThreads = managedLedgerOffloadReadThreads; - return this; - } - public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds( Integer managedLedgerOffloadPrefetchRounds) { impl.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;