Skip to content

Commit

Permalink
Fix event loop shutdown timing fragility (#9616)
Browse files Browse the repository at this point in the history
Motivation

The current event loop shutdown logic is quite fragile and in the
epoll/NIO cases relies on the default 1 second wait/select timeout that
applies when there are no scheduled tasks. Without this default timeout
the shutdown would hang indefinitely.

The timeout only takes effect in this case because queued scheduled
tasks are first cancelled in
SingleThreadEventExecutor#confirmShutdown(), but I _think_ even this
isn't robust, since the main task queue is subsequently serviced which
could result in some new scheduled task being queued with much later
deadline.

It also means shutdowns are unnecessarily delayed by up to 1 second.

Modifications

- Add/extend unit tests to expose the issue
- Adjust SingleThreadEventExecutor shutdown and confirmShutdown methods
to explicitly add no-op tasks to the taskQueue so that the subsequent
event loop iteration doesn't enter blocking wait (as looks like was
originally intended)

Results

Faster and more robust shutdown of event loops, allows removal of the
default wait timeout
  • Loading branch information
njhill authored and normanmaurer committed Oct 7, 2019
1 parent 45be693 commit 170e4de
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 13 deletions.
Expand Up @@ -587,7 +587,7 @@ protected void cleanup() {
}

protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
if (!inEventLoop) {
// Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
// is already something in the queue.
taskQueue.offer(WAKEUP_TASK);
Expand Down Expand Up @@ -726,7 +726,10 @@ public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit uni
}

if (wakeup) {
wakeup(inEventLoop);
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}

return terminationFuture();
Expand Down Expand Up @@ -778,7 +781,10 @@ public void shutdown() {
}

if (wakeup) {
wakeup(inEventLoop);
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
}

Expand Down Expand Up @@ -827,7 +833,7 @@ protected boolean confirmShutdown() {
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
wakeup(true);
taskQueue.offer(WAKEUP_TASK);
return false;
}

Expand All @@ -840,7 +846,7 @@ protected boolean confirmShutdown() {
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
taskQueue.offer(WAKEUP_TASK);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -15,14 +15,29 @@
*/
package io.netty.testsuite.transport;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.socket.ServerSocketChannel;

import static org.junit.Assert.*;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;

public abstract class AbstractSingleThreadEventLoopTest {

Expand All @@ -35,19 +50,108 @@ public void testChannelsRegistered() {
final Channel ch1 = newChannel();
final Channel ch2 = newChannel();

assertEquals(0, loop.registeredChannels());
int rc = loop.registeredChannels();
boolean channelCountSupported = rc != -1;

if (channelCountSupported) {
assertEquals(0, loop.registeredChannels());
}

assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
assertEquals(2, loop.registeredChannels());
if (channelCountSupported) {
assertEquals(2, loop.registeredChannels());
}

assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
assertEquals(1, loop.registeredChannels());
if (channelCountSupported) {
assertEquals(1, loop.registeredChannels());
}
} finally {
group.shutdownGracefully();
}
}

@Test
@SuppressWarnings("deprecation")
public void shutdownBeforeStart() throws Exception {
EventLoopGroup group = newEventLoopGroup();
assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
group.shutdown();
assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
}

@Test
public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
EventLoopGroup group = newEventLoopGroup();
assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
}

// Copied from AbstractEventLoopTest
@Test(timeout = 5000)
public void testShutdownGracefullyNoQuietPeriod() throws Exception {
EventLoopGroup loop = newEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(loop)
.channel(serverChannelClass())
.childHandler(new ChannelInboundHandlerAdapter());

// Not close the Channel to ensure the EventLoop is still shutdown in time.
ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
? b.bind(new LocalAddress("local")) : b.bind(0);
cf.sync().channel();

Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
assertTrue(f.syncUninterruptibly().isSuccess());
assertTrue(loop.isShutdown());
assertTrue(loop.isTerminated());
}

@Test
public void shutdownGracefullyBeforeStart() throws Exception {
EventLoopGroup group = newEventLoopGroup();
assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
}

@Test
public void gracefulShutdownAfterStart() throws Exception {
EventLoop loop = newEventLoopGroup().next();
final CountDownLatch latch = new CountDownLatch(1);
loop.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});

// Wait for the event loop thread to start.
latch.await();

// Request the event loop thread to stop.
loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);

// Wait until the event loop is terminated.
assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));

assertRejection(loop);
}

private static final Runnable NOOP = new Runnable() {
@Override
public void run() { }
};

private static void assertRejection(EventExecutor loop) {
try {
loop.execute(NOOP);
fail("A task must be rejected after shutdown() is called.");
} catch (RejectedExecutionException e) {
// Expected
}
}

protected abstract EventLoopGroup newEventLoopGroup();
protected abstract ServerSocketChannel newChannel();
protected abstract Channel newChannel();
protected abstract Class<? extends ServerChannel> serverChannelClass();
}
@@ -0,0 +1,41 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport;

import io.netty.channel.Channel;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;

public class DefaultEventLoopTest extends AbstractSingleThreadEventLoopTest {

@Override
protected EventLoopGroup newEventLoopGroup() {
return new DefaultEventLoopGroup();
}

@Override
protected Channel newChannel() {
return new LocalChannel();
}

@Override
protected Class<? extends ServerChannel> serverChannelClass() {
return LocalServerChannel.class;
}
}
@@ -0,0 +1,41 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NioEventLoopTest extends AbstractSingleThreadEventLoopTest {

@Override
protected EventLoopGroup newEventLoopGroup() {
return new NioEventLoopGroup();
}

@Override
protected Channel newChannel() {
return new NioSocketChannel();
}

@Override
protected Class<? extends ServerChannel> serverChannelClass() {
return NioServerSocketChannel.class;
}
}
Expand Up @@ -18,6 +18,7 @@
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.FileDescriptor;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
Expand Down Expand Up @@ -49,6 +50,11 @@ protected ServerSocketChannel newChannel() {
return new EpollServerSocketChannel();
}

@Override
protected Class<? extends ServerChannel> serverChannelClass() {
return EpollServerSocketChannel.class;
}

@Test
public void testScheduleBigDelayNotOverflow() {
final AtomicReference<Throwable> capture = new AtomicReference<Throwable>();
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
import io.netty.util.concurrent.Future;
Expand All @@ -39,6 +40,11 @@ protected ServerSocketChannel newChannel() {
return new KQueueServerSocketChannel();
}

@Override
protected Class<? extends ServerChannel> serverChannelClass() {
return KQueueServerSocketChannel.class;
}

@Test
public void testScheduleBigDelayNotOverflow() {
EventLoopGroup group = new KQueueEventLoopGroup(1);
Expand Down
Expand Up @@ -75,7 +75,7 @@ public void testShutdownGracefullyNoQuietPeriod() throws Exception {
b.bind(0).sync().channel();

Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
assertTrue(loop.awaitTermination(2, TimeUnit.SECONDS));
assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
assertTrue(f.syncUninterruptibly().isSuccess());
assertTrue(loop.isShutdown());
assertTrue(loop.isTerminated());
Expand Down

0 comments on commit 170e4de

Please sign in to comment.