From 1ffda4ca957bd2fcc87fa793e645ab820d9e4629 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 May 2024 10:43:57 +0300 Subject: [PATCH 01/10] [fix][broker] Fix multiple transfer corruption issues when TLS is enabled --- .../pulsar/common/protocol/ByteBufPair.java | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index cfd89d3bb28ab..b199df8c92606 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -141,19 +142,47 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof ByteBufPair) { ByteBufPair b = (ByteBufPair) msg; + ChannelPromise compositePromise = ctx.newPromise(); + compositePromise.addListener(future -> { + // release the ByteBufPair after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (!promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } + } + }); + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached // for multiple requests. - try { - ctx.write(b.getFirst().copy(), ctx.voidPromise()); - ctx.write(b.getSecond().copy(), promise); - } finally { - ReferenceCountUtil.safeRelease(b); - } + ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); + ctx.write(nioBufferCopy(b.getSecond()), compositePromise); } else { ctx.write(msg, promise); } } + + // Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method. + // This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is + // not thread safe when the ByteBuf instance is shared across multiple threads. + // This method works around the issue. + // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. + // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after + // the write method completes. + private ByteBuf nioBufferCopy(ByteBuf buf) { + // avoid calling nioBufferCount() for performance reasons on CompositeByteBuf + // there's a similar optimization in Netty's SslHandler.wrap method where this is explained + if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) { + return Unpooled.wrappedBuffer(buf.nioBuffers()); + } else { + // Single buffer, no need to wrap it in an array as the nioBuffers() method would do + return Unpooled.wrappedBuffer(buf.nioBuffer()); + } + } } } From aa1543f9f1656558d5d76265722ae91535d62603 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 May 2024 20:16:21 +0300 Subject: [PATCH 02/10] Use alternative approach: Remove CopyingEncoder and make the encoder suitable for both use cases --- .../service/PulsarChannelInitializer.java | 7 +-- .../client/impl/PulsarChannelInitializer.java | 6 +- .../pulsar/common/protocol/ByteBufPair.java | 61 ++----------------- 3 files changed, 11 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 5308b3c981eb4..d642b46f3e8e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -112,10 +113,8 @@ protected void initChannel(SocketChannel ch) throws Exception { } else { ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc())); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); @@ -128,7 +127,7 @@ protected void initChannel(SocketChannel ch) throws Exception { // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ServerCnx cnx = newServerCnx(pulsar, listenerName); + ChannelHandler cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index ed34f7d41c130..72a5183bf69cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -146,12 +147,11 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier { - // release the ByteBufPair after the write operation is completed - ReferenceCountUtil.safeRelease(b); - // complete the promise passed as an argument unless it's a void promise - if (!promise.isVoid()) { - if (future.isSuccess()) { - promise.setSuccess(); - } else { - promise.setFailure(future.cause()); - } - } - }); - - // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). - // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached - // for multiple requests. - ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); - ctx.write(nioBufferCopy(b.getSecond()), compositePromise); - } else { - ctx.write(msg, promise); - } - } - - // Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method. - // This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is - // not thread safe when the ByteBuf instance is shared across multiple threads. - // This method works around the issue. - // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. - // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after - // the write method completes. - private ByteBuf nioBufferCopy(ByteBuf buf) { - // avoid calling nioBufferCount() for performance reasons on CompositeByteBuf - // there's a similar optimization in Netty's SslHandler.wrap method where this is explained - if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) { - return Unpooled.wrappedBuffer(buf.nioBuffers()); - } else { - // Single buffer, no need to wrap it in an array as the nioBuffers() method would do - return Unpooled.wrappedBuffer(buf.nioBuffer()); - } - } - } - } From 3767f6fd3430a631a2740aba17dfd9e38a2c54f7 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 May 2024 20:53:20 +0300 Subject: [PATCH 03/10] Revisit the approach once again: retainedDuplicate() is needed --- .../apache/pulsar/common/protocol/ByteBufPair.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index 68e4cc07ac4c0..c283f9ed5eb22 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -122,10 +122,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // Write each buffer individually on the socket. The retain() here is needed to preserve the fact that // ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased and it // gets written multiple times, the individual buffers refcount should be reflected as well. - // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. try { - ctx.write(b.getFirst().asReadOnly().retain(), ctx.voidPromise()); - ctx.write(b.getSecond().asReadOnly().retain(), promise); + ctx.write(readOnlyRetainedDuplicate(b.getFirst()), ctx.voidPromise()); + ctx.write(readOnlyRetainedDuplicate(b.getSecond()), promise); } finally { ReferenceCountUtil.safeRelease(b); } @@ -133,5 +132,13 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.write(msg, promise); } } + + // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. + private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { + // If the buffer is already read-only, .asReadOnly() will return the same buffer. + // That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer + // has independent readIndex and writeIndex. + return buf.asReadOnly().retainedDuplicate(); + } } } From b9ad8db896f194ad4a7f82d34d777b8858ef9a08 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 May 2024 23:40:56 +0300 Subject: [PATCH 04/10] Add workaround for Netty issue which will be fixed by Netty PR 14071 --- .../pulsar/common/protocol/ByteBufPair.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index c283f9ed5eb22..905375f92a9d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -133,12 +133,23 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. + // readonly buffer is needed to prevent SslHandler from modifying the input buffers. private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { - // If the buffer is already read-only, .asReadOnly() will return the same buffer. - // That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer - // has independent readIndex and writeIndex. - return buf.asReadOnly().retainedDuplicate(); + if (buf.readableBytes() == 0) { + return Unpooled.EMPTY_BUFFER; + } + + // using "buf.asReadOnly().retainedDuplicate()" would be preferred here, however it's not supported + // by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. + // There's a need to use ".retainedDuplicate()" together with ".asReadOnly()". + // If the buffer is already read-only, .asReadOnly() will return the current buffer. This would be a problem + // since a duplicate is needed so that the buffer has independent readIndex and writeIndex state. + // + // The alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". This is a deprecated + // method, but it will achieve the same result with the additional detail that it won't require the PR 14071 + // in SslHandler to be released. The reason for this is that isWritable methods return false for an + // unwrapped ReadOnlyByteBuf instance. + return Unpooled.unmodifiableBuffer(buf).retain(); } } } From 56cfd35f9f71256dd07420906b3d6d011c2e9488 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 00:01:14 +0300 Subject: [PATCH 05/10] Improve comment --- .../pulsar/common/protocol/ByteBufPair.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index 905375f92a9d7..75c2bcf7b4fcf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -139,16 +139,18 @@ private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { return Unpooled.EMPTY_BUFFER; } - // using "buf.asReadOnly().retainedDuplicate()" would be preferred here, however it's not supported - // by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. - // There's a need to use ".retainedDuplicate()" together with ".asReadOnly()". - // If the buffer is already read-only, .asReadOnly() will return the current buffer. This would be a problem - // since a duplicate is needed so that the buffer has independent readIndex and writeIndex state. + // The preferred approach here would be to use "buf.asReadOnly().retainedDuplicate()", however, it's not + // supported by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. + // There's a need to use ".retainedDuplicate()" in conjunction with ".asReadOnly()" because when the buffer + // is already read-only, ".asReadOnly()" will return the current buffer without adding a wrapper. + // This could be problematic since a duplicate buffer is needed so that the buffer maintains independent + // readerIndex and writerIndex states. // - // The alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". This is a deprecated - // method, but it will achieve the same result with the additional detail that it won't require the PR 14071 - // in SslHandler to be released. The reason for this is that isWritable methods return false for an - // unwrapped ReadOnlyByteBuf instance. + // An alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". Although this method + // is deprecated, it achieves the same result with the additional detail that it doesn't require the + // PR 14071 in SslHandler to be released. The reason for this is that the isWritable methods return false + // for an unwrapped ReadOnlyByteBuf instance. There's no need to add a separate duplicate wrapper buffer + // when using Unpooled.unmodifiableBuffer. return Unpooled.unmodifiableBuffer(buf).retain(); } } From 2b3021e641f92ef495a1b3ab789d4377bd35d049 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 01:03:10 +0300 Subject: [PATCH 06/10] Revert "Improve comment" This reverts commit 56cfd35f9f71256dd07420906b3d6d011c2e9488. --- .../pulsar/common/protocol/ByteBufPair.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index 75c2bcf7b4fcf..905375f92a9d7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -139,18 +139,16 @@ private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { return Unpooled.EMPTY_BUFFER; } - // The preferred approach here would be to use "buf.asReadOnly().retainedDuplicate()", however, it's not - // supported by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. - // There's a need to use ".retainedDuplicate()" in conjunction with ".asReadOnly()" because when the buffer - // is already read-only, ".asReadOnly()" will return the current buffer without adding a wrapper. - // This could be problematic since a duplicate buffer is needed so that the buffer maintains independent - // readerIndex and writerIndex states. + // using "buf.asReadOnly().retainedDuplicate()" would be preferred here, however it's not supported + // by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. + // There's a need to use ".retainedDuplicate()" together with ".asReadOnly()". + // If the buffer is already read-only, .asReadOnly() will return the current buffer. This would be a problem + // since a duplicate is needed so that the buffer has independent readIndex and writeIndex state. // - // An alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". Although this method - // is deprecated, it achieves the same result with the additional detail that it doesn't require the - // PR 14071 in SslHandler to be released. The reason for this is that the isWritable methods return false - // for an unwrapped ReadOnlyByteBuf instance. There's no need to add a separate duplicate wrapper buffer - // when using Unpooled.unmodifiableBuffer. + // The alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". This is a deprecated + // method, but it will achieve the same result with the additional detail that it won't require the PR 14071 + // in SslHandler to be released. The reason for this is that isWritable methods return false for an + // unwrapped ReadOnlyByteBuf instance. return Unpooled.unmodifiableBuffer(buf).retain(); } } From cd6022175cb5a440d02fcf6fd086d9a10abc430b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 01:03:15 +0300 Subject: [PATCH 07/10] Revert "Add workaround for Netty issue which will be fixed by Netty PR 14071" This reverts commit b9ad8db896f194ad4a7f82d34d777b8858ef9a08. --- .../pulsar/common/protocol/ByteBufPair.java | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index 905375f92a9d7..c283f9ed5eb22 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -133,23 +133,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - // readonly buffer is needed to prevent SslHandler from modifying the input buffers. + // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { - if (buf.readableBytes() == 0) { - return Unpooled.EMPTY_BUFFER; - } - - // using "buf.asReadOnly().retainedDuplicate()" would be preferred here, however it's not supported - // by SslHandler until the fix https://github.com/netty/netty/pull/14071 is released. - // There's a need to use ".retainedDuplicate()" together with ".asReadOnly()". - // If the buffer is already read-only, .asReadOnly() will return the current buffer. This would be a problem - // since a duplicate is needed so that the buffer has independent readIndex and writeIndex state. - // - // The alternative workaround is to use "Unpooled.unmodifiableBuffer(buf).retain()". This is a deprecated - // method, but it will achieve the same result with the additional detail that it won't require the PR 14071 - // in SslHandler to be released. The reason for this is that isWritable methods return false for an - // unwrapped ReadOnlyByteBuf instance. - return Unpooled.unmodifiableBuffer(buf).retain(); + // If the buffer is already read-only, .asReadOnly() will return the same buffer. + // That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer + // has independent readIndex and writeIndex. + return buf.asReadOnly().retainedDuplicate(); } } } From 06814b5e8d4e7e5883c0eb1ebb6cf670c8bc40ec Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 01:03:23 +0300 Subject: [PATCH 08/10] Revert "Revisit the approach once again: retainedDuplicate() is needed" This reverts commit 3767f6fd3430a631a2740aba17dfd9e38a2c54f7. --- .../apache/pulsar/common/protocol/ByteBufPair.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index c283f9ed5eb22..68e4cc07ac4c0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -122,9 +122,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // Write each buffer individually on the socket. The retain() here is needed to preserve the fact that // ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased and it // gets written multiple times, the individual buffers refcount should be reflected as well. + // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. try { - ctx.write(readOnlyRetainedDuplicate(b.getFirst()), ctx.voidPromise()); - ctx.write(readOnlyRetainedDuplicate(b.getSecond()), promise); + ctx.write(b.getFirst().asReadOnly().retain(), ctx.voidPromise()); + ctx.write(b.getSecond().asReadOnly().retain(), promise); } finally { ReferenceCountUtil.safeRelease(b); } @@ -132,13 +133,5 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.write(msg, promise); } } - - // .asReadOnly() is needed to prevent SslHandler from modifying the input buffers. - private static ByteBuf readOnlyRetainedDuplicate(ByteBuf buf) { - // If the buffer is already read-only, .asReadOnly() will return the same buffer. - // That's why the additional .retainedDuplicate() is needed to ensure that the returned buffer - // has independent readIndex and writeIndex. - return buf.asReadOnly().retainedDuplicate(); - } } } From 305f15ceb177ba38a3a5eb966667cac3dfa67ffc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 01:03:29 +0300 Subject: [PATCH 09/10] Revert "Use alternative approach: Remove CopyingEncoder and make the encoder suitable for both use cases" This reverts commit aa1543f9f1656558d5d76265722ae91535d62603. --- .../service/PulsarChannelInitializer.java | 7 ++- .../client/impl/PulsarChannelInitializer.java | 6 +- .../pulsar/common/protocol/ByteBufPair.java | 61 +++++++++++++++++-- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index d642b46f3e8e2..5308b3c981eb4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -113,8 +112,10 @@ protected void initChannel(SocketChannel ch) throws Exception { } else { ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc())); } + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); @@ -127,7 +128,7 @@ protected void initChannel(SocketChannel ch) throws Exception { // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ChannelHandler cnx = newServerCnx(pulsar, listenerName); + ServerCnx cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index 72a5183bf69cf..ed34f7d41c130 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -147,11 +146,12 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier { + // release the ByteBufPair after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (!promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } + } + }); + + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). + // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached + // for multiple requests. + ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); + ctx.write(nioBufferCopy(b.getSecond()), compositePromise); + } else { + ctx.write(msg, promise); + } + } + + // Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method. + // This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is + // not thread safe when the ByteBuf instance is shared across multiple threads. + // This method works around the issue. + // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. + // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after + // the write method completes. + private ByteBuf nioBufferCopy(ByteBuf buf) { + // avoid calling nioBufferCount() for performance reasons on CompositeByteBuf + // there's a similar optimization in Netty's SslHandler.wrap method where this is explained + if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) { + return Unpooled.wrappedBuffer(buf.nioBuffers()); + } else { + // Single buffer, no need to wrap it in an array as the nioBuffers() method would do + return Unpooled.wrappedBuffer(buf.nioBuffer()); + } + } + } + } From 24b577989fe059590612f73fa93c19039de8a948 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 01:11:17 +0300 Subject: [PATCH 10/10] Return to original approach, but add read-only wrapper --- .../apache/pulsar/common/protocol/ByteBufPair.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java index b199df8c92606..f7e48aca20af9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java @@ -159,8 +159,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached // for multiple requests. - ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise()); - ctx.write(nioBufferCopy(b.getSecond()), compositePromise); + ctx.write(nioBufferReadOnlyWrapper(b.getFirst()), ctx.voidPromise()); + ctx.write(nioBufferReadOnlyWrapper(b.getSecond()), compositePromise); } else { ctx.write(msg, promise); } @@ -173,14 +173,16 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation. // This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after // the write method completes. - private ByteBuf nioBufferCopy(ByteBuf buf) { + // Use Unpooled.unmodifiableBuffer instead of asReadOnly() to make the ByteBuf read-only + // since Unpooled.unmodifiableBuffer works around an issue in Netty's SslHandler before the fix is available. + private ByteBuf nioBufferReadOnlyWrapper(ByteBuf buf) { // avoid calling nioBufferCount() for performance reasons on CompositeByteBuf // there's a similar optimization in Netty's SslHandler.wrap method where this is explained if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) { - return Unpooled.wrappedBuffer(buf.nioBuffers()); + return Unpooled.unmodifiableBuffer(Unpooled.wrappedBuffer(buf.nioBuffers())); } else { // Single buffer, no need to wrap it in an array as the nioBuffers() method would do - return Unpooled.wrappedBuffer(buf.nioBuffer()); + return Unpooled.unmodifiableBuffer(Unpooled.wrappedBuffer(buf.nioBuffer())); } } }