Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename EWYK The AdaptiveExecutionStrategy #6353

Merged
merged 21 commits into from Jun 16, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;

import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
Expand All @@ -40,31 +41,30 @@
* <p>This strategy selects between the following sub-strategies:</p>
* <dl>
* <dt>ProduceConsume(PC)</dt>
* <dd>The producing thread consumes the task by executing it directly and then
* continues to produce.</dd>
* <dd>The producing thread consumes the task by running it directly
* and then continues to produce.</dd>
* <dt>ProduceInvokeConsume(PIC)</dt>
* <dd>The producing thread consumes the task by invoking it with {@link Invocable#invokeNonBlocking(Runnable)}
* <dd>The producing thread consumes the task by running it with {@link Invocable#invokeNonBlocking(Runnable)}
* and then continues to produce.</dd>
* <dt>ProduceExecuteConsume(PEC)</dt>
* <dd>The producing thread dispatches the task to a thread pool to be executed and then immediately resumes
* producing.</dd>
* <dd>The producing thread dispatches the task to a thread pool to be executed
* and then continues to produce.</dd>
* <dt>ExecuteProduceConsume(EPC)</dt>
* <dd>The producing thread consumes the task by executing it directly (as in PC mode) but then races with
* a pending producer thread to take over production.
* <dd>The producing thread consumes dispatches a pending producer to a thread pool,
* then consumes the task by running it directly (as in PC mode), then races with
* the pending producer thread to take over production.
* </dd>
* </dl>
* <p>The sub-strategy is selected as follows:</p>
* <dl>
gregw marked this conversation as resolved.
Show resolved Hide resolved
* <dt>PC</dt>
* <dd>If the produced task has been invoked with {@link Invocable#invokeNonBlocking(Runnable)}
* to indicate that it is {@link Invocable.InvocationType#NON_BLOCKING}.</dd>
* <dd>If the produced task is {@link Invocable.InvocationType#NON_BLOCKING}.</dd>
* <dt>EPC</dt>
* <dd>If the producing thread is not {@link Invocable.InvocationType#NON_BLOCKING}
* and a pending producer thread is available, either because there is already a pending producer
* or one is successfully started with {@link TryExecutor#tryExecute(Runnable)}.</dd>
* <dt>PIC</dt>
* <dd>If the produced task has used the {@link Invocable#getInvocationType()} API to
* indicate that it is {@link Invocable.InvocationType#EITHER}.</dd>
* <dd>If the produced task is {@link Invocable.InvocationType#EITHER} and EPC was not selected.</dd>
* <dt>PEC</dt>
* <dd>Otherwise.</dd>
* </dl>
Expand All @@ -88,7 +88,6 @@
* <p>This strategy was previously named EatWhatYouKill (EWYK) because its preference for a
* producer to directly consume the tasks that it produces is similar to a hunting proverb
* that says that a hunter should eat (i.e. consume) what they kill (i.e. produced).</p>
*
*/
@ManagedObject("Adaptive execution strategy")
public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements ExecutionStrategy
Expand All @@ -105,23 +104,25 @@ private enum State
REPRODUCING // There is an active producing thread and demand for more production.
}

/* The sub-strategies used by the strategy to consume tasks that are produced. */
/**
* The sub-strategies used by the strategy to consume tasks that are produced.
*/
private enum SubStrategy
{
/**
* Consumes produced tasks and resume producing.
* Consumes produced tasks and continues producing.
*/
PRODUCE_CONSUME,
/**
* Invokes produced tasks as non blocking and resume producing.
* Consumes produced tasks as non blocking and continues producing.
*/
PRODUCE_INVOKE_CONSUME,
/**
* Executes produced tasks and continue producing.
* Executes produced tasks and continues producing.
*/
PRODUCE_EXECUTE_CONSUME,
/**
* Executes a pending producer, consumes produced tasks and races pending producer to resume producing.
* Executes a pending producer, consumes produced tasks and races the pending producer to continue producing.
*/
EXECUTE_PRODUCE_CONSUME
}
Expand Down Expand Up @@ -191,6 +192,7 @@ public void produce()

/**
* Tries to become the producing thread and then produces and consumes tasks.
*
* @param wasPending True if the calling thread was started as a pending producer.
*/
private void tryProduce(boolean wasPending)
Expand All @@ -215,20 +217,20 @@ private void tryProduce(boolean wasPending)
case PRODUCING:
// The strategy is already producing, so another thread must be the producer.
// However, it may be just about to stop being the producer so we set the
// REPRODUCING state to force it to call #doProduce at least once more.
// REPRODUCING state to force it to produce at least once more.
_state = State.REPRODUCING;
return;

case REPRODUCING:
// Another thread is already producing and will already try another #doProduce.
// Another thread is already producing and will already try again to produce.
return;

default:
throw new IllegalStateException(toStringLocked());
}
}

// Determine the threads invocation type once outside of the production loop.
// Determine the thread's invocation type once, outside of the production loop.
boolean nonBlocking = Invocable.isNonBlockingInvocation();
while (isRunning())
{
Expand All @@ -245,7 +247,7 @@ private void tryProduce(boolean wasPending)
switch (_state)
{
case PRODUCING:
// The calling thread was the only producer, so it is now Idle and we return from production.
// The calling thread was the only producer, so it is now IDLE and we stop producing.
_state = State.IDLE;
return;

Expand All @@ -262,7 +264,7 @@ private void tryProduce(boolean wasPending)
}

// Consume the task according the selected sub-strategy, then
// continue producing only if the sub-strategy used returns true.
// continue producing only if the sub-strategy returns true.
if (consumeTask(task, selectSubStrategy(task, nonBlocking)))
continue;
return;
Expand All @@ -275,7 +277,8 @@ private void tryProduce(boolean wasPending)
}

/**
* Select the execution strategy.
* Selects the execution strategy.
*
* @param task The task to select the strategy for.
* @param nonBlocking True if the producing thread cannot block.
* @return The sub-strategy to use for the task.
Expand Down Expand Up @@ -345,16 +348,17 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
}

/**
* Consume a task with a sub-strategy.
* Consumes a task with a sub-strategy.
*
* @param task The task to consume.
* @param subStrategy The execution sub-strategy mode to use to consume it.
* @param subStrategy The execution sub-strategy to use to consume the task.
* @return True if the sub-strategy requires the caller to continue to produce tasks.
*/
private boolean consumeTask(Runnable task, SubStrategy subStrategy)
{
// Consume and/or execute task according to the selected mode.
if (LOG.isDebugEnabled())
LOG.debug("{} ss={} t={}/{} {}", this, subStrategy, task, Invocable.getInvocationType(task));
LOG.debug("ss={} t={}/{} {}", subStrategy, task, Invocable.getInvocationType(task), this);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbordet this is now inconsistent with the other debugs in the class.... but I guess that is not so important.

switch (subStrategy)
{
case PRODUCE_CONSUME:
Expand Down Expand Up @@ -398,6 +402,7 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy)

/**
* Runs a Runnable task, logging any thrown exception.
*
* @param task The task to run.
*/
private void runTask(Runnable task)
Expand All @@ -413,8 +418,9 @@ private void runTask(Runnable task)
}

/**
* Invoke a task in non-blocking mode.
* @param task The task to invoke.
* Runs a task in non-blocking mode.
*
* @param task The task to run in non-blocking mode.
*/
private void invokeAsNonBlocking(Runnable task)
{
Expand All @@ -429,7 +435,8 @@ private void invokeAsNonBlocking(Runnable task)
}

/**
* Produce a task, logging any Throwable that results.
* Produces a task, logging any Throwable that may result.
*
* @return A produced task or null if there were no tasks or a Throwable was thrown.
*/
private Runnable produceTask()
Expand All @@ -446,8 +453,9 @@ private Runnable produceTask()
}

/**
* Execute a task via the {@link Executor} used to construct this strategy.
* Executes a task via the {@link Executor} used to construct this strategy.
* If the execution is rejected and the task is a Closeable, then it is closed.
*
* @param task The task to execute.
*/
private void execute(Runnable task)
Expand All @@ -464,16 +472,7 @@ private void execute(Runnable task)
LOG.trace("IGNORED", e);

if (task instanceof Closeable)
{
try
{
((Closeable)task).close();
}
catch (Throwable e2)
{
LOG.trace("IGNORED", e2);
}
}
IO.close((Closeable)task);
}
}

Expand Down