Skip to content

Commit

Permalink
Rename EWYK The AdaptiveExecutionStrategy
Browse files Browse the repository at this point in the history
updates from review

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Jun 10, 2021
1 parent c2ecdc4 commit 8dd8f1b
Showing 1 changed file with 41 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,28 @@ public class AdaptiveExecutionStrategy extends ContainerLifeCycle implements Exe
private static final Logger LOG = LoggerFactory.getLogger(AdaptiveExecutionStrategy.class);

/**
* The state of this strategy
* The production state of the strategy.
*/
private enum State
{
IDLE, // No tasks or producers
PRODUCING, // There is an active producing thread
REPRODUCING // There is an active producing thread and demand for more production
IDLE, // No tasks or producers.
PRODUCING, // There is an active producing thread.
REPRODUCING // There is an active producing thread and demand for more production.
}

/* The sub-strategies used by this strategy to consume tasks that are produced */
/* The sub-strategies used by this strategy to consume tasks that are produced. */
private enum SubStrategy
{
/**
* Consume produced tasks and resume producing
* Consume produced tasks and resume producing.
*/
PRODUCE_CONSUME,
/**
* Invoke produced tasks as non blocking and resume producing
* Invoke produced tasks as non blocking and resume producing.
*/
PRODUCE_INVOKE_CONSUME,
/**
* Execute produced tasks and continue producing
* Execute produced tasks and continue producing.
*/
PRODUCE_EXECUTE_CONSUME,
/**
Expand Down Expand Up @@ -190,25 +190,25 @@ public void produce()
}

/**
* Try to become the producing thread and then produce and consume tasks
* @param wasPending True if this thread was started as a pending producer
* Try to become the producing thread and then produce and consume tasks.
* @param wasPending True if this thread was started as a pending producer.
*/
private void tryProduce(boolean wasPending)
{
if (LOG.isDebugEnabled())
LOG.debug("{} tryProduce {}", this, wasPending);

// Take the lock to atomically check if this thread can produce
// Take the lock to atomically check if this thread can produce.
try (AutoLock l = _lock.lock())
{
// If this thread was the pending producer, there is no longer one pending
// If this thread was the pending producer, there is no longer one pending.
if (wasPending)
_pending = false;

switch (_state)
{
case IDLE:
// The strategy was IDLE, so this thread can become the producer
// The strategy was IDLE, so this thread can become the producer.
_state = State.PRODUCING;
break;

Expand All @@ -220,15 +220,15 @@ private void tryProduce(boolean wasPending)
return;

case REPRODUCING:
// Another thread is already producing and will already try another #doProduce
// Another thread is already producing and will already try another #doProduce.
return;

default:
throw new IllegalStateException();
throw new IllegalStateException(toStringLocked());
}
}

// Determine this threads invocation type once outside of the production loop
// Determine this threads invocation type once outside of the production loop.
boolean nonBlocking = Invocable.isNonBlockingInvocation();
while (isRunning())
{
Expand All @@ -239,13 +239,13 @@ private void tryProduce(boolean wasPending)
// If we did not produce a task
if (task == null)
{
// the we need take the lock to atomically determine if we should keep producing
// take the lock to atomically determine if we should keep producing.
try (AutoLock l = _lock.lock())
{
switch (_state)
{
case PRODUCING:
// This thread was the only producer, so it is now Idle and we return from production
// This thread was the only producer, so it is now Idle and we return from production.
_state = State.IDLE;
return;

Expand All @@ -262,7 +262,7 @@ private void tryProduce(boolean wasPending)
}

// Consume the task according the selected sub-strategy, then
// continue producing only if the sub-strategy used returns true
// continue producing only if the sub-strategy used returns true.
if (consumeTask(task, selectSubStrategy(task, nonBlocking)))
continue;
return;
Expand All @@ -275,10 +275,10 @@ private void tryProduce(boolean wasPending)
}

/**
* Select the execution strategy
* @param task The task to select the strategy for
* @param nonBlocking True if the producing thread cannot block
* @return The sub-strategy mode to use for the task
* Select the execution strategy.
* @param task The task to select the strategy for.
* @param nonBlocking True if the producing thread cannot block.
* @return The sub-strategy mode to use for the task.
*/
private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
{
Expand All @@ -295,7 +295,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
// the task in blocking mode if a pending producer is available.
if (!nonBlocking)
{
// We need to take the lock so we can atomically check if a pending producer is available.
// Take the lock so we can atomically check if a pending producer is available.
try (AutoLock l = _lock.lock())
{
// If a pending producer is available or one can be started
Expand All @@ -318,7 +318,7 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
// the blocking task if a pending producer is available.
if (!nonBlocking)
{
// We need to take the lock so we can atomically check if a pending producer is available.
// Take the lock so we can atomically check if a pending producer is available.
try (AutoLock l = _lock.lock())
{
// If a pending producer is available or one can be started
Expand All @@ -333,18 +333,18 @@ private SubStrategy selectSubStrategy(Runnable task, boolean nonBlocking)
}
}

// Otherwise use PEC: the task is consumed by the executor and this thread continues to produce
// Otherwise use PEC: the task is consumed by the executor and this thread continues to produce.
return SubStrategy.PRODUCE_EXECUTE_CONSUME;

default:
throw new IllegalStateException();
throw new IllegalStateException(String.format("taskType=%s %s", taskType, this));
}
}

/** Consume a task
* @param task The task to consume
* @param subStrategy The execution sub-strategy mode to use to consume it
* @return True if the sub-strategy requires the caller to continue to produce tasks
/** Consume a task with a sub-strategy.
* @param task The task to consume.
* @param subStrategy The execution sub-strategy mode to use to consume it.
* @return True if the sub-strategy requires the caller to continue to produce tasks.
*/
private boolean consumeTask(Runnable task, SubStrategy subStrategy)
{
Expand Down Expand Up @@ -372,28 +372,28 @@ private boolean consumeTask(Runnable task, SubStrategy subStrategy)
_epcMode.increment();
runTask(task);

// Race the pending producer to produce again
// Race the pending producer to produce again.
try (AutoLock l = _lock.lock())
{
if (_state == State.IDLE)
{
// We beat the pending producer, so we will become the producer instead
// We beat the pending producer, so we will become the producer instead.
// The pending produce will become a noop if it arrives whilst we are producing,
// or it may take over if we subsequently do another EPC consumption
// or it may take over if we subsequently do another EPC consumption.
_state = State.PRODUCING;
return true;
}
}
// The pending producer is now producing, so this thread no longer produces
// The pending producer is now producing, so this thread no longer produces.
return false;

default:
throw new IllegalStateException(toString());
throw new IllegalStateException(String.format("ss=%s %s", subStrategy, this));
}
}

/**
* Run a Runnable task, logging any thrown exception
* Run a Runnable task, logging any thrown exception.
* @param task The task to run.
*/
private void runTask(Runnable task)
Expand All @@ -409,8 +409,8 @@ private void runTask(Runnable task)
}

/**
* Invoke a task in non-blocking mode
* @param task The task to invoke
* Invoke a task in non-blocking mode.
* @param task The task to invoke.
*/
private void invokeAsNonBlocking(Runnable task)
{
Expand All @@ -425,7 +425,7 @@ private void invokeAsNonBlocking(Runnable task)
}

/**
* Produce a task, logging any Throwable that results
* Produce a task, logging any Throwable that results.
* @return A produced task or null if there were no tasks or a Throwable was thrown.
*/
private Runnable produceTask()
Expand All @@ -444,7 +444,7 @@ private Runnable produceTask()
/**
* Execute a task via the {@link Executor} used to construct this strategy.
* If the execution is rejected and the task is a Closeable, then it is closed.
* @param task The task to execute
* @param task The task to execute.
*/
private void execute(Runnable task)
{
Expand Down

0 comments on commit 8dd8f1b

Please sign in to comment.