From 59275fba525ceaf7a493c8115e20d69ee97272b1 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 8 Sep 2021 09:06:28 +0200 Subject: [PATCH] Netty Future no longer extends JDK Future (#11647) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: It is important to avoid blocking method calls in an event loop thread, since that can stall the system. Netty's Future interface was extending the JDK Future interface, which included a number of blocking methods of questionable use in Netty. We wish to reduce the number of blocking methods on the Future API in order to discourage their use a little. Further more, the Netty Future specification of the behaviour of the cancel() and isDone() methods are inconsistent with those of the JDK Future. If Netty's Future stop extending the JDK Future interface, it will also no longer be bound by its specification. Modification: Make Netty's Future no longer extend the JDK Future interface. Change the EvenExecutorGroup interface to no longer extend ScheduledExecutorService. The EventExecutorGroup still extends Executor, because Executor does not dictate any return type of the `execute()` method — this is also useful in the DefaultFutureCompletionStage implementation. The Netty ScheduledFuture interface has been removed since it provided no additional features that were actually used. Numerous changes to use sites that previously relied on the JDK types. Remove the `Future.cancel()` method that took a boolean argument — this argument was always ignored in our implementations, which was another spec deviation. Various `invoke*` and `shutdown*` methods have been removed from the EvenExecutorGroup API since it no longer extends ScheduledExecutorService — these were either not used anywhere, or deprecated with better alternatives available. Updates to cancellation javadocs. Result: Cleaner code, leaner API. --- .../websocketx/WebSocketClientHandshaker.java | 2 +- ...bSocketClientProtocolHandshakeHandler.java | 2 +- .../websocketx/WebSocketProtocolHandler.java | 2 +- ...bSocketServerProtocolHandshakeHandler.java | 2 +- .../codec/http2/Http2ConnectionHandler.java | 3 +- .../concurrent/AbstractEventExecutor.java | 58 +++-- .../AbstractScheduledEventExecutor.java | 82 +++---- .../util/concurrent/AsynchronousResult.java | 11 +- .../DefaultFutureCompletionStage.java | 30 ++- .../netty/util/concurrent/DefaultPromise.java | 17 +- .../util/concurrent/EventExecutorGroup.java | 220 ++++++++++++------ .../java/io/netty/util/concurrent/Future.java | 44 +++- .../io/netty/util/concurrent/Futures.java | 2 +- .../util/concurrent/GlobalEventExecutor.java | 12 +- .../concurrent/ImmediateEventExecutor.java | 34 ++- .../MultithreadEventExecutorGroup.java | 14 +- .../NonStickyEventExecutorGroup.java | 100 +++----- .../io/netty/util/concurrent/PromiseTask.java | 5 + .../netty/util/concurrent/RunnableFuture.java | 3 +- .../concurrent/RunnableFutureAdapter.java | 11 +- .../concurrent/RunnableScheduledFuture.java | 15 +- .../RunnableScheduledFutureAdapter.java | 42 ++-- .../util/concurrent/ScheduledFuture.java | 23 -- .../concurrent/SingleThreadEventExecutor.java | 92 +------- .../UnorderedThreadPoolEventExecutor.java | 127 ++++++---- .../AbstractScheduledEventExecutorTest.java | 31 ++- .../util/concurrent/DefaultPromiseTest.java | 56 +++-- .../io/netty/util/concurrent/FuturesTest.java | 14 +- .../concurrent/GlobalEventExecutorTest.java | 12 +- .../SingleThreadEventExecutorTest.java | 83 +------ .../UnorderedThreadPoolEventExecutorTest.java | 39 +++- .../io/netty/handler/proxy/ProxyHandler.java | 2 +- .../flush/FlushConsolidationHandler.java | 2 +- .../java/io/netty/handler/ssl/SslHandler.java | 6 +- .../handler/timeout/IdleStateHandler.java | 6 +- .../handler/timeout/WriteTimeoutHandler.java | 4 +- .../traffic/GlobalChannelTrafficCounter.java | 10 +- .../GlobalChannelTrafficShapingHandler.java | 24 +- .../traffic/GlobalTrafficShapingHandler.java | 21 +- .../netty/handler/traffic/TrafficCounter.java | 17 +- .../traffic/TrafficShapingHandlerTest.java | 9 +- .../epoll/EpollSocketChannelBenchmark.java | 2 +- .../BurstCostExecutorsBenchmark.java | 109 +++++---- ...nnableScheduledFutureAdapterBenchmark.java | 4 +- .../AbstractSharedExecutorMicrobenchmark.java | 36 ++- .../java/io/netty/resolver/dns/Cache.java | 11 +- .../netty/resolver/dns/DnsQueryContext.java | 2 +- .../netty/resolver/dns/DnsResolveContext.java | 2 +- .../AbstractSingleThreadEventLoopTest.java | 3 +- .../socket/SocketCancelWriteTest.java | 4 +- .../socket/SocketConnectionAttemptTest.java | 2 +- .../channel/epoll/AbstractEpollChannel.java | 6 +- .../channel/epoll/EpollEventLoopTest.java | 2 +- .../channel/kqueue/AbstractKQueueChannel.java | 4 +- .../channel/kqueue/KQueueEventLoopTest.java | 2 +- .../main/java/io/netty/channel/Channel.java | 1 + .../netty/channel/SingleThreadEventLoop.java | 2 +- .../channel/embedded/EmbeddedEventLoop.java | 16 +- .../netty/channel/nio/AbstractNioChannel.java | 6 +- .../io/netty/channel/AbstractChannelTest.java | 7 +- .../channel/SingleThreadEventLoopTest.java | 44 ++-- .../netty/channel/nio/NioEventLoopTest.java | 7 +- .../socket/nio/AbstractNioChannelTest.java | 35 ++- 63 files changed, 788 insertions(+), 808 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index 18c10647dde..9aabd260e36 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -504,7 +504,7 @@ private Future close0(final ChannelOutboundInvoker invoker, final Channel }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS); channel.closeFuture().addListener(ignore -> { - forceCloseFuture.cancel(false); + forceCloseFuture.cancel(); }); } }); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java index 055cde8b46a..7d6cf101be5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java @@ -117,7 +117,7 @@ private void applyHandshakeTimeout() { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index dab9bdc726c..3436a04d505 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -125,7 +125,7 @@ private void applyCloseSentTimeout(ChannelHandlerContext ctx) { } }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS); - closeSent.asFuture().addListener(future -> timeoutTask.cancel(false)); + closeSent.asFuture().addListener(future -> timeoutTask.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java index 5c35b397845..1968267c3d3 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java @@ -157,6 +157,6 @@ private void applyHandshakeTimeout(ChannelHandlerContext ctx) { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index cde8ff15b0b..12bb1339e82 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -27,7 +27,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -900,7 +899,7 @@ private static final class ClosingChannelFutureListener implements FutureListene @Override public void operationComplete(Future sentGoAwayFuture) { if (timeoutTask != null) { - timeoutTask.cancel(false); + timeoutTask.cancel(); } doClose(); } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 1e8fc6fbee1..1158c248d9a 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -18,14 +18,15 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import static java.util.Objects.requireNonNull; + /** * Abstract base class for {@link EventExecutor} implementations. */ -public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { +public abstract class AbstractEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class); static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; @@ -43,26 +44,60 @@ public Future newSucceededFuture(V result) { } @Override - public final Future submit(Runnable task) { - return (Future) super.submit(task); + public final Future submit(Runnable task) { + var futureTask = newTaskFor(task, (Void) null); + execute(futureTask); + return futureTask; } @Override public final Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + var futureTask = newTaskFor(task, result); + execute(futureTask); + return futureTask; } @Override public final Future submit(Callable task) { - return (Future) super.submit(task); + var futureTask = newTaskFor(task); + execute(futureTask); + return futureTask; } - @Override + /** + * Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the given result at the end of executing its + * {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param runnable The task to be decorated. + * @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't + * throw an exception. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Runnable runnable, T value) { return newRunnableFuture(newPromise(), runnable, value); } - @Override + /** + * Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of + * executing its {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param callable The task to be decorated. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Callable callable) { return newRunnableFuture(newPromise(), callable); } @@ -85,17 +120,14 @@ static void safeExecute(Runnable task) { * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Callable task) { - return new RunnableFutureAdapter<>(promise, task); + return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task")); } /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and * {@code value}. - * - * This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different - * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Runnable task, V value) { - return new RunnableFutureAdapter<>(promise, Executors.callable(task, value)); + return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value)); } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 478d479cb12..e61fe326b88 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.PriorityQueue; import io.netty.util.internal.PriorityQueueNode; @@ -24,12 +22,13 @@ import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.callable; + /** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ @@ -60,7 +59,7 @@ public static long nanoTime() { static long deadlineNanos(long delay) { long deadlineNanos = nanoTime() + delay; // Guard against overflow - return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; + return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; } PriorityQueue> scheduledTaskQueue() { @@ -79,7 +78,7 @@ private static boolean isNullOrEmpty(Queue> queue /** * Cancel all scheduled tasks. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final void cancelScheduledTasks() { @@ -92,8 +91,8 @@ protected final void cancelScheduledTasks() { final RunnableScheduledFutureNode[] scheduledTasks = scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES); - for (RunnableScheduledFutureNode task: scheduledTasks) { - task.cancel(false); + for (RunnableScheduledFutureNode task : scheduledTasks) { + task.cancel(); } scheduledTaskQueue.clearIgnoringIndexes(); @@ -107,16 +106,16 @@ protected final RunnableScheduledFuture pollScheduledTask() { } /** - * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. - * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. - * + * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link + * #nanoTime()} to retrieve the correct {@code nanoTime}. + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final RunnableScheduledFuture pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } @@ -130,12 +129,12 @@ protected final RunnableScheduledFuture pollScheduledTask(long nanoTime) { /** * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final long nextScheduledTaskNano() { Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return -1; } @@ -152,30 +151,30 @@ final RunnableScheduledFuture peekScheduledTask() { /** * Returns {@code true} if a scheduled task is ready for processing. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final boolean hasScheduledTasks() { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable command, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (delay < 0) { delay = 0; } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command), - deadlineNanos(unit.toNanos(delay)), 0); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(delay)), 0); return schedule(task); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable callable, long delay, TimeUnit unit) { requireNonNull(callable, "callable"); requireNonNull(unit, "unit"); if (delay < 0) { @@ -186,7 +185,7 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -198,13 +197,13 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDela String.format("period: %d (expected: > 0)", period)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); return schedule(task); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -216,15 +215,15 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD String.format("delay: %d (expected: > 0)", delay)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); return schedule(task); } /** * Add the {@link RunnableScheduledFuture} for execution. */ - protected final ScheduledFuture schedule(final RunnableScheduledFuture task) { + protected final Future schedule(final RunnableScheduledFuture task) { if (inEventLoop()) { add0(task); } else { @@ -253,9 +252,9 @@ final void removeScheduled(final RunnableScheduledFutureNode task) { /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}. - * - * This can be used if you want to override {@link #newTaskFor(Callable)} and return a different - * {@link RunnableFuture}. + *

+ * This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a + * different {@link RunnableFuture}. */ protected static RunnableScheduledFuture newRunnableScheduledFuture( AbstractScheduledEventExecutor executor, Promise promise, Callable task, @@ -271,7 +270,8 @@ protected RunnableScheduledFuture newScheduledTaskFor( return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period); } - interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { } + interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { + } private static final class DefaultRunnableScheduledFutureNode implements RunnableScheduledFutureNode { private final RunnableScheduledFuture future; @@ -308,8 +308,8 @@ public RunnableScheduledFuture addListener(FutureListener listener } @Override - public RunnableScheduledFuture addListener(C context, - FutureContextListener listener) { + public RunnableScheduledFuture addListener( + C context, FutureContextListener listener) { future.addListener(context, listener); return this; } @@ -335,8 +335,8 @@ public void run() { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); + public boolean cancel() { + return future.cancel(); } @Override @@ -360,12 +360,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution } @Override - public long getDelay(TimeUnit unit) { - return future.getDelay(unit); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { return future.compareTo(o); } @@ -393,11 +388,6 @@ public RunnableFuture awaitUninterruptibly() { return this; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return future.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java index 968585aa915..3dd8690f32e 100644 --- a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java +++ b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java @@ -25,9 +25,10 @@ */ interface AsynchronousResult { /** - * Cancel this asynchronous operation, unless it has already been completed. + * Cancel this asynchronous operation, unless it has already been completed + * or is not {@linkplain #isCancellable() cancellable}. *

- * A cancelled operation is considered to be {@linkplain #isFailed() failed}. + * A cancelled operation is considered to be {@linkplain #isDone() done} and {@linkplain #isFailed() failed}. *

* If the cancellation was successful, the result of this operation will be that it has failed with a * {@link CancellationException}. @@ -66,7 +67,11 @@ interface AsynchronousResult { boolean isDone(); /** - * returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Note that this is inherently racy, as the operation could be made + * {@linkplain Promise#setUncancellable() uncancellable} at any time. + * + * @return {@code true} if this operation can be cancelled. */ boolean isCancellable(); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java index 62b2b013ce7..df37edb1e79 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java @@ -17,8 +17,11 @@ import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -33,7 +36,7 @@ * * @param the value type. */ -final class DefaultFutureCompletionStage implements FutureCompletionStage { +final class DefaultFutureCompletionStage implements FutureCompletionStage, java.util.concurrent.Future { private enum Marker { EMPTY, ERROR @@ -50,6 +53,31 @@ private enum Marker { this.future = future; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + @Override public Future future() { return future; diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 9136b679544..a88a4f50509 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -384,14 +384,6 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution @Override public boolean cancel() { - return cancel(false); - } - - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); @@ -668,6 +660,15 @@ static void safeExecute(EventExecutor executor, Runnable task) { @Override public FutureCompletionStage asStage() { + return getFutureStageAdaptor(); + } + + @Override + public java.util.concurrent.Future asJdkFuture() { + return getFutureStageAdaptor(); + } + + private DefaultFutureCompletionStage getFutureStageAdaptor() { DefaultFutureCompletionStage stageAdapter = stage; if (stageAdapter == null) { stage = stageAdapter = new DefaultFutureCompletionStage<>(this); diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index b409bbc0264..e2cd2f47ac1 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -15,15 +15,10 @@ */ package io.netty.util.concurrent; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_QUIET_PERIOD; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_TIMEOUT; @@ -34,61 +29,88 @@ * life-cycle and allows shutting them down in a global fashion. * */ -public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { - +public interface EventExecutorGroup extends Iterable, Executor { /** * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}. + *

+ * An executor group that "is shutting down" can still accept new tasks for a little while (the grace period), + * but will eventually start rejecting new tasks. + * At that point, the executor group will be {@linkplain #isShutdown() shut down}. + * + * @return {@code true} if all executors in this group have at least started shutting down, otherwise {@code false}. */ boolean isShuttingDown(); + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} have been + * {@linkplain #shutdownGracefully() shut down gracefully} and moved past the grace period so that they are no + * longer accepting any new tasks. + *

+ * An executor group that "is shut down" might still be executing tasks that it has queued up, but it will no + * longer be accepting any new tasks. + * Once all running and queued tasks have completed, the executor group will be + * {@linkplain #isTerminated() terminated}. + * + * @return {@code true} if all executors in this group have shut down and are no longer accepting any new tasks. + */ + boolean isShutdown(); + + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} are + * {@linkplain #isShutdown() shut down}, and all of their tasks have completed. + * + * @return {@code true} if all executors in this group have terminated. + */ + default boolean isTerminated() { + return terminationFuture().isDone(); + } + + /** + * Wait for this {@link EventExecutorGroup} to {@linkplain #isTerminated() terminate}, up to the given timeout. + * + * @param timeout The non-negative maximum amount of time to wait for the executor group to terminate. + * @param unit The non-null time unit of the timeout. + * @return {@code true} if the executor group terminated within the specific timeout. + * @throws InterruptedException If this thread was {@linkplain Thread#interrupt() interrupted} while waiting for + * executor group to terminate. + */ + default boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return terminationFuture().await(timeout, unit); + } + /** * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values. * * @return the {@link #terminationFuture()} */ - default Future shutdownGracefully() { + default Future shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. - * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for 'the quiet period' - * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, - * it is guaranteed to be accepted and the quiet period will start over. + * This method ensures that no tasks are submitted for 'the quiet period' (usually a couple seconds) before + * it shuts itself down. If a task is submitted during the quiet period, it is guaranteed to be accepted and the + * quiet period will start over. * * @param quietPeriod the quiet period as described in the documentation - * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} - * regardless if a task was submitted during the quiet period + * @param timeout the maximum amount of time to wait until the executor is + * {@linkplain #isShuttingDown() shutting down} regardless if a task was submitted during the quiet period. * @param unit the unit of {@code quietPeriod} and {@code timeout} * * @return the {@link #terminationFuture()} */ - Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); + Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); /** * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this * {@link EventExecutorGroup} have been terminated. + * + * @return The {@link Future} representing the termination of this {@link EventExecutorGroup}. */ - Future terminationFuture(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - void shutdown(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - default List shutdownNow() { - shutdown(); - return Collections.emptyList(); - } + Future terminationFuture(); /** * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. @@ -98,66 +120,116 @@ default List shutdownNow() { @Override Iterator iterator(); - @Override - default Future submit(Runnable task) { + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @return A future that represents the completion of the submitted task. + */ + default Future submit(Runnable task) { return next().submit(task); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces the given result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param result The value that the returned future will complete with, if the task completes successfully. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Runnable task, T result) { return next().submit(task, result); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Callable task) { return next().submit(task); } - @Override - default ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return next().schedule(command, delay, unit); - } - - @Override - default ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return next().schedule(callable, delay, unit); - } - - @Override - default ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return next().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - default ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return next().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - default List> invokeAll(Collection> tasks) - throws InterruptedException { - return next().invokeAll(tasks); + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Runnable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); } - @Override - default List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return next().invokeAll(tasks, timeout, unit); + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @param The type of the future result. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Callable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); } - @Override - default T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return next().invokeAny(tasks); + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given period of time between each execution is started. + * If the task takes longer to complete than the requested period, then the following executions will be delayed, + * rather than allowing multiple instances of the task to run concurrently. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute at a fixed rate in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param period The positive period for the execution frequency to use after the first execution has started, + * in terms of the given time unit. + * @param unit The non-null time unit for the delay and period. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return next().scheduleAtFixedRate(task, initialDelay, period, unit); } - @Override - default T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return next().invokeAny(tasks, timeout, unit); + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given subsequent delay between one task completing and the next task starting. + * The delay from the completion of one task, to the start of the next, stays unchanged regardless of how long a + * task takes to complete. + *

+ * This is in contrast to {@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)} which varies the delays + * between the tasks in order to hit a given frequency. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute with fixed delays in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param delay The positive subsequent delay between task, to use after the first execution has completed, + * in terms of the given time unit. + * @param unit The non-null time unit for the delays. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return next().scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - default void execute(Runnable command) { - next().execute(command); + default void execute(Runnable task) { + next().execute(task); } } diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 7f946fbc7ec..f2d80c85da1 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -150,8 +150,7 @@ * } * */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface Future extends java.util.concurrent.Future, AsynchronousResult { +public interface Future extends AsynchronousResult { /** * Adds the specified listener to this future. The specified listener is notified when this future is {@linkplain * #isDone() done}. If this future is already completed, the specified listener is notified immediately. @@ -236,14 +235,15 @@ public interface Future extends java.util.concurrent.Future, AsynchronousR boolean awaitUninterruptibly(long timeoutMillis); /** - * {@inheritDoc} - *

- * If the cancellation was successful it will fail the future with a {@link CancellationException}. + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block until it completes. + * + * @return The result of the task execution, if it completed successfully. + * @throws InterruptedException If the call was blocked, waiting for the future to complete, and the thread was + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception or through cancellation. */ - @Override - boolean cancel(boolean mayInterruptIfRunning); - - @Override default V get() throws InterruptedException, ExecutionException { await(); @@ -257,7 +257,24 @@ default V get() throws InterruptedException, ExecutionException { throw new ExecutionException(cause); } - @Override + /** + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block, waiting up to the given timeout for the future + * to complete. + * If the future does not complete within the specified timeout, then a {@link TimeoutException} will be thrown. + * If the timeout is zero, then this method will not block, and instead either get the result or failure of the + * future if completed, or immediately throw a {@link TimeoutException} if not yet completed. + * + * @param timeout The non-negative maximum amount of time, in terms of the given time unit, to wait for the + * completion of the future. + * @param unit The time unit for the timeout. + * @return The value of the successfully completed future. + * @throws InterruptedException If this call was blocking and this thread got + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception, or through cancellation. + * @throws TimeoutException If the future did not complete within the specified timeout. + */ default V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); @@ -280,6 +297,13 @@ default FutureCompletionStage asStage() { return new DefaultFutureCompletionStage<>(this); } + /** + * Returns a {@link java.util.concurrent.Future JDK Future that reflects the state of this {@link Future}. + */ + default java.util.concurrent.Future asJdkFuture() { + return new DefaultFutureCompletionStage<>(this); + } + /** * Creates a new {@link Future} that will complete with the result of this {@link Future} mapped * through the given mapper function. diff --git a/common/src/main/java/io/netty/util/concurrent/Futures.java b/common/src/main/java/io/netty/util/concurrent/Futures.java index f9eeb001c2c..09d00c921cc 100644 --- a/common/src/main/java/io/netty/util/concurrent/Futures.java +++ b/common/src/main/java/io/netty/util/concurrent/Futures.java @@ -136,7 +136,7 @@ private static final class PropagateCancel implements FutureContextListener context, Future future) throws Exception { if (future.isCancelled()) { - context.cancel(false); + context.cancel(); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index bc1bd54e78c..14b00ad5efc 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -67,7 +67,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( this, new UnsupportedOperationException()).asFuture(); private GlobalEventExecutor() { @@ -150,21 +150,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { - throw new UnsupportedOperationException(); - } - @Override public boolean isShuttingDown() { return false; diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java index d2c945320fb..32602f4867f 100644 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -25,6 +23,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + /** * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be * queued until the original {@link Runnable} finishes execution. @@ -54,7 +54,7 @@ protected Boolean initialValue() throws Exception { } }; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()).asFuture(); private ImmediateEventExecutor() { } @@ -65,19 +65,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { } - @Override public boolean isShuttingDown() { return false; @@ -99,14 +95,14 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); + public void execute(Runnable task) { + requireNonNull(task, "command"); if (!RUNNING.get()) { RUNNING.set(true); try { - command.run(); + task.run(); } catch (Throwable cause) { - logger.info("Throwable caught while executing Runnable {}", command, cause); + logger.info("Throwable caught while executing Runnable {}", task, cause); } finally { Queue delayedRunnables = DELAYED_RUNNABLES.get(); Runnable runnable; @@ -120,7 +116,7 @@ public void execute(Runnable command) { RUNNING.set(false); } } else { - DELAYED_RUNNABLES.get().add(command); + DELAYED_RUNNABLES.get().add(task); } } @@ -130,23 +126,23 @@ public Promise newPromise() { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index ae05112c7c5..b22b2691b16 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -38,7 +38,7 @@ public class MultithreadEventExecutorGroup implements EventExecutorGroup { private final EventExecutor[] children; private final List readonlyChildren; private final AtomicInteger terminatedChildren = new AtomicInteger(); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); private final boolean powerOfTwo; /** @@ -223,7 +223,7 @@ protected EventExecutor newChild(Executor executor, int maxPendingTasks, } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } @@ -231,18 +231,10 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - for (EventExecutor l: children) { - l.shutdown(); - } - } - @Override public final boolean isShuttingDown() { for (EventExecutor l: children) { diff --git a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java index 529127a1e0a..318dd47c862 100644 --- a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java @@ -15,23 +15,19 @@ */ package io.netty.util.concurrent; -import static io.netty.util.internal.ObjectUtil.checkPositive; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.util.Objects.requireNonNull; + /** * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s. @@ -83,32 +79,20 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully() { + public Future shutdownGracefully() { return group.shutdownGracefully(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return group.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return group.terminationFuture(); } - @SuppressWarnings("deprecation") - @Override - public void shutdown() { - group.shutdown(); - } - - @SuppressWarnings("deprecation") - @Override - public List shutdownNow() { - return group.shutdownNow(); - } - @Override public EventExecutor next() { return newExecutor(group.next()); @@ -136,7 +120,7 @@ public void remove() { } @Override - public Future submit(Runnable task) { + public Future submit(Runnable task) { return group.submit(task); } @@ -151,23 +135,23 @@ public Future submit(Callable task) { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return group.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return group.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return group.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return group.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return group.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return group.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override @@ -186,31 +170,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public List> invokeAll( - Collection> tasks) throws InterruptedException { - return group.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return group.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return group.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return group.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - group.execute(command); + public void execute(Runnable task) { + group.execute(task); } private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor @@ -294,20 +255,15 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShutdown() { return executor.isShutdown(); @@ -324,8 +280,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException(); } if (state.compareAndSet(NONE, SUBMITTED)) { @@ -336,25 +292,25 @@ public void execute(Runnable command) { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index 32698bf755c..1c88cd7e3d0 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -121,6 +121,11 @@ protected final boolean setUncancellableInternal() { return super.setUncancellable(); } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return cancel(); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java index 93613b551fb..ab829256ba6 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java @@ -18,8 +18,7 @@ /** * A combination of {@link java.util.concurrent.RunnableFuture} and {@link Future}. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableFuture extends java.util.concurrent.RunnableFuture, Future { +public interface RunnableFuture extends Runnable, Future { @Override RunnableFuture addListener(FutureListener listener); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java index 9ba77d8c076..4069d8e4cce 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; @@ -24,6 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; + final class RunnableFutureAdapter implements RunnableFuture { private final Promise promise; @@ -134,14 +134,9 @@ public void run() { } } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - @Override public boolean cancel() { - return cancel(false); + return future.cancel(); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java index b286d02a929..5376760992b 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java @@ -16,12 +16,17 @@ package io.netty.util.concurrent; /** - * A combination of {@link java.util.concurrent.RunnableScheduledFuture}, {@link RunnableFuture} and - * {@link ScheduledFuture}. + * A combination of {@link RunnableFuture} and {@link Comparable} (sorting by their next deadline), + * with additional methods for scheduling, periodicity, and delay. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableScheduledFuture extends - java.util.concurrent.RunnableScheduledFuture, RunnableFuture, ScheduledFuture { +public interface RunnableScheduledFuture extends RunnableFuture, Comparable> { + /** + * Return {@code true} if the task is periodic, which means it may be executed multiple times, as opposed to a + * delayed task or a normal task, that only execute once. + * + * @return {@code true} if this task is periodic, otherwise {@code false}. + */ + boolean isPeriodic(); /** * Returns the deadline in nanos when the {@link #run()} method should be called again. diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java index 5129ef5a382..2d1cd510f1b 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java @@ -16,19 +16,17 @@ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -@SuppressWarnings("ComparableImplementedButEqualsNotOverridden") +import static java.util.Objects.requireNonNull; + final class RunnableScheduledFutureAdapter implements AbstractScheduledEventExecutor.RunnableScheduledFutureNode { private static final AtomicLong NEXT_TASK_ID = new AtomicLong(); @@ -75,12 +73,7 @@ public long delayNanos(long currentTimeNanos) { } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { if (this == o) { return 0; } @@ -100,6 +93,23 @@ public int compareTo(Delayed o) { } } + @Override + public int hashCode() { + return Long.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof RunnableScheduledFutureAdapter) { + RunnableScheduledFutureAdapter adaptor = (RunnableScheduledFutureAdapter) obj; + return id == adaptor.id; + } + return false; + } + @Override public void run() { try { @@ -130,23 +140,15 @@ public void run() { } } - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean canceled = future.cancel(mayInterruptIfRunning); + public boolean cancel() { + boolean canceled = future.cancel(); if (canceled) { executor.removeScheduled(this); } return canceled; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return promise.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java deleted file mode 100644 index a6a91f63142..00000000000 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -/** - * The result of an scheduled asynchronous operation. - */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface ScheduledFuture extends Future, java.util.concurrent.ScheduledFuture { -} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 3bd35150885..7ccd11b1186 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -22,21 +22,17 @@ import java.lang.Thread.State; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -95,7 +91,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; - private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); + private final Promise terminationFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); /** * Create a new instance @@ -500,7 +496,7 @@ private boolean runShutdownHooks() { } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } @@ -538,7 +534,6 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - //System.err.println(oldState + " " + newState + " " + this); break; } } @@ -560,58 +555,10 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - if (isShutdown()) { - return; - } - - boolean inEventLoop = inEventLoop(); - boolean wakeup; - int oldState; - for (;;) { - if (isShuttingDown()) { - return; - } - int newState; - wakeup = true; - oldState = state; - if (inEventLoop) { - newState = ST_SHUTDOWN; - } else { - switch (oldState) { - case ST_NOT_STARTED: - case ST_STARTED: - case ST_SHUTTING_DOWN: - newState = ST_SHUTDOWN; - break; - default: - newState = oldState; - wakeup = false; - } - } - if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - break; - } - } - - if (ensureThreadStarted(oldState)) { - return; - } - - if (wakeup) { - taskQueue.offer(WAKEUP_TASK); - if (!addTaskWakesUp) { - wakeup(inEventLoop); - } - } - } - @Override public final boolean isShuttingDown() { return state >= ST_SHUTTING_DOWN; @@ -732,39 +679,6 @@ public void execute(Runnable task) { } } - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks, timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks, timeout, unit); - } - - private void throwIfInEventLoop(String method) { - if (inEventLoop()) { - throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed"); - } - } - /** * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}. * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index bd5eac08f43..4273e06c0d0 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -18,7 +18,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.RejectedExecutionHandler; @@ -36,25 +36,28 @@ * * Because it provides no ordering care should be taken when using it! */ -public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor { +@SuppressWarnings("unchecked") +public final class UnorderedThreadPoolEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance( UnorderedThreadPoolEventExecutor.class); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final InnerScheduledThreadPoolExecutor executor; /** * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)} * using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class)); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)} */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** @@ -62,7 +65,8 @@ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFa * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } /** @@ -70,7 +74,7 @@ public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandl */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, threadFactory, handler); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } @Override @@ -84,94 +88,97 @@ public boolean isShuttingDown() { } @Override - public List shutdownNow() { - List tasks = super.shutdownNow(); - terminationFuture.trySuccess(null); - return tasks; + public boolean isShutdown() { + return executor.isShutdown(); } @Override - public void shutdown() { - super.shutdown(); - terminationFuture.trySuccess(null); + public boolean isTerminated() { + return executor.isTerminated(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which // respects the quietPeriod and timeout. - shutdown(); + executor.shutdown(); return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture.asFuture(); } @Override - protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { - return runnable instanceof NonNotifyRunnable ? - task : new RunnableScheduledFutureTask<>(this, runnable, task); - } - - @Override - protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask<>(this, callable, task); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return (ScheduledFuture) super.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return (Future) executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return (ScheduledFuture) super.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return (Future) executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - public Future submit(Runnable task) { - return (Future) super.submit(task); + public Future submit(Runnable task) { + return (Future) executor.submit(task); } @Override public Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + return (Future) executor.submit(task, result); } @Override public Future submit(Callable task) { - return (Future) super.submit(task); + return (Future) executor.submit(task); } @Override - public void execute(Runnable command) { - super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS); + public void execute(Runnable task) { + executor.schedule(new NonNotifyRunnable(task), 0, NANOSECONDS); } + /** + * Return the task queue of the underlying {@link java.util.concurrent.Executor} instance. + *

+ * Visible for testing. + * + * @return The task queue of this executor. + */ + BlockingQueue getQueue() { + return executor.getQueue(); + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ private static final class RunnableScheduledFutureTask extends PromiseTask - implements RunnableScheduledFuture, ScheduledFuture { + implements RunnableScheduledFuture { private final RunnableScheduledFuture future; - RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFuture future) { super(executor, runnable, null); this.future = future; } - RunnableScheduledFutureTask(EventExecutor executor, Callable callable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Callable callable, RunnableScheduledFuture future) { super(executor, callable); this.future = future; } @@ -228,4 +235,30 @@ public void run() { task.run(); } } + + private static final class InnerScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private final EventExecutor eventExecutor; + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + this.eventExecutor = eventExecutor; + } + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + this.eventExecutor = eventExecutor; + } + + @Override + protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { + return runnable instanceof NonNotifyRunnable ? + task : new RunnableScheduledFutureTask<>(eventExecutor, runnable, task); + } + + @Override + protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { + return new RunnableScheduledFutureTask<>(eventExecutor, callable, task); + } + } } diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java index f61ff387ff4..a0ae541103b 100644 --- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -35,8 +35,8 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -44,8 +44,8 @@ public void testScheduleRunnableZero() { @Test public void testScheduleRunnableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -53,8 +53,8 @@ public void testScheduleRunnableNegative() { @Test public void testScheduleCallableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -62,12 +62,16 @@ public void testScheduleCallableZero() { @Test public void testScheduleCallableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } + private static long getDelay(Future future) { + return ((RunnableScheduledFuture) future).delayNanos(); + } + @Test public void testScheduleAtFixedRateRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); @@ -113,17 +117,12 @@ public boolean inEventLoop(Thread thread) { } @Override - public void shutdown() { - // NOOP - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } @@ -143,7 +142,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { throw new UnsupportedOperationException(); } } diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index bc276dc988b..74aa677e046 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -80,19 +80,15 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return null; } @Override - public Future terminationFuture() { + public Future terminationFuture() { return null; } - @Override - public void shutdown() { - } - @Override public boolean isShutdown() { return false; @@ -109,23 +105,23 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, - TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, + TimeUnit unit) { return fail("Cannot schedule commands"); } @@ -135,7 +131,7 @@ public boolean inEventLoop(Thread thread) { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { fail("Cannot schedule commands"); } } @@ -145,7 +141,7 @@ public void testCancelDoesNotScheduleWhenNoListeners() { EventExecutor executor = new RejectingEventExecutor(); DefaultPromise promise = new DefaultPromise(executor); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); } @@ -174,23 +170,33 @@ public void testFailureDoesNotScheduleWhenNoListeners() { @Test public void testCancellationExceptionIsThrownWhenBlockingGet() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, promise::get); } @Test public void testCancellationExceptionIsThrownWhenBlockingGetWithTimeout() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, () -> promise.get(1, TimeUnit.SECONDS)); } @Test public void testCancellationExceptionIsReturnedAsCause() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThat(promise.cause()).isInstanceOf(CancellationException.class); assertTrue(promise.isFailed()); + assertTrue(promise.isDone()); + } + + @Test + public void uncancellablePromiseIsNotDone() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + assertFalse(promise.isDone()); + assertFalse(promise.isCancellable()); + assertFalse(promise.isCancelled()); } @Test @@ -390,6 +396,22 @@ public void setUncancellableGetNow() { assertEquals("success", promise.getNow()); } + @Test + public void cancellingUncancellablePromiseDoesNotCompleteIt() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + promise.cancel(); + assertFalse(promise.isCancelled()); + assertFalse(promise.isDone()); + assertFalse(promise.isFailed()); + assertFalse(promise.isSuccess()); + promise.setSuccess(null); + assertFalse(promise.isCancelled()); + assertTrue(promise.isDone()); + assertFalse(promise.isFailed()); + assertTrue(promise.isSuccess()); + } + @Test public void throwUncheckedSync() throws InterruptedException { Exception exception = new Exception(); @@ -421,7 +443,7 @@ public void throwUncheckedSyncUninterruptibly() { @Test public void throwCancelled() throws InterruptedException { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - promise.cancel(true); + promise.cancel(); assertThrows(CancellationException.class, promise::sync); } diff --git a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java index 862e52f213a..75109a9115b 100644 --- a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java @@ -90,7 +90,7 @@ public void mapMustNotFailOriginalFutureWhenMapperFunctionThrows() { public void cancelOnFutureFromMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -99,7 +99,7 @@ public void cancelOnFutureFromMapMustCancelOriginalFuture() { public void cancelOnOriginalFutureMustCancelFutureFromMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -165,7 +165,7 @@ public void flatMapMustNotFailOriginalFutureWhenMapperFunctionThrows() { public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -174,7 +174,7 @@ public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() { public void cancelOnOriginalFutureMustCancelFutureFromFlatMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -184,7 +184,7 @@ public void cancelOnFutureFromFlatMapMapperMustCancelReturnedFuture() throws Exc DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> { Future future = new DefaultPromise<>(INSTANCE); - future.cancel(false); + future.cancel(); return future; }); @@ -254,7 +254,7 @@ public void cascadeToCancel() throws Exception { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); assertTrue(promise2.await(1, SECONDS)); assertTrue(promise2.isCancelled()); @@ -267,7 +267,7 @@ public void cascadeToCancelSecond() throws Exception { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise2.cancel(false)); + assertTrue(promise2.cancel()); assertTrue(promise2.isCancelled()); // diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java index 1837bb1c96b..78550e05049 100644 --- a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -38,11 +38,7 @@ public class GlobalEventExecutorTest { @BeforeEach public void setUp() throws Exception { // Wait until the global executor is stopped (just in case there is a task running due to previous test cases) - for (;;) { - if (e.thread == null || !e.thread.isAlive()) { - break; - } - + while (e.thread != null && e.thread.isAlive()) { Thread.sleep(50); } } @@ -76,7 +72,7 @@ public void testAutomaticStartStop() throws Exception { @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testScheduledTasks() throws Exception { TestRunnable task = new TestRunnable(0); - ScheduledFuture f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); f.sync(); assertThat(task.ran.get(), is(true)); @@ -115,7 +111,7 @@ public void testTakeTask() throws Exception { //add scheduled task TestRunnable scheduledTask = new TestRunnable(0); - ScheduledFuture f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(0); @@ -134,7 +130,7 @@ public void testTakeTaskAlwaysHasTask() throws Exception { //for https://github.com/netty/netty/issues/1614 //add scheduled task TestRunnable t = new TestRunnable(0); - final ScheduledFuture f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index ddfe32587bf..379990626d1 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -17,10 +17,7 @@ import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -99,82 +96,6 @@ protected void run() { executor.shutdownGracefully(); } - @Test - public void testInvokeAnyInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAnyInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - private static void testInvokeInEventLoop(final boolean any, final boolean timeout) { - final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(Executors.defaultThreadFactory()) { - @Override - protected void run() { - while (!confirmShutdown()) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - } - } - } - }; - try { - final Promise promise = executor.newPromise(); - executor.execute(() -> { - try { - Set> set = Collections.singleton(() -> { - promise.setFailure(new AssertionError("Should never execute the Callable")); - return Boolean.TRUE; - }); - if (any) { - if (timeout) { - executor.invokeAny(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAny(set); - } - } else { - if (timeout) { - executor.invokeAll(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAll(set); - } - } - promise.setFailure(new AssertionError("Should never reach here")); - } catch (Throwable cause) { - promise.setFailure(cause); - } - }); - promise.asFuture().syncUninterruptibly(); - } finally { - executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); - } - } - @Test public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { @@ -266,7 +187,7 @@ protected void run() { //add scheduled task TestRunnable scheduledTask = new TestRunnable(); - ScheduledFuture f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(); @@ -300,7 +221,7 @@ protected void run() { assertTimeoutPreemptively(ofSeconds(5), () -> { //add scheduled task TestRunnable t = new TestRunnable(); - final ScheduledFuture f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java index aff141610ee..caeabdfbe58 100644 --- a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -65,7 +65,7 @@ public void run() { try { latch.await(); } finally { - future.cancel(true); + future.cancel(); executor.shutdownGracefully(); } } @@ -105,4 +105,41 @@ public String call() { executor.shutdownGracefully(); } } + + @Test + public void futuresMustHaveCorrectExecutor() { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + Runnable runnable = () -> { + }; + Callable callable = () -> null; + Future future = null; + + try { + future = executor.schedule(runnable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.schedule(callable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleWithFixedDelay(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(runnable); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(callable); + assertSame(executor, future.executor()); + } finally { + future.cancel(); + executor.shutdownGracefully(); + } + } } diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java index e915577f980..82f2fc76764 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -348,7 +348,7 @@ private void failPendingWritesAndClose(Throwable cause) { private void cancelConnectTimeoutFuture() { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); connectTimeoutFuture = null; } } diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index 62bb9e7e69b..aaea42c3a83 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -205,7 +205,7 @@ private void scheduleFlush(final ChannelHandlerContext ctx) { private void cancelScheduledFlush() { if (nextScheduledFlush != null) { - nextScheduledFlush.cancel(false); + nextScheduledFlush.cancel(); nextScheduledFlush = null; } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index c26c596f72b..35551c3b732 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -2038,7 +2038,7 @@ private void applyHandshakeTimeout() { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } private void forceFlush(ChannelHandlerContext ctx) { @@ -2088,7 +2088,7 @@ private void safeClose( // Close the connection if close_notify is sent in time. flushFuture.addListener(f -> { if (timeoutFuture != null) { - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis; if (closeNotifyReadTimeout <= 0) { @@ -2122,7 +2122,7 @@ private void safeClose( // Do the close once we received the close_notify. closeFuture.addListener(future -> { if (closeNotifyReadTimeoutFuture != null) { - closeNotifyReadTimeoutFuture.cancel(false); + closeNotifyReadTimeoutFuture.cancel(); } if (ctx.channel().isActive()) { addCloseListener(ctx.close(), promise); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 258c5525bf1..7ce25394ca3 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -348,15 +348,15 @@ private void destroy() { state = 2; if (readerIdleTimeout != null) { - readerIdleTimeout.cancel(false); + readerIdleTimeout.cancel(); readerIdleTimeout = null; } if (writerIdleTimeout != null) { - writerIdleTimeout.cancel(false); + writerIdleTimeout.cancel(); writerIdleTimeout = null; } if (allIdleTimeout != null) { - allIdleTimeout.cancel(false); + allIdleTimeout.cancel(); allIdleTimeout = null; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index e5131bbc92e..d8ceaaa5990 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -117,7 +117,7 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { lastTask = null; while (task != null) { assert task.ctx.executor().inEventLoop(); - task.scheduledFuture.cancel(false); + task.scheduledFuture.cancel(); WriteTimeoutTask prev = task.prev; task.prev = null; task.next = null; @@ -214,7 +214,7 @@ public void run() { @Override public void operationComplete(Future future) throws Exception { // scheduledFuture has already be set when reaching here - scheduledFuture.cancel(false); + scheduledFuture.cancel(); // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the // modification so its picked up by the executor.. diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java index aa0ec578087..0915b6f373d 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java @@ -15,13 +15,13 @@ */ package io.netty.handler.traffic; -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; - import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel; +import io.netty.util.concurrent.EventExecutorGroup; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; + /** * Version for {@link GlobalChannelTrafficShapingHandler}. * This TrafficCounter is the Global one, and its special property is to directly handle @@ -36,7 +36,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter { * @param checkInterval the checkInterval in millisecond between two computations. */ public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + EventExecutorGroup executor, String name, long checkInterval) { super(trafficShapingHandler, executor, name, checkInterval); checkNotNullWithIAE(executor, "executor"); } @@ -111,7 +111,7 @@ public synchronized void stop() { resetAccounting(milliSecondFromNano()); trafficShapingHandler.doAccounting(this); if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java index 8a3de8743c3..faa780845cb 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; @@ -33,7 +34,6 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -149,7 +149,7 @@ static final class PerChannel { /** * Create the global TrafficCounter */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { // Default setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION); checkNotNullWithIAE(executor, "executor"); @@ -167,7 +167,7 @@ protected int userDefinedWritabilityIndex() { * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -182,7 +182,7 @@ protected int userDefinedWritabilityIndex() { * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) { @@ -196,7 +196,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -209,7 +209,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) { @@ -223,7 +223,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -233,7 +233,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * @param readChannelLimit * 0 or a limit in bytes/s */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit) { super(writeGlobalLimit, readGlobalLimit); @@ -246,12 +246,12 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } @@ -260,9 +260,9 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, lon * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor) { createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index 89a58ac0117..fbd16c8c48b 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Promise; import java.util.ArrayDeque; @@ -104,7 +105,7 @@ private static final class PerChannel { /** * Create the global TrafficCounter. */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { requireNonNull(executor, "executor"); TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval); setTrafficCounter(tc); @@ -120,7 +121,7 @@ protected int userDefinedWritabilityIndex() { * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -131,7 +132,7 @@ protected int userDefinedWritabilityIndex() { * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit, long checkInterval, long maxTime) { super(writeLimit, readLimit, checkInterval, maxTime); createGlobalTrafficCounter(executor); @@ -142,7 +143,7 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -151,8 +152,8 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, - long readLimit, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, + long readLimit, long checkInterval) { super(writeLimit, readLimit, checkInterval); createGlobalTrafficCounter(executor); } @@ -162,13 +163,13 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit) { super(writeLimit, readLimit); createGlobalTrafficCounter(executor); @@ -179,12 +180,12 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms and no limit. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 283e01166eb..fd384e2de29 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,11 +15,13 @@ */ package io.netty.handler.traffic; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -148,7 +150,7 @@ public static long milliSecondFromNano() { /** * Executor that will run the monitor */ - final ScheduledExecutorService executor; + final EventExecutorGroup executor; /** * Monitor created once in start() */ @@ -156,7 +158,7 @@ public static long milliSecondFromNano() { /** * used in stop() to cancel the timer */ - volatile ScheduledFuture scheduledFuture; + volatile Future scheduledFuture; /** * Is Monitor active @@ -211,7 +213,7 @@ public synchronized void stop() { trafficShapingHandler.doAccounting(this); } if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } @@ -252,7 +254,7 @@ synchronized void resetAccounting(long newLastTime) { * @param checkInterval * the checkInterval in millisecond between two computations. */ - public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(EventExecutor executor, String name, long checkInterval) { requireNonNull(name, "name"); trafficShapingHandler = null; @@ -277,7 +279,7 @@ public TrafficCounter(ScheduledExecutorService executor, String name, long check * the checkInterval in millisecond between two computations. */ public TrafficCounter( - AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor, String name, long checkInterval) { this.name = requireNonNull(name, "name"); this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler"); @@ -304,13 +306,12 @@ private void init(long checkInterval) { public void configure(long newCheckInterval) { long newInterval = newCheckInterval / 10 * 10; if (checkInterval.getAndSet(newInterval) != newInterval) { + stop(); if (newInterval <= 0) { - stop(); // No more active monitoring lastTime.set(milliSecondFromNano()); } else { // Restart - stop(); start(); } } diff --git a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java index 22a57ddef13..80dff79e5c7 100644 --- a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java @@ -30,11 +30,12 @@ import io.netty.channel.local.LocalServerChannel; import io.netty.util.Attribute; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.SingleThreadEventExecutor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -42,14 +43,14 @@ public class TrafficShapingHandlerTest { private static final long READ_LIMIT_BYTES_PER_SECOND = 1; - private static final ScheduledExecutorService SES = Executors.newSingleThreadScheduledExecutor(); + private static final EventExecutorGroup SES = new SingleThreadEventExecutor(); private static final MultithreadEventLoopGroup GROUP = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); @AfterAll public static void destroy() { GROUP.shutdownGracefully(); - SES.shutdown(); + SES.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @Test diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 36f3ee9e0da..6d5b9223d93 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -126,7 +126,7 @@ public Future write(ChannelHandlerContext ctx, Object msg) { public void tearDown() throws Exception { chan.close().sync(); serverChan.close().sync(); - future.cancel(true); + future.cancel(); group.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); abyte.release(); } diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java index c5ac0dbf812..0bbfdf5b734 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java @@ -23,8 +23,14 @@ import io.netty.channel.nio.NioHandler; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; +import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import io.netty.util.internal.PlatformDependent; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -38,17 +44,12 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.Blackhole; -import java.util.Collection; -import java.util.List; +import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -60,18 +61,17 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { * This executor is useful as the best burst latency performer because it won't go to sleep and won't be hit by the * cost of being awaken on both offer/consumer side. */ - private static final class SpinExecutorService implements ExecutorService { - + private static final class SpinExecutorService implements EventExecutorGroup { private static final Runnable POISON_PILL = () -> { }; private final Queue tasks; private final AtomicBoolean poisoned = new AtomicBoolean(); private final Thread executorThread; + private final Promise terminationFuture = ImmediateEventExecutor.INSTANCE.newPromise(); SpinExecutorService(int maxTasks) { tasks = PlatformDependent.newFixedMpscQueue(maxTasks); executorThread = new Thread(() -> { - final Queue tasks = SpinExecutorService.this.tasks; Runnable task; while ((task = tasks.poll()) != POISON_PILL) { if (task != null) { @@ -83,22 +83,8 @@ private static final class SpinExecutorService implements ExecutorService { } @Override - public void shutdown() { - if (poisoned.compareAndSet(false, true)) { - while (!tasks.offer(POISON_PILL)) { - // Just try again - } - try { - executorThread.join(); - } catch (InterruptedException e) { - //We're quite trusty :) - } - } - } - - @Override - public List shutdownNow() { - throw new UnsupportedOperationException(); + public boolean isShuttingDown() { + return poisoned.get(); } @Override @@ -122,45 +108,78 @@ public Future submit(Callable task) { } @Override - public Future submit(Runnable task, T result) { + public Future schedule(Runnable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future submit(Runnable task) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { + public Future submit(Runnable task, T result) { throw new UnsupportedOperationException(); } @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public Future submit(Runnable task) { throw new UnsupportedOperationException(); } @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException( "If that happens, there is something wrong with the available capacity/burst size"); } } + + @Override + public Future shutdownGracefully() { + if (poisoned.compareAndSet(false, true)) { + while (!tasks.offer(POISON_PILL)) { + // Just try again + } + try { + executorThread.join(); + } catch (InterruptedException e) { + //We're quite trusty :) + } + } + terminationFuture.trySuccess(null); + return terminationFuture.asFuture(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return shutdownGracefully(); + } + + @Override + public Future terminationFuture() { + return terminationFuture.asFuture(); + } + + @Override + public EventExecutor next() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } } private enum ExecutorType { @@ -179,8 +198,8 @@ private enum ExecutorType { @Param({ "0", "10" }) private int work; - private ExecutorService executor; - private ExecutorService executorToShutdown; + private EventExecutorGroup executor; + private EventExecutorGroup executorToShutdown; @Setup public void setup() { @@ -199,7 +218,7 @@ public void setup() { executorToShutdown = executor; break; case juc: - executor = Executors.newSingleThreadScheduledExecutor(); + executor = new UnorderedThreadPoolEventExecutor(1); executorToShutdown = executor; break; case nioEventLoop: @@ -230,7 +249,7 @@ public void setup() { @TearDown public void tearDown() { - executorToShutdown.shutdown(); + executorToShutdown.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @State(Scope.Thread) @@ -255,7 +274,7 @@ public void setup(BurstCostExecutorsBenchmark bench) { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } else { completeTask = () -> { @@ -263,7 +282,7 @@ public void setup(BurstCostExecutorsBenchmark bench) { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } } @@ -283,7 +302,7 @@ public void resetCompleted() { */ public int spinWaitCompletionOf(int value) { while (true) { - final int lastRead = this.completed; + final int lastRead = completed; if (lastRead >= value) { return lastRead; } @@ -313,7 +332,7 @@ public int test3Producers(final PerThreadState state) { } private int executeBurst(final PerThreadState state) { - final ExecutorService executor = this.executor; + final EventExecutorGroup executor = this.executor; final int burstLength = this.burstLength; final Runnable completeTask = state.completeTask; for (int i = 0; i < burstLength; i++) { diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java index 8035bfc663e..7e0ff6cd326 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java @@ -68,7 +68,7 @@ public void stop() throws Exception { public Future cancelInOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = 0; i < futuresHolder.num; i++) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } @@ -77,7 +77,7 @@ public Future cancelInOrder(final FuturesHolder futuresHolder) { public Future cancelInReverseOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = futuresHolder.num - 1; i >= 0; i--) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } diff --git a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java index b6ea697f576..d88680d90a8 100644 --- a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java @@ -19,15 +19,13 @@ import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.openjdk.jmh.annotations.Fork; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.Fork; - /** * This harness facilitates the sharing of an executor between JMH and Netty and * thus avoid measuring context switching in microbenchmarks. @@ -87,21 +85,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - @Deprecated - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShuttingDown() { return executor.isShuttingDown(); @@ -128,8 +120,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { - executor.execute(command); + public void execute(Runnable task) { + executor.execute(task); } @Override @@ -138,24 +130,24 @@ public Promise newPromise() { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return executor.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return executor.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return executor.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java index 515a0e73690..25ab8dcf6b4 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java @@ -53,7 +53,7 @@ abstract class Cache { private static final Future CANCELLED_FUTURE = new Future() { @Override public boolean cancel() { - return cancel(false); + return false; } @Override @@ -131,11 +131,6 @@ public Object getNow() { return null; } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - @Override public EventExecutor executor() { throw new UnsupportedOperationException(); @@ -352,7 +347,7 @@ private void scheduleCacheExpirationIfNeeded(int ttl, EventLoop loop) { break; } else { // There was something else scheduled in the meantime... Cancel and try again. - newFuture.cancel(true); + newFuture.cancel(); } } else { break; @@ -401,7 +396,7 @@ private FutureAndDelay(Future future, int ttl) { } void cancel() { - future.cancel(false); + future.cancel(); } @Override diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index 68c48ead875..1fc45d30599 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -217,7 +217,7 @@ public void operationComplete(Future timeoutFuture = this.timeoutFuture; if (timeoutFuture != null) { this.timeoutFuture = null; - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } // Remove the id from the manager as soon as the query completes. This may be because of success, failure or diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java index 2bbb85bada2..26166f784c4 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java @@ -987,7 +987,7 @@ private void finishResolve(Promise> promise, Throwable cause) { Future> f = i.next(); i.remove(); - f.cancel(false); + f.cancel(); } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java index 08d46affdd3..debe7b94913 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -41,11 +41,10 @@ public abstract class AbstractSingleThreadEventLoopTest { @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); - group.shutdown(); + group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java index 83867b31857..ff711061f7a 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java @@ -58,11 +58,11 @@ public void testCancelWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { Channel cc = cb.connect(sc.localAddress()).get(); Future f = cc.write(a); - assertTrue(f.cancel(false)); + assertTrue(f.cancel()); cc.writeAndFlush(b); cc.write(c); Future f2 = cc.write(d); - assertTrue(f2.cancel(false)); + assertTrue(f2.cancel()); cc.writeAndFlush(e); while (sh.counter < 3) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java index de24fc41e34..68972660608 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java @@ -137,7 +137,7 @@ public void testConnectCancellation(Bootstrap cb) throws Throwable { } } - if (future.cancel(true)) { + if (future.cancel()) { assertThat(future.isCancelled()).isTrue(); } else { // Cancellation not supported by the transport. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index da160a626f1..722e7aec11d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -166,7 +166,7 @@ protected void doClose() throws Exception { Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } @@ -612,7 +612,7 @@ public void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -684,7 +684,7 @@ private void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 5a919fa2643..b4b385a28ab 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -61,7 +61,7 @@ void handleLoopException(Throwable t) { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); assertNull(capture.get()); } finally { group.shutdownGracefully(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 7388c41bb6d..7a1d9e4692b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -563,7 +563,7 @@ public void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -635,7 +635,7 @@ private void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index e578b7cca95..f24d2c95b3b 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -41,7 +41,7 @@ public void testScheduleBigDelayNotOverflow() { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 90d14d57938..6ac78449033 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -87,6 +87,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl /** * Return the {@link EventLoop} this {@link Channel} was registered to. */ + @Override EventLoop executor(); /** diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 90d516ecce6..77b27d68c25 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -45,7 +45,7 @@ public class SingleThreadEventLoop extends SingleThreadEventExecutor implements @Override public boolean canBlock() { assert inEventLoop(); - return !SingleThreadEventLoop.this.hasTasks() && !SingleThreadEventLoop.this.hasScheduledTasks(); + return !hasTasks() && !hasScheduledTasks(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index e44155972a3..f7f9e92b6a3 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -63,9 +63,9 @@ public EventLoop next() { } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); - tasks.add(command); + public void execute(Runnable task) { + requireNonNull(task, "command"); + tasks.add(task); if (!running) { runTasks(); } @@ -124,18 +124,12 @@ void cancelScheduled() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public void shutdown() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 422816cbd08..59ee95f7ada 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -250,7 +250,7 @@ public final void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -317,7 +317,7 @@ public final void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } @@ -452,7 +452,7 @@ protected void doClose() throws Exception { Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } } diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java index 3d243e31009..08ccbddffc6 100644 --- a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -17,7 +17,6 @@ import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.Test; @@ -26,6 +25,7 @@ import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; @@ -44,6 +44,7 @@ public void ensureInitialRegistrationFiresActive() throws Throwable { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); + when(eventLoop.newPromise()).thenReturn(INSTANCE.newPromise()); TestChannel channel = new TestChannel(eventLoop); // Using spy as otherwise intelliJ will not be able to understand that we dont want to skip the handler @@ -78,7 +79,7 @@ public void ensureSubsequentRegistrationDoesNotFireActive() throws Throwable { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); @@ -134,7 +135,7 @@ public void testClosedChannelExceptionCarryIOException() throws Exception { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); return null; diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 36ab4faaabb..ff84abdf812 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -95,14 +94,12 @@ public void stopEventLoop() { } @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); } @Test - @SuppressWarnings("deprecation") public void shutdownAfterStart() throws Exception { final CountDownLatch latch = new CountDownLatch(1); loopA.execute(latch::countDown); @@ -111,7 +108,7 @@ public void shutdownAfterStart() throws Exception { latch.await(); // Request the event loop thread to stop. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); assertTrue(loopA.isShutdown()); @@ -165,7 +162,7 @@ private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws Interrup final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(50); @@ -175,13 +172,13 @@ private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws Interrup allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); // Check if the task was run without a lag. Long firstTimestamp = null; - int cnt = 0; + long cnt = 0; for (Long t: timestamps) { if (firstTimestamp == null) { firstTimestamp = t; @@ -212,7 +209,7 @@ private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws Int final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { boolean empty = timestamps.isEmpty(); timestamps.add(System.nanoTime()); if (empty) { @@ -225,7 +222,7 @@ private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws Int allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -265,7 +262,7 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 3; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleWithFixedDelay(() -> { + Future f = loopA.scheduleWithFixedDelay(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(51); @@ -275,7 +272,7 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -294,7 +291,6 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter } @Test - @SuppressWarnings("deprecation") public void shutdownWithPendingTasks() throws Exception { final int NUM_TASKS = 3; final AtomicInteger ranTasks = new AtomicInteger(); @@ -321,7 +317,7 @@ public void shutdownWithPendingTasks() throws Exception { assertEquals(1, ranTasks.get()); // Shut down the event loop to test if the other tasks are run before termination. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); // Let the other tasks run. latch.countDown(); @@ -337,9 +333,8 @@ public void shutdownWithPendingTasks() throws Exception { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); // Disable logging temporarily. Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); @@ -367,9 +362,8 @@ public void testRegistrationAfterShutdown() throws Exception { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown2() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); final CountDownLatch latch = new CountDownLatch(1); Channel ch = new LocalChannel(loopA); @@ -462,17 +456,14 @@ public EventLoop next() { @Override protected void run() { - for (;;) { + do { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); } - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override @@ -504,7 +495,7 @@ protected Queue newTaskQueue(int maxPendingTasks) { @Override protected void run() { - for (;;) { + do { try { Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime()))); } catch (InterruptedException e) { @@ -513,10 +504,7 @@ protected void run() { runAllTasks(Integer.MAX_VALUE); - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 8e3c64e9274..50b15af00d2 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -98,7 +98,7 @@ public void testScheduleBigDelayNotOverflow() { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } @@ -169,7 +169,6 @@ public void channelUnregistered(SocketChannel ch, Throwable cause) { } } - @SuppressWarnings("deprecation") @Test public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() throws Exception { final AtomicReference error = new AtomicReference<>(); @@ -191,9 +190,9 @@ public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() thr } }); t.start(); - group.shutdownNow(); + Future termination = group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); t.join(); - group.terminationFuture().syncUninterruptibly(); + termination.syncUninterruptibly(); assertThat(error.get(), instanceOf(RejectedExecutionException.class)); error.set(null); } diff --git a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java index 29345c23ef2..2f529462bf7 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java @@ -23,7 +23,6 @@ import io.netty.channel.nio.NioHandler; import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -108,6 +107,7 @@ class WrappedEventLoop extends AbstractEventExecutor implements EventLoop { this.eventLoop = eventLoop; } + @Override @Test public EventLoop next() { return this; @@ -118,11 +118,6 @@ public Unsafe unsafe() { return eventLoop.unsafe(); } - @Override - public void shutdown() { - eventLoop.shutdown(); - } - @Override public boolean inEventLoop(Thread thread) { return eventLoop.inEventLoop(thread); @@ -134,12 +129,12 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return eventLoop.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return eventLoop.terminationFuture(); } @@ -159,30 +154,30 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public void execute(Runnable command) { - eventLoop.execute(command); + public void execute(Runnable task) { + eventLoop.execute(task); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return eventLoop.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return eventLoop.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { + return eventLoop.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return eventLoop.scheduleWithFixedDelay(task, initialDelay, delay, unit); } }