From 170e4deee6f78a564727dee18af7dd7ce322a35f Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Mon, 7 Oct 2019 15:06:01 +0800 Subject: [PATCH] Fix event loop shutdown timing fragility (#9616) Motivation The current event loop shutdown logic is quite fragile and in the epoll/NIO cases relies on the default 1 second wait/select timeout that applies when there are no scheduled tasks. Without this default timeout the shutdown would hang indefinitely. The timeout only takes effect in this case because queued scheduled tasks are first cancelled in SingleThreadEventExecutor#confirmShutdown(), but I _think_ even this isn't robust, since the main task queue is subsequently serviced which could result in some new scheduled task being queued with much later deadline. It also means shutdowns are unnecessarily delayed by up to 1 second. Modifications - Add/extend unit tests to expose the issue - Adjust SingleThreadEventExecutor shutdown and confirmShutdown methods to explicitly add no-op tasks to the taskQueue so that the subsequent event loop iteration doesn't enter blocking wait (as looks like was originally intended) Results Faster and more robust shutdown of event loops, allows removal of the default wait timeout --- .../concurrent/SingleThreadEventExecutor.java | 16 ++- .../AbstractSingleThreadEventLoopTest.java | 118 ++++++++++++++++-- .../transport/DefaultEventLoopTest.java | 41 ++++++ .../testsuite/transport/NioEventLoopTest.java | 41 ++++++ .../channel/epoll/EpollEventLoopTest.java | 6 + .../channel/kqueue/KQueueEventLoopTest.java | 6 + .../netty/channel/AbstractEventLoopTest.java | 2 +- 7 files changed, 217 insertions(+), 13 deletions(-) create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 4d77da11a73..d2b9707f393 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -587,7 +587,7 @@ protected void cleanup() { } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTTING_DOWN) { + if (!inEventLoop) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. taskQueue.offer(WAKEUP_TASK); @@ -726,7 +726,10 @@ public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit uni } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } return terminationFuture(); @@ -778,7 +781,10 @@ public void shutdown() { } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } } @@ -827,7 +833,7 @@ protected boolean confirmShutdown() { if (gracefulShutdownQuietPeriod == 0) { return true; } - wakeup(true); + taskQueue.offer(WAKEUP_TASK); return false; } @@ -840,7 +846,7 @@ protected boolean confirmShutdown() { if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. - wakeup(true); + taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java index e4306b3dc8e..fd58a5255bc 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -15,14 +15,29 @@ */ package io.netty.testsuite.transport; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + import org.junit.Test; +import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.socket.ServerSocketChannel; - -import static org.junit.Assert.*; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; public abstract class AbstractSingleThreadEventLoopTest { @@ -35,19 +50,108 @@ public void testChannelsRegistered() { final Channel ch1 = newChannel(); final Channel ch2 = newChannel(); - assertEquals(0, loop.registeredChannels()); + int rc = loop.registeredChannels(); + boolean channelCountSupported = rc != -1; + + if (channelCountSupported) { + assertEquals(0, loop.registeredChannels()); + } assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess()); assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess()); - assertEquals(2, loop.registeredChannels()); + if (channelCountSupported) { + assertEquals(2, loop.registeredChannels()); + } assertTrue(ch1.deregister().syncUninterruptibly().isSuccess()); - assertEquals(1, loop.registeredChannels()); + if (channelCountSupported) { + assertEquals(1, loop.registeredChannels()); + } } finally { group.shutdownGracefully(); } } + @Test + @SuppressWarnings("deprecation") + public void shutdownBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); + group.shutdown(); + assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void shutdownGracefullyZeroQuietBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L)); + } + + // Copied from AbstractEventLoopTest + @Test(timeout = 5000) + public void testShutdownGracefullyNoQuietPeriod() throws Exception { + EventLoopGroup loop = newEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + b.group(loop) + .channel(serverChannelClass()) + .childHandler(new ChannelInboundHandlerAdapter()); + + // Not close the Channel to ensure the EventLoop is still shutdown in time. + ChannelFuture cf = serverChannelClass() == LocalServerChannel.class + ? b.bind(new LocalAddress("local")) : b.bind(0); + cf.sync().channel(); + + Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); + assertTrue(f.syncUninterruptibly().isSuccess()); + assertTrue(loop.isShutdown()); + assertTrue(loop.isTerminated()); + } + + @Test + public void shutdownGracefullyBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L)); + } + + @Test + public void gracefulShutdownAfterStart() throws Exception { + EventLoop loop = newEventLoopGroup().next(); + final CountDownLatch latch = new CountDownLatch(1); + loop.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + // Wait for the event loop thread to start. + latch.await(); + + // Request the event loop thread to stop. + loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS); + + // Wait until the event loop is terminated. + assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS)); + + assertRejection(loop); + } + + private static final Runnable NOOP = new Runnable() { + @Override + public void run() { } + }; + + private static void assertRejection(EventExecutor loop) { + try { + loop.execute(NOOP); + fail("A task must be rejected after shutdown() is called."); + } catch (RejectedExecutionException e) { + // Expected + } + } + protected abstract EventLoopGroup newEventLoopGroup(); - protected abstract ServerSocketChannel newChannel(); + protected abstract Channel newChannel(); + protected abstract Class serverChannelClass(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java new file mode 100644 index 00000000000..cb13b806241 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; + +public class DefaultEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new DefaultEventLoopGroup(); + } + + @Override + protected Channel newChannel() { + return new LocalChannel(); + } + + @Override + protected Class serverChannelClass() { + return LocalServerChannel.class; + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java new file mode 100644 index 00000000000..e4bc928907d --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +public class NioEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new NioEventLoopGroup(); + } + + @Override + protected Channel newChannel() { + return new NioSocketChannel(); + } + + @Override + protected Class serverChannelClass() { + return NioServerSocketChannel.class; + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 0e057eb21a1..c6bc431d702 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -18,6 +18,7 @@ import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; @@ -49,6 +50,11 @@ protected ServerSocketChannel newChannel() { return new EpollServerSocketChannel(); } + @Override + protected Class serverChannelClass() { + return EpollServerSocketChannel.class; + } + @Test public void testScheduleBigDelayNotOverflow() { final AtomicReference capture = new AtomicReference(); diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index ceda867deaf..55d2e162a87 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -17,6 +17,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.Future; @@ -39,6 +40,11 @@ protected ServerSocketChannel newChannel() { return new KQueueServerSocketChannel(); } + @Override + protected Class serverChannelClass() { + return KQueueServerSocketChannel.class; + } + @Test public void testScheduleBigDelayNotOverflow() { EventLoopGroup group = new KQueueEventLoopGroup(1); diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index b4d29a67b66..df7fe13f869 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -75,7 +75,7 @@ public void testShutdownGracefullyNoQuietPeriod() throws Exception { b.bind(0).sync().channel(); Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); - assertTrue(loop.awaitTermination(2, TimeUnit.SECONDS)); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); assertTrue(f.syncUninterruptibly().isSuccess()); assertTrue(loop.isShutdown()); assertTrue(loop.isTerminated());