Skip to content

Commit

Permalink
Netty Future no longer extends JDK Future (#11647)
Browse files Browse the repository at this point in the history
Motivation:
It is important to avoid blocking method calls in an event loop thread, since that can stall the system.
Netty's Future interface was extending the JDK Future interface, which included a number of blocking methods of questionable use in Netty.
We wish to reduce the number of blocking methods on the Future API in order to discourage their use a little.
Further more, the Netty Future specification of the behaviour of the cancel() and isDone() methods are inconsistent with those of the JDK Future.
If Netty's Future stop extending the JDK Future interface, it will also no longer be bound by its specification.

Modification:
Make Netty's Future no longer extend the JDK Future interface.
Change the EvenExecutorGroup interface to no longer extend ScheduledExecutorService.
The EventExecutorGroup still extends Executor, because Executor does not dictate any return type of the `execute()` method — this is also useful in the DefaultFutureCompletionStage implementation.
The Netty ScheduledFuture interface has been removed since it provided no additional features that were actually used.
Numerous changes to use sites that previously relied on the JDK types.
Remove the `Future.cancel()` method that took a boolean argument — this argument was always ignored in our implementations, which was another spec deviation.
Various `invoke*` and `shutdown*` methods have been removed from the EvenExecutorGroup API since it no longer extends ScheduledExecutorService — these were either not used anywhere, or deprecated with better alternatives available.
Updates to cancellation javadocs.

Result:
Cleaner code, leaner API.
  • Loading branch information
chrisvest committed Sep 8, 2021
1 parent 3cbb41a commit 59275fb
Show file tree
Hide file tree
Showing 63 changed files with 788 additions and 808 deletions.
Expand Up @@ -504,7 +504,7 @@ private Future<Void> close0(final ChannelOutboundInvoker invoker, final Channel
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

channel.closeFuture().addListener(ignore -> {
forceCloseFuture.cancel(false);
forceCloseFuture.cancel();
});
}
});
Expand Down
Expand Up @@ -117,7 +117,7 @@ private void applyHandshakeTimeout() {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}

/**
Expand Down
Expand Up @@ -125,7 +125,7 @@ private void applyCloseSentTimeout(ChannelHandlerContext ctx) {
}
}, forceCloseTimeoutMillis, TimeUnit.MILLISECONDS);

closeSent.asFuture().addListener(future -> timeoutTask.cancel(false));
closeSent.asFuture().addListener(future -> timeoutTask.cancel());
}

/**
Expand Down
Expand Up @@ -157,6 +157,6 @@ private void applyHandshakeTimeout(ChannelHandlerContext ctx) {
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);

// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel(false));
localHandshakePromise.asFuture().addListener(f -> timeoutFuture.cancel());
}
}
Expand Up @@ -27,7 +27,6 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -900,7 +899,7 @@ private static final class ClosingChannelFutureListener implements FutureListene
@Override
public void operationComplete(Future<?> sentGoAwayFuture) {
if (timeoutTask != null) {
timeoutTask.cancel(false);
timeoutTask.cancel();
}
doClose();
}
Expand Down
Expand Up @@ -18,14 +18,15 @@
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import static java.util.Objects.requireNonNull;

/**
* Abstract base class for {@link EventExecutor} implementations.
*/
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
public abstract class AbstractEventExecutor implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
Expand All @@ -43,26 +44,60 @@ public <V> Future<V> newSucceededFuture(V result) {
}

@Override
public final Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
public final Future<Void> submit(Runnable task) {
var futureTask = newTaskFor(task, (Void) null);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Runnable task, T result) {
return (Future<T>) super.submit(task, result);
var futureTask = newTaskFor(task, result);
execute(futureTask);
return futureTask;
}

@Override
public final <T> Future<T> submit(Callable<T> task) {
return (Future<T>) super.submit(task);
var futureTask = newTaskFor(task);
execute(futureTask);
return futureTask;
}

@Override
/**
* Decorate the given {@link Runnable} and its return value, as a {@link RunnableFuture}, such that the
* returned {@link RunnableFuture} completes with the given result at the end of executing its
* {@link RunnableFuture#run()} method.
* <p>
* The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
* executor.
* <p>
* This method can be overridden by sub-classes to hook into the life cycle of the given task.
*
* @param runnable The task to be decorated.
* @param value The value that the returned future will complete with, assuming the given {@link Runnable} doesn't
* throw an exception.
* @param <T> The type of the result value.
* @return The decorated {@link Runnable} that is now also a {@link Future}.
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return newRunnableFuture(newPromise(), runnable, value);
}

@Override
/**
* Decorate the given {@link Callable} and its return value, as a {@link RunnableFuture}, such that the
* returned {@link RunnableFuture} completes with the returned result from the {@link Callable} at the end of
* executing its {@link RunnableFuture#run()} method.
* <p>
* The returned {@link RunnableFuture} is the task that will actually be run by a thread in this
* executor.
* <p>
* This method can be overridden by sub-classes to hook into the life cycle of the given task.
*
* @param callable The task to be decorated.
* @param <T> The type of the result value.
* @return The decorated {@link Runnable} that is now also a {@link Future}.
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return newRunnableFuture(newPromise(), callable);
}
Expand All @@ -85,17 +120,14 @@ static void safeExecute(Runnable task) {
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Callable<V> task) {
return new RunnableFutureAdapter<>(promise, task);
return new RunnableFutureAdapter<>(promise, requireNonNull(task, "task"));
}

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Runnable} and
* {@code value}.
*
* This can be used if you want to override {@link #newTaskFor(Runnable, V)} and return a different
* {@link RunnableFuture}.
*/
private static <V> RunnableFuture<V> newRunnableFuture(Promise<V> promise, Runnable task, V value) {
return new RunnableFutureAdapter<>(promise, Executors.callable(task, value));
return new RunnableFutureAdapter<>(promise, Executors.callable(requireNonNull(task, "task"), value));
}
}
Expand Up @@ -15,21 +15,20 @@
*/
package io.netty.util.concurrent;

import static java.util.Objects.requireNonNull;

import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.callable;

/**
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
Expand Down Expand Up @@ -60,7 +59,7 @@ public static long nanoTime() {
static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos;
}

PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
Expand All @@ -79,7 +78,7 @@ private static boolean isNullOrEmpty(Queue<RunnableScheduledFutureNode<?>> queue

/**
* Cancel all scheduled tasks.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final void cancelScheduledTasks() {
Expand All @@ -92,8 +91,8 @@ protected final void cancelScheduledTasks() {
final RunnableScheduledFutureNode<?>[] scheduledTasks =
scheduledTaskQueue.toArray(EMPTY_RUNNABLE_SCHEDULED_FUTURE_NODES);

for (RunnableScheduledFutureNode<?> task: scheduledTasks) {
task.cancel(false);
for (RunnableScheduledFutureNode<?> task : scheduledTasks) {
task.cancel();
}

scheduledTaskQueue.clearIgnoringIndexes();
Expand All @@ -107,16 +106,16 @@ protected final RunnableScheduledFuture<?> pollScheduledTask() {
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link
* #nanoTime()} to retrieve the correct {@code nanoTime}.
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {
assert inEventLoop();

Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
Expand All @@ -130,12 +129,12 @@ protected final RunnableScheduledFuture<?> pollScheduledTask(long nanoTime) {

/**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final long nextScheduledTaskNano() {
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return -1;
}
Expand All @@ -152,30 +151,30 @@ final RunnableScheduledFuture<?> peekScheduledTask() {

/**
* Returns {@code true} if a scheduled task is ready for processing.
*
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
protected final boolean hasScheduledTasks() {
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.callable(command),
deadlineNanos(unit.toNanos(delay)), 0);
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(delay)), 0);
return schedule(task);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
requireNonNull(callable, "callable");
requireNonNull(unit, "unit");
if (delay < 0) {
Expand All @@ -186,7 +185,7 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -198,13 +197,13 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
String.format("period: %d (expected: > 0)", period));
}

RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period));
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period));
return schedule(task);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
requireNonNull(command, "command");
requireNonNull(unit, "unit");
if (initialDelay < 0) {
Expand All @@ -216,15 +215,15 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
String.format("delay: %d (expected: > 0)", delay));
}

RunnableScheduledFuture<?> task = newScheduledTaskFor(Executors.<Void>callable(command, null),
deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay));
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay));
return schedule(task);
}

/**
* Add the {@link RunnableScheduledFuture} for execution.
*/
protected final <V> ScheduledFuture<V> schedule(final RunnableScheduledFuture<V> task) {
protected final <V> Future<V> schedule(final RunnableScheduledFuture<V> task) {
if (inEventLoop()) {
add0(task);
} else {
Expand Down Expand Up @@ -253,9 +252,9 @@ final void removeScheduled(final RunnableScheduledFutureNode<?> task) {

/**
* Returns a new {@link RunnableFuture} build on top of the given {@link Promise} and {@link Callable}.
*
* This can be used if you want to override {@link #newTaskFor(Callable)} and return a different
* {@link RunnableFuture}.
* <p>
* This can be used if you want to override {@link #newScheduledTaskFor(Callable, long, long)} and return a
* different {@link RunnableFuture}.
*/
protected static <V> RunnableScheduledFuture<V> newRunnableScheduledFuture(
AbstractScheduledEventExecutor executor, Promise<V> promise, Callable<V> task,
Expand All @@ -271,7 +270,8 @@ protected <V> RunnableScheduledFuture<V> newScheduledTaskFor(
return newRunnableScheduledFuture(this, newPromise(), callable, deadlineNanos, period);
}

interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> { }
interface RunnableScheduledFutureNode<V> extends PriorityQueueNode, RunnableScheduledFuture<V> {
}

private static final class DefaultRunnableScheduledFutureNode<V> implements RunnableScheduledFutureNode<V> {
private final RunnableScheduledFuture<V> future;
Expand Down Expand Up @@ -308,8 +308,8 @@ public RunnableScheduledFuture<V> addListener(FutureListener<? super V> listener
}

@Override
public <C> RunnableScheduledFuture<V> addListener(C context,
FutureContextListener<? super C, ? super V> listener) {
public <C> RunnableScheduledFuture<V> addListener(
C context, FutureContextListener<? super C, ? super V> listener) {
future.addListener(context, listener);
return this;
}
Expand All @@ -335,8 +335,8 @@ public void run() {
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
public boolean cancel() {
return future.cancel();
}

@Override
Expand All @@ -360,12 +360,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}

@Override
public long getDelay(TimeUnit unit) {
return future.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
public int compareTo(RunnableScheduledFuture<?> o) {
return future.compareTo(o);
}

Expand Down Expand Up @@ -393,11 +388,6 @@ public RunnableFuture<V> awaitUninterruptibly() {
return this;
}

@Override
public boolean cancel() {
return cancel(false);
}

@Override
public boolean isSuccess() {
return future.isSuccess();
Expand Down

0 comments on commit 59275fb

Please sign in to comment.