diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 4d77da11a73..d2b9707f393 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -587,7 +587,7 @@ protected void cleanup() { } protected void wakeup(boolean inEventLoop) { - if (!inEventLoop || state == ST_SHUTTING_DOWN) { + if (!inEventLoop) { // Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there // is already something in the queue. taskQueue.offer(WAKEUP_TASK); @@ -726,7 +726,10 @@ public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit uni } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } return terminationFuture(); @@ -778,7 +781,10 @@ public void shutdown() { } if (wakeup) { - wakeup(inEventLoop); + taskQueue.offer(WAKEUP_TASK); + if (!addTaskWakesUp) { + wakeup(inEventLoop); + } } } @@ -827,7 +833,7 @@ protected boolean confirmShutdown() { if (gracefulShutdownQuietPeriod == 0) { return true; } - wakeup(true); + taskQueue.offer(WAKEUP_TASK); return false; } @@ -840,7 +846,7 @@ protected boolean confirmShutdown() { if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. - wakeup(true); + taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java index e4306b3dc8e..fd58a5255bc 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -15,14 +15,29 @@ */ package io.netty.testsuite.transport; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + import org.junit.Test; +import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.socket.ServerSocketChannel; - -import static org.junit.Assert.*; +import io.netty.channel.local.LocalAddress; +import io.netty.channel.local.LocalServerChannel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Future; public abstract class AbstractSingleThreadEventLoopTest { @@ -35,19 +50,108 @@ public void testChannelsRegistered() { final Channel ch1 = newChannel(); final Channel ch2 = newChannel(); - assertEquals(0, loop.registeredChannels()); + int rc = loop.registeredChannels(); + boolean channelCountSupported = rc != -1; + + if (channelCountSupported) { + assertEquals(0, loop.registeredChannels()); + } assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess()); assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess()); - assertEquals(2, loop.registeredChannels()); + if (channelCountSupported) { + assertEquals(2, loop.registeredChannels()); + } assertTrue(ch1.deregister().syncUninterruptibly().isSuccess()); - assertEquals(1, loop.registeredChannels()); + if (channelCountSupported) { + assertEquals(1, loop.registeredChannels()); + } } finally { group.shutdownGracefully(); } } + @Test + @SuppressWarnings("deprecation") + public void shutdownBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); + group.shutdown(); + assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void shutdownGracefullyZeroQuietBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L)); + } + + // Copied from AbstractEventLoopTest + @Test(timeout = 5000) + public void testShutdownGracefullyNoQuietPeriod() throws Exception { + EventLoopGroup loop = newEventLoopGroup(); + ServerBootstrap b = new ServerBootstrap(); + b.group(loop) + .channel(serverChannelClass()) + .childHandler(new ChannelInboundHandlerAdapter()); + + // Not close the Channel to ensure the EventLoop is still shutdown in time. + ChannelFuture cf = serverChannelClass() == LocalServerChannel.class + ? b.bind(new LocalAddress("local")) : b.bind(0); + cf.sync().channel(); + + Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); + assertTrue(f.syncUninterruptibly().isSuccess()); + assertTrue(loop.isShutdown()); + assertTrue(loop.isTerminated()); + } + + @Test + public void shutdownGracefullyBeforeStart() throws Exception { + EventLoopGroup group = newEventLoopGroup(); + assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L)); + } + + @Test + public void gracefulShutdownAfterStart() throws Exception { + EventLoop loop = newEventLoopGroup().next(); + final CountDownLatch latch = new CountDownLatch(1); + loop.execute(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + + // Wait for the event loop thread to start. + latch.await(); + + // Request the event loop thread to stop. + loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS); + + // Wait until the event loop is terminated. + assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS)); + + assertRejection(loop); + } + + private static final Runnable NOOP = new Runnable() { + @Override + public void run() { } + }; + + private static void assertRejection(EventExecutor loop) { + try { + loop.execute(NOOP); + fail("A task must be rejected after shutdown() is called."); + } catch (RejectedExecutionException e) { + // Expected + } + } + protected abstract EventLoopGroup newEventLoopGroup(); - protected abstract ServerSocketChannel newChannel(); + protected abstract Channel newChannel(); + protected abstract Class serverChannelClass(); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java new file mode 100644 index 00000000000..cb13b806241 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/DefaultEventLoopTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultEventLoopGroup; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.local.LocalChannel; +import io.netty.channel.local.LocalServerChannel; + +public class DefaultEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new DefaultEventLoopGroup(); + } + + @Override + protected Channel newChannel() { + return new LocalChannel(); + } + + @Override + protected Class serverChannelClass() { + return LocalServerChannel.class; + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java new file mode 100644 index 00000000000..e4bc928907d --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/NioEventLoopTest.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +public class NioEventLoopTest extends AbstractSingleThreadEventLoopTest { + + @Override + protected EventLoopGroup newEventLoopGroup() { + return new NioEventLoopGroup(); + } + + @Override + protected Channel newChannel() { + return new NioSocketChannel(); + } + + @Override + protected Class serverChannelClass() { + return NioServerSocketChannel.class; + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 0e057eb21a1..c6bc431d702 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -18,6 +18,7 @@ import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; @@ -49,6 +50,11 @@ protected ServerSocketChannel newChannel() { return new EpollServerSocketChannel(); } + @Override + protected Class serverChannelClass() { + return EpollServerSocketChannel.class; + } + @Test public void testScheduleBigDelayNotOverflow() { final AtomicReference capture = new AtomicReference(); diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index ceda867deaf..55d2e162a87 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -17,6 +17,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.Future; @@ -39,6 +40,11 @@ protected ServerSocketChannel newChannel() { return new KQueueServerSocketChannel(); } + @Override + protected Class serverChannelClass() { + return KQueueServerSocketChannel.class; + } + @Test public void testScheduleBigDelayNotOverflow() { EventLoopGroup group = new KQueueEventLoopGroup(1); diff --git a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java index b4d29a67b66..df7fe13f869 100644 --- a/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractEventLoopTest.java @@ -75,7 +75,7 @@ public void testShutdownGracefullyNoQuietPeriod() throws Exception { b.bind(0).sync().channel(); Future f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES); - assertTrue(loop.awaitTermination(2, TimeUnit.SECONDS)); + assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS)); assertTrue(f.syncUninterruptibly().isSuccess()); assertTrue(loop.isShutdown()); assertTrue(loop.isTerminated());