From a2014b6e7f55f6251653d269738b93f29092e4a5 Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 29 Oct 2024 16:33:31 +0800 Subject: [PATCH 1/7] [improve][client] Add a producer config to improve compaction performance --- .../impl/RawBatchMessageContainerImpl.java | 2 +- .../impl/BatchMessageContainerImpl.java | 25 +++++++--- .../pulsar/client/impl/MessageImpl.java | 10 ++-- .../pulsar/client/impl/ProducerImpl.java | 46 +++++++++++-------- .../impl/conf/ProducerConfigurationData.java | 2 + 5 files changed, 53 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 374f1e30c0a89..8cb8d74b3f93e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() { } } - ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(true)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, encryptedPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index a3c9d1bc9ab48..759bce1a6bf07 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -141,7 +141,7 @@ public boolean add(MessageImpl msg, SendCallback callback) { return isBatchFull(); } - protected ByteBuf getCompressedBatchMetadataAndPayload() { + protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) { int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { } int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); - ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); - batchedMessageMetadataAndPayload.release(); - if (compressionType != CompressionType.NONE) { + ByteBuf compressedPayload; + boolean isCompressed = false; + if (!isBrokerTwoPhaseCompactor && producer != null){ + if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { + compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); + isCompressed = true; + } else { + compressedPayload = batchedMessageMetadataAndPayload; + } + } else { + compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); + batchedMessageMetadataAndPayload.release(); + } + if (compressionType != CompressionType.NONE && isCompressed) { messageMetadata.setCompression(compressionType); messageMetadata.setUncompressedSize(uncompressedSize); } @@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException { if (messages.size() == 1) { messageMetadata.clear(); messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, + getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), 1, null, messageMetadata, encryptedPayload); @@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException { lowestSequenceId = -1L; return op; } - ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload()); + ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, + getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index d369d639a73a0..5599e8bc946ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -38,11 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Getter; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; @@ -773,6 +769,10 @@ int getUncompressedSize() { return uncompressedSize; } + CompressionType getCompressionType() { + return CompressionType.valueOf(msgMetadata.getCompression().name()); + } + SchemaState getSchemaState() { return schemaState; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6d5a81454631f..6dc50971770c5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -478,7 +478,8 @@ CompletableFuture internalSendWithTxnAsync(Message message, Transa * @param payload * @return a new payload */ - private ByteBuf applyCompression(ByteBuf payload) { + @VisibleForTesting + public ByteBuf applyCompression(ByteBuf payload) { ByteBuf compressedPayload = compressor.encode(payload); payload.release(); return compressedPayload; @@ -505,22 +506,27 @@ public void sendAsync(Message message, SendCallback callback) { boolean compressed = false; // Batch will be compressed when closed // If a message has a delayed delivery time, we'll always send it individually - if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) { - compressedPayload = applyCompression(payload); - compressed = true; + if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) { + if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) { - // validate msg-size (For batching this will be check at the batch completion size) - int compressedSize = compressedPayload.readableBytes(); - if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { - compressedPayload.release(); - String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; - PulsarClientException.InvalidMessageException invalidMessageException = - new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" - + " %d bytes", - producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); - completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); - return; + } else { + compressedPayload = applyCompression(payload); + compressed = true; + + // validate msg-size (For batching this will be check at the batch completion size) + int compressedSize = compressedPayload.readableBytes(); + if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { + compressedPayload.release(); + String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; + PulsarClientException.InvalidMessageException invalidMessageException = + new PulsarClientException.InvalidMessageException( + format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" + + " %d bytes", + producerName, topic, compressedStr, compressedSize, + getMaxMessageSize())); + completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); + return; + } } } @@ -542,7 +548,7 @@ public void sendAsync(Message message, SendCallback callback) { // Update the message metadata before computing the payload chunk size to avoid a large message cannot be split // into chunks. - updateMessageMetadata(msgMetadata, uncompressedSize); + updateMessageMetadata(msgMetadata, uncompressedSize, compressed); // send in chunks int totalChunks; @@ -636,7 +642,7 @@ public void sendAsync(Message message, SendCallback callback) { * @param uncompressedSize * @return the sequence id */ - private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) { + private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, boolean isCompressed) { if (!msgMetadata.hasPublishTime()) { msgMetadata.setPublishTime(client.getClientClock().millis()); @@ -646,7 +652,7 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int // The field "uncompressedSize" is zero means the compression info were not set yet. if (msgMetadata.getUncompressedSize() <= 0) { - if (conf.getCompressionType() != CompressionType.NONE) { + if (conf.getCompressionType() != CompressionType.NONE && isCompressed) { msgMetadata .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); } @@ -737,7 +743,7 @@ private void serializeAndSendMessage(MessageImpl msg, } else { // in this case compression has not been applied by the caller // but we have to compress the payload if compression is configured - if (!compressed) { + if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) { chunkPayload = applyCompression(chunkPayload); } ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 6ec738bbf4c8d..0c770c7c9bd05 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { ) private CompressionType compressionType = CompressionType.NONE; + private int compressMinMsgBodySize = 4 * 1024; // 4kb + // Cannot use Optional since it's not serializable private Long initialSequenceId = null; From 08c28534669ec3fe52cfa39af1827ac58e1e4fb0 Mon Sep 17 00:00:00 2001 From: xiangying Date: Tue, 29 Oct 2024 20:41:40 +0800 Subject: [PATCH 2/7] ADD TEST --- .../impl/ProducerConsumerInternalTest.java | 60 +++++++++++++++---- 1 file changed, 50 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index a06085d3d4626..9f2eb83afa070 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -18,24 +18,21 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.client.api.BatcherBuilder; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.awaitility.Awaitility; @@ -230,4 +227,47 @@ public void testRetentionPolicyByProducingMessages() throws Exception { assertEquals(internalStats.ledgers.size(), 1); }); } + + + @Test + public void testProducerCompressionMinMsgBodySize() throws PulsarClientException { + byte[] msg1022 = new byte[1022]; + byte[] msg1025 = new byte[1025]; + final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic(topicName) + .producerName("producer") + .compressionType(CompressionType.LZ4) + .create(); + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("sub") + .subscribe(); + + producer.conf.setCompressMinMsgBodySize(1024); + producer.conf.setCompressionType(CompressionType.LZ4); + // disable batch + producer.conf.setBatchingEnabled(false); + producer.newMessage().value(msg1022).send(); + MessageImpl message = (MessageImpl) consumer.receive(); + CompressionType compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.NONE); + producer.newMessage().value(msg1025).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.LZ4); + + // enable batch + producer.conf.setBatchingEnabled(true); + producer.newMessage().value(msg1022).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.NONE); + producer.newMessage().value(msg1025).send(); + message = (MessageImpl) consumer.receive(); + compressionType = message.getCompressionType(); + assertEquals(compressionType, CompressionType.LZ4); + } } From 5f1e2c235783e9443fe0569c93ccb320cbd6796d Mon Sep 17 00:00:00 2001 From: xiangying Date: Thu, 28 Nov 2024 20:54:41 +0800 Subject: [PATCH 3/7] address comments --- .../impl/RawBatchMessageContainerImpl.java | 2 +- .../impl/ProducerConsumerInternalTest.java | 18 +++++++++++------ .../impl/BatchMessageContainerImpl.java | 20 +++++++++---------- .../pulsar/client/impl/MessageImpl.java | 7 ++++++- .../pulsar/client/impl/ProducerImpl.java | 12 ++++++----- 5 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java index 8cb8d74b3f93e..9b9f79a8ec5ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchMessageContainerImpl.java @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() { } } - ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(true)); + ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(false)); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, encryptedPayload); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java index 9f2eb83afa070..4617f631207f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerConsumerInternalTest.java @@ -18,21 +18,27 @@ */ package org.apache.pulsar.client.impl; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; -import static org.testng.Assert.*; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertEquals; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.ServerCnx; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.BatcherBuilder; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.awaitility.Awaitility; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 759bce1a6bf07..a3b1babaf6f75 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -141,7 +141,11 @@ public boolean add(MessageImpl msg, SendCallback callback) { return isBatchFull(); } - protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) { + protected ByteBuf getCompressedBatchMetadataAndPayload() { + return getCompressedBatchMetadataAndPayload(true); + } + + protected ByteBuf getCompressedBatchMetadataAndPayload(boolean allowCompression) { int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); @@ -170,11 +174,11 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseC int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); ByteBuf compressedPayload; - boolean isCompressed = false; - if (!isBrokerTwoPhaseCompactor && producer != null){ + if (!allowCompression && producer != null){ if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); - isCompressed = true; + messageMetadata.setCompression(compressionType); + messageMetadata.setUncompressedSize(uncompressedSize); } else { compressedPayload = batchedMessageMetadataAndPayload; } @@ -182,10 +186,6 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseC compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); batchedMessageMetadataAndPayload.release(); } - if (compressionType != CompressionType.NONE && isCompressed) { - messageMetadata.setCompression(compressionType); - messageMetadata.setUncompressedSize(uncompressedSize); - } // Update the current max batch size using the uncompressed size, which is what we need in any case to // accumulate the batch content @@ -264,7 +264,7 @@ public OpSendMsg createOpSendMsg() throws IOException { messageMetadata.clear(); messageMetadata.copyFrom(messages.get(0).getMessageBuilder()); ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, - getCompressedBatchMetadataAndPayload(false)); + getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(), 1, null, messageMetadata, encryptedPayload); @@ -296,7 +296,7 @@ public OpSendMsg createOpSendMsg() throws IOException { return op; } ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, - getCompressedBatchMetadataAndPayload(false)); + getCompressedBatchMetadataAndPayload()); updateAndReserveBatchAllocatedSize(encryptedPayload.capacity()); if (encryptedPayload.readableBytes() > getMaxMessageSize()) { producer.semaphoreRelease(messages.size()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index 5599e8bc946ae..89b269f297f2e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -38,7 +38,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Getter; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.schema.AbstractSchema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 6dc50971770c5..5fc733e486e6b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -520,8 +520,8 @@ public void sendAsync(Message message, SendCallback callback) { String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( - format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds" - + " %d bytes", + format("The producer %s of the topic %s sends a %s message with %d bytes that " + + "exceeds %d bytes", producerName, topic, compressedStr, compressedSize, getMaxMessageSize())); completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException); @@ -642,7 +642,9 @@ public void sendAsync(Message message, SendCallback callback) { * @param uncompressedSize * @return the sequence id */ - private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, boolean isCompressed) { + @SuppressWarnings("checkstyle:Indentation") + private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, + boolean isCompressed) { if (!msgMetadata.hasPublishTime()) { msgMetadata.setPublishTime(client.getClientClock().millis()); @@ -653,8 +655,8 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int // The field "uncompressedSize" is zero means the compression info were not set yet. if (msgMetadata.getUncompressedSize() <= 0) { if (conf.getCompressionType() != CompressionType.NONE && isCompressed) { - msgMetadata - .setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); + msgMetadata.setCompression( + CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType())); } msgMetadata.setUncompressedSize(uncompressedSize); } From 460e00e156d9e985df9c18ed4b63f0af27069243 Mon Sep 17 00:00:00 2001 From: xiangying Date: Thu, 28 Nov 2024 20:58:23 +0800 Subject: [PATCH 4/7] address comments --- .../apache/pulsar/client/impl/BatchMessageContainerImpl.java | 3 ++- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index a3b1babaf6f75..f35e0f80498b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -175,7 +175,8 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean allowCompression) int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); ByteBuf compressedPayload; if (!allowCompression && producer != null){ - if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { + if (compressionType != CompressionType.NONE + && uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); messageMetadata.setCompression(compressionType); messageMetadata.setUncompressedSize(uncompressedSize); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 5fc733e486e6b..953331bf62046 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -517,7 +517,9 @@ public void sendAsync(Message message, SendCallback callback) { int compressedSize = compressedPayload.readableBytes(); if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) { compressedPayload.release(); - String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : ""; + String compressedStr = conf.getCompressionType() != CompressionType.NONE + ? ("compressed (" + conf.getCompressionType() + ")") + : "uncompressed"; PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException( format("The producer %s of the topic %s sends a %s message with %d bytes that " From 7068823405bac7760a1556955b93f6e50b815d7a Mon Sep 17 00:00:00 2001 From: xiangying Date: Thu, 13 Feb 2025 14:27:08 +0800 Subject: [PATCH 5/7] fix --- .../pulsar/client/impl/BatchMessageContainerImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 809d8fc78967f..150761fbde1d5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -174,7 +174,7 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean allowCompression) int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes(); ByteBuf compressedPayload; - if (!allowCompression && producer != null){ + if (clientOperation && producer != null){ if (compressionType != CompressionType.NONE && uncompressedSize > producer.conf.getCompressMinMsgBodySize()) { compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload); @@ -186,6 +186,10 @@ protected ByteBuf getCompressedBatchMetadataAndPayload(boolean allowCompression) } else { compressedPayload = compressor.encode(batchedMessageMetadataAndPayload); batchedMessageMetadataAndPayload.release(); + if (compressionType != CompressionType.NONE) { + messageMetadata.setCompression(compressionType); + messageMetadata.setUncompressedSize(uncompressedSize); + } } // Update the current max batch size using the uncompressed size, which is what we need in any case to From c24e769f9944dd7ccfa2373002c93d0c25f71c79 Mon Sep 17 00:00:00 2001 From: xiangying Date: Thu, 13 Feb 2025 19:57:30 +0800 Subject: [PATCH 6/7] fix --- .../apache/pulsar/client/impl/BatchMessageContainerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 150761fbde1d5..ed4dc3875ad14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -145,7 +145,7 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() { return getCompressedBatchMetadataAndPayload(true); } - protected ByteBuf getCompressedBatchMetadataAndPayload(boolean allowCompression) { + protected ByteBuf getCompressedBatchMetadataAndPayload(boolean clientOperation) { int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex(); int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex(); From 5aaa6466a824476829abcdbb68c8be23ed8773ed Mon Sep 17 00:00:00 2001 From: xiangying Date: Fri, 14 Feb 2025 12:06:49 +0800 Subject: [PATCH 7/7] fix test --- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 3 ++- .../org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 258c0183114fd..27f9d72413c52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1257,11 +1257,12 @@ public void testExamineMessageMetadata() throws Exception { admin.topics().createPartitionedTopic(topicName, 2); @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING) + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.STRING) .producerName("testExamineMessageMetadataProducer") .compressionType(CompressionType.LZ4) .topic(topicName + "-partition-0") .create(); + producer.getConfiguration().setCompressMinMsgBodySize(1); producer.newMessage() .keyBytes("partition123".getBytes()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java index dcdfd136476c3..e366b232a639d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java @@ -124,6 +124,7 @@ public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType comp .compressionType(compressionType) .enableBatching(false) .create(); + producer.getConfiguration().setCompressMinMsgBodySize(1); producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); MsgPayloadTouchableMessageBuilder msgBuilder = newMessage(producer); /**