Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ static class Arguments extends PerformanceBaseArguments {
"--batch-time-window" }, description = "Batch messages in 'x' ms window (Default: 1ms)")
public double batchTimeMillis = 1.0;

@Parameter(names = { "-db",
"--disable-batching" }, description = "Disable batching if true")
public boolean disableBatching;

@Parameter(names = {
"-bm", "--batch-max-messages"
}, description = "Maximum number of messages per batch")
Expand Down Expand Up @@ -478,6 +482,47 @@ static IMessageFormatter getMessageFormatter(String formatterClass) {
}
}

static ProducerBuilder<byte[]> createProducerBuilder(PulsarClient client, Arguments arguments, int producerId) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
.sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
.accessMode(arguments.producerAccessMode)
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
if (arguments.maxPendingMessagesAcrossPartitions > 0) {
producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
}

if (arguments.producerName != null) {
String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId);
producerBuilder.producerName(producerName);
}

if (arguments.disableBatching || (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0)) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true);
}
if (arguments.batchMaxMessages > 0) {
producerBuilder.batchingMaxMessages(arguments.batchMaxMessages);
}
if (arguments.batchMaxBytes > 0) {
producerBuilder.batchingMaxBytes(arguments.batchMaxBytes);
}

// Block if queue is full else we will start seeing errors in sendAsync
producerBuilder.blockIfQueueFull(true);

if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) {
producerBuilder.addEncryptionKey(arguments.encKeyName);
producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile);
}

return producerBuilder;
}

private static void runProducer(int producerId,
Arguments arguments,
long numMessages,
Expand All @@ -496,16 +541,8 @@ private static void runProducer(int producerId,
.enableTransaction(arguments.isEnableTransaction);

client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
.sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
.accessMode(arguments.producerAccessMode)
// enable round robin message routing if it is a partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
if (arguments.maxPendingMessagesAcrossPartitions > 0) {
producerBuilder.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions);
}

ProducerBuilder<byte[]> producerBuilder = createProducerBuilder(client, arguments, producerId);

AtomicReference<Transaction> transactionAtomicReference;
if (arguments.isEnableTransaction) {
Expand All @@ -517,31 +554,6 @@ private static void runProducer(int producerId,
} else {
transactionAtomicReference = new AtomicReference<>(null);
}
if (arguments.producerName != null) {
String producerName = String.format("%s%s%d", arguments.producerName, arguments.separator, producerId);
producerBuilder.producerName(producerName);
}

if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
producerBuilder.batchingMaxPublishDelay(batchTimeUsec, TimeUnit.MICROSECONDS).enableBatching(true);
}
if (arguments.batchMaxMessages > 0) {
producerBuilder.batchingMaxMessages(arguments.batchMaxMessages);
}
if (arguments.batchMaxBytes > 0) {
producerBuilder.batchingMaxBytes(arguments.batchMaxBytes);
}

// Block if queue is full else we will start seeing errors in sendAsync
producerBuilder.blockIfQueueFull(true);

if (isNotBlank(arguments.encKeyName) && isNotBlank(arguments.encKeyFile)) {
producerBuilder.addEncryptionKey(arguments.encKeyName);
producerBuilder.defaultCryptoKeyReader(arguments.encKeyFile);
}

for (int i = 0; i < arguments.numTopics; i++) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -161,6 +166,27 @@ public void testMsgKey() throws Exception {
newConsumer2.close();
}

@Test(timeOut = 20000)
public void testBatchingDisabled() throws Exception {
PerformanceProducer.Arguments arguments = new PerformanceProducer.Arguments();

int producerId = 0;

String topic = testTopic + UUID.randomUUID();
arguments.topics = List.of(topic);
arguments.msgRate = 10;
arguments.serviceURL = pulsar.getBrokerServiceUrl();
arguments.numMessages = 500;
arguments.disableBatching = true;

ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
.enableTransaction(arguments.isEnableTransaction);
PulsarClient client = clientBuilder.build();

ProducerBuilderImpl<byte[]> builder = (ProducerBuilderImpl<byte[]>) PerformanceProducer.createProducerBuilder(client, arguments, producerId);
Assert.assertFalse(builder.getConf().isBatchingEnabled());
}

@Test(timeOut = 20000)
public void testCreatePartitions() throws Exception {
String argString = "%s -r 10 -u %s -au %s -m 5 -np 10";
Expand Down