diff --git a/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java new file mode 100644 index 00000000000..ae4a2ab07a8 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/channel/DefaultChannelPipelineDuplexHandlerBenchmark.java @@ -0,0 +1,121 @@ +/* + * Copyright 2022 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.channel; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 5, time = 1) +@State(Scope.Thread) +public class DefaultChannelPipelineDuplexHandlerBenchmark extends AbstractMicrobenchmark { + + private ChannelPipeline pipeline; + private EmbeddedChannel channel; + + @Param({"true", "false"}) + private boolean duplex; + + @Setup + public void setup() { + channel = new EmbeddedChannel() { + @Override + public void runPendingTasks() { + // NO-OP to reduce noise on flush + } + }; + // disabling auto-read to reduce noise on flush + channel.config().setAutoRead(false); + pipeline = channel.pipeline(); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + }); + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + if (duplex) { + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + + @Override + public void flush(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + pipeline.addLast(new ChannelDuplexHandler() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } else { + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.fireChannelReadComplete(); + } + }); + pipeline.addLast(new ChannelOutboundHandlerAdapter() { + @Override + public void flush(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelReadComplete(final ChannelHandlerContext ctx) { + ctx.flush(); + } + }); + } + } + + @TearDown + public void tearDown() { + pipeline.channel().close(); + } + + @Benchmark + public void propagateEvent(Blackhole hole) { + hole.consume(pipeline.fireChannelReadComplete()); + } + + @Benchmark + @Threads(4) + public void parallelPropagateEvent(Blackhole hole) { + hole.consume(pipeline.fireChannelReadComplete()); + } +} diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 348ba3a5582..0128e776b07 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -163,7 +163,18 @@ public void run() { private void invokeChannelRegistered() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelRegistered(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelRegistered(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelRegistered(this); + } else { + ((ChannelInboundHandler) handler).channelRegistered(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -195,7 +206,18 @@ public void run() { private void invokeChannelUnregistered() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelUnregistered(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelUnregistered(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelUnregistered(this); + } else { + ((ChannelInboundHandler) handler).channelUnregistered(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -227,7 +249,18 @@ public void run() { private void invokeChannelActive() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelActive(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelActive(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelActive(this); + } else { + ((ChannelInboundHandler) handler).channelActive(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -259,7 +292,18 @@ public void run() { private void invokeChannelInactive() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelInactive(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelInactive(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelInactive(this); + } else { + ((ChannelInboundHandler) handler).channelInactive(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -343,7 +387,18 @@ public void run() { private void invokeUserEventTriggered(Object event) { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).userEventTriggered(this, event); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.userEventTriggered(this, event); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).userEventTriggered(this, event); + } else { + ((ChannelInboundHandler) handler).userEventTriggered(this, event); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -376,7 +431,18 @@ public void run() { private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelRead(this, msg); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelRead(this, msg); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelRead(this, msg); + } else { + ((ChannelInboundHandler) handler).channelRead(this, msg); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -407,7 +473,18 @@ static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) private void invokeChannelReadComplete() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelReadComplete(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelReadComplete(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelReadComplete(this); + } else { + ((ChannelInboundHandler) handler).channelReadComplete(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -438,7 +515,18 @@ static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext private void invokeChannelWritabilityChanged() { if (invokeHandler()) { try { - ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.channelWritabilityChanged(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).channelWritabilityChanged(this); + } else { + ((ChannelInboundHandler) handler).channelWritabilityChanged(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -503,7 +591,18 @@ public void run() { private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.bind(this, localAddress, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).bind(this, localAddress, promise); + } else { + ((ChannelOutboundHandler) handler).bind(this, localAddress, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -545,7 +644,18 @@ public void run() { private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.connect(this, remoteAddress, localAddress, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise); + } else { + ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -584,7 +694,18 @@ public void run() { private void invokeDisconnect(ChannelPromise promise) { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).disconnect(this, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.disconnect(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).disconnect(this, promise); + } else { + ((ChannelOutboundHandler) handler).disconnect(this, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -619,7 +740,18 @@ public void run() { private void invokeClose(ChannelPromise promise) { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).close(this, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.close(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).close(this, promise); + } else { + ((ChannelOutboundHandler) handler).close(this, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -654,7 +786,18 @@ public void run() { private void invokeDeregister(ChannelPromise promise) { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).deregister(this, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.deregister(this, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).deregister(this, promise); + } else { + ((ChannelOutboundHandler) handler).deregister(this, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -683,7 +826,18 @@ public ChannelHandlerContext read() { private void invokeRead() { if (invokeHandler()) { try { - ((ChannelOutboundHandler) handler()).read(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.read(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).read(this); + } else { + ((ChannelOutboundHandler) handler).read(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } @@ -714,7 +868,18 @@ void invokeWrite(Object msg, ChannelPromise promise) { private void invokeWrite0(Object msg, ChannelPromise promise) { try { - ((ChannelOutboundHandler) handler()).write(this, msg, promise); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.write(this, msg, promise); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).write(this, msg, promise); + } else { + ((ChannelOutboundHandler) handler).write(this, msg, promise); + } } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } @@ -747,7 +912,18 @@ private void invokeFlush() { private void invokeFlush0() { try { - ((ChannelOutboundHandler) handler()).flush(this); + // DON'T CHANGE + // Duplex handlers implements both out/in interfaces causing a scalability issue + // see https://bugs.openjdk.org/browse/JDK-8180450 + final ChannelHandler handler = handler(); + final DefaultChannelPipeline.HeadContext headContext = pipeline.head; + if (handler == headContext) { + headContext.flush(this); + } else if (handler instanceof ChannelDuplexHandler) { + ((ChannelDuplexHandler) handler).flush(this); + } else { + ((ChannelOutboundHandler) handler).flush(this); + } } catch (Throwable t) { invokeExceptionCaught(t); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index c6684ad8ae8..3bd860cf691 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -61,8 +61,8 @@ protected Map, String> initialValue() { private static final AtomicReferenceFieldUpdater ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); - final AbstractChannelHandlerContext head; - final AbstractChannelHandlerContext tail; + final HeadContext head; + final TailContext tail; private final Channel channel; private final ChannelFuture succeededFuture;