From 1bdded21d9811cfc914e9ed886a6ab9ec0b580ae Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Tue, 28 Aug 2018 12:21:53 +0000 Subject: [PATCH] Copy ByteBufPair buffers when using with SSL (#2401) The netty SSL handler uses a coalescing buffer queue, which modifies the buffers used to queue the writes so that SSL_write can be given larger chunks, thereby increasing the 'goodput'. If we pass in a retained duplicate as we have been doing until now, then later clients will be passed junk, as SSL will have modified cached entry buffers. This patch introduces a copying ByteBufPair encoder, which is only used with SSL connections. --- .../service/PulsarChannelInitializer.java | 4 ++- .../pulsar/client/impl/ConnectionPool.java | 5 ++-- .../apache/pulsar/common/api/ByteBufPair.java | 25 ++++++++++++++++++- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 7858a110488f5..a3fb772f5612c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -57,9 +57,11 @@ protected void initChannel(SocketChannel ch) throws Exception { serviceConfig.getTlsCiphers(), serviceConfig.getTlsProtocols(), serviceConfig.getTlsRequireTrustedClientCertOnConnect()); ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); ch.pipeline().addLast("handler", new ServerCnx(pulsar)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index afba7d2723829..3188b3562b93e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -100,9 +100,10 @@ public void initChannel(SocketChannel ch) throws Exception { conf.getTlsTrustCertsFilePath()); } ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER); + } else { + ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); } - - ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4)); ch.pipeline().addLast("handler", clientCnxSupplier.get()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java index 94e1fb7b1320b..b99270b1c1bb6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java @@ -109,6 +109,7 @@ public ReferenceCounted touch(Object hint) { } public static final Encoder ENCODER = new Encoder(); + public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder(); @Sharable public static class Encoder extends ChannelOutboundHandlerAdapter { @@ -132,4 +133,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } -} \ No newline at end of file + @Sharable + public static class CopyingEncoder extends ChannelOutboundHandlerAdapter { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof ByteBufPair) { + ByteBufPair b = (ByteBufPair) msg; + + // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler). + // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached + // for multiple requests. + try { + ctx.write(b.getFirst().copy(), ctx.voidPromise()); + ctx.write(b.getSecond().copy(), promise); + } finally { + ReferenceCountUtil.safeRelease(b); + } + } else { + ctx.write(msg, promise); + } + } + } + +}