From b0ddc0a62ea247b63d86b7e250ee23991ed68a07 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 15 Sep 2022 10:19:22 +0200 Subject: [PATCH 1/5] Fix scalability issue due to checkcast on context's invoke operations Motivation: ChannelDuplexHandler can implements both ChannelOutboundHandler and ChannelInboundHandler causing a scalability issue due to checkcast due to https://bugs.openjdk.org/browse/JDK-8180450 Modifications: Peeling-off invoke methods turning the checkcast vs interfaces into an instanceof vs ChannelDuplexHandler, saving the scalability issue to happen. Sadly, if users manually implements both ChannelOutboundHandler and ChannelInboundHandler without extending ChannelDuplexHandler the fix won't be enough. Result: Scalable duplex channel handler operations. --- .../AbstractChannelHandlerContext.java | 273 ++++++++++++++---- 1 file changed, 209 insertions(+), 64 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 348ba3a5582..c0f58bc62d0 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -16,6 +16,7 @@ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.socket.DuplexChannel; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; @@ -162,10 +163,19 @@ public void run() { private void invokeChannelRegistered() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRegistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelRegistered(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelRegistered(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelRegistered(); @@ -194,10 +204,19 @@ public void run() { private void invokeChannelUnregistered() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelUnregistered(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelUnregistered(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelUnregistered(); @@ -226,10 +245,19 @@ public void run() { private void invokeChannelActive() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelActive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelActive(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelActive(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelActive(); @@ -258,10 +286,19 @@ public void run() { private void invokeChannelInactive() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelInactive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelInactive(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelInactive(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelInactive(); @@ -342,10 +379,19 @@ public void run() { private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).userEventTriggered(this, event); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).userEventTriggered(this, event); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireUserEventTriggered(event); @@ -375,10 +421,19 @@ public void run() { private void invokeChannelRead(Object msg) { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelRead(this, msg); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelRead(this, msg); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelRead(msg); @@ -406,10 +461,19 @@ static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) private void invokeChannelReadComplete() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelReadComplete(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelReadComplete(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelReadComplete(); @@ -437,10 +501,19 @@ static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext private void invokeChannelWritabilityChanged() { if (invokeHandler()) { - try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).channelWritabilityChanged(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelInboundHandler) handler).channelWritabilityChanged(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { fireChannelWritabilityChanged(); @@ -502,10 +575,19 @@ public void run() { private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } else { bind(localAddress, promise); @@ -544,10 +626,19 @@ public void run() { private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } else { connect(remoteAddress, localAddress, promise); @@ -583,10 +674,19 @@ public void run() { private void invokeDisconnect(ChannelPromise promise) { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).disconnect(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } else { disconnect(promise); @@ -618,10 +718,19 @@ public void run() { private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).close(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } else { close(promise); @@ -653,10 +762,19 @@ public void run() { private void invokeDeregister(ChannelPromise promise) { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).deregister(this, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } else { deregister(promise); @@ -682,10 +800,19 @@ public ChannelHandlerContext read() { private void invokeRead() { if (invokeHandler()) { - try { - ((ChannelOutboundHandler) handler()).read(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).read(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelOutboundHandler) handler).read(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } else { read(); @@ -713,10 +840,19 @@ void invokeWrite(Object msg, ChannelPromise promise) { } private void invokeWrite0(Object msg, ChannelPromise promise) { - try { - ((ChannelOutboundHandler) handler()).write(this, msg, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).write(this, msg, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } + } else { + try { + ((ChannelOutboundHandler) handler).write(this, msg, promise); + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); + } } } @@ -746,10 +882,19 @@ private void invokeFlush() { } private void invokeFlush0() { - try { - ((ChannelOutboundHandler) handler()).flush(this); - } catch (Throwable t) { - invokeExceptionCaught(t); + final ChannelHandler handler = handler(); + if (handler instanceof ChannelDuplexHandler) { + try { + ((ChannelDuplexHandler) handler).flush(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } + } else { + try { + ((ChannelOutboundHandler) handler).flush(this); + } catch (Throwable t) { + invokeExceptionCaught(t); + } } } From 747e4708ba5d7739cc9a15a1330a1045953da2bf Mon Sep 17 00:00:00 2001 From: franz1981 Date: Fri, 16 Sep 2022 15:13:26 +0200 Subject: [PATCH 2/5] Improved style and comments --- .../AbstractChannelHandlerContext.java | 305 +++++++++--------- 1 file changed, 144 insertions(+), 161 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index c0f58bc62d0..e27d487853b 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -16,7 +16,6 @@ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.socket.DuplexChannel; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; @@ -163,19 +162,18 @@ public void run() { private void invokeChannelRegistered() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelRegistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelRegistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelRegistered(); @@ -204,19 +202,18 @@ public void run() { private void invokeChannelUnregistered() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelUnregistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelUnregistered(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelUnregistered(); @@ -245,19 +242,18 @@ public void run() { private void invokeChannelActive() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelActive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelActive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelActive(); @@ -286,19 +282,18 @@ public void run() { private void invokeChannelInactive() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelInactive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelInactive(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelInactive(); @@ -379,19 +374,18 @@ public void run() { private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).userEventTriggered(this, event); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).userEventTriggered(this, event); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireUserEventTriggered(event); @@ -421,19 +415,18 @@ public void run() { private void invokeChannelRead(Object msg) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelRead(this, msg); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelRead(this, msg); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelRead(msg); @@ -461,19 +454,18 @@ static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) private void invokeChannelReadComplete() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelReadComplete(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelReadComplete(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelReadComplete(); @@ -501,19 +493,18 @@ static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext private void invokeChannelWritabilityChanged() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelWritabilityChanged(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelInboundHandler) handler).channelWritabilityChanged(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { fireChannelWritabilityChanged(); @@ -575,19 +566,18 @@ public void run() { private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); @@ -626,19 +616,18 @@ public void run() { private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); @@ -674,19 +663,18 @@ public void run() { private void invokeDisconnect(ChannelPromise promise) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).disconnect(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } else { disconnect(promise); @@ -718,19 +706,18 @@ public void run() { private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).close(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } else { close(promise); @@ -762,19 +749,18 @@ public void run() { private void invokeDeregister(ChannelPromise promise) { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).deregister(this, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } else { deregister(promise); @@ -800,19 +786,18 @@ public ChannelHandlerContext read() { private void invokeRead() { if (invokeHandler()) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).read(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).read(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } else { read(); @@ -840,19 +825,18 @@ void invokeWrite(Object msg, ChannelPromise promise) { } private void invokeWrite0(Object msg, ChannelPromise promise) { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).write(this, msg, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).write(this, msg, promise); - } catch (Throwable t) { - notifyOutboundHandlerException(t, promise); } + } catch (Throwable t) { + notifyOutboundHandlerException(t, promise); } } @@ -882,19 +866,18 @@ private void invokeFlush() { } private void invokeFlush0() { - final ChannelHandler handler = handler(); - if (handler instanceof ChannelDuplexHandler) { - try { + try { + final ChannelHandler handler = handler(); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).flush(this); - } catch (Throwable t) { - invokeExceptionCaught(t); - } - } else { - try { + } else { ((ChannelOutboundHandler) handler).flush(this); - } catch (Throwable t) { - invokeExceptionCaught(t); } + } catch (Throwable t) { + invokeExceptionCaught(t); } } From 8e0185c7bd27571b3a0b00b424e519845ebf8281 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Wed, 21 Sep 2022 00:57:43 +0200 Subject: [PATCH 3/5] Micro-benchmark --- ...ChannelPipelineDuplexHandlerBenchmark.java | 109 ++++++++++++++++++ 1 file changed, 109 insertions(+) create mode 100644 microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java diff --git a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java new file mode 100644 index 00000000000..e425db7ae5e --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java @@ -0,0 +1,109 @@ +/* + * Copyright 2022 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.channel; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.microbench.util.AbstractMicrobenchmark; +import io.netty.util.internal.PlatformDependent; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.profile.LinuxPerfAsmProfiler; +import org.openjdk.jmh.profile.LinuxPerfC2CProfiler; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; + +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DefaultChannelPipelineDuplexHandlerBenchmark extends AbstractMicrobenchmark { + + private ChannelPipeline pipeline; + private EmbeddedChannel channel; + + @Setup + public void setup() { + channel = new EmbeddedChannel() { + @Override + public void runPendingTasks() { + // NO-OP to reduce noise on flush + } + }; + // disabling auto-read to reduce noise on flush + channel.config().setAutoRead(false); + pipeline = channel.pipeline(); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + }); + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + + @Override + public void flush(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } + + @TearDown + public void tearDown() { + pipeline.channel().close(); + } + + @Benchmark + public void propagateEvent(Blackhole hole) { + hole.consume(pipeline.fireChannelReadComplete()); + } + + @Benchmark + @Threads(4) + public void parallelPropagateEvent(Blackhole hole) { + hole.consume(pipeline.fireChannelReadComplete()); + } + + @Override + protected ChainedOptionsBuilder newOptionsBuilder() throws Exception { + if (PlatformDependent.isOsx() || PlatformDependent.isWindows()) { + return super.newOptionsBuilder(); + } + return super.newOptionsBuilder() + .addProfiler(LinuxPerfAsmProfiler.class) + .addProfiler(LinuxPerfC2CProfiler.class); + } +} From e67fb4b83eeff82710f657c0f48ae35e1b5cd4e3 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Mon, 26 Sep 2022 21:47:53 +0200 Subject: [PATCH 4/5] Adding duplex/non-duplex --- ...ChannelPipelineDuplexHandlerBenchmark.java | 72 +++++++++++-------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java index e425db7ae5e..ae4a2ab07a8 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java @@ -18,12 +18,13 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.microbench.util.AbstractMicrobenchmark; -import io.netty.util.internal.PlatformDependent; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; @@ -31,9 +32,6 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import org.openjdk.jmh.profile.LinuxPerfAsmProfiler; -import org.openjdk.jmh.profile.LinuxPerfC2CProfiler; -import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; @Warmup(iterations = 5, time = 1) @Measurement(iterations = 5, time = 1) @@ -43,6 +41,9 @@ public class DefaultChannelPipelineDuplexHandlerBenchmark extends AbstractMicrob private ChannelPipeline pipeline; private EmbeddedChannel channel; + @Param({"true", "false"}) + private boolean duplex; + @Setup public void setup() { channel = new EmbeddedChannel() { @@ -62,23 +63,44 @@ public void channelReadComplete(final ChannelHandlerContext ctx) { }); // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - pipeline.addLast(new ChannelDuplexHandler() { - @Override - public void channelReadComplete(final ChannelHandlerContext ctx) { - ctx.fireChannelReadComplete(); - } + if (duplex) { + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } - @Override - public void flush(final ChannelHandlerContext ctx) { - ctx.flush(); - } - }); - pipeline.addLast(new ChannelDuplexHandler() { - @Override - public void channelReadComplete(final ChannelHandlerContext ctx) { - ctx.flush(); - } - }); + @Override + public void flush(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } else { + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + }); + pipeline.addLast(new ChannelOutboundHandlerAdapter() { + @Override + public void flush(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } } @TearDown @@ -96,14 +118,4 @@ public void propagateEvent(Blackhole hole) { public void parallelPropagateEvent(Blackhole hole) { hole.consume(pipeline.fireChannelReadComplete()); } - - @Override - protected ChainedOptionsBuilder newOptionsBuilder() throws Exception { - if (PlatformDependent.isOsx() || PlatformDependent.isWindows()) { - return super.newOptionsBuilder(); - } - return super.newOptionsBuilder() - .addProfiler(LinuxPerfAsmProfiler.class) - .addProfiler(LinuxPerfC2CProfiler.class); - } } From 3edcb9dfeaa4fa2c630586cf49130e9c94200440 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Mon, 26 Sep 2022 23:47:15 +0200 Subject: [PATCH 5/5] Addressing scalability of HeadContext --- .../AbstractChannelHandlerContext.java | 112 +++++++++++++----- .../netty/channel/DefaultChannelPipeline.java | 4 +- 2 files changed, 82 insertions(+), 34 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index e27d487853b..0128e776b07 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -163,11 +163,14 @@ public void run() { private void invokeChannelRegistered() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelRegistered(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelRegistered(this); } else { ((ChannelInboundHandler) handler).channelRegistered(this); @@ -203,11 +206,14 @@ public void run() { private void invokeChannelUnregistered() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelUnregistered(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelUnregistered(this); } else { ((ChannelInboundHandler) handler).channelUnregistered(this); @@ -243,11 +249,14 @@ public void run() { private void invokeChannelActive() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelActive(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelActive(this); } else { ((ChannelInboundHandler) handler).channelActive(this); @@ -283,11 +292,14 @@ public void run() { private void invokeChannelInactive() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelInactive(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelInactive(this); } else { ((ChannelInboundHandler) handler).channelInactive(this); @@ -375,11 +387,14 @@ public void run() { private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.userEventTriggered(this, event); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).userEventTriggered(this, event); } else { ((ChannelInboundHandler) handler).userEventTriggered(this, event); @@ -416,11 +431,14 @@ public void run() { private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelRead(this, msg); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelRead(this, msg); } else { ((ChannelInboundHandler) handler).channelRead(this, msg); @@ -455,11 +473,14 @@ static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) private void invokeChannelReadComplete() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelReadComplete(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelReadComplete(this); } else { ((ChannelInboundHandler) handler).channelReadComplete(this); @@ -494,11 +515,14 @@ static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext private void invokeChannelWritabilityChanged() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelWritabilityChanged(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).channelWritabilityChanged(this); } else { ((ChannelInboundHandler) handler).channelWritabilityChanged(this); @@ -567,11 +591,14 @@ public void run() { private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.bind(this, localAddress, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).bind(this, localAddress, promise); } else { ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); @@ -617,11 +644,14 @@ public void run() { private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.connect(this, remoteAddress, localAddress, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise); } else { ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); @@ -664,11 +694,14 @@ public void run() { private void invokeDisconnect(ChannelPromise promise) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.disconnect(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).disconnect(this, promise); } else { ((ChannelOutboundHandler) handler).disconnect(this, promise); @@ -707,11 +740,14 @@ public void run() { private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.close(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).close(this, promise); } else { ((ChannelOutboundHandler) handler).close(this, promise); @@ -750,11 +786,14 @@ public void run() { private void invokeDeregister(ChannelPromise promise) { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.deregister(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).deregister(this, promise); } else { ((ChannelOutboundHandler) handler).deregister(this, promise); @@ -787,11 +826,14 @@ public ChannelHandlerContext read() { private void invokeRead() { if (invokeHandler()) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.read(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).read(this); } else { ((ChannelOutboundHandler) handler).read(this); @@ -826,11 +868,14 @@ void invokeWrite(Object msg, ChannelPromise promise) { private void invokeWrite0(Object msg, ChannelPromise promise) { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.write(this, msg, promise); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).write(this, msg, promise); } else { ((ChannelOutboundHandler) handler).write(this, msg, promise); @@ -867,11 +912,14 @@ private void invokeFlush() { private void invokeFlush0() { try { - final ChannelHandler handler = handler(); // DON'T CHANGE // Duplex handlers implements both out/in interfaces causing a scalability issue // see https://bugs.openjdk.org/browse/JDK-8180450 - if (handler instanceof ChannelDuplexHandler) { + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.flush(this); + } else if (handler instanceof ChannelDuplexHandler) { ((ChannelDuplexHandler) handler).flush(this); } else { ((ChannelOutboundHandler) handler).flush(this); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index c6684ad8ae8..3bd860cf691 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -61,8 +61,8 @@ protected Map, String> initialValue() { private static final AtomicReferenceFieldUpdater ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); - final AbstractChannelHandlerContext head; - final AbstractChannelHandlerContext tail; + final HeadContext head; + final TailContext tail; private final Channel channel; private final ChannelFuture succeededFuture;