diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index cab0279ecb41d..464a521f6445b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -178,6 +178,10 @@ static class Arguments extends PerformanceBaseArguments { "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)") public double batchTimeMillis = 1.0; + @Parameter(names = { "-db", + "--disable-batching" }, description = "Disable batching if true") + public boolean disableBatching; + @Parameter(names = { "-bm", "--batch-max-messages" }, description = "Maximum number of messages per batch") @@ -478,6 +482,47 @@ static IMessageFormatter getMessageFormatter(String formatterClass) { } } + static ProducerBuilder createProducerBuilder(PulsarClient client, Arguments arguments, int producerId) { + ProducerBuilder producerBuilder = client.newProducer() // + .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // + .compressionType(arguments.compression) // + .maxPendingMessages(arguments.maxOutstanding) // + .accessMode(arguments.producerAccessMode) + // enable round robin message routing if it is a partitioned topic + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.maxPendingMessagesAcrossPartitions > 0) { + producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); + } + + if (arguments.producerName != null) { + String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); + producerBuilder.producerName(producerName); + } + + if (arguments.disableBatching || (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0)) { + producerBuilder.enableBatching(false); + } else { + long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); + producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true); + } + if (arguments.batchMaxMessages > 0) { + producerBuilder.batchingMaxMessages(arguments.batchMaxMessages); + } + if (arguments.batchMaxBytes > 0) { + producerBuilder.batchingMaxBytes(arguments.batchMaxBytes); + } + + // Block if queue is full else we will start seeing errors in sendAsync + producerBuilder.blockIfQueueFull(true); + + if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) { + producerBuilder.addEncryptionKey(arguments.encKeyName); + producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); + } + + return producerBuilder; + } + private static void runProducer(int producerId, Arguments arguments, long numMessages, @@ -496,16 +541,8 @@ private static void runProducer(int producerId, .enableTransaction(arguments.isEnableTransaction); client = clientBuilder.build(); - ProducerBuilder producerBuilder = client.newProducer() // - .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // - .compressionType(arguments.compression) // - .maxPendingMessages(arguments.maxOutstanding) // - .accessMode(arguments.producerAccessMode) - // enable round robin message routing if it is a partitioned topic - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); - if (arguments.maxPendingMessagesAcrossPartitions > 0) { - producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); - } + + ProducerBuilder producerBuilder = createProducerBuilder(client, arguments, producerId); AtomicReference transactionAtomicReference; if (arguments.isEnableTransaction) { @@ -517,31 +554,6 @@ private static void runProducer(int producerId, } else { transactionAtomicReference = new AtomicReference<>(null); } - if (arguments.producerName != null) { - String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); - producerBuilder.producerName(producerName); - } - - if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) { - producerBuilder.enableBatching(false); - } else { - long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); - producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true); - } - if (arguments.batchMaxMessages > 0) { - producerBuilder.batchingMaxMessages(arguments.batchMaxMessages); - } - if (arguments.batchMaxBytes > 0) { - producerBuilder.batchingMaxBytes(arguments.batchMaxBytes); - } - - // Block if queue is full else we will start seeing errors in sendAsync - producerBuilder.blockIfQueueFull(true); - - if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) { - producerBuilder.addEncryptionKey(arguments.encKeyName); - producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); - } for (int i = 0; i < arguments.numTopics; i++) { diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 3adedc3ac605e..7bd6cbb25796c 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -21,9 +21,12 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; @@ -31,6 +34,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -161,6 +166,27 @@ public void testMsgKey() throws Exception { newConsumer2.close(); } + @Test(timeOut = 20000) + public void testBatchingDisabled() throws Exception { + PerformanceProducer.Arguments arguments = new PerformanceProducer.Arguments(); + + int producerId = 0; + + String topic = testTopic + UUID.randomUUID(); + arguments.topics = List.of(topic); + arguments.msgRate = 10; + arguments.serviceURL = pulsar.getBrokerServiceUrl(); + arguments.numMessages = 500; + arguments.disableBatching = true; + + ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .enableTransaction(arguments.isEnableTransaction); + PulsarClient client = clientBuilder.build(); + + ProducerBuilderImpl builder = (ProducerBuilderImpl) PerformanceProducer.createProducerBuilder(client, arguments, producerId); + Assert.assertFalse(builder.getConf().isBatchingEnabled()); + } + @Test(timeOut = 20000) public void testCreatePartitions() throws Exception { String argString = "%s -r 10 -u %s -au %s -m 5 -np 10";