Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/function-localrunner
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ if [[ -z "$PULSAR_GC_LOG" ]]; then
fi

# Extra options to be passed to the jvm
PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096"}
PULSAR_EXTRA_OPTS=${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096 -Dio.netty.allocator.maxOrder=10"}

if [ -z "$PULSAR_LOG_CONF" ]; then
PULSAR_LOG_CONF=$DEFAULT_LOG_CONF
Expand Down
2 changes: 1 addition & 1 deletion conf/pulsar_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ if [[ -z "$PULSAR_GC_LOG" ]]; then
fi

# Extra options to be passed to the jvm
PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096"}"
PULSAR_EXTRA_OPTS="${PULSAR_EXTRA_OPTS:-" -Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096 -Dio.netty.allocator.maxOrder=10"}"

# Add extra paths to the bookkeeper classpath
# PULSAR_EXTRA_CLASSPATH=
Expand Down
2 changes: 1 addition & 1 deletion deployment/terraform-ansible/templates/pulsar_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ PULSAR_MEM=" -Xms{{ max_heap_memory }} -Xmx{{ max_heap_memory }} -XX:MaxDirectMe
PULSAR_GC=" -XX:+UseZGC -XX:+PerfDisableSharedMem -XX:+AlwaysPreTouch"

# Extra options to be passed to the jvm
PULSAR_EXTRA_OPTS="-Dio.netty.leakDetection.level=disabled -Dio.netty.recycler.maxCapacityPerThread=4096 ${PULSAR_EXTRA_OPTS}"
PULSAR_EXTRA_OPTS="-Dio.netty.leakDetection.level=disabled -Dio.netty.recycler.maxCapacityPerThread=4096 -Dio.netty.allocator.maxOrder=10 ${PULSAR_EXTRA_OPTS}"

# Add extra paths to the bookkeeper classpath
# PULSAR_EXTRA_CLASSPATH=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
Expand Down Expand Up @@ -54,4 +56,38 @@ public void testDefaultConfig() {
}
}

/**
* Verify that a {@link PooledByteBufAllocator} created with {@code maxOrder=10} produces the expected chunk size,
* which is consistent with the {@code -Dio.netty.allocator.maxOrder=10} setting in {@code conf/pulsar_env.sh}.
*
* <p>Netty computes chunk size as: {@code pageSize << maxOrder = 8192 << 10 = 8,388,608 bytes (8 MiB)}.
* This test constructs the allocator directly with {@code maxOrder=10} so no JVM argument is required.
*/
@Test
public void testDefaultChunkSizeMatchesMaxOrder10() {
// Expected chunk size: pageSize (8192 bytes) << maxOrder (10) = 8 MiB
final int maxOrder = 10;
final int expectedChunkSize = 8192 << maxOrder;

// Create a PooledByteBufAllocator with maxOrder=10, same as -Dio.netty.allocator.maxOrder=10
PooledByteBufAllocator allocator = new PooledByteBufAllocator(
true, // preferDirect
0, // nHeapArena
1, // nDirectArena
8192, // pageSize (default)
maxOrder, // maxOrder=10
64, // smallPageSize (default)
256, // normalPageSize (default)
false,
0
);

PooledByteBufAllocatorMetric metric = allocator.metric();
// Verify that the chunk size derived from maxOrder=10 equals 8 MiB
assertEquals(metric.chunkSize(), expectedChunkSize,
"Chunk size should be 8 MiB (pageSize << maxOrder = 8192 << 10) "
+ "as configured by -Dio.netty.allocator.maxOrder=10 in pulsar_env.sh");

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ protected void passNettyLeakDetectionSystemProperties() {

protected void initializePulsarExtraOpts() {
appendToEnv("PULSAR_EXTRA_OPTS",
"-Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096");
"-Dpulsar.allocator.exit_on_oom=true -Dio.netty.recycler.maxCapacityPerThread=4096 -Dio.netty.allocator.maxOrder=10");
}

protected boolean isCodeCoverageEnabled() {
Expand Down