From 9881080c216adcfef982f640622945c42abf0db3 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Tue, 30 Aug 2022 12:46:27 -0700 Subject: [PATCH 1/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../testclient/PerformanceProducer.java | 6 ++- .../testclient/PerformanceProducerTest.java | 43 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index cab0279ecb41d..c3b0ba447cc88 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -178,6 +178,10 @@ static class Arguments extends PerformanceBaseArguments { "--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)") public double batchTimeMillis = 1.0; + @Parameter(names = { "-db", + "--disable-batching" }, description = "Disable batching if true") + public boolean disableBatching = false; + @Parameter(names = { "-bm", "--batch-max-messages" }, description = "Maximum number of messages per batch") @@ -522,7 +526,7 @@ private static void runProducer(int producerId, producerBuilder.producerName(producerName); } - if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) { + if (arguments.disableBatching || (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0)) { producerBuilder.enableBatching(false); } else { long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 3adedc3ac605e..326e92f03486a 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -161,6 +161,49 @@ public void testMsgKey() throws Exception { newConsumer2.close(); } + @Test + public void testMsgKeyBatchingDisabled() throws Exception { + String argString = "%s -r 10 -u %s -m 500 -mk autoIncrement -db"; + String topic = testTopic + UUID.randomUUID(); + String args = String.format(argString, topic, pulsar.getBrokerServiceUrl()); + Thread thread = new Thread(() -> { + try { + PerformanceProducer.main(args.split(" ")); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + Consumer newConsumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-2") + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + Consumer newConsumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-2") + .subscriptionType(SubscriptionType.Key_Shared).subscribe(); + + thread.start(); + + Awaitility.await() + .untilAsserted(() -> { + Message message = newConsumer1.receive(1, TimeUnit.SECONDS); + if (message != null) { + newConsumer1.acknowledge(message); + } + assertNotNull(message); + }); + + Awaitility.await() + .untilAsserted(() -> { + Message message = newConsumer2.receive(1, TimeUnit.SECONDS); + if (message != null) { + newConsumer2.acknowledge(message); + } + assertNotNull(message); + }); + + thread.interrupt(); + newConsumer1.close(); + newConsumer2.close(); + } + @Test(timeOut = 20000) public void testCreatePartitions() throws Exception { String argString = "%s -r 10 -u %s -au %s -m 5 -np 10"; From 3f538b9bcf6cb882f0c1d54cd1da8eb7560d345f Mon Sep 17 00:00:00 2001 From: Vineeth Date: Tue, 30 Aug 2022 14:07:33 -0700 Subject: [PATCH 2/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../org/apache/pulsar/testclient/PerformanceProducerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 326e92f03486a..b9940b818de4a 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -161,7 +161,7 @@ public void testMsgKey() throws Exception { newConsumer2.close(); } - @Test + @Test(timeOut = 20000) public void testMsgKeyBatchingDisabled() throws Exception { String argString = "%s -r 10 -u %s -m 500 -mk autoIncrement -db"; String topic = testTopic + UUID.randomUUID(); From cdb844a1decbc3c8970ffac29c8c441ac52c4924 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 31 Aug 2022 13:12:52 -0700 Subject: [PATCH 3/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index c3b0ba447cc88..b751e58c15142 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -180,7 +180,7 @@ static class Arguments extends PerformanceBaseArguments { @Parameter(names = { "-db", "--disable-batching" }, description = "Disable batching if true") - public boolean disableBatching = false; + public boolean disableBatching; @Parameter(names = { "-bm", "--batch-max-messages" From 285c7ac9c52b141b9cdfd67451ae48db50736a3d Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 31 Aug 2022 15:48:12 -0700 Subject: [PATCH 4/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../testclient/PerformanceProducer.java | 78 ++++++++++--------- .../testclient/PerformanceProducerTest.java | 62 ++++++--------- 2 files changed, 66 insertions(+), 74 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index b751e58c15142..5c870184e66af 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -481,6 +481,47 @@ static IMessageFormatter getMessageFormatter(String formatterClass) { return null; } } + + static ProducerBuilder createProducerBuilder(PulsarClient client, Arguments arguments, int producerId) { + ProducerBuilder producerBuilder = client.newProducer() // + .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // + .compressionType(arguments.compression) // + .maxPendingMessages(arguments.maxOutstanding) // + .accessMode(arguments.producerAccessMode) + // enable round robin message routing if it is a partitioned topic + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.maxPendingMessagesAcrossPartitions > 0) { + producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); + } + + if (arguments.producerName != null) { + String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); + producerBuilder.producerName(producerName); + } + + if (arguments.disableBatching || (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0)) { + producerBuilder.enableBatching(false); + } else { + long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); + producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true); + } + if (arguments.batchMaxMessages > 0) { + producerBuilder.batchingMaxMessages(arguments.batchMaxMessages); + } + if (arguments.batchMaxBytes > 0) { + producerBuilder.batchingMaxBytes(arguments.batchMaxBytes); + } + + // Block if queue is full else we will start seeing errors in sendAsync + producerBuilder.blockIfQueueFull(true); + + if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) { + producerBuilder.addEncryptionKey(arguments.encKeyName); + producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); + } + + return producerBuilder; + } private static void runProducer(int producerId, Arguments arguments, @@ -500,16 +541,8 @@ private static void runProducer(int producerId, .enableTransaction(arguments.isEnableTransaction); client = clientBuilder.build(); - ProducerBuilder producerBuilder = client.newProducer() // - .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // - .compressionType(arguments.compression) // - .maxPendingMessages(arguments.maxOutstanding) // - .accessMode(arguments.producerAccessMode) - // enable round robin message routing if it is a partitioned topic - .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); - if (arguments.maxPendingMessagesAcrossPartitions > 0) { - producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); - } + + ProducerBuilder producerBuilder = createProducerBuilder(client, arguments, producerId); AtomicReference transactionAtomicReference; if (arguments.isEnableTransaction) { @@ -521,31 +554,6 @@ private static void runProducer(int producerId, } else { transactionAtomicReference = new AtomicReference<>(null); } - if (arguments.producerName != null) { - String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); - producerBuilder.producerName(producerName); - } - - if (arguments.disableBatching || (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0)) { - producerBuilder.enableBatching(false); - } else { - long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); - producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true); - } - if (arguments.batchMaxMessages > 0) { - producerBuilder.batchingMaxMessages(arguments.batchMaxMessages); - } - if (arguments.batchMaxBytes > 0) { - producerBuilder.batchingMaxBytes(arguments.batchMaxBytes); - } - - // Block if queue is full else we will start seeing errors in sendAsync - producerBuilder.blockIfQueueFull(true); - - if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) { - producerBuilder.addEncryptionKey(arguments.encKeyName); - producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); - } for (int i = 0; i < arguments.numTopics; i++) { diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index b9940b818de4a..4c921ebef28b6 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -21,9 +21,13 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; @@ -31,6 +35,8 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -162,46 +168,24 @@ public void testMsgKey() throws Exception { } @Test(timeOut = 20000) - public void testMsgKeyBatchingDisabled() throws Exception { - String argString = "%s -r 10 -u %s -m 500 -mk autoIncrement -db"; + public void testBatchingDisabled() throws Exception { + PerformanceProducer.Arguments arguments = new PerformanceProducer.Arguments(); + + int producerId = 0; + String topic = testTopic + UUID.randomUUID(); - String args = String.format(argString, topic, pulsar.getBrokerServiceUrl()); - Thread thread = new Thread(() -> { - try { - PerformanceProducer.main(args.split(" ")); - } catch (Exception e) { - e.printStackTrace(); - } - }); - - Consumer newConsumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-2") - .subscriptionType(SubscriptionType.Key_Shared).subscribe(); - Consumer newConsumer2 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub-2") - .subscriptionType(SubscriptionType.Key_Shared).subscribe(); - - thread.start(); - - Awaitility.await() - .untilAsserted(() -> { - Message message = newConsumer1.receive(1, TimeUnit.SECONDS); - if (message != null) { - newConsumer1.acknowledge(message); - } - assertNotNull(message); - }); - - Awaitility.await() - .untilAsserted(() -> { - Message message = newConsumer2.receive(1, TimeUnit.SECONDS); - if (message != null) { - newConsumer2.acknowledge(message); - } - assertNotNull(message); - }); - - thread.interrupt(); - newConsumer1.close(); - newConsumer2.close(); + arguments.topics = List.of(topic); + arguments.msgRate = 10; + arguments.serviceURL = pulsar.getBrokerServiceUrl(); + arguments.numMessages = 500; + arguments.disableBatching = true; + + ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments) + .enableTransaction(arguments.isEnableTransaction); + PulsarClient client = clientBuilder.build(); + + ProducerBuilderImpl builder = (ProducerBuilderImpl) PerformanceProducer.createProducerBuilder(client, arguments, producerId); + Assert.assertFalse(builder.getConf().isBatchingEnabled()); } @Test(timeOut = 20000) From a84c3fcf702efead5e219c180adfedc9bffd7ad1 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 31 Aug 2022 15:52:17 -0700 Subject: [PATCH 5/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../org/apache/pulsar/testclient/PerformanceProducerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index 4c921ebef28b6..f7f82a5afe805 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -184,7 +184,7 @@ public void testBatchingDisabled() throws Exception { .enableTransaction(arguments.isEnableTransaction); PulsarClient client = clientBuilder.build(); - ProducerBuilderImpl builder = (ProducerBuilderImpl) PerformanceProducer.createProducerBuilder(client, arguments, producerId); + ProducerBuilderImpl builder = (ProducerBuilderImpl) PerformanceProducer.createProducerBuilder(client, arguments, producerId); Assert.assertFalse(builder.getConf().isBatchingEnabled()); } From 961445edb0fa6c41d93dd4b3759a0f90612ea8a5 Mon Sep 17 00:00:00 2001 From: Vineeth Date: Wed, 31 Aug 2022 16:01:59 -0700 Subject: [PATCH 6/6] [improve][cli] Add option to disable batching in pulsar-testclient --- .../org/apache/pulsar/testclient/PerformanceProducer.java | 8 ++++---- .../apache/pulsar/testclient/PerformanceProducerTest.java | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 5c870184e66af..464a521f6445b 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -481,8 +481,8 @@ static IMessageFormatter getMessageFormatter(String formatterClass) { return null; } } - - static ProducerBuilder createProducerBuilder(PulsarClient client, Arguments arguments, int producerId) { + + static ProducerBuilder createProducerBuilder(PulsarClient client, Arguments arguments, int producerId) { ProducerBuilder producerBuilder = client.newProducer() // .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) // .compressionType(arguments.compression) // @@ -493,7 +493,7 @@ static ProducerBuilder createProducerBuilder(PulsarClient client, Argume if (arguments.maxPendingMessagesAcrossPartitions > 0) { producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions); } - + if (arguments.producerName != null) { String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId); producerBuilder.producerName(producerName); @@ -519,7 +519,7 @@ static ProducerBuilder createProducerBuilder(PulsarClient client, Argume producerBuilder.addEncryptionKey(arguments.encKeyName); producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile); } - + return producerBuilder; } diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java index f7f82a5afe805..7bd6cbb25796c 100644 --- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java +++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java @@ -24,7 +24,6 @@ import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ProducerBuilderImpl;