diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a07aad973142c..337df3d5b62c4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -767,34 +767,23 @@ public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, i @Override public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { - if (log.isDebugEnabled()) { - log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); - } - - // retain buffer in this thread - buffer.retain(); - - // Jump to specific thread to avoid contention from writers writing from different threads - executor.execute(() -> { - OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx, - currentLedgerTimeoutTriggered); - internalAsyncAddEntry(addOperation); - }); + asyncAddEntry(buffer, 0, callback, ctx); } @Override public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) { + // retain buffer in this thread + // use `.retainedSlice()` instead of `.retain()` to ensure that the input buffer is not mutated by + // Netty's SslHandler + ByteBuf retainedBuffer = buffer.retainedSlice(); if (log.isDebugEnabled()) { - log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); + log.debug("[{}] asyncAddEntry size={} state={}", name, retainedBuffer.readableBytes(), state); } - - // retain buffer in this thread - buffer.retain(); - // Jump to specific thread to avoid contention from writers writing from different threads executor.execute(() -> { - OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx, - currentLedgerTimeoutTriggered); + OpAddEntry addOperation = + OpAddEntry.createNoRetainBuffer(this, retainedBuffer, numberOfMessages, callback, ctx, + currentLedgerTimeoutTriggered); internalAsyncAddEntry(addOperation); }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index acbb0da5a4e74..e6d61c322f8c1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -80,29 +80,9 @@ enum State { CLOSED } - public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, - Object ctx, AtomicBoolean timeoutTriggered) { - OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered); - if (log.isDebugEnabled()) { - log.debug("Created new OpAddEntry {}", op); - } - return op; - } - public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx, AtomicBoolean timeoutTriggered) { - OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx, timeoutTriggered); - op.numberOfMessages = numberOfMessages; - if (log.isDebugEnabled()) { - log.debug("Created new OpAddEntry {}", op); - } - return op; - } - - private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, - AddEntryCallback callback, Object ctx, - AtomicBoolean timeoutTriggered) { OpAddEntry op = RECYCLER.get(); op.ml = ml; op.ledger = null; @@ -118,6 +98,10 @@ private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, B op.payloadProcessorHandle = null; op.timeoutTriggered = timeoutTriggered; ml.mbean.addAddEntrySample(op.dataLength); + op.numberOfMessages = numberOfMessages; + if (log.isDebugEnabled()) { + log.debug("Created new OpAddEntry {}", op); + } return op; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4f521f1e99e91..527b95aeec744 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -3311,7 +3311,7 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception { List oldOps = new ArrayList<>(); for (int i = 0; i < 10; i++) { OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, - ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null, new AtomicBoolean()); + ByteBufAllocator.DEFAULT.buffer(128).retain(), 0, null, null, new AtomicBoolean()); if (i > 4) { op.setLedger(mock(LedgerHandle.class)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 5308b3c981eb4..d642b46f3e8e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -112,10 +113,8 @@ protected void initChannel(SocketChannel ch) throws Exception { } else { ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc())); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); - } else { - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) { ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder()); @@ -128,7 +127,7 @@ protected void initChannel(SocketChannel ch) throws Exception { // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ServerCnx cnx = newServerCnx(pulsar, listenerName); + ChannelHandler cnx = newServerCnx(pulsar, listenerName); ch.pipeline().addLast("handler", cnx); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java index ed34f7d41c130..72a5183bf69cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; @@ -146,12 +147,11 @@ public PulsarChannelInitializer(ClientConfigurationData conf, Supplier