diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 374f1e30c0a89..9b9f79a8ec5ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() { } } - ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, encryptedPayload); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 258c0183114fd..27f9d72413c52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1257,11 +1257,12 @@ public void testExamineMessageMetadata() throws Exception { admin.topics().createPartitionedTopic(topicName, 2); @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.STRING) .producerName("testExamineMessageMetadataProducer") .compressionType(CompressionType.LZ4) .topic(topicName + "-partition-0") .create(); + producer.getConfiguration().setCompressMinMsgBodySize(1); producer.newMessage() .keyBytes("partition123".getBytes()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index a06085d3d4626..4617f631207f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -18,23 +18,26 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.client.api.BatcherBuilder; +import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -230,4 +233,47 @@ public void testRetentionPolicyByProducingMessages() throws Exception { assertEquals(internalStats.ledgers.size(), 1); }); } + + + @Test + public void testProducerCompressionMinMsgBodySize() throws PulsarClientException { + byte[] msg1022 = new byte[1022]; + byte[] msg1025 = new byte[1025]; + final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic(topicName) + .producerName("producer") + .compressionType(CompressionType.LZ4) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub") + .subscribe(); + + producer.conf.setCompressMinMsgBodySize(1024); + producer.conf.setCompressionType(CompressionType.LZ4); + // disable batch + producer.conf.setBatchingEnabled(false); + producer.newMessage().value(msg1022).send(); + MessageImpl message = (MessageImpl) consumer.receive(); + CompressionType compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.NONE); + producer.newMessage().value(msg1025).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.LZ4); + + // enable batch + producer.conf.setBatchingEnabled(true); + producer.newMessage().value(msg1022).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.NONE); + producer.newMessage().value(msg1025).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.LZ4); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java index dcdfd136476c3..e366b232a639d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java @@ -124,6 +124,7 @@ public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType comp .compressionType(compressionType) .enableBatching(false) .create(); + producer.getConfiguration().setCompressMinMsgBodySize(1); producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); MsgPayloadTouchableMessageBuilder msgBuilder = newMessage(producer); /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 7262cfd11e069..ed4dc3875ad14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -142,6 +142,10 @@ public boolean add(MessageImpl msg, SendCallback callback) { } protected ByteBuf getCompressedBatchMetadataAndPayload() { + return getCompressedBatchMetadataAndPayload(true); + } + + protected ByteBuf getCompressedBatchMetadataAndPayload(boolean clientOperation) { int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); @@ -169,11 +173,23 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); - batchedMessageMetadataAndPayload.release(); - if (compressionType != CompressionType.NONE) { - messageMetadata.setCompression(compressionType); - messageMetadata.setUncompressedSize(uncompressedSize); + ByteBuf compressedPayload; + if (clientOperation && producer != null){ + if (compressionType != CompressionType.NONE + && uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { + compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); + messageMetadata.setCompression(compressionType); + messageMetadata.setUncompressedSize(uncompressedSize); + } else { + compressedPayload = batchedMessageMetadataAndPayload; + } + } else { + compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); + batchedMessageMetadataAndPayload.release(); + if (compressionType != CompressionType.NONE) { + messageMetadata.setCompression(compressionType); + messageMetadata.setUncompressedSize(uncompressedSize); + } } // Update the current max batch size using the uncompressed size, which is what we need in any case to @@ -252,7 +268,8 @@ public OpSendMsg createOpSendMsg() throws IOException { if (messages.size() == 1) { messageMetadata.clear(); messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, + getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), 1, null, messageMetadata, encryptedPayload); @@ -283,7 +300,8 @@ public OpSendMsg createOpSendMsg() throws IOException { lowestSequenceId = -1L; return op; } - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, + getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 72a5fd54e852b..aa98df6cda944 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Getter; +import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageIdAdv; @@ -780,6 +781,10 @@ int getUncompressedSize() { return uncompressedSize; } + CompressionType getCompressionType() { + return CompressionType.valueOf(msgMetadata.getCompression().name()); + } + SchemaState getSchemaState() { return schemaState; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index fb2246f3a66a1..935a6251ddaf8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -486,7 +486,8 @@ CompletableFuture internalSendWithTxnAsync(Message message, Transa * @param payload * @return a new payload */ - private ByteBuf applyCompression(ByteBuf payload) { + @VisibleForTesting + public ByteBuf applyCompression(ByteBuf payload) { ByteBuf compressedPayload = compressor.encode(payload); payload.release(); return compressedPayload; @@ -540,22 +541,29 @@ public void sendAsync(Message message, SendCallback callback) { boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually - if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) { - compressedPayload = applyCompression(payload); - compressed = true; + if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) { + if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) { - // validate msg-size (For batching this will be check at the batch completion size) - int compressedSize = compressedPayload.readableBytes(); - if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { - compressedPayload.release(); - String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; - PulsarClientException.InvalidMessageException invalidMessageException = - new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" - + " %d bytes", - producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); - completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); - return; + } else { + compressedPayload = applyCompression(payload); + compressed = true; + + // validate msg-size (For batching this will be check at the batch completion size) + int compressedSize = compressedPayload.readableBytes(); + if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { + compressedPayload.release(); + String compressedStr = conf.getCompressionType() != CompressionType.NONE + ? ("compressed (" + conf.getCompressionType() + ")") + : "uncompressed"; + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s sends a %s message with %d bytes that " + + "exceeds %d bytes", + producerName, topic, compressedStr, compressedSize, + getMaxMessageSize())); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); + return; + } } } @@ -577,7 +585,7 @@ public void sendAsync(Message message, SendCallback callback) { // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split // into chunks. - updateMessageMetadata(msgMetadata, uncompressedSize); + updateMessageMetadata(msgMetadata, uncompressedSize, compressed); // send in chunks int totalChunks; @@ -673,7 +681,9 @@ public void sendAsync(Message message, SendCallback callback) { * @param uncompressedSize * @return the sequence id */ - private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) { + @SuppressWarnings("checkstyle:Indentation") + private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, + boolean isCompressed) { if (!msgMetadata.hasPublishTime()) { msgMetadata.setPublishTime(client.getClientClock().millis()); @@ -683,9 +693,9 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int // The field "uncompressedSize" is zero means the compression info were not set yet. if (msgMetadata.getUncompressedSize() <= 0) { - if (conf.getCompressionType() != CompressionType.NONE) { - msgMetadata - .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); + if (conf.getCompressionType() != CompressionType.NONE && isCompressed) { + msgMetadata.setCompression( + CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); } msgMetadata.setUncompressedSize(uncompressedSize); } @@ -777,7 +787,7 @@ private void serializeAndSendMessage(MessageImpl msg, } else { // in this case compression has not been applied by the caller // but we have to compress the payload if compression is configured - if (!compressed) { + if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) { chunkPayload = applyCompression(chunkPayload); } ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 6ec738bbf4c8d..0c770c7c9bd05 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { ) private CompressionType compressionType = CompressionType.NONE; + private int compressMinMsgBodySize = 4 * 1024; // 4kb + // Cannot use Optional since it's not serializable private Long initialSequenceId = null;