diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index 18c10647dde..9aabd260e36 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -504,7 +504,7 @@ private Future close0(final ChannelOutboundInvoker invoker, final Channel }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS); channel.closeFuture().addListener(ignore -> { - forceCloseFuture.cancel(false); + forceCloseFuture.cancel(); }); } }); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java index 055cde8b46a..7d6cf101be5 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientProtocolHandshakeHandler.java @@ -117,7 +117,7 @@ private void applyHandshakeTimeout() { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java index dab9bdc726c..3436a04d505 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketProtocolHandler.java @@ -125,7 +125,7 @@ private void applyCloseSentTimeout(ChannelHandlerContext ctx) { } }, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS); - closeSent.asFuture().addListener(future -> timeoutTask.cancel(false)); + closeSent.asFuture().addListener(future -> timeoutTask.cancel()); } /** diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java index 5c35b397845..1968267c3d3 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerProtocolHandshakeHandler.java @@ -157,6 +157,6 @@ private void applyHandshakeTimeout(ChannelHandlerContext ctx) { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index cde8ff15b0b..12bb1339e82 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -27,7 +27,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -900,7 +899,7 @@ private static final class ClosingChannelFutureListener implements FutureListene @Override public void operationComplete(Future sentGoAwayFuture) { if (timeoutTask != null) { - timeoutTask.cancel(false); + timeoutTask.cancel(); } doClose(); } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 1e8fc6fbee1..1158c248d9a 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -18,14 +18,15 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import static java.util.Objects.requireNonNull; + /** * Abstract base class for {@link EventExecutor} implementations. */ -public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { +public abstract class AbstractEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class); static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; @@ -43,26 +44,60 @@ public Future newSucceededFuture(V result) { } @Override - public final Future submit(Runnable task) { - return (Future) super.submit(task); + public final Future submit(Runnable task) { + var futureTask = newTaskFor(task, (Void) null); + execute(futureTask); + return futureTask; } @Override public final Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + var futureTask = newTaskFor(task, result); + execute(futureTask); + return futureTask; } @Override public final Future submit(Callable task) { - return (Future) super.submit(task); + var futureTask = newTaskFor(task); + execute(futureTask); + return futureTask; } - @Override + /** + * Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the given result at the end of executing its + * {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param runnable The task to be decorated. + * @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't + * throw an exception. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Runnable runnable, T value) { return newRunnableFuture(newPromise(), runnable, value); } - @Override + /** + * Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the + * returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of + * executing its {@link RunnableFuture#run()} method. + *

+ * The returned {@link RunnableFuture} is the task that will actually be run by a thread in this + * executor. + *

+ * This method can be overridden by sub-classes to hook into the life cycle of the given task. + * + * @param callable The task to be decorated. + * @param The type of the result value. + * @return The decorated {@link Runnable} that is now also a {@link Future}. + */ protected RunnableFuture newTaskFor(Callable callable) { return newRunnableFuture(newPromise(), callable); } @@ -85,17 +120,14 @@ static void safeExecute(Runnable task) { * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Callable task) { - return new RunnableFutureAdapter<>(promise, task); + return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task")); } /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and * {@code value}. - * - * This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different - * {@link RunnableFuture}. */ private static RunnableFuture newRunnableFuture(Promise promise, Runnable task, V value) { - return new RunnableFutureAdapter<>(promise, Executors.callable(task, value)); + return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value)); } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 478d479cb12..e61fe326b88 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.PriorityQueue; import io.netty.util.internal.PriorityQueueNode; @@ -24,12 +22,13 @@ import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.callable; + /** * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ @@ -60,7 +59,7 @@ public static long nanoTime() { static long deadlineNanos(long delay) { long deadlineNanos = nanoTime() + delay; // Guard against overflow - return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; + return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; } PriorityQueue> scheduledTaskQueue() { @@ -79,7 +78,7 @@ private static boolean isNullOrEmpty(Queue> queue /** * Cancel all scheduled tasks. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final void cancelScheduledTasks() { @@ -92,8 +91,8 @@ protected final void cancelScheduledTasks() { final RunnableScheduledFutureNode[] scheduledTasks = scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES); - for (RunnableScheduledFutureNode task: scheduledTasks) { - task.cancel(false); + for (RunnableScheduledFutureNode task : scheduledTasks) { + task.cancel(); } scheduledTaskQueue.clearIgnoringIndexes(); @@ -107,16 +106,16 @@ protected final RunnableScheduledFuture pollScheduledTask() { } /** - * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. - * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. - * + * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link + * #nanoTime()} to retrieve the correct {@code nanoTime}. + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final RunnableScheduledFuture pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } @@ -130,12 +129,12 @@ protected final RunnableScheduledFuture pollScheduledTask(long nanoTime) { /** * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final long nextScheduledTaskNano() { Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return -1; } @@ -152,30 +151,30 @@ final RunnableScheduledFuture peekScheduledTask() { /** * Returns {@code true} if a scheduled task is ready for processing. - * + *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ protected final boolean hasScheduledTasks() { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; - RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable command, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (delay < 0) { delay = 0; } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command), - deadlineNanos(unit.toNanos(delay)), 0); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(delay)), 0); return schedule(task); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable callable, long delay, TimeUnit unit) { requireNonNull(callable, "callable"); requireNonNull(unit, "unit"); if (delay < 0) { @@ -186,7 +185,7 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -198,13 +197,13 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDela String.format("period: %d (expected: > 0)", period)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); return schedule(task); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { requireNonNull(command, "command"); requireNonNull(unit, "unit"); if (initialDelay < 0) { @@ -216,15 +215,15 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD String.format("delay: %d (expected: > 0)", delay)); } - RunnableScheduledFuture task = newScheduledTaskFor(Executors.callable(command, null), - deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); + RunnableScheduledFuture task = newScheduledTaskFor( + callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); return schedule(task); } /** * Add the {@link RunnableScheduledFuture} for execution. */ - protected final ScheduledFuture schedule(final RunnableScheduledFuture task) { + protected final Future schedule(final RunnableScheduledFuture task) { if (inEventLoop()) { add0(task); } else { @@ -253,9 +252,9 @@ final void removeScheduled(final RunnableScheduledFutureNode task) { /** * Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}. - * - * This can be used if you want to override {@link #newTaskFor(Callable)} and return a different - * {@link RunnableFuture}. + *

+ * This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a + * different {@link RunnableFuture}. */ protected static RunnableScheduledFuture newRunnableScheduledFuture( AbstractScheduledEventExecutor executor, Promise promise, Callable task, @@ -271,7 +270,8 @@ protected RunnableScheduledFuture newScheduledTaskFor( return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period); } - interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { } + interface RunnableScheduledFutureNode extends PriorityQueueNode, RunnableScheduledFuture { + } private static final class DefaultRunnableScheduledFutureNode implements RunnableScheduledFutureNode { private final RunnableScheduledFuture future; @@ -308,8 +308,8 @@ public RunnableScheduledFuture addListener(FutureListener listener } @Override - public RunnableScheduledFuture addListener(C context, - FutureContextListener listener) { + public RunnableScheduledFuture addListener( + C context, FutureContextListener listener) { future.addListener(context, listener); return this; } @@ -335,8 +335,8 @@ public void run() { } @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); + public boolean cancel() { + return future.cancel(); } @Override @@ -360,12 +360,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution } @Override - public long getDelay(TimeUnit unit) { - return future.getDelay(unit); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { return future.compareTo(o); } @@ -393,11 +388,6 @@ public RunnableFuture awaitUninterruptibly() { return this; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return future.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java index 968585aa915..3dd8690f32e 100644 --- a/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java +++ b/common/src/main/java/io/netty/util/concurrent/AsynchronousResult.java @@ -25,9 +25,10 @@ */ interface AsynchronousResult { /** - * Cancel this asynchronous operation, unless it has already been completed. + * Cancel this asynchronous operation, unless it has already been completed + * or is not {@linkplain #isCancellable() cancellable}. *

- * A cancelled operation is considered to be {@linkplain #isFailed() failed}. + * A cancelled operation is considered to be {@linkplain #isDone() done} and {@linkplain #isFailed() failed}. *

* If the cancellation was successful, the result of this operation will be that it has failed with a * {@link CancellationException}. @@ -66,7 +67,11 @@ interface AsynchronousResult { boolean isDone(); /** - * returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Returns {@code true} if and only if the operation can be cancelled via {@link #cancel()}. + * Note that this is inherently racy, as the operation could be made + * {@linkplain Promise#setUncancellable() uncancellable} at any time. + * + * @return {@code true} if this operation can be cancelled. */ boolean isCancellable(); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java index 62b2b013ce7..df37edb1e79 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultFutureCompletionStage.java @@ -17,8 +17,11 @@ import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -33,7 +36,7 @@ * * @param the value type. */ -final class DefaultFutureCompletionStage implements FutureCompletionStage { +final class DefaultFutureCompletionStage implements FutureCompletionStage, java.util.concurrent.Future { private enum Marker { EMPTY, ERROR @@ -50,6 +53,31 @@ private enum Marker { this.future = future; } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + @Override public Future future() { return future; diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 9136b679544..a88a4f50509 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -384,14 +384,6 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution @Override public boolean cancel() { - return cancel(false); - } - - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ - @Override - public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); @@ -668,6 +660,15 @@ static void safeExecute(EventExecutor executor, Runnable task) { @Override public FutureCompletionStage asStage() { + return getFutureStageAdaptor(); + } + + @Override + public java.util.concurrent.Future asJdkFuture() { + return getFutureStageAdaptor(); + } + + private DefaultFutureCompletionStage getFutureStageAdaptor() { DefaultFutureCompletionStage stageAdapter = stage; if (stageAdapter == null) { stage = stageAdapter = new DefaultFutureCompletionStage<>(this); diff --git a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java index b409bbc0264..e2cd2f47ac1 100644 --- a/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/EventExecutorGroup.java @@ -15,15 +15,10 @@ */ package io.netty.util.concurrent; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_QUIET_PERIOD; import static io.netty.util.concurrent.AbstractEventExecutor.DEFAULT_SHUTDOWN_TIMEOUT; @@ -34,61 +29,88 @@ * life-cycle and allows shutting them down in a global fashion. * */ -public interface EventExecutorGroup extends ScheduledExecutorService, Iterable { - +public interface EventExecutorGroup extends Iterable, Executor { /** * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}. + *

+ * An executor group that "is shutting down" can still accept new tasks for a little while (the grace period), + * but will eventually start rejecting new tasks. + * At that point, the executor group will be {@linkplain #isShutdown() shut down}. + * + * @return {@code true} if all executors in this group have at least started shutting down, otherwise {@code false}. */ boolean isShuttingDown(); + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} have been + * {@linkplain #shutdownGracefully() shut down gracefully} and moved past the grace period so that they are no + * longer accepting any new tasks. + *

+ * An executor group that "is shut down" might still be executing tasks that it has queued up, but it will no + * longer be accepting any new tasks. + * Once all running and queued tasks have completed, the executor group will be + * {@linkplain #isTerminated() terminated}. + * + * @return {@code true} if all executors in this group have shut down and are no longer accepting any new tasks. + */ + boolean isShutdown(); + + /** + * Returns {@code true} if all {@link EventExecutor}s managed by this {@link EventExecutorGroup} are + * {@linkplain #isShutdown() shut down}, and all of their tasks have completed. + * + * @return {@code true} if all executors in this group have terminated. + */ + default boolean isTerminated() { + return terminationFuture().isDone(); + } + + /** + * Wait for this {@link EventExecutorGroup} to {@linkplain #isTerminated() terminate}, up to the given timeout. + * + * @param timeout The non-negative maximum amount of time to wait for the executor group to terminate. + * @param unit The non-null time unit of the timeout. + * @return {@code true} if the executor group terminated within the specific timeout. + * @throws InterruptedException If this thread was {@linkplain Thread#interrupt() interrupted} while waiting for + * executor group to terminate. + */ + default boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return terminationFuture().await(timeout, unit); + } + /** * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values. * * @return the {@link #terminationFuture()} */ - default Future shutdownGracefully() { + default Future shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); } /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. - * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for 'the quiet period' - * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, - * it is guaranteed to be accepted and the quiet period will start over. + * This method ensures that no tasks are submitted for 'the quiet period' (usually a couple seconds) before + * it shuts itself down. If a task is submitted during the quiet period, it is guaranteed to be accepted and the + * quiet period will start over. * * @param quietPeriod the quiet period as described in the documentation - * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} - * regardless if a task was submitted during the quiet period + * @param timeout the maximum amount of time to wait until the executor is + * {@linkplain #isShuttingDown() shutting down} regardless if a task was submitted during the quiet period. * @param unit the unit of {@code quietPeriod} and {@code timeout} * * @return the {@link #terminationFuture()} */ - Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); + Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); /** * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this * {@link EventExecutorGroup} have been terminated. + * + * @return The {@link Future} representing the termination of this {@link EventExecutorGroup}. */ - Future terminationFuture(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - void shutdown(); - - /** - * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead. - */ - @Override - @Deprecated - default List shutdownNow() { - shutdown(); - return Collections.emptyList(); - } + Future terminationFuture(); /** * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}. @@ -98,66 +120,116 @@ default List shutdownNow() { @Override Iterator iterator(); - @Override - default Future submit(Runnable task) { + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @return A future that represents the completion of the submitted task. + */ + default Future submit(Runnable task) { return next().submit(task); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that produces the given result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param result The value that the returned future will complete with, if the task completes successfully. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Runnable task, T result) { return next().submit(task, result); } - @Override + /** + * Submit the given task for execution in the next available {@link EventExecutor} in this group, + * and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup}. + * @param The type of the future result. + * @return A future that represents the completion of the submitted task. + */ default Future submit(Callable task) { return next().submit(task); } - @Override - default ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return next().schedule(command, delay, unit); - } - - @Override - default ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return next().schedule(callable, delay, unit); - } - - @Override - default ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return next().scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - default ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return next().scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - default List> invokeAll(Collection> tasks) - throws InterruptedException { - return next().invokeAll(tasks); + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that produces a {@code null} result when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Runnable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); } - @Override - default List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return next().invokeAll(tasks, timeout, unit); + /** + * Schedule the given task for execution after the given delay, in the next available {@link EventExecutor} + * in this group, and return a future that will return the result of the callable when the task completes. + * + * @param task The task that should be executed in this {@link EventExecutorGroup} after the given delay. + * @param delay A positive time delay, in the given time unit. + * @param unit The non-null time unit for the delay. + * @param The type of the future result. + * @return A future that represents the completion of the scheduled task. + */ + default Future schedule(Callable task, long delay, TimeUnit unit) { + return next().schedule(task, delay, unit); } - @Override - default T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return next().invokeAny(tasks); + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given period of time between each execution is started. + * If the task takes longer to complete than the requested period, then the following executions will be delayed, + * rather than allowing multiple instances of the task to run concurrently. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute at a fixed rate in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param period The positive period for the execution frequency to use after the first execution has started, + * in terms of the given time unit. + * @param unit The non-null time unit for the delay and period. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return next().scheduleAtFixedRate(task, initialDelay, period, unit); } - @Override - default T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return next().invokeAny(tasks, timeout, unit); + /** + * Schedule the given task for periodic execution in the next available {@link EventExecutor}. + * The first execution will occur after the given initial delay, and the following repeated executions will occur + * with the given subsequent delay between one task completing and the next task starting. + * The delay from the completion of one task, to the start of the next, stays unchanged regardless of how long a + * task takes to complete. + *

+ * This is in contrast to {@link #scheduleAtFixedRate(Runnable, long, long, TimeUnit)} which varies the delays + * between the tasks in order to hit a given frequency. + *

+ * The task will be executed repeatedly until it either fails with an exception, or its future is + * {@linkplain Future#cancel() cancelled}. The future thus will never complete successfully. + * + * @param task The task that should be scheduled to execute with fixed delays in this {@link EventExecutorGroup}. + * @param initialDelay The positive initial delay for the first task execution, in terms of the given time unit. + * @param delay The positive subsequent delay between task, to use after the first execution has completed, + * in terms of the given time unit. + * @param unit The non-null time unit for the delays. + * @return A future that represents the recurring task, and which can be cancelled to stop future executions. + */ + default Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return next().scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - default void execute(Runnable command) { - next().execute(command); + default void execute(Runnable task) { + next().execute(task); } } diff --git a/common/src/main/java/io/netty/util/concurrent/Future.java b/common/src/main/java/io/netty/util/concurrent/Future.java index 7f946fbc7ec..f2d80c85da1 100644 --- a/common/src/main/java/io/netty/util/concurrent/Future.java +++ b/common/src/main/java/io/netty/util/concurrent/Future.java @@ -150,8 +150,7 @@ * } * */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface Future extends java.util.concurrent.Future, AsynchronousResult { +public interface Future extends AsynchronousResult { /** * Adds the specified listener to this future. The specified listener is notified when this future is {@linkplain * #isDone() done}. If this future is already completed, the specified listener is notified immediately. @@ -236,14 +235,15 @@ public interface Future extends java.util.concurrent.Future, AsynchronousR boolean awaitUninterruptibly(long timeoutMillis); /** - * {@inheritDoc} - *

- * If the cancellation was successful it will fail the future with a {@link CancellationException}. + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block until it completes. + * + * @return The result of the task execution, if it completed successfully. + * @throws InterruptedException If the call was blocked, waiting for the future to complete, and the thread was + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception or through cancellation. */ - @Override - boolean cancel(boolean mayInterruptIfRunning); - - @Override default V get() throws InterruptedException, ExecutionException { await(); @@ -257,7 +257,24 @@ default V get() throws InterruptedException, ExecutionException { throw new ExecutionException(cause); } - @Override + /** + * Get the result of this future, if it has completed. + * If the future has failed, then an {@link ExecutionException} will be thrown instead. + * If the future has not yet completed, then this method will block, waiting up to the given timeout for the future + * to complete. + * If the future does not complete within the specified timeout, then a {@link TimeoutException} will be thrown. + * If the timeout is zero, then this method will not block, and instead either get the result or failure of the + * future if completed, or immediately throw a {@link TimeoutException} if not yet completed. + * + * @param timeout The non-negative maximum amount of time, in terms of the given time unit, to wait for the + * completion of the future. + * @param unit The time unit for the timeout. + * @return The value of the successfully completed future. + * @throws InterruptedException If this call was blocking and this thread got + * {@linkplain Thread#interrupt() interrupted}. + * @throws ExecutionException If the task failed, either by throwing an exception, or through cancellation. + * @throws TimeoutException If the future did not complete within the specified timeout. + */ default V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); @@ -280,6 +297,13 @@ default FutureCompletionStage asStage() { return new DefaultFutureCompletionStage<>(this); } + /** + * Returns a {@link java.util.concurrent.Future JDK Future that reflects the state of this {@link Future}. + */ + default java.util.concurrent.Future asJdkFuture() { + return new DefaultFutureCompletionStage<>(this); + } + /** * Creates a new {@link Future} that will complete with the result of this {@link Future} mapped * through the given mapper function. diff --git a/common/src/main/java/io/netty/util/concurrent/Futures.java b/common/src/main/java/io/netty/util/concurrent/Futures.java index f9eeb001c2c..09d00c921cc 100644 --- a/common/src/main/java/io/netty/util/concurrent/Futures.java +++ b/common/src/main/java/io/netty/util/concurrent/Futures.java @@ -136,7 +136,7 @@ private static final class PropagateCancel implements FutureContextListener context, Future future) throws Exception { if (future.isCancelled()) { - context.cancel(false); + context.cancel(); } } } diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index bc1bd54e78c..14b00ad5efc 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -67,7 +67,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( this, new UnsupportedOperationException()).asFuture(); private GlobalEventExecutor() { @@ -150,21 +150,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { - throw new UnsupportedOperationException(); - } - @Override public boolean isShuttingDown() { return false; diff --git a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java index d2c945320fb..32602f4867f 100644 --- a/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/ImmediateEventExecutor.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -25,6 +23,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import static java.util.Objects.requireNonNull; + /** * Executes {@link Runnable} objects in the caller's thread. If the {@link #execute(Runnable)} is reentrant it will be * queued until the original {@link Runnable} finishes execution. @@ -54,7 +54,7 @@ protected Boolean initialValue() throws Exception { } }; - private final Future terminationFuture = DefaultPromise.newFailedPromise( + private final Future terminationFuture = DefaultPromise.newFailedPromise( GlobalEventExecutor.INSTANCE, new UnsupportedOperationException()).asFuture(); private ImmediateEventExecutor() { } @@ -65,19 +65,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture; } - @Override - @Deprecated - public void shutdown() { } - @Override public boolean isShuttingDown() { return false; @@ -99,14 +95,14 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); + public void execute(Runnable task) { + requireNonNull(task, "command"); if (!RUNNING.get()) { RUNNING.set(true); try { - command.run(); + task.run(); } catch (Throwable cause) { - logger.info("Throwable caught while executing Runnable {}", command, cause); + logger.info("Throwable caught while executing Runnable {}", task, cause); } finally { Queue delayedRunnables = DELAYED_RUNNABLES.get(); Runnable runnable; @@ -120,7 +116,7 @@ public void execute(Runnable command) { RUNNING.set(false); } } else { - DELAYED_RUNNABLES.get().add(command); + DELAYED_RUNNABLES.get().add(task); } } @@ -130,23 +126,23 @@ public Promise newPromise() { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } diff --git a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java index ae05112c7c5..b22b2691b16 100644 --- a/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/MultithreadEventExecutorGroup.java @@ -38,7 +38,7 @@ public class MultithreadEventExecutorGroup implements EventExecutorGroup { private final EventExecutor[] children; private final List readonlyChildren; private final AtomicInteger terminatedChildren = new AtomicInteger(); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); private final boolean powerOfTwo; /** @@ -223,7 +223,7 @@ protected EventExecutor newChild(Executor executor, int maxPendingTasks, } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } @@ -231,18 +231,10 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - for (EventExecutor l: children) { - l.shutdown(); - } - } - @Override public final boolean isShuttingDown() { for (EventExecutor l: children) { diff --git a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java index 529127a1e0a..318dd47c862 100644 --- a/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java +++ b/common/src/main/java/io/netty/util/concurrent/NonStickyEventExecutorGroup.java @@ -15,23 +15,19 @@ */ package io.netty.util.concurrent; -import static io.netty.util.internal.ObjectUtil.checkPositive; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.UnstableApi; -import java.util.Collection; import java.util.Iterator; -import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.util.internal.ObjectUtil.checkPositive; +import static java.util.Objects.requireNonNull; + /** * {@link EventExecutorGroup} which will preserve {@link Runnable} execution order but makes no guarantees about what * {@link EventExecutor} (and therefore {@link Thread}) will be used to execute the {@link Runnable}s. @@ -83,32 +79,20 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully() { + public Future shutdownGracefully() { return group.shutdownGracefully(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return group.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return group.terminationFuture(); } - @SuppressWarnings("deprecation") - @Override - public void shutdown() { - group.shutdown(); - } - - @SuppressWarnings("deprecation") - @Override - public List shutdownNow() { - return group.shutdownNow(); - } - @Override public EventExecutor next() { return newExecutor(group.next()); @@ -136,7 +120,7 @@ public void remove() { } @Override - public Future submit(Runnable task) { + public Future submit(Runnable task) { return group.submit(task); } @@ -151,23 +135,23 @@ public Future submit(Callable task) { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return group.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return group.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return group.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return group.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return group.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return group.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return group.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override @@ -186,31 +170,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public List> invokeAll( - Collection> tasks) throws InterruptedException { - return group.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return group.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return group.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return group.invokeAny(tasks, timeout, unit); - } - - @Override - public void execute(Runnable command) { - group.execute(command); + public void execute(Runnable task) { + group.execute(task); } private static final class NonStickyOrderedEventExecutor extends AbstractEventExecutor @@ -294,20 +255,15 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShutdown() { return executor.isShutdown(); @@ -324,8 +280,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException(); } if (state.compareAndSet(NONE, SUBMITTED)) { @@ -336,25 +292,25 @@ public void execute(Runnable command) { } @Override - public ScheduledFuture schedule(Runnable command, long delay, - TimeUnit unit) { + public Future schedule(Runnable task, long delay, + TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } } diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index 32698bf755c..1c88cd7e3d0 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -121,6 +121,11 @@ protected final boolean setUncancellableInternal() { return super.setUncancellable(); } + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return cancel(); + } + @Override protected StringBuilder toStringBuilder() { StringBuilder buf = super.toStringBuilder(); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java index 93613b551fb..ab829256ba6 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFuture.java @@ -18,8 +18,7 @@ /** * A combination of {@link java.util.concurrent.RunnableFuture} and {@link Future}. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableFuture extends java.util.concurrent.RunnableFuture, Future { +public interface RunnableFuture extends Runnable, Future { @Override RunnableFuture addListener(FutureListener listener); diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java index 9ba77d8c076..4069d8e4cce 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableFutureAdapter.java @@ -15,8 +15,6 @@ */ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; @@ -24,6 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Objects.requireNonNull; + final class RunnableFutureAdapter implements RunnableFuture { private final Promise promise; @@ -134,14 +134,9 @@ public void run() { } } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return future.cancel(mayInterruptIfRunning); - } - @Override public boolean cancel() { - return cancel(false); + return future.cancel(); } @Override diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java index b286d02a929..5376760992b 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFuture.java @@ -16,12 +16,17 @@ package io.netty.util.concurrent; /** - * A combination of {@link java.util.concurrent.RunnableScheduledFuture}, {@link RunnableFuture} and - * {@link ScheduledFuture}. + * A combination of {@link RunnableFuture} and {@link Comparable} (sorting by their next deadline), + * with additional methods for scheduling, periodicity, and delay. */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface RunnableScheduledFuture extends - java.util.concurrent.RunnableScheduledFuture, RunnableFuture, ScheduledFuture { +public interface RunnableScheduledFuture extends RunnableFuture, Comparable> { + /** + * Return {@code true} if the task is periodic, which means it may be executed multiple times, as opposed to a + * delayed task or a normal task, that only execute once. + * + * @return {@code true} if this task is periodic, otherwise {@code false}. + */ + boolean isPeriodic(); /** * Returns the deadline in nanos when the {@link #run()} method should be called again. diff --git a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java index 5129ef5a382..2d1cd510f1b 100644 --- a/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty/util/concurrent/RunnableScheduledFutureAdapter.java @@ -16,19 +16,17 @@ package io.netty.util.concurrent; -import static java.util.Objects.requireNonNull; - import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.StringUtil; import java.util.concurrent.Callable; -import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -@SuppressWarnings("ComparableImplementedButEqualsNotOverridden") +import static java.util.Objects.requireNonNull; + final class RunnableScheduledFutureAdapter implements AbstractScheduledEventExecutor.RunnableScheduledFutureNode { private static final AtomicLong NEXT_TASK_ID = new AtomicLong(); @@ -75,12 +73,7 @@ public long delayNanos(long currentTimeNanos) { } @Override - public long getDelay(TimeUnit unit) { - return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); - } - - @Override - public int compareTo(Delayed o) { + public int compareTo(RunnableScheduledFuture o) { if (this == o) { return 0; } @@ -100,6 +93,23 @@ public int compareTo(Delayed o) { } } + @Override + public int hashCode() { + return Long.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof RunnableScheduledFutureAdapter) { + RunnableScheduledFutureAdapter adaptor = (RunnableScheduledFutureAdapter) obj; + return id == adaptor.id; + } + return false; + } + @Override public void run() { try { @@ -130,23 +140,15 @@ public void run() { } } - /** - * @param mayInterruptIfRunning this value has no effect in this implementation. - */ @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean canceled = future.cancel(mayInterruptIfRunning); + public boolean cancel() { + boolean canceled = future.cancel(); if (canceled) { executor.removeScheduled(this); } return canceled; } - @Override - public boolean cancel() { - return cancel(false); - } - @Override public boolean isSuccess() { return promise.isSuccess(); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java deleted file mode 100644 index a6a91f63142..00000000000 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFuture.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -/** - * The result of an scheduled asynchronous operation. - */ -@SuppressWarnings("ClassNameSameAsAncestorName") -public interface ScheduledFuture extends Future, java.util.concurrent.ScheduledFuture { -} diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 3bd35150885..7ccd11b1186 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -22,21 +22,17 @@ import java.lang.Thread.State; import java.util.ArrayList; -import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -95,7 +91,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im private volatile long gracefulShutdownTimeout; private long gracefulShutdownStartTime; - private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); + private final Promise terminationFuture = new DefaultPromise<>(GlobalEventExecutor.INSTANCE); /** * Create a new instance @@ -500,7 +496,7 @@ private boolean runShutdownHooks() { } @Override - public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } @@ -538,7 +534,6 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - //System.err.println(oldState + " " + newState + " " + this); break; } } @@ -560,58 +555,10 @@ public final Future shutdownGracefully(long quietPeriod, long timeout, TimeUn } @Override - public final Future terminationFuture() { + public final Future terminationFuture() { return terminationFuture.asFuture(); } - @Override - @Deprecated - public final void shutdown() { - if (isShutdown()) { - return; - } - - boolean inEventLoop = inEventLoop(); - boolean wakeup; - int oldState; - for (;;) { - if (isShuttingDown()) { - return; - } - int newState; - wakeup = true; - oldState = state; - if (inEventLoop) { - newState = ST_SHUTDOWN; - } else { - switch (oldState) { - case ST_NOT_STARTED: - case ST_STARTED: - case ST_SHUTTING_DOWN: - newState = ST_SHUTDOWN; - break; - default: - newState = oldState; - wakeup = false; - } - } - if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { - break; - } - } - - if (ensureThreadStarted(oldState)) { - return; - } - - if (wakeup) { - taskQueue.offer(WAKEUP_TASK); - if (!addTaskWakesUp) { - wakeup(inEventLoop); - } - } - } - @Override public final boolean isShuttingDown() { return state >= ST_SHUTTING_DOWN; @@ -732,39 +679,6 @@ public void execute(Runnable task) { } } - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throwIfInEventLoop("invokeAny"); - return super.invokeAny(tasks, timeout, unit); - } - - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks); - } - - @Override - public List> invokeAll( - Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - throwIfInEventLoop("invokeAll"); - return super.invokeAll(tasks, timeout, unit); - } - - private void throwIfInEventLoop(String method) { - if (inEventLoop()) { - throw new RejectedExecutionException("Calling " + method + " from within the EventLoop is not allowed"); - } - } - /** * Returns the {@link ThreadProperties} of the {@link Thread} that powers the {@link SingleThreadEventExecutor}. * If the {@link SingleThreadEventExecutor} is not started yet, this operation will start it and block until diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index bd5eac08f43..4273e06c0d0 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -18,7 +18,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.RejectedExecutionHandler; @@ -36,25 +36,28 @@ * * Because it provides no ordering care should be taken when using it! */ -public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolExecutor implements EventExecutor { +@SuppressWarnings("unchecked") +public final class UnorderedThreadPoolEventExecutor implements EventExecutor { private static final InternalLogger logger = InternalLoggerFactory.getInstance( UnorderedThreadPoolEventExecutor.class); - private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final Promise terminationFuture = GlobalEventExecutor.INSTANCE.newPromise(); + private final InnerScheduledThreadPoolExecutor executor; /** * Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)} * using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class)); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** * See {@link ScheduledThreadPoolExecutor#ScheduledThreadPoolExecutor(int, ThreadFactory)} */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory); } /** @@ -62,7 +65,8 @@ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFa * ThreadFactory, java.util.concurrent.RejectedExecutionHandler)} using {@link DefaultThreadFactory}. */ public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandler handler) { - this(corePoolSize, new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class), handler); + DefaultThreadFactory threadFactory = new DefaultThreadFactory(UnorderedThreadPoolEventExecutor.class); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } /** @@ -70,7 +74,7 @@ public UnorderedThreadPoolEventExecutor(int corePoolSize, RejectedExecutionHandl */ public UnorderedThreadPoolEventExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, threadFactory, handler); + executor = new InnerScheduledThreadPoolExecutor(this, corePoolSize, threadFactory, handler); } @Override @@ -84,94 +88,97 @@ public boolean isShuttingDown() { } @Override - public List shutdownNow() { - List tasks = super.shutdownNow(); - terminationFuture.trySuccess(null); - return tasks; + public boolean isShutdown() { + return executor.isShutdown(); } @Override - public void shutdown() { - super.shutdown(); - terminationFuture.trySuccess(null); + public boolean isTerminated() { + return executor.isTerminated(); } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executor.awaitTermination(timeout, unit); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { // TODO: At the moment this just calls shutdown but we may be able to do something more smart here which // respects the quietPeriod and timeout. - shutdown(); + executor.shutdown(); return terminationFuture(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return terminationFuture.asFuture(); } @Override - protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { - return runnable instanceof NonNotifyRunnable ? - task : new RunnableScheduledFutureTask<>(this, runnable, task); - } - - @Override - protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask<>(this, callable, task); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return (ScheduledFuture) super.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return (Future) executor.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return (ScheduledFuture) super.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return (Future) executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return (ScheduledFuture) super.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { + return (Future) executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } @Override - public Future submit(Runnable task) { - return (Future) super.submit(task); + public Future submit(Runnable task) { + return (Future) executor.submit(task); } @Override public Future submit(Runnable task, T result) { - return (Future) super.submit(task, result); + return (Future) executor.submit(task, result); } @Override public Future submit(Callable task) { - return (Future) super.submit(task); + return (Future) executor.submit(task); } @Override - public void execute(Runnable command) { - super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS); + public void execute(Runnable task) { + executor.schedule(new NonNotifyRunnable(task), 0, NANOSECONDS); } + /** + * Return the task queue of the underlying {@link java.util.concurrent.Executor} instance. + *

+ * Visible for testing. + * + * @return The task queue of this executor. + */ + BlockingQueue getQueue() { + return executor.getQueue(); + } + + /** + * Note: this class has a natural ordering that is inconsistent with equals. + */ private static final class RunnableScheduledFutureTask extends PromiseTask - implements RunnableScheduledFuture, ScheduledFuture { + implements RunnableScheduledFuture { private final RunnableScheduledFuture future; - RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFuture future) { super(executor, runnable, null); this.future = future; } - RunnableScheduledFutureTask(EventExecutor executor, Callable callable, - RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, Callable callable, RunnableScheduledFuture future) { super(executor, callable); this.future = future; } @@ -228,4 +235,30 @@ public void run() { task.run(); } } + + private static final class InnerScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private final EventExecutor eventExecutor; + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + this.eventExecutor = eventExecutor; + } + + InnerScheduledThreadPoolExecutor(EventExecutor eventExecutor, int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + this.eventExecutor = eventExecutor; + } + + @Override + protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { + return runnable instanceof NonNotifyRunnable ? + task : new RunnableScheduledFutureTask<>(eventExecutor, runnable, task); + } + + @Override + protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { + return new RunnableScheduledFutureTask<>(eventExecutor, callable, task); + } + } } diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java index f61ff387ff4..a0ae541103b 100644 --- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -35,8 +35,8 @@ public class AbstractScheduledEventExecutorTest { @Test public void testScheduleRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -44,8 +44,8 @@ public void testScheduleRunnableZero() { @Test public void testScheduleRunnableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_RUNNABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -53,8 +53,8 @@ public void testScheduleRunnableNegative() { @Test public void testScheduleCallableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, 0, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } @@ -62,12 +62,16 @@ public void testScheduleCallableZero() { @Test public void testScheduleCallableNegative() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); - ScheduledFuture future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); - assertEquals(0, future.getDelay(TimeUnit.NANOSECONDS)); + Future future = executor.schedule(TEST_CALLABLE, -1, TimeUnit.NANOSECONDS); + assertEquals(0, getDelay(future)); assertNotNull(executor.pollScheduledTask()); assertNull(executor.pollScheduledTask()); } + private static long getDelay(Future future) { + return ((RunnableScheduledFuture) future).delayNanos(); + } + @Test public void testScheduleAtFixedRateRunnableZero() { TestScheduledEventExecutor executor = new TestScheduledEventExecutor(); @@ -113,17 +117,12 @@ public boolean inEventLoop(Thread thread) { } @Override - public void shutdown() { - // NOOP - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } @@ -143,7 +142,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { throw new UnsupportedOperationException(); } } diff --git a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java index bc276dc988b..74aa677e046 100644 --- a/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java +++ b/common/src/test/java/io/netty/util/concurrent/DefaultPromiseTest.java @@ -80,19 +80,15 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return null; } @Override - public Future terminationFuture() { + public Future terminationFuture() { return null; } - @Override - public void shutdown() { - } - @Override public boolean isShutdown() { return false; @@ -109,23 +105,23 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + public Future schedule(Runnable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + public Future schedule(Callable task, long delay, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { return fail("Cannot schedule commands"); } @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, - TimeUnit unit) { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, + TimeUnit unit) { return fail("Cannot schedule commands"); } @@ -135,7 +131,7 @@ public boolean inEventLoop(Thread thread) { } @Override - public void execute(Runnable command) { + public void execute(Runnable task) { fail("Cannot schedule commands"); } } @@ -145,7 +141,7 @@ public void testCancelDoesNotScheduleWhenNoListeners() { EventExecutor executor = new RejectingEventExecutor(); DefaultPromise promise = new DefaultPromise(executor); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); } @@ -174,23 +170,33 @@ public void testFailureDoesNotScheduleWhenNoListeners() { @Test public void testCancellationExceptionIsThrownWhenBlockingGet() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, promise::get); } @Test public void testCancellationExceptionIsThrownWhenBlockingGetWithTimeout() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThrows(CancellationException.class, () -> promise.get(1, TimeUnit.SECONDS)); } @Test public void testCancellationExceptionIsReturnedAsCause() throws Exception { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertThat(promise.cause()).isInstanceOf(CancellationException.class); assertTrue(promise.isFailed()); + assertTrue(promise.isDone()); + } + + @Test + public void uncancellablePromiseIsNotDone() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + assertFalse(promise.isDone()); + assertFalse(promise.isCancellable()); + assertFalse(promise.isCancelled()); } @Test @@ -390,6 +396,22 @@ public void setUncancellableGetNow() { assertEquals("success", promise.getNow()); } + @Test + public void cancellingUncancellablePromiseDoesNotCompleteIt() { + DefaultPromise promise = new DefaultPromise<>(INSTANCE); + promise.setUncancellable(); + promise.cancel(); + assertFalse(promise.isCancelled()); + assertFalse(promise.isDone()); + assertFalse(promise.isFailed()); + assertFalse(promise.isSuccess()); + promise.setSuccess(null); + assertFalse(promise.isCancelled()); + assertTrue(promise.isDone()); + assertFalse(promise.isFailed()); + assertTrue(promise.isSuccess()); + } + @Test public void throwUncheckedSync() throws InterruptedException { Exception exception = new Exception(); @@ -421,7 +443,7 @@ public void throwUncheckedSyncUninterruptibly() { @Test public void throwCancelled() throws InterruptedException { DefaultPromise promise = new DefaultPromise<>(INSTANCE); - promise.cancel(true); + promise.cancel(); assertThrows(CancellationException.class, promise::sync); } diff --git a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java index 862e52f213a..75109a9115b 100644 --- a/common/src/test/java/io/netty/util/concurrent/FuturesTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FuturesTest.java @@ -90,7 +90,7 @@ public void mapMustNotFailOriginalFutureWhenMapperFunctionThrows() { public void cancelOnFutureFromMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -99,7 +99,7 @@ public void cancelOnFutureFromMapMustCancelOriginalFuture() { public void cancelOnOriginalFutureMustCancelFutureFromMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.map(i -> i.toString()); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -165,7 +165,7 @@ public void flatMapMustNotFailOriginalFutureWhenMapperFunctionThrows() { public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - strFut.cancel(false); + strFut.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -174,7 +174,7 @@ public void cancelOnFutureFromFlatMapMustCancelOriginalFuture() { public void cancelOnOriginalFutureMustCancelFutureFromFlatMap() { DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> INSTANCE.newSucceededFuture(i.toString())); - promise.cancel(false); + promise.cancel(); assertTrue(promise.isCancelled()); assertTrue(strFut.isCancelled()); } @@ -184,7 +184,7 @@ public void cancelOnFutureFromFlatMapMapperMustCancelReturnedFuture() throws Exc DefaultPromise promise = new DefaultPromise<>(INSTANCE); Future strFut = promise.flatMap(i -> { Future future = new DefaultPromise<>(INSTANCE); - future.cancel(false); + future.cancel(); return future; }); @@ -254,7 +254,7 @@ public void cascadeToCancel() throws Exception { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise.cancel(false)); + assertTrue(promise.cancel()); assertTrue(promise.isCancelled()); assertTrue(promise2.await(1, SECONDS)); assertTrue(promise2.isCancelled()); @@ -267,7 +267,7 @@ public void cascadeToCancelSecond() throws Exception { DefaultPromise promise2 = new DefaultPromise<>(executor); promise.cascadeTo(promise2); - assertTrue(promise2.cancel(false)); + assertTrue(promise2.cancel()); assertTrue(promise2.isCancelled()); // diff --git a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java index 1837bb1c96b..78550e05049 100644 --- a/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/GlobalEventExecutorTest.java @@ -38,11 +38,7 @@ public class GlobalEventExecutorTest { @BeforeEach public void setUp() throws Exception { // Wait until the global executor is stopped (just in case there is a task running due to previous test cases) - for (;;) { - if (e.thread == null || !e.thread.isAlive()) { - break; - } - + while (e.thread != null && e.thread.isAlive()) { Thread.sleep(50); } } @@ -76,7 +72,7 @@ public void testAutomaticStartStop() throws Exception { @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS) public void testScheduledTasks() throws Exception { TestRunnable task = new TestRunnable(0); - ScheduledFuture f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(task, 1500, TimeUnit.MILLISECONDS); f.sync(); assertThat(task.ran.get(), is(true)); @@ -115,7 +111,7 @@ public void testTakeTask() throws Exception { //add scheduled task TestRunnable scheduledTask = new TestRunnable(0); - ScheduledFuture f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(0); @@ -134,7 +130,7 @@ public void testTakeTaskAlwaysHasTask() throws Exception { //for https://github.com/netty/netty/issues/1614 //add scheduled task TestRunnable t = new TestRunnable(0); - final ScheduledFuture f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = e.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index ddfe32587bf..379990626d1 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -17,10 +17,7 @@ import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.Queue; -import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -99,82 +96,6 @@ protected void run() { executor.shutdownGracefully(); } - @Test - public void testInvokeAnyInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAnyInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(true, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoop() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, false)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - @Test - public void testInvokeAllInEventLoopWithTimeout() throws Throwable { - assertTimeoutPreemptively(ofSeconds(3), () -> { - var exception = assertThrows(CompletionException.class, () -> testInvokeInEventLoop(false, true)); - assertThat(exception).hasCauseInstanceOf(RejectedExecutionException.class); - }); - } - - private static void testInvokeInEventLoop(final boolean any, final boolean timeout) { - final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(Executors.defaultThreadFactory()) { - @Override - protected void run() { - while (!confirmShutdown()) { - Runnable task = takeTask(); - if (task != null) { - task.run(); - } - } - } - }; - try { - final Promise promise = executor.newPromise(); - executor.execute(() -> { - try { - Set> set = Collections.singleton(() -> { - promise.setFailure(new AssertionError("Should never execute the Callable")); - return Boolean.TRUE; - }); - if (any) { - if (timeout) { - executor.invokeAny(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAny(set); - } - } else { - if (timeout) { - executor.invokeAll(set, 10, TimeUnit.SECONDS); - } else { - executor.invokeAll(set); - } - } - promise.setFailure(new AssertionError("Should never reach here")); - } catch (Throwable cause) { - promise.setFailure(cause); - } - }); - promise.asFuture().syncUninterruptibly(); - } finally { - executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); - } - } - @Test public void testTaskAddedAfterShutdownNotAbandoned() throws Exception { @@ -266,7 +187,7 @@ protected void run() { //add scheduled task TestRunnable scheduledTask = new TestRunnable(); - ScheduledFuture f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(scheduledTask , 1500, TimeUnit.MILLISECONDS); //add task TestRunnable afterTask = new TestRunnable(); @@ -300,7 +221,7 @@ protected void run() { assertTimeoutPreemptively(ofSeconds(5), () -> { //add scheduled task TestRunnable t = new TestRunnable(); - final ScheduledFuture f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); + Future f = executor.schedule(t, 1500, TimeUnit.MILLISECONDS); //ensure always has at least one task in taskQueue //check if scheduled tasks are triggered diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java index aff141610ee..caeabdfbe58 100644 --- a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -65,7 +65,7 @@ public void run() { try { latch.await(); } finally { - future.cancel(true); + future.cancel(); executor.shutdownGracefully(); } } @@ -105,4 +105,41 @@ public String call() { executor.shutdownGracefully(); } } + + @Test + public void futuresMustHaveCorrectExecutor() { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + Runnable runnable = () -> { + }; + Callable callable = () -> null; + Future future = null; + + try { + future = executor.schedule(runnable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.schedule(callable, 0, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.scheduleWithFixedDelay(runnable, 0, 1, TimeUnit.MILLISECONDS); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(runnable); + assertSame(executor, future.executor()); + + future.cancel(); + future = executor.submit(callable); + assertSame(executor, future.executor()); + } finally { + future.cancel(); + executor.shutdownGracefully(); + } + } } diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java index e915577f980..82f2fc76764 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/ProxyHandler.java @@ -348,7 +348,7 @@ private void failPendingWritesAndClose(Throwable cause) { private void cancelConnectTimeoutFuture() { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); connectTimeoutFuture = null; } } diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index 62bb9e7e69b..aaea42c3a83 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -205,7 +205,7 @@ private void scheduleFlush(final ChannelHandlerContext ctx) { private void cancelScheduledFlush() { if (nextScheduledFlush != null) { - nextScheduledFlush.cancel(false); + nextScheduledFlush.cancel(); nextScheduledFlush = null; } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index c26c596f72b..35551c3b732 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -2038,7 +2038,7 @@ private void applyHandshakeTimeout() { }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); // Cancel the handshake timeout when handshake is finished. - localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false)); + localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel()); } private void forceFlush(ChannelHandlerContext ctx) { @@ -2088,7 +2088,7 @@ private void safeClose( // Close the connection if close_notify is sent in time. flushFuture.addListener(f -> { if (timeoutFuture != null) { - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis; if (closeNotifyReadTimeout <= 0) { @@ -2122,7 +2122,7 @@ private void safeClose( // Do the close once we received the close_notify. closeFuture.addListener(future -> { if (closeNotifyReadTimeoutFuture != null) { - closeNotifyReadTimeoutFuture.cancel(false); + closeNotifyReadTimeoutFuture.cancel(); } if (ctx.channel().isActive()) { addCloseListener(ctx.close(), promise); diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index 258c5525bf1..7ce25394ca3 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -348,15 +348,15 @@ private void destroy() { state = 2; if (readerIdleTimeout != null) { - readerIdleTimeout.cancel(false); + readerIdleTimeout.cancel(); readerIdleTimeout = null; } if (writerIdleTimeout != null) { - writerIdleTimeout.cancel(false); + writerIdleTimeout.cancel(); writerIdleTimeout = null; } if (allIdleTimeout != null) { - allIdleTimeout.cancel(false); + allIdleTimeout.cancel(); allIdleTimeout = null; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index e5131bbc92e..d8ceaaa5990 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -117,7 +117,7 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { lastTask = null; while (task != null) { assert task.ctx.executor().inEventLoop(); - task.scheduledFuture.cancel(false); + task.scheduledFuture.cancel(); WriteTimeoutTask prev = task.prev; task.prev = null; task.next = null; @@ -214,7 +214,7 @@ public void run() { @Override public void operationComplete(Future future) throws Exception { // scheduledFuture has already be set when reaching here - scheduledFuture.cancel(false); + scheduledFuture.cancel(); // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the // modification so its picked up by the executor.. diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java index aa0ec578087..0915b6f373d 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficCounter.java @@ -15,13 +15,13 @@ */ package io.netty.handler.traffic; -import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; - import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler.PerChannel; +import io.netty.util.concurrent.EventExecutorGroup; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE; + /** * Version for {@link GlobalChannelTrafficShapingHandler}. * This TrafficCounter is the Global one, and its special property is to directly handle @@ -36,7 +36,7 @@ public class GlobalChannelTrafficCounter extends TrafficCounter { * @param checkInterval the checkInterval in millisecond between two computations. */ public GlobalChannelTrafficCounter(GlobalChannelTrafficShapingHandler trafficShapingHandler, - ScheduledExecutorService executor, String name, long checkInterval) { + EventExecutorGroup executor, String name, long checkInterval) { super(trafficShapingHandler, executor, name, checkInterval); checkNotNullWithIAE(executor, "executor"); } @@ -111,7 +111,7 @@ public synchronized void stop() { resetAccounting(milliSecondFromNano()); trafficShapingHandler.doAccounting(this); if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java index 8a3de8743c3..faa780845cb 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalChannelTrafficShapingHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.Attribute; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.logging.InternalLogger; @@ -33,7 +34,6 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -149,7 +149,7 @@ static final class PerChannel { /** * Create the global TrafficCounter */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { // Default setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION); checkNotNullWithIAE(executor, "executor"); @@ -167,7 +167,7 @@ protected int userDefinedWritabilityIndex() { * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -182,7 +182,7 @@ protected int userDefinedWritabilityIndex() { * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval, long maxTime) { @@ -196,7 +196,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -209,7 +209,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit, long checkInterval) { @@ -223,7 +223,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeGlobalLimit * 0 or a limit in bytes/s * @param readGlobalLimit @@ -233,7 +233,7 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * @param readChannelLimit * 0 or a limit in bytes/s */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long writeGlobalLimit, long readGlobalLimit, long writeChannelLimit, long readChannelLimit) { super(writeGlobalLimit, readGlobalLimit); @@ -246,12 +246,12 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } @@ -260,9 +260,9 @@ public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, lon * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. */ - public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) { + public GlobalChannelTrafficShapingHandler(EventExecutorGroup executor) { createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index 89a58ac0117..fbd16c8c48b 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Promise; import java.util.ArrayDeque; @@ -104,7 +105,7 @@ private static final class PerChannel { /** * Create the global TrafficCounter. */ - void createGlobalTrafficCounter(ScheduledExecutorService executor) { + void createGlobalTrafficCounter(EventExecutorGroup executor) { requireNonNull(executor, "executor"); TrafficCounter tc = new TrafficCounter(this, executor, "GlobalTC", checkInterval); setTrafficCounter(tc); @@ -120,7 +121,7 @@ protected int userDefinedWritabilityIndex() { * Create a new instance. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -131,7 +132,7 @@ protected int userDefinedWritabilityIndex() { * @param maxTime * The maximum delay to wait in case of traffic excess. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, long readLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit, long checkInterval, long maxTime) { super(writeLimit, readLimit, checkInterval, maxTime); createGlobalTrafficCounter(executor); @@ -142,7 +143,7 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit @@ -151,8 +152,8 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, - long readLimit, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, + long readLimit, long checkInterval) { super(writeLimit, readLimit, checkInterval); createGlobalTrafficCounter(executor); } @@ -162,13 +163,13 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param writeLimit * 0 or a limit in bytes/s * @param readLimit * 0 or a limit in bytes/s */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long writeLimit, + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long writeLimit, long readLimit) { super(writeLimit, readLimit); createGlobalTrafficCounter(executor); @@ -179,12 +180,12 @@ public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long write * default max time as delay allowed value of 15000 ms and no limit. * * @param executor - * the {@link ScheduledExecutorService} to use for the {@link TrafficCounter}. + * the {@link EventExecutorGroup} to use for the {@link TrafficCounter}. * @param checkInterval * The delay between two computations of performances for * channels or 0 if no stats are to be computed. */ - public GlobalTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) { + public GlobalTrafficShapingHandler(EventExecutorGroup executor, long checkInterval) { super(checkInterval); createGlobalTrafficCounter(executor); } diff --git a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java index 283e01166eb..fd384e2de29 100644 --- a/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java +++ b/handler/src/main/java/io/netty/handler/traffic/TrafficCounter.java @@ -15,11 +15,13 @@ */ package io.netty.handler.traffic; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -148,7 +150,7 @@ public static long milliSecondFromNano() { /** * Executor that will run the monitor */ - final ScheduledExecutorService executor; + final EventExecutorGroup executor; /** * Monitor created once in start() */ @@ -156,7 +158,7 @@ public static long milliSecondFromNano() { /** * used in stop() to cancel the timer */ - volatile ScheduledFuture scheduledFuture; + volatile Future scheduledFuture; /** * Is Monitor active @@ -211,7 +213,7 @@ public synchronized void stop() { trafficShapingHandler.doAccounting(this); } if (scheduledFuture != null) { - scheduledFuture.cancel(true); + scheduledFuture.cancel(); } } @@ -252,7 +254,7 @@ synchronized void resetAccounting(long newLastTime) { * @param checkInterval * the checkInterval in millisecond between two computations. */ - public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) { + public TrafficCounter(EventExecutor executor, String name, long checkInterval) { requireNonNull(name, "name"); trafficShapingHandler = null; @@ -277,7 +279,7 @@ public TrafficCounter(ScheduledExecutorService executor, String name, long check * the checkInterval in millisecond between two computations. */ public TrafficCounter( - AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor, + AbstractTrafficShapingHandler trafficShapingHandler, EventExecutorGroup executor, String name, long checkInterval) { this.name = requireNonNull(name, "name"); this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler"); @@ -304,13 +306,12 @@ private void init(long checkInterval) { public void configure(long newCheckInterval) { long newInterval = newCheckInterval / 10 * 10; if (checkInterval.getAndSet(newInterval) != newInterval) { + stop(); if (newInterval <= 0) { - stop(); // No more active monitoring lastTime.set(milliSecondFromNano()); } else { // Restart - stop(); start(); } } diff --git a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java index 22a57ddef13..80dff79e5c7 100644 --- a/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/traffic/TrafficShapingHandlerTest.java @@ -30,11 +30,12 @@ import io.netty.channel.local.LocalServerChannel; import io.netty.util.Attribute; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.SingleThreadEventExecutor; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -42,14 +43,14 @@ public class TrafficShapingHandlerTest { private static final long READ_LIMIT_BYTES_PER_SECOND = 1; - private static final ScheduledExecutorService SES = Executors.newSingleThreadScheduledExecutor(); + private static final EventExecutorGroup SES = new SingleThreadEventExecutor(); private static final MultithreadEventLoopGroup GROUP = new MultithreadEventLoopGroup(1, LocalHandler.newFactory()); @AfterAll public static void destroy() { GROUP.shutdownGracefully(); - SES.shutdown(); + SES.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @Test diff --git a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java index 36f3ee9e0da..6d5b9223d93 100644 --- a/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/channel/epoll/EpollSocketChannelBenchmark.java @@ -126,7 +126,7 @@ public Future write(ChannelHandlerContext ctx, Object msg) { public void tearDown() throws Exception { chan.close().sync(); serverChan.close().sync(); - future.cancel(true); + future.cancel(); group.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync(); abyte.release(); } diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java index c5ac0dbf812..0bbfdf5b734 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/BurstCostExecutorsBenchmark.java @@ -23,8 +23,14 @@ import io.netty.channel.nio.NioHandler; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; +import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor; import io.netty.util.internal.PlatformDependent; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -38,17 +44,12 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.infra.Blackhole; -import java.util.Collection; -import java.util.List; +import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -60,18 +61,17 @@ public class BurstCostExecutorsBenchmark extends AbstractMicrobenchmark { * This executor is useful as the best burst latency performer because it won't go to sleep and won't be hit by the * cost of being awaken on both offer/consumer side. */ - private static final class SpinExecutorService implements ExecutorService { - + private static final class SpinExecutorService implements EventExecutorGroup { private static final Runnable POISON_PILL = () -> { }; private final Queue tasks; private final AtomicBoolean poisoned = new AtomicBoolean(); private final Thread executorThread; + private final Promise terminationFuture = ImmediateEventExecutor.INSTANCE.newPromise(); SpinExecutorService(int maxTasks) { tasks = PlatformDependent.newFixedMpscQueue(maxTasks); executorThread = new Thread(() -> { - final Queue tasks = SpinExecutorService.this.tasks; Runnable task; while ((task = tasks.poll()) != POISON_PILL) { if (task != null) { @@ -83,22 +83,8 @@ private static final class SpinExecutorService implements ExecutorService { } @Override - public void shutdown() { - if (poisoned.compareAndSet(false, true)) { - while (!tasks.offer(POISON_PILL)) { - // Just try again - } - try { - executorThread.join(); - } catch (InterruptedException e) { - //We're quite trusty :) - } - } - } - - @Override - public List shutdownNow() { - throw new UnsupportedOperationException(); + public boolean isShuttingDown() { + return poisoned.get(); } @Override @@ -122,45 +108,78 @@ public Future submit(Callable task) { } @Override - public Future submit(Runnable task, T result) { + public Future schedule(Runnable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future submit(Runnable task) { + public Future schedule(Callable task, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException { + public Future scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { + public Future submit(Runnable task, T result) { throw new UnsupportedOperationException(); } @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public Future submit(Runnable task) { throw new UnsupportedOperationException(); } @Override - public void execute(Runnable command) { - if (!tasks.offer(command)) { + public void execute(Runnable task) { + if (!tasks.offer(task)) { throw new RejectedExecutionException( "If that happens, there is something wrong with the available capacity/burst size"); } } + + @Override + public Future shutdownGracefully() { + if (poisoned.compareAndSet(false, true)) { + while (!tasks.offer(POISON_PILL)) { + // Just try again + } + try { + executorThread.join(); + } catch (InterruptedException e) { + //We're quite trusty :) + } + } + terminationFuture.trySuccess(null); + return terminationFuture.asFuture(); + } + + @Override + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + return shutdownGracefully(); + } + + @Override + public Future terminationFuture() { + return terminationFuture.asFuture(); + } + + @Override + public EventExecutor next() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } } private enum ExecutorType { @@ -179,8 +198,8 @@ private enum ExecutorType { @Param({ "0", "10" }) private int work; - private ExecutorService executor; - private ExecutorService executorToShutdown; + private EventExecutorGroup executor; + private EventExecutorGroup executorToShutdown; @Setup public void setup() { @@ -199,7 +218,7 @@ public void setup() { executorToShutdown = executor; break; case juc: - executor = Executors.newSingleThreadScheduledExecutor(); + executor = new UnorderedThreadPoolEventExecutor(1); executorToShutdown = executor; break; case nioEventLoop: @@ -230,7 +249,7 @@ public void setup() { @TearDown public void tearDown() { - executorToShutdown.shutdown(); + executorToShutdown.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } @State(Scope.Thread) @@ -255,7 +274,7 @@ public void setup(BurstCostExecutorsBenchmark bench) { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } else { completeTask = () -> { @@ -263,7 +282,7 @@ public void setup(BurstCostExecutorsBenchmark bench) { //benchmark is focusing on executors with a single threaded consumer: //it would reduce the cost on consumer side while allowing to focus just //to the threads hand-off/wake-up cost - DONE_UPDATER.lazySet(PerThreadState.this, completed + 1); + DONE_UPDATER.lazySet(this, completed + 1); }; } } @@ -283,7 +302,7 @@ public void resetCompleted() { */ public int spinWaitCompletionOf(int value) { while (true) { - final int lastRead = this.completed; + final int lastRead = completed; if (lastRead >= value) { return lastRead; } @@ -313,7 +332,7 @@ public int test3Producers(final PerThreadState state) { } private int executeBurst(final PerThreadState state) { - final ExecutorService executor = this.executor; + final EventExecutorGroup executor = this.executor; final int burstLength = this.burstLength; final Runnable completeTask = state.completeTask; for (int i = 0; i < burstLength; i++) { diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java index 8035bfc663e..7e0ff6cd326 100644 --- a/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/concurrent/RunnableScheduledFutureAdapterBenchmark.java @@ -68,7 +68,7 @@ public void stop() throws Exception { public Future cancelInOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = 0; i < futuresHolder.num; i++) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } @@ -77,7 +77,7 @@ public Future cancelInOrder(final FuturesHolder futuresHolder) { public Future cancelInReverseOrder(final FuturesHolder futuresHolder) { return executor.submit(() -> { for (int i = futuresHolder.num - 1; i >= 0; i--) { - futuresHolder.futures.get(i).cancel(false); + futuresHolder.futures.get(i).cancel(); } }).syncUninterruptibly(); } diff --git a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java index b6ea697f576..d88680d90a8 100644 --- a/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java +++ b/microbench/src/main/java/io/netty/microbench/util/AbstractSharedExecutorMicrobenchmark.java @@ -19,15 +19,13 @@ import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.openjdk.jmh.annotations.Fork; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.Fork; - /** * This harness facilitates the sharing of an executor between JMH and Netty and * thus avoid measuring context switching in microbenchmarks. @@ -87,21 +85,15 @@ public boolean inEventLoop(Thread thread) { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return executor.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return executor.terminationFuture(); } - @Override - @Deprecated - public void shutdown() { - executor.shutdown(); - } - @Override public boolean isShuttingDown() { return executor.isShuttingDown(); @@ -128,8 +120,8 @@ public boolean awaitTermination(long timeout, TimeUnit unit) { } @Override - public void execute(Runnable command) { - executor.execute(command); + public void execute(Runnable task) { + executor.execute(task); } @Override @@ -138,24 +130,24 @@ public Promise newPromise() { } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return executor.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return executor.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return executor.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return executor.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) { + return executor.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); } } diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java index 515a0e73690..25ab8dcf6b4 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/Cache.java @@ -53,7 +53,7 @@ abstract class Cache { private static final Future CANCELLED_FUTURE = new Future() { @Override public boolean cancel() { - return cancel(false); + return false; } @Override @@ -131,11 +131,6 @@ public Object getNow() { return null; } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - @Override public EventExecutor executor() { throw new UnsupportedOperationException(); @@ -352,7 +347,7 @@ private void scheduleCacheExpirationIfNeeded(int ttl, EventLoop loop) { break; } else { // There was something else scheduled in the meantime... Cancel and try again. - newFuture.cancel(true); + newFuture.cancel(); } } else { break; @@ -401,7 +396,7 @@ private FutureAndDelay(Future future, int ttl) { } void cancel() { - future.cancel(false); + future.cancel(); } @Override diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java index 68c48ead875..1fc45d30599 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsQueryContext.java @@ -217,7 +217,7 @@ public void operationComplete(Future timeoutFuture = this.timeoutFuture; if (timeoutFuture != null) { this.timeoutFuture = null; - timeoutFuture.cancel(false); + timeoutFuture.cancel(); } // Remove the id from the manager as soon as the query completes. This may be because of success, failure or diff --git a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java index 2bbb85bada2..26166f784c4 100644 --- a/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java +++ b/resolver-dns/src/main/java/io/netty/resolver/dns/DnsResolveContext.java @@ -987,7 +987,7 @@ private void finishResolve(Promise> promise, Throwable cause) { Future> f = i.next(); i.remove(); - f.cancel(false); + f.cancel(); } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java index 08d46affdd3..debe7b94913 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/AbstractSingleThreadEventLoopTest.java @@ -41,11 +41,10 @@ public abstract class AbstractSingleThreadEventLoopTest { @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { EventLoopGroup group = new MultithreadEventLoopGroup(newIoHandlerFactory()); assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS)); - group.shutdown(); + group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS)); } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java index 83867b31857..ff711061f7a 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketCancelWriteTest.java @@ -58,11 +58,11 @@ public void testCancelWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable { Channel cc = cb.connect(sc.localAddress()).get(); Future f = cc.write(a); - assertTrue(f.cancel(false)); + assertTrue(f.cancel()); cc.writeAndFlush(b); cc.write(c); Future f2 = cc.write(d); - assertTrue(f2.cancel(false)); + assertTrue(f2.cancel()); cc.writeAndFlush(e); while (sh.counter < 3) { diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java index de24fc41e34..68972660608 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectionAttemptTest.java @@ -137,7 +137,7 @@ public void testConnectCancellation(Bootstrap cb) throws Throwable { } } - if (future.cancel(true)) { + if (future.cancel()) { assertThat(future.isCancelled()).isTrue(); } else { // Cancellation not supported by the transport. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index da160a626f1..722e7aec11d 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -166,7 +166,7 @@ protected void doClose() throws Exception { Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } @@ -612,7 +612,7 @@ public void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -684,7 +684,7 @@ private void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 5a919fa2643..b4b385a28ab 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -61,7 +61,7 @@ void handleLoopException(Throwable t) { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); assertNull(capture.get()); } finally { group.shutdownGracefully(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 7388c41bb6d..7a1d9e4692b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -563,7 +563,7 @@ public void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -635,7 +635,7 @@ private void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java index e578b7cca95..f24d2c95b3b 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueEventLoopTest.java @@ -41,7 +41,7 @@ public void testScheduleBigDelayNotOverflow() { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 90d14d57938..6ac78449033 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -87,6 +87,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparabl /** * Return the {@link EventLoop} this {@link Channel} was registered to. */ + @Override EventLoop executor(); /** diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 90d516ecce6..77b27d68c25 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -45,7 +45,7 @@ public class SingleThreadEventLoop extends SingleThreadEventExecutor implements @Override public boolean canBlock() { assert inEventLoop(); - return !SingleThreadEventLoop.this.hasTasks() && !SingleThreadEventLoop.this.hasScheduledTasks(); + return !hasTasks() && !hasScheduledTasks(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index e44155972a3..f7f9e92b6a3 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -63,9 +63,9 @@ public EventLoop next() { } @Override - public void execute(Runnable command) { - requireNonNull(command, "command"); - tasks.add(command); + public void execute(Runnable task) { + requireNonNull(task, "command"); + tasks.add(task); if (!running) { runTasks(); } @@ -124,18 +124,12 @@ void cancelScheduled() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException(); } @Override - public Future terminationFuture() { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public void shutdown() { + public Future terminationFuture() { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 422816cbd08..59ee95f7ada 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -250,7 +250,7 @@ public final void connect( promise.asFuture().addListener(future -> { if (future.isCancelled()) { if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; close(newPromise()); @@ -317,7 +317,7 @@ public final void finishConnect() { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); + connectTimeoutFuture.cancel(); } connectPromise = null; } @@ -452,7 +452,7 @@ protected void doClose() throws Exception { Future future = connectTimeoutFuture; if (future != null) { - future.cancel(false); + future.cancel(); connectTimeoutFuture = null; } } diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java index 3d243e31009..08ccbddffc6 100644 --- a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -17,7 +17,6 @@ import io.netty.util.NetUtil; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; import org.junit.jupiter.api.Test; @@ -26,6 +25,7 @@ import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import static io.netty.util.concurrent.ImmediateEventExecutor.INSTANCE; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; @@ -44,6 +44,7 @@ public void ensureInitialRegistrationFiresActive() throws Throwable { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); + when(eventLoop.newPromise()).thenReturn(INSTANCE.newPromise()); TestChannel channel = new TestChannel(eventLoop); // Using spy as otherwise intelliJ will not be able to understand that we dont want to skip the handler @@ -78,7 +79,7 @@ public void ensureSubsequentRegistrationDoesNotFireActive() throws Throwable { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); @@ -134,7 +135,7 @@ public void testClosedChannelExceptionCarryIOException() throws Exception { // This allows us to have a single-threaded test when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - when(eventLoop.newPromise()).thenReturn(ImmediateEventExecutor.INSTANCE.newPromise()); + when(eventLoop.newPromise()).thenAnswer(inv -> INSTANCE.newPromise()); doAnswer(invocationOnMock -> { ((Runnable) invocationOnMock.getArgument(0)).run(); return null; diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 36ab4faaabb..ff84abdf812 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -95,14 +94,12 @@ public void stopEventLoop() { } @Test - @SuppressWarnings("deprecation") public void shutdownBeforeStart() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); } @Test - @SuppressWarnings("deprecation") public void shutdownAfterStart() throws Exception { final CountDownLatch latch = new CountDownLatch(1); loopA.execute(latch::countDown); @@ -111,7 +108,7 @@ public void shutdownAfterStart() throws Exception { latch.await(); // Request the event loop thread to stop. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); assertRejection(loopA); assertTrue(loopA.isShutdown()); @@ -165,7 +162,7 @@ private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws Interrup final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(50); @@ -175,13 +172,13 @@ private static void testScheduleTaskAtFixedRate(EventLoop loopA) throws Interrup allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); // Check if the task was run without a lag. Long firstTimestamp = null; - int cnt = 0; + long cnt = 0; for (Long t: timestamps) { if (firstTimestamp == null) { firstTimestamp = t; @@ -212,7 +209,7 @@ private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws Int final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + Future f = loopA.scheduleAtFixedRate(() -> { boolean empty = timestamps.isEmpty(); timestamps.add(System.nanoTime()); if (empty) { @@ -225,7 +222,7 @@ private static void testScheduleLaggyTaskAtFixedRate(EventLoop loopA) throws Int allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -265,7 +262,7 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 3; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleWithFixedDelay(() -> { + Future f = loopA.scheduleWithFixedDelay(() -> { timestamps.add(System.nanoTime()); try { Thread.sleep(51); @@ -275,7 +272,7 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); - assertTrue(f.cancel(true)); + assertTrue(f.cancel()); Thread.sleep(300); assertEquals(expectedTimeStamps, timestamps.size()); @@ -294,7 +291,6 @@ private static void testScheduleTaskWithFixedDelay(EventLoop loopA) throws Inter } @Test - @SuppressWarnings("deprecation") public void shutdownWithPendingTasks() throws Exception { final int NUM_TASKS = 3; final AtomicInteger ranTasks = new AtomicInteger(); @@ -321,7 +317,7 @@ public void shutdownWithPendingTasks() throws Exception { assertEquals(1, ranTasks.get()); // Shut down the event loop to test if the other tasks are run before termination. - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); // Let the other tasks run. latch.countDown(); @@ -337,9 +333,8 @@ public void shutdownWithPendingTasks() throws Exception { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); // Disable logging temporarily. Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); @@ -367,9 +362,8 @@ public void testRegistrationAfterShutdown() throws Exception { @Test @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS) - @SuppressWarnings("deprecation") public void testRegistrationAfterShutdown2() throws Exception { - loopA.shutdown(); + loopA.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS).await(); final CountDownLatch latch = new CountDownLatch(1); Channel ch = new LocalChannel(loopA); @@ -462,17 +456,14 @@ public EventLoop next() { @Override protected void run() { - for (;;) { + do { Runnable task = takeTask(); if (task != null) { task.run(); updateLastExecutionTime(); } - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override @@ -504,7 +495,7 @@ protected Queue newTaskQueue(int maxPendingTasks) { @Override protected void run() { - for (;;) { + do { try { Thread.sleep(TimeUnit.NANOSECONDS.toMillis(delayNanos(System.nanoTime()))); } catch (InterruptedException e) { @@ -513,10 +504,7 @@ protected void run() { runAllTasks(Integer.MAX_VALUE); - if (confirmShutdown()) { - break; - } - } + } while (!confirmShutdown()); } @Override diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 8e3c64e9274..50b15af00d2 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -98,7 +98,7 @@ public void testScheduleBigDelayNotOverflow() { }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); - assertTrue(future.cancel(true)); + assertTrue(future.cancel()); group.shutdownGracefully(); } @@ -169,7 +169,6 @@ public void channelUnregistered(SocketChannel ch, Throwable cause) { } } - @SuppressWarnings("deprecation") @Test public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() throws Exception { final AtomicReference error = new AtomicReference<>(); @@ -191,9 +190,9 @@ public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() thr } }); t.start(); - group.shutdownNow(); + Future termination = group.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); t.join(); - group.terminationFuture().syncUninterruptibly(); + termination.syncUninterruptibly(); assertThat(error.get(), instanceOf(RejectedExecutionException.class)); error.set(null); } diff --git a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java index 29345c23ef2..2f529462bf7 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/AbstractNioChannelTest.java @@ -23,7 +23,6 @@ import io.netty.channel.nio.NioHandler; import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -108,6 +107,7 @@ class WrappedEventLoop extends AbstractEventExecutor implements EventLoop { this.eventLoop = eventLoop; } + @Override @Test public EventLoop next() { return this; @@ -118,11 +118,6 @@ public Unsafe unsafe() { return eventLoop.unsafe(); } - @Override - public void shutdown() { - eventLoop.shutdown(); - } - @Override public boolean inEventLoop(Thread thread) { return eventLoop.inEventLoop(thread); @@ -134,12 +129,12 @@ public boolean isShuttingDown() { } @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { + public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { return eventLoop.shutdownGracefully(quietPeriod, timeout, unit); } @Override - public Future terminationFuture() { + public Future terminationFuture() { return eventLoop.terminationFuture(); } @@ -159,30 +154,30 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } @Override - public void execute(Runnable command) { - eventLoop.execute(command); + public void execute(Runnable task) { + eventLoop.execute(task); } @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return eventLoop.schedule(command, delay, unit); + public Future schedule(Runnable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return eventLoop.schedule(callable, delay, unit); + public Future schedule(Callable task, long delay, TimeUnit unit) { + return eventLoop.schedule(task, delay, unit); } @Override - public ScheduledFuture scheduleAtFixedRate( - Runnable command, long initialDelay, long period, TimeUnit unit) { - return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit); + public Future scheduleAtFixedRate( + Runnable task, long initialDelay, long period, TimeUnit unit) { + return eventLoop.scheduleAtFixedRate(task, initialDelay, period, unit); } @Override - public ScheduledFuture scheduleWithFixedDelay( - Runnable command, long initialDelay, long delay, TimeUnit unit) { - return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit); + public Future scheduleWithFixedDelay( + Runnable task, long initialDelay, long delay, TimeUnit unit) { + return eventLoop.scheduleWithFixedDelay(task, initialDelay, delay, unit); } }