Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -141,19 +142,49 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;

ChannelPromise compositePromise = ctx.newPromise();
compositePromise.addListener(future -> {
// release the ByteBufPair after the write operation is completed
ReferenceCountUtil.safeRelease(b);
// complete the promise passed as an argument unless it's a void promise
if (!promise.isVoid()) {
if (future.isSuccess()) {
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});

// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
ctx.write(nioBufferReadOnlyWrapper(b.getFirst()), ctx.voidPromise());
ctx.write(nioBufferReadOnlyWrapper(b.getSecond()), compositePromise);
} else {
ctx.write(msg, promise);
}
}

// Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method.
// This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is
// not thread safe when the ByteBuf instance is shared across multiple threads.
// This method works around the issue.
// Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation.
// This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after
// the write method completes.
// Use Unpooled.unmodifiableBuffer instead of asReadOnly() to make the ByteBuf read-only
// since Unpooled.unmodifiableBuffer works around an issue in Netty's SslHandler before the fix is available.
private ByteBuf nioBufferReadOnlyWrapper(ByteBuf buf) {
// avoid calling nioBufferCount() for performance reasons on CompositeByteBuf
// there's a similar optimization in Netty's SslHandler.wrap method where this is explained
if (buf instanceof CompositeByteBuf || buf.nioBufferCount() > 1) {
return Unpooled.unmodifiableBuffer(Unpooled.wrappedBuffer(buf.nioBuffers()));
} else {
// Single buffer, no need to wrap it in an array as the nioBuffers() method would do
return Unpooled.unmodifiableBuffer(Unpooled.wrappedBuffer(buf.nioBuffer()));
}
}
}

}