Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1755,9 +1755,6 @@ managedLedgerOffloadDriver=
# Maximum number of thread pool threads for ledger offloading
managedLedgerOffloadMaxThreads=2

# Maximum number of read thread pool threads for ledger offloading
managedLedgerOffloadReadThreads=2

# The extraction directory of the nar package.
# Available for Protocol Handler, Additional Servlets, Entry Filter, Offloaders, Broker Interceptor.
# Default is System.getProperty("java.io.tmpdir").
Expand All @@ -1769,6 +1766,12 @@ managedLedgerOffloadPrefetchRounds=1
# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

# Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate
# When it's false, the behavior will be the same with 3.x or earlier
# Modifying this config could lose existing individual acknowledgments, so you should configure it with false when
# upgrading from version <=4.0 to 4.1 or later if you don't want to lose these acknowledgments.
managedLedgerPersistIndividualAckAsLongArray=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a conclusion that we will never set the default value of managedLedgerPersistIndividualAckAsLongArray to true for branch-4.0.x

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated


# For Amazon S3 ledger offload, AWS region
s3ManagedLedgerOffloadRegion=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2125,15 +2125,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " Consumer Netty channel. Use O to disable")
private long managedLedgerMaxReadsInFlightSizeInMB = 0;

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum time to wait for acquiring permits for max reads in "
+ "flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit is reached.")
private long managedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis = 60000;

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum number of reads that can be queued for acquiring "
+ "permits for max reads in flight when managedLedgerMaxReadsInFlightSizeInMB is set (>0) and the limit "
+ "is reached.")
private int managedLedgerMaxReadsInFlightPermitsAcquireQueueSize = 50000;

@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
Expand Down Expand Up @@ -2259,23 +2250,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int managedLedgerCursorRolloverTimeInSeconds = 14400;

@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "When resetting a subscription by timestamp, the broker will use the"
+ " ledger closing timestamp metadata to determine the range of ledgers"
+ " to search for the message where the subscription position is reset to. "
+ " Since by default, the search condition is based on the message publish time provided by the "
+ " client at the publish time, there will be some clock skew between the ledger closing timestamp "
+ " metadata and the publish time."
+ " This configuration is used to set the max clock skew between the ledger closing"
+ " timestamp and the message publish time for finding the range of ledgers to open for searching."
+ " The default value is 60000 milliseconds (60 seconds). When set to -1, the broker will not"
+ " use the ledger closing timestamp metadata to determine the range of ledgers to search for the"
+ " message."
)
private int managedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis = 60000;

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of `acknowledgment holes` that are going to be persistently stored.\n\n"
Expand All @@ -2288,7 +2262,11 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
private int managedLedgerMaxUnackedRangesToPersist = 10000;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate")
doc = "Whether persist cursor ack stats as long arrays, which will compress the data and reduce GC rate\n"
+ "When it's false, the behavior will be the same with 4.0 or earlier\n"
+ "Modifying this config could lose existing individual acknowledgments, so you should configure it"
+ "with false when upgrading from version <=4.0 to 4.1 or later if you don't want to lose these "
+ "acknowledgments.")
private boolean managedLedgerPersistIndividualAckAsLongArray = true;
@FieldContext(
category = CATEGORY_STORAGE_ML,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
(conf.getDispatcherMaxReadSizeBytes() / (1024L * 1024L)) + 1);
}
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeBytes);
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis());
managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(
conf.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize());
managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(
conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImp
protected synchronized OrderedScheduler getOffloaderReadScheduler(OffloadPoliciesImpl offloadPolicies) {
if (this.offloaderReadExecutor == null) {
this.offloaderReadExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(offloadPolicies.getManagedLedgerOffloadReadThreads())
.numThreads(OffloadPoliciesImpl.DEFAULT_OFFLOAD_READ_THREADS)
.name("offloader-read").build();
}
return this.offloaderReadExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.slf4j.LoggerFactory;

public class PersistentSubscription extends AbstractSubscription {
private static final int CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS = 60000;
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Dispatcher dispatcher;
Expand Down Expand Up @@ -792,7 +793,7 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor,
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
CURSOR_RESET_LEDGER_CLOSE_TIMESTAMP_MAX_CLOCK_SKEW_MILLIS);

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Resetting subscription to timestamp {}", topicName, subName, timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ interface Builder {

Builder managedLedgerOffloadMaxThreads(Integer managedLedgerOffloadMaxThreads);

Builder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads);

Builder managedLedgerOffloadPrefetchRounds(Integer managedLedgerOffloadPrefetchRounds);

Builder managedLedgerOffloadThresholdInBytes(Long managedLedgerOffloadThresholdInBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadReadThreads = DEFAULT_OFFLOAD_READ_THREADS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS;
@Configuration
@JsonProperty(access = JsonProperty.Access.READ_WRITE)
Expand Down Expand Up @@ -504,11 +501,6 @@ public OffloadPoliciesImplBuilder managedLedgerOffloadMaxThreads(Integer managed
return this;
}

public OffloadPoliciesImplBuilder managedLedgerOffloadReadThreads(Integer managedLedgerOffloadReadThreads) {
impl.managedLedgerOffloadReadThreads = managedLedgerOffloadReadThreads;
return this;
}

public OffloadPoliciesImplBuilder managedLedgerOffloadPrefetchRounds(
Integer managedLedgerOffloadPrefetchRounds) {
impl.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
Expand Down
Loading