diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 5b9a7b691fb5..02dabdb702a9 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -33,8 +33,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.TryExecutor; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,9 +41,6 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. { protected static final Logger LOG = LoggerFactory.getLogger(HTTP2Connection.class); - // TODO remove this once we are sure EWYK is OK for http2 - private static final boolean PEC_MODE = Boolean.getBoolean("org.eclipse.jetty.http2.PEC_MODE"); - private final AutoLock lock = new AutoLock(); private final Queue tasks = new ArrayDeque<>(); private final HTTP2Producer producer = new HTTP2Producer(); @@ -64,9 +60,7 @@ public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoin this.parser = parser; this.session = session; this.bufferSize = bufferSize; - if (PEC_MODE) - executor = new TryExecutor.NoTryExecutor(executor); - this.strategy = new EatWhatYouKill(producer, executor); + this.strategy = new AdaptiveExecutionStrategy(producer, executor); LifeCycle.start(strategy); parser.init(ParserListener::new); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java index 631335a353c3..9ec7fad40c84 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ManagedSelector.java @@ -49,7 +49,7 @@ import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +95,7 @@ public ManagedSelector(SelectorManager selectorManager, int id) _id = id; SelectorProducer producer = new SelectorProducer(); Executor executor = selectorManager.getExecutor(); - _strategy = new EatWhatYouKill(producer, executor); + _strategy = new AdaptiveExecutionStrategy(producer, executor); addBean(_strategy, true); } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java new file mode 100644 index 000000000000..72ee3a283423 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategy.java @@ -0,0 +1,567 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread.strategy; + +import java.io.Closeable; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.LongAdder; + +import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.AutoLock; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.TryExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

An adaptive execution strategy that uses the {@link Invocable} status + * of both the task and the current thread to select an optimal strategy + * that prioritizes executing the task immediately in the current + * producing thread if it can be done so without thread starvation issues.

+ * + *

This strategy selects between the following sub-strategies:

+ *
+ *
ProduceConsume(PC)
+ *
The producing thread consumes the task by running it directly + * and then continues to produce.
+ *
ProduceInvokeConsume(PIC)
+ *
The producing thread consumes the task by running it with {@link Invocable#invokeNonBlocking(Runnable)} + * and then continues to produce.
+ *
ProduceExecuteConsume(PEC)
+ *
The producing thread dispatches the task to a thread pool to be executed + * and then continues to produce.
+ *
ExecuteProduceConsume(EPC)
+ *
The producing thread consumes dispatches a pending producer to a thread pool, + * then consumes the task by running it directly (as in PC mode), then races with + * the pending producer thread to take over production. + *
+ *
+ *

The sub-strategy is selected as follows:

+ *
+ *
PC
+ *
If the produced task is {@link Invocable.InvocationType#NON_BLOCKING}.
+ *
EPC
+ *
If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING} + * and a pending producer thread is available, either because there is already a pending producer + * or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.
+ *
PIC
+ *
If the produced task is {@link Invocable.InvocationType#EITHER} and EPC was not selected.
+ *
PEC
+ *
Otherwise.
+ *
+ * + *

Because of the preference for {@code PC} mode, on a multicore machine with many + * many {@link Invocable.InvocationType#NON_BLOCKING} tasks, multiple instances of the strategy may be + * required to keep all CPUs on the system busy.

+ * + *

Since the producing thread may be invoked with {@link Invocable#invokeNonBlocking(Runnable)} + * this allows {@link AdaptiveExecutionStrategy}s to be efficiently and safely chained: a task + * produced by one execution strategy may become itself be a producer in a second execution strategy + * (e.g. an IO selector may use an execution strategy to handle multiple connections and each + * connection may use a execution strategy to handle multiplexed channels/streams within the connection).

+ * + *

A task containing another {@link AdaptiveExecutionStrategy} should identify as + * {@link Invocable.InvocationType#EITHER} so when there are no pending producers threads available to + * the first strategy, then it may invoke the second as {@link Invocable.InvocationType#NON_BLOCKING}. + * This avoids starvation as the production on the second strategy can always be executed, + * but without the risk that it may block the last available producer for the first strategy.

+ * + *

This strategy was previously named EatWhatYouKill (EWYK) because its preference for a + * producer to directly consume the tasks that it produces is similar to a hunting proverb + * that says that a hunter should eat (i.e. consume) what they kill (i.e. produced).

+ */ +@ManagedObject("Adaptive execution strategy") +public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy +{ + private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class); + + /** + * The production state of the strategy. + */ + private enum State + { + IDLE, // No tasks or producers. + PRODUCING, // There is an active producing thread. + REPRODUCING // There is an active producing thread and demand for more production. + } + + /** + * The sub-strategies used by the strategy to consume tasks that are produced. + */ + private enum SubStrategy + { + /** + * Consumes produced tasks and continues producing. + */ + PRODUCE_CONSUME, + /** + * Consumes produced tasks as non blocking and continues producing. + */ + PRODUCE_INVOKE_CONSUME, + /** + * Executes produced tasks and continues producing. + */ + PRODUCE_EXECUTE_CONSUME, + /** + * Executes a pending producer, consumes produced tasks and races the pending producer to continue producing. + */ + EXECUTE_PRODUCE_CONSUME + } + + private final AutoLock _lock = new AutoLock(); + private final LongAdder _pcMode = new LongAdder(); + private final LongAdder _picMode = new LongAdder(); + private final LongAdder _pecMode = new LongAdder(); + private final LongAdder _epcMode = new LongAdder(); + private final Producer _producer; + private final Executor _executor; + private final TryExecutor _tryExecutor; + private final Runnable _runPendingProducer = () -> tryProduce(true); + private State _state = State.IDLE; + private boolean _pending; + + /** + * @param producer The produce of tasks to be consumed. + * @param executor The executor to be used for executing producers or consumers, depending on the sub-strategy. + */ + public AdaptiveExecutionStrategy(Producer producer, Executor executor) + { + _producer = producer; + _executor = executor; + _tryExecutor = TryExecutor.asTryExecutor(executor); + addBean(_producer); + addBean(_tryExecutor); + if (LOG.isDebugEnabled()) + LOG.debug("{} created", this); + } + + @Override + public void dispatch() + { + boolean execute = false; + try (AutoLock l = _lock.lock()) + { + switch (_state) + { + case IDLE: + if (!_pending) + { + _pending = true; + execute = true; + } + break; + + case PRODUCING: + _state = State.REPRODUCING; + break; + + default: + break; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("{} dispatch {}", this, execute); + if (execute) + _executor.execute(_runPendingProducer); + } + + @Override + public void produce() + { + tryProduce(false); + } + + /** + * Tries to become the producing thread and then produces and consumes tasks. + * + * @param wasPending True if the calling thread was started as a pending producer. + */ + private void tryProduce(boolean wasPending) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} tryProduce {}", this, wasPending); + + // Takes the lock to atomically check if the thread can produce. + try (AutoLock l = _lock.lock()) + { + // If the calling thread was the pending producer, there is no longer one pending. + if (wasPending) + _pending = false; + + switch (_state) + { + case IDLE: + // The strategy was IDLE, so this thread can become the producer. + _state = State.PRODUCING; + break; + + case PRODUCING: + // The strategy is already producing, so another thread must be the producer. + // However, it may be just about to stop being the producer so we set the + // REPRODUCING state to force it to produce at least once more. + _state = State.REPRODUCING; + return; + + case REPRODUCING: + // Another thread is already producing and will already try again to produce. + return; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + + // Determine the thread's invocation type once, outside of the production loop. + boolean nonBlocking = Invocable.isNonBlockingInvocation(); + while (isRunning()) + { + try + { + Runnable task = produceTask(); + + // If we did not produce a task + if (task == null) + { + // take the lock to atomically determine if we should keep producing. + try (AutoLock l = _lock.lock()) + { + switch (_state) + { + case PRODUCING: + // The calling thread was the only producer, so it is now IDLE and we stop producing. + _state = State.IDLE; + return; + + case REPRODUCING: + // Another thread may have queued a task and tried to produce + // so the calling thread should continue to produce. + _state = State.PRODUCING; + continue; + + default: + throw new IllegalStateException(toStringLocked()); + } + } + } + + // Consume the task according the selected sub-strategy, then + // continue producing only if the sub-strategy returns true. + if (consumeTask(task, selectSubStrategy(task, nonBlocking))) + continue; + return; + } + catch (Throwable th) + { + LOG.warn("Unable to produce", th); + } + } + } + + /** + * Selects the execution strategy. + * + * @param task The task to select the strategy for. + * @param nonBlocking True if the producing thread cannot block. + * @return The sub-strategy to use for the task. + */ + private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking) + { + Invocable.InvocationType taskType = Invocable.getInvocationType(task); + switch (taskType) + { + case NON_BLOCKING: + // The produced task will not block, so use PC: consume task directly + // and then resume production. + return SubStrategy.PRODUCE_CONSUME; + + case EITHER: + // The produced task may be run either as blocking or non blocking. + + // If the calling producing thread may also block + if (!nonBlocking) + { + // Take the lock to atomically check if a pending producer is available. + try (AutoLock l = _lock.lock()) + { + // If a pending producer is available or one can be started + if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) + { + // use EPC: The producer directly consumes the task, which may block + // and then races with the pending producer to resume production. + _pending = true; + _state = State.IDLE; + return SubStrategy.EXECUTE_PRODUCE_CONSUME; + } + } + } + + // otherwise use PIC: The producer consumers the task in non-blocking mode + // and then resumes production. + return SubStrategy.PRODUCE_INVOKE_CONSUME; + + case BLOCKING: + // The produced task may block. + + // If the calling producing thread may also block + if (!nonBlocking) + { + // Take the lock to atomically check if a pending producer is available. + try (AutoLock l = _lock.lock()) + { + // If a pending producer is available or one can be started + if (_pending || _tryExecutor.tryExecute(_runPendingProducer)) + { + // use EPC: The producer directly consumes the task, which may block + // and then races with the pending producer to resume production. + _pending = true; + _state = State.IDLE; + return SubStrategy.EXECUTE_PRODUCE_CONSUME; + } + } + } + + // Otherwise use PEC: the task is consumed by the executor and the producer continues to produce. + return SubStrategy.PRODUCE_EXECUTE_CONSUME; + + default: + throw new IllegalStateException(String.format("taskType=%s %s", taskType, this)); + } + } + + /** + * Consumes a task with a sub-strategy. + * + * @param task The task to consume. + * @param subStrategy The execution sub-strategy to use to consume the task. + * @return True if the sub-strategy requires the caller to continue to produce tasks. + */ + private boolean consumeTask(Runnable task, SubStrategy subStrategy) + { + // Consume and/or execute task according to the selected mode. + if (LOG.isDebugEnabled()) + LOG.debug("ss={} t={}/{} {}", subStrategy, task, Invocable.getInvocationType(task), this); + switch (subStrategy) + { + case PRODUCE_CONSUME: + _pcMode.increment(); + runTask(task); + return true; + + case PRODUCE_INVOKE_CONSUME: + _picMode.increment(); + invokeAsNonBlocking(task); + return true; + + case PRODUCE_EXECUTE_CONSUME: + _pecMode.increment(); + execute(task); + return true; + + case EXECUTE_PRODUCE_CONSUME: + _epcMode.increment(); + runTask(task); + + // Race the pending producer to produce again. + try (AutoLock l = _lock.lock()) + { + if (_state == State.IDLE) + { + // We beat the pending producer, so we will become the producer instead. + // The pending produce will become a noop if it arrives whilst we are producing, + // or it may take over if we subsequently do another EPC consumption. + _state = State.PRODUCING; + return true; + } + } + // The pending producer is now producing, so this thread no longer produces. + return false; + + default: + throw new IllegalStateException(String.format("ss=%s %s", subStrategy, this)); + } + } + + /** + * Runs a Runnable task, logging any thrown exception. + * + * @param task The task to run. + */ + private void runTask(Runnable task) + { + try + { + task.run(); + } + catch (Throwable x) + { + LOG.warn("Task run failed", x); + } + } + + /** + * Runs a task in non-blocking mode. + * + * @param task The task to run in non-blocking mode. + */ + private void invokeAsNonBlocking(Runnable task) + { + try + { + Invocable.invokeNonBlocking(task); + } + catch (Throwable x) + { + LOG.warn("Task invoke failed", x); + } + } + + /** + * Produces a task, logging any Throwable that may result. + * + * @return A produced task or null if there were no tasks or a Throwable was thrown. + */ + private Runnable produceTask() + { + try + { + return _producer.produce(); + } + catch (Throwable e) + { + LOG.warn("Task produce failed", e); + return null; + } + } + + /** + * Executes a task via the {@link Executor} used to construct this strategy. + * If the execution is rejected and the task is a Closeable, then it is closed. + * + * @param task The task to execute. + */ + private void execute(Runnable task) + { + try + { + _executor.execute(task); + } + catch (RejectedExecutionException e) + { + if (isRunning()) + LOG.warn("Execute failed", e); + else + LOG.trace("IGNORED", e); + + if (task instanceof Closeable) + IO.close((Closeable)task); + } + } + + @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) + public long getPCTasksConsumed() + { + return _pcMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true) + public long getPICTasksExecuted() + { + return _picMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true) + public long getPECTasksExecuted() + { + return _pecMode.longValue(); + } + + @ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true) + public long getEPCTasksConsumed() + { + return _epcMode.longValue(); + } + + @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) + public boolean isIdle() + { + try (AutoLock l = _lock.lock()) + { + return _state == State.IDLE; + } + } + + @ManagedOperation(value = "resets the task counts", impact = "ACTION") + public void reset() + { + _pcMode.reset(); + _epcMode.reset(); + _pecMode.reset(); + _picMode.reset(); + } + + @Override + public String toString() + { + try (AutoLock l = _lock.lock()) + { + return toStringLocked(); + } + } + + public String toStringLocked() + { + StringBuilder builder = new StringBuilder(); + getString(builder); + getState(builder); + return builder.toString(); + } + + private void getString(StringBuilder builder) + { + builder.append(getClass().getSimpleName()); + builder.append('@'); + builder.append(Integer.toHexString(hashCode())); + builder.append('/'); + builder.append(_producer); + builder.append('/'); + } + + private void getState(StringBuilder builder) + { + builder.append(_state); + builder.append("/p="); + builder.append(_pending); + builder.append('/'); + builder.append(_tryExecutor); + builder.append("[pc="); + builder.append(getPCTasksConsumed()); + builder.append(",pic="); + builder.append(getPICTasksExecuted()); + builder.append(",pec="); + builder.append(getPECTasksExecuted()); + builder.append(",epc="); + builder.append(getEPCTasksConsumed()); + builder.append("]"); + builder.append("@"); + builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java index 7fd11bd9f5e3..350df4101e64 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/strategy/EatWhatYouKill.java @@ -13,469 +13,16 @@ package org.eclipse.jetty.util.thread.strategy; -import java.io.Closeable; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.LongAdder; - -import org.eclipse.jetty.util.annotation.ManagedAttribute; -import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.annotation.ManagedOperation; -import org.eclipse.jetty.util.component.ContainerLifeCycle; -import org.eclipse.jetty.util.thread.AutoLock; -import org.eclipse.jetty.util.thread.ExecutionStrategy; -import org.eclipse.jetty.util.thread.Invocable; -import org.eclipse.jetty.util.thread.TryExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - *

A strategy where the thread that produces will run the resulting task if it - * is possible to do so without thread starvation.

- * - *

This strategy preemptively dispatches a thread as a pending producer, so that - * when a thread produces a task it can immediately run the task and let the pending - * producer thread take over production. When operating in this way, the sub-strategy - * is called Execute Produce Consume (EPC).

- *

However, if the task produced uses the {@link Invocable} API to indicate that - * it will not block, then the strategy will run it directly, regardless of the - * presence of a pending producer thread and then resume production after the - * task has completed. When operating in this pattern, the sub-strategy is called - * ProduceConsume (PC).

- *

If there is no pending producer thread available and if the task has not - * indicated it is non-blocking, then this strategy will dispatch the execution of - * the task and immediately continue production. When operating in this pattern, the - * sub-strategy is called ProduceExecuteConsume (PEC).

+ * @deprecated This class has been renamed to {@link AdaptiveExecutionStrategy} */ -@ManagedObject("eat what you kill execution strategy") -public class EatWhatYouKill extends ContainerLifeCycle implements ExecutionStrategy, Runnable +@Deprecated(forRemoval = true) +public class EatWhatYouKill extends AdaptiveExecutionStrategy { - private static final Logger LOG = LoggerFactory.getLogger(EatWhatYouKill.class); - - private enum State - { - IDLE, PRODUCING, REPRODUCING - } - - /* The modes this strategy can work in */ - private enum Mode - { - PRODUCE_CONSUME, - PRODUCE_INVOKE_CONSUME, // This is PRODUCE_CONSUME an EITHER task with NON_BLOCKING invocation - PRODUCE_EXECUTE_CONSUME, - EXECUTE_PRODUCE_CONSUME // Eat What You Kill! - } - - private final AutoLock _lock = new AutoLock(); - private final LongAdder _pcMode = new LongAdder(); - private final LongAdder _picMode = new LongAdder(); - private final LongAdder _pecMode = new LongAdder(); - private final LongAdder _epcMode = new LongAdder(); - private final Producer _producer; - private final Executor _executor; - private final TryExecutor _tryExecutor; - private State _state = State.IDLE; - private boolean _pending; - public EatWhatYouKill(Producer producer, Executor executor) { - _producer = producer; - _executor = executor; - _tryExecutor = TryExecutor.asTryExecutor(executor); - addBean(_producer); - addBean(_tryExecutor); - if (LOG.isDebugEnabled()) - LOG.debug("{} created", this); - } - - @Override - public void dispatch() - { - boolean execute = false; - try (AutoLock l = _lock.lock()) - { - switch (_state) - { - case IDLE: - if (!_pending) - { - _pending = true; - execute = true; - } - break; - - case PRODUCING: - _state = State.REPRODUCING; - break; - - default: - break; - } - } - if (LOG.isDebugEnabled()) - LOG.debug("{} dispatch {}", this, execute); - if (execute) - _executor.execute(this); - } - - @Override - public void run() - { - tryProduce(true); - } - - @Override - public void produce() - { - tryProduce(false); - } - - private void tryProduce(boolean wasPending) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} tryProduce {}", this, wasPending); - - try (AutoLock l = _lock.lock()) - { - if (wasPending) - _pending = false; - - switch (_state) - { - case IDLE: - // Enter PRODUCING - _state = State.PRODUCING; - break; - - case PRODUCING: - // Keep other Thread producing - _state = State.REPRODUCING; - return; - - default: - return; - } - } - - boolean nonBlocking = Invocable.isNonBlockingInvocation(); - - while (isRunning()) - { - try - { - if (doProduce(nonBlocking)) - continue; - return; - } - catch (Throwable th) - { - LOG.warn("Unable to produce", th); - } - } - } - - private boolean doProduce(boolean nonBlocking) - { - Runnable task = produceTask(); - - if (task == null) - { - try (AutoLock l = _lock.lock()) - { - // Could another task just have been queued with a produce call? - switch (_state) - { - case PRODUCING: - _state = State.IDLE; - return false; - - case REPRODUCING: - _state = State.PRODUCING; - return true; - - default: - throw new IllegalStateException(toStringLocked()); - } - } - } - - Mode mode; - if (nonBlocking) - { - // The calling thread cannot block, so we only have a choice between PC and PEC modes, - // based on the invocation type of the task - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case EITHER: - mode = Mode.PRODUCE_INVOKE_CONSUME; - break; - - default: - mode = Mode.PRODUCE_EXECUTE_CONSUME; - break; - } - } - else - { - // The calling thread can block, so we can choose between PC, PEC and EPC modes, - // based on the invocation type of the task and if a reserved thread is available - switch (Invocable.getInvocationType(task)) - { - case NON_BLOCKING: - mode = Mode.PRODUCE_CONSUME; - break; - - case BLOCKING: - // The task is blocking, so PC is not an option. Thus we choose - // between EPC and PEC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - mode = Mode.PRODUCE_EXECUTE_CONSUME; - } - } - break; - - case EITHER: - // The task may be non blocking, so PC is an option. Thus we choose - // between EPC and PC based on the availability of a reserved thread. - try (AutoLock l = _lock.lock()) - { - if (_pending) - { - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else if (_tryExecutor.tryExecute(this)) - { - _pending = true; - _state = State.IDLE; - mode = Mode.EXECUTE_PRODUCE_CONSUME; - } - else - { - // PC mode, but we must consume with non-blocking invocation - // as we may be the last thread and we cannot block - mode = Mode.PRODUCE_INVOKE_CONSUME; - } - } - break; - - default: - throw new IllegalStateException(toString()); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("{} m={} t={}/{}", this, mode, task, Invocable.getInvocationType(task)); - - // Consume or execute task - switch (mode) - { - case PRODUCE_CONSUME: - _pcMode.increment(); - runTask(task); - return true; - - case PRODUCE_INVOKE_CONSUME: - _picMode.increment(); - invokeTask(task); - return true; - - case PRODUCE_EXECUTE_CONSUME: - _pecMode.increment(); - execute(task); - return true; - - case EXECUTE_PRODUCE_CONSUME: - _epcMode.increment(); - runTask(task); - - // Try to produce again? - try (AutoLock l = _lock.lock()) - { - if (_state == State.IDLE) - { - // We beat the pending producer, so we will become the producer instead - _state = State.PRODUCING; - return true; - } - } - return false; - - default: - throw new IllegalStateException(toString()); - } - } - - private void runTask(Runnable task) - { - try - { - task.run(); - } - catch (Throwable x) - { - LOG.warn("Task run failed", x); - } - } - - private void invokeTask(Runnable task) - { - try - { - Invocable.invokeNonBlocking(task); - } - catch (Throwable x) - { - LOG.warn("Task invoke failed", x); - } - } - - private Runnable produceTask() - { - try - { - return _producer.produce(); - } - catch (Throwable e) - { - LOG.warn("Task produce failed", e); - return null; - } - } - - private void execute(Runnable task) - { - try - { - _executor.execute(task); - } - catch (RejectedExecutionException e) - { - if (isRunning()) - LOG.warn("Execute failed", e); - else - LOG.trace("IGNORED", e); - - if (task instanceof Closeable) - { - try - { - ((Closeable)task).close(); - } - catch (Throwable e2) - { - LOG.trace("IGNORED", e2); - } - } - } - } - - @ManagedAttribute(value = "number of tasks consumed with PC mode", readonly = true) - public long getPCTasksConsumed() - { - return _pcMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks executed with PIC mode", readonly = true) - public long getPICTasksExecuted() - { - return _picMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks executed with PEC mode", readonly = true) - public long getPECTasksExecuted() - { - return _pecMode.longValue(); - } - - @ManagedAttribute(value = "number of tasks consumed with EPC mode", readonly = true) - public long getEPCTasksConsumed() - { - return _epcMode.longValue(); - } - - @ManagedAttribute(value = "whether this execution strategy is idle", readonly = true) - public boolean isIdle() - { - try (AutoLock l = _lock.lock()) - { - return _state == State.IDLE; - } - } - - @ManagedOperation(value = "resets the task counts", impact = "ACTION") - public void reset() - { - _pcMode.reset(); - _epcMode.reset(); - _pecMode.reset(); - _picMode.reset(); - } - - @Override - public String toString() - { - try (AutoLock l = _lock.lock()) - { - return toStringLocked(); - } - } - - public String toStringLocked() - { - StringBuilder builder = new StringBuilder(); - getString(builder); - getState(builder); - return builder.toString(); - } - - private void getString(StringBuilder builder) - { - builder.append(getClass().getSimpleName()); - builder.append('@'); - builder.append(Integer.toHexString(hashCode())); - builder.append('/'); - builder.append(_producer); - builder.append('/'); - } - - private void getState(StringBuilder builder) - { - builder.append(_state); - builder.append("/p="); - builder.append(_pending); - builder.append('/'); - builder.append(_tryExecutor); - builder.append("[pc="); - builder.append(getPCTasksConsumed()); - builder.append(",pic="); - builder.append(getPICTasksExecuted()); - builder.append(",pec="); - builder.append(getPECTasksExecuted()); - builder.append(",epc="); - builder.append(getEPCTasksConsumed()); - builder.append("]"); - builder.append("@"); - builder.append(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now())); + super(producer, executor); } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java similarity index 85% rename from jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java rename to jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java index 9d4f12094b5a..6e8055f67e88 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/EatWhatYouKillTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java @@ -20,21 +20,21 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.logging.StacklessLogging; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertNull; -public class EatWhatYouKillTest +public class AdaptiveExecutionStrategyTest { - private EatWhatYouKill ewyk; + private AdaptiveExecutionStrategy aes; - private void startEWYK(ExecutionStrategy.Producer producer) throws Exception + private void startAES(ExecutionStrategy.Producer producer) throws Exception { QueuedThreadPool executor = new QueuedThreadPool(); - ewyk = new EatWhatYouKill(producer, executor); - ewyk.start(); + aes = new AdaptiveExecutionStrategy(producer, executor); + aes.start(); ReservedThreadExecutor tryExecutor = executor.getBean(ReservedThreadExecutor.class); // Prime the executor so that there is a reserved thread. executor.tryExecute(() -> @@ -49,19 +49,19 @@ private void startEWYK(ExecutionStrategy.Producer producer) throws Exception @AfterEach public void dispose() throws Exception { - if (ewyk != null) - ewyk.stop(); + if (aes != null) + aes.stop(); } @Test public void testExceptionThrownByTask() throws Exception { - try (StacklessLogging ignored = new StacklessLogging(EatWhatYouKill.class)) + try (StacklessLogging ignored = new StacklessLogging(AdaptiveExecutionStrategy.class)) { AtomicReference detector = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(2); BlockingQueue tasks = new LinkedBlockingQueue<>(); - startEWYK(() -> + startAES(() -> { boolean proceed = detector.compareAndSet(null, new Throwable()); if (proceed) @@ -88,7 +88,7 @@ public void testExceptionThrownByTask() throws Exception }); // Start production in another thread. - ewyk.dispatch(); + aes.dispatch(); tasks.offer(new Task(() -> { @@ -96,7 +96,7 @@ public void testExceptionThrownByTask() throws Exception { // While thread1 runs this task, simulate // that thread2 starts producing. - ewyk.dispatch(); + aes.dispatch(); // Wait for thread2 to block in produce(). latch.await(); // Throw to verify that exceptions are handled correctly. @@ -108,8 +108,8 @@ public void testExceptionThrownByTask() throws Exception } }, Invocable.InvocationType.BLOCKING)); - // Wait until EWYK is idle. - while (!ewyk.isIdle()) + // Wait until AES is idle. + while (!aes.isIdle()) { Thread.sleep(10); } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java index 304e1eb59c43..d832e66445ca 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecuteProduceConsumeTest.java @@ -36,7 +36,7 @@ public class ExecuteProduceConsumeTest private final BlockingQueue _produce = new LinkedBlockingQueue<>(); private final Queue _executions = new LinkedBlockingQueue<>(); - private ExecuteProduceConsume _ewyk; + private ExecuteProduceConsume _exStrategy; private volatile Thread _producer; @BeforeEach @@ -67,7 +67,7 @@ public void before() Executor executor = _executions::add; - _ewyk = new ExecuteProduceConsume(producer, executor); + _exStrategy = new ExecuteProduceConsume(producer, executor); } @AfterEach @@ -82,7 +82,7 @@ public void after() public void testIdle() { _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } @Test @@ -91,9 +91,9 @@ public void testProduceOneNonBlockingTask() Task t0 = new Task(); _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); assertThat(t0.hasRun(), Matchers.equalTo(true)); - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); } @Test @@ -106,13 +106,13 @@ public void testProduceManyNonBlockingTask() _produce.add(tasks[i]); } _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); for (Task task : tasks) { assertThat(task.hasRun(), Matchers.equalTo(true)); } - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); } @Test @@ -126,7 +126,7 @@ public void run() { _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } }; thread.start(); @@ -136,10 +136,10 @@ public void run() assertEquals(thread, t0.getThread()); // Should have dispatched only one helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // which is make us idle - _ewyk.run(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + _exStrategy.run(); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // unblock task t0.unblock(); @@ -158,7 +158,7 @@ public void run() { _produce.add(t0); _produce.add(NULLTASK); - _ewyk.produce(); + _exStrategy.produce(); } }; thread.start(); @@ -167,16 +167,16 @@ public void run() t0.awaitRun(); // Should have dispatched only one helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // unblock task t0.unblock(); // will run to completion because are become idle thread.join(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // because we are idle, dispatched thread is noop - _ewyk.run(); + _exStrategy.run(); } @Test @@ -189,7 +189,7 @@ public void testBlockedInProduce() throws Exception public void run() { _produce.add(t0); - _ewyk.produce(); + _exStrategy.produce(); } }; thread0.start(); @@ -199,10 +199,10 @@ public void run() assertEquals(thread0, t0.getThread()); // Should have dispatched another helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // dispatched thread will block in produce - Thread thread1 = new Thread(_ewyk); + Thread thread1 = new Thread(_exStrategy); thread1.start(); // Spin @@ -215,10 +215,10 @@ public void run() assertEquals(thread1, _producer); // because we are producing, any other dispatched threads are noops - _ewyk.run(); + _exStrategy.run(); // ditto with execute - _ewyk.produce(); + _exStrategy.produce(); // Now if unblock the production by the dispatched thread final Task t1 = new Task(true); @@ -229,7 +229,7 @@ public void run() assertEquals(thread1, t1.getThread()); // and another thread will have been requested - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // If we unblock t1, it will overtake t0 and try to produce again! t1.unblock(); @@ -246,7 +246,7 @@ public void run() thread0.join(); // If the requested extra thread turns up, it is also noop because we are producing - _ewyk.run(); + _exStrategy.run(); // Give the idle job _produce.add(NULLTASK); @@ -266,7 +266,7 @@ public void testExecuteWhileIdling() throws Exception public void run() { _produce.add(t0); - _ewyk.produce(); + _exStrategy.produce(); } }; thread0.start(); @@ -276,13 +276,13 @@ public void run() assertEquals(thread0, t0.getThread()); // Should have dispatched another helper - assertEquals(_ewyk, _executions.poll()); + assertEquals(_exStrategy, _executions.poll()); // We will go idle when we next produce _produce.add(NULLTASK); // execute will return immediately because it did not yet see the idle. - _ewyk.produce(); + _exStrategy.produce(); // When we unblock t0, thread1 will see the idle, t0.unblock(); @@ -298,8 +298,8 @@ public void run() // When the dispatched thread turns up, it will see the second idle _produce.add(NULLTASK); - _ewyk.run(); - assertThat(_ewyk.isIdle(), Matchers.equalTo(true)); + _exStrategy.run(); + assertThat(_exStrategy.isIdle(), Matchers.equalTo(true)); // So that when t1 completes it does not produce again. t1.unblock(); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java index 2c2800840c9f..52d7e14d64be 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java @@ -44,7 +44,7 @@ public static Stream strategies() return Stream.of( ProduceExecuteConsume.class, ExecuteProduceConsume.class, - EatWhatYouKill.class + AdaptiveExecutionStrategy.class ).map(Arguments::of); } diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java similarity index 91% rename from tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java rename to tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java index 8c9bc153d5f8..724f7ac5f957 100644 --- a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/EWYKBenchmark.java +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/util/thread/strategy/jmh/AdaptiveExecutionStrategyBenchmark.java @@ -26,7 +26,7 @@ import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.ReservedThreadExecutor; -import org.eclipse.jetty.util.thread.strategy.EatWhatYouKill; +import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.eclipse.jetty.util.thread.strategy.ProduceConsume; import org.eclipse.jetty.util.thread.strategy.ProduceExecuteConsume; import org.openjdk.jmh.annotations.Benchmark; @@ -45,13 +45,13 @@ import org.openjdk.jmh.runner.options.TimeValue; @State(Scope.Benchmark) -public class EWYKBenchmark +public class AdaptiveExecutionStrategyBenchmark { static TestServer server; static ReservedThreadExecutor reserved; static Path directory; - @Param({"PC", "PEC", "EWYK"}) + @Param({"PC", "PEC", "AES"}) public static String strategyName; @Param({"true", "false"}) @@ -64,12 +64,12 @@ public class EWYKBenchmark public static void setupServer() throws Exception { // Make a test directory - directory = Files.createTempDirectory("ewyk"); + directory = Files.createTempDirectory("AES"); // Make some test files for (int i = 0; i < 75; i++) { - File.createTempFile("ewyk_benchmark", i + ".txt", directory.toFile()); + File.createTempFile("AES_benchmark", i + ".txt", directory.toFile()); } server = new TestServer(directory.toFile()); @@ -110,8 +110,8 @@ public static class ThreadState implements Runnable strategy = new ProduceExecuteConsume(connection, server); break; - case "EWYK": - strategy = new EatWhatYouKill(connection, server); + case "AES": + strategy = new AdaptiveExecutionStrategy(connection, server); break; default: @@ -172,7 +172,7 @@ public long testStrategy(ThreadState state) throws Exception public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() - .include(EWYKBenchmark.class.getSimpleName()) + .include(AdaptiveExecutionStrategyBenchmark.class.getSimpleName()) .warmupIterations(2) .measurementIterations(3) .forks(1)