diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
index 5a70548..5fa3450 100644
--- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
+++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java
@@ -33,22 +33,33 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Hashtable;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
+import java.util.concurrent.locks.ReentrantLock;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
@@ -65,12 +76,15 @@
* necessary size. In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can
* be used to further govern the thread pool size.
*
+ * Additionally, this thread pool implementation supports scheduling of tasks.
+ * The scheduled tasks will execute on the main pool.
+ *
* New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling
* its {@link Builder#build() build()} method.
*
* @author David M. Lloyd
*/
-public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService {
+public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService, ScheduledExecutorService {
private static final Thread[] NO_THREADS = new Thread[0];
static {
@@ -196,6 +210,14 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl
* The context handler for the user-defined context.
*/
private final ContextHandler> contextHandler;
+ /**
+ * The task for scheduled execution.
+ */
+ private final SchedulerTask schedulerTask = new SchedulerTask();
+ /**
+ * The scheduler thread.
+ */
+ private final Thread schedulerThread;
// =======================================================
// Current state fields
@@ -350,6 +372,9 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl
this.handoffExecutor = builder.getHandoffExecutor();
this.exceptionHandler = builder.getExceptionHandler();
this.threadFactory = builder.getThreadFactory();
+ this.schedulerThread = threadFactory.newThread(schedulerTask);
+ String schedulerName = this.schedulerThread.getName();
+ this.schedulerThread.setName(schedulerName + " (scheduler)");
this.terminationTask = builder.getTerminationTask();
this.growthResistance = builder.getGrowthResistance();
this.contextHandler = builder.getContextHandler();
@@ -875,7 +900,7 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
Assert.checkNotNullParam("unit", unit);
if (timeout > 0) {
final Thread thread = Thread.currentThread();
- if (runningThreads.contains(thread)) {
+ if (runningThreads.contains(thread) || thread == schedulerThread) {
throw Messages.msg.cannotAwaitWithin();
}
Waiter waiters = this.terminationWaiters;
@@ -902,6 +927,39 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
return isTerminated();
}
+ // =======================================================
+ // ScheduledExecutorService
+ // =======================================================
+
+ public ScheduledFuture> schedule(final Runnable command, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new RunnableScheduledFuture(command, delay, unit));
+ }
+
+ public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new CallableScheduledFuture(callable, delay, unit));
+ }
+
+ public ScheduledFuture> scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new FixedRateRunnableScheduledFuture(command, initialDelay, period, unit));
+ }
+
+ public ScheduledFuture> scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) {
+ startScheduleThread();
+ return schedulerTask.schedule(new FixedDelayRunnableScheduledFuture(command, initialDelay, delay, unit));
+ }
+
+ private void startScheduleThread() {
+ // this should be fairly quick...
+ if (schedulerThread.getState() == Thread.State.NEW) try {
+ schedulerThread.start();
+ } catch (IllegalThreadStateException ignored) {
+ // make sure it's race-proof
+ }
+ }
+
// =======================================================
// Management
// =======================================================
@@ -950,6 +1008,8 @@ public void shutdown(boolean interrupt) {
if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) {
assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared
// we initiated shutdown
+ // terminate the scheduler
+ schedulerTask.shutdown();
// clear out all consumers and append a dummy waiter node
TaskNode tail = this.tail;
QNode tailNext;
@@ -2421,6 +2481,10 @@ public long getSpinMissCount() {
}
}
+ // =======================================================
+ // Basic task wrapper
+ // =======================================================
+
final class Task implements Runnable {
private final Runnable delegate;
@@ -2478,4 +2542,846 @@ public String toString() {
return "Task{delegate=" + delegate + ", contextClassLoader=" + contextClassLoader + '}';
}
}
+
+ // =======================================================
+ // Scheduled future tasks
+ // =======================================================
+
+ static final int ASF_ST_WAITING = 0;
+ static final int ASF_ST_CANCELLED = 1;
+ static final int ASF_ST_SUBMITTED = 2;
+ static final int ASF_ST_RUNNING = 3;
+ static final int ASF_ST_FINISHED = 4;
+ static final int ASF_ST_FAILED = 5;
+ static final int ASF_ST_REJECTED = 6;
+
+ static final AbstractScheduledFuture>[] NO_FUTURES = new AbstractScheduledFuture>[0];
+
+ static final AtomicLong SCHEDULED_TASK_SEQ = new AtomicLong();
+
+ /**
+ * An implementation of {@link ScheduledFuture} which is wrapped by {@link Task}.
+ *
+ * @param the result type
+ */
+ abstract class AbstractScheduledFuture implements ScheduledFuture, Runnable {
+ final long seq = SCHEDULED_TASK_SEQ.getAndIncrement();
+ /**
+ * The task which is wrapping this one.
+ */
+ final Task wrappingTask;
+ /**
+ * The scheduled time for this task, in nanoseconds since the scheduler thread was born.
+ * Can be mutated in subclasses if the task is recurring, but only under lock.
+ */
+ volatile long when;
+ /**
+ * The state of this task; one of {@code ASF_ST_*}.
+ */
+ volatile int state = ASF_ST_WAITING;
+ /**
+ * The actual result; only valid in {@code ASF_ST_FINISHED} (where it is of type {@code V}),
+ * or in {@code ASF_ST_FAILED} (where it is of type {@code Throwable}),
+ * or in {@code ASF_ST_REJECTED} (where it is of type {@code RejectedExecutionException}).
+ */
+ volatile Object result;
+ /**
+ * The thread which is currently live for this task.
+ */
+ Thread liveThread;
+
+ AbstractScheduledFuture(long delay, TimeUnit unit) {
+ when = Math.addExact(schedulerTask.age(), unit.toNanos(delay));
+ wrappingTask = new Task(this, contextHandler.captureContext());
+ }
+
+ public int compareTo(final Delayed o) {
+ return o instanceof AbstractScheduledFuture> ? compareTo((AbstractScheduledFuture>) o) : wrongType();
+ }
+
+ public int compareTo(final AbstractScheduledFuture> other) {
+ int cmp = Long.compare(when, other.when);
+ if (cmp == 0) cmp = Long.compare(seq, other.seq);
+ return cmp;
+ }
+
+ public long getDelay(final TimeUnit unit) {
+ return unit.convert(Math.max(0, when - schedulerTask.age()), TimeUnit.NANOSECONDS);
+ }
+
+ public boolean isCancelled() {
+ return state == ASF_ST_CANCELLED;
+ }
+
+ public boolean isDone() {
+ int state = this.state;
+ return state == ASF_ST_FINISHED || state == ASF_ST_FAILED || state == ASF_ST_CANCELLED || state == ASF_ST_REJECTED;
+ }
+
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ int state;
+ synchronized (this) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED: {
+ this.state = ASF_ST_CANCELLED;
+ return true;
+ }
+ case ASF_ST_RUNNING: {
+ if (mayInterruptIfRunning) {
+ liveThread.interrupt();
+ }
+ return false;
+ }
+ case ASF_ST_CANCELLED: {
+ return true;
+ }
+ default: {
+ return false;
+ }
+ }
+ }
+ }
+
+ public V get() throws InterruptedException, ExecutionException {
+ int state;
+ synchronized (this) {
+ for (;;) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ wait();
+ break;
+ }
+ case ASF_ST_CANCELLED: {
+ throw new CancellationException("Task was cancelled");
+ }
+ case ASF_ST_REJECTED: {
+ throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result);
+ }
+ case ASF_ST_FAILED: {
+ throw new ExecutionException((Throwable) result);
+ }
+ case ASF_ST_FINISHED: {
+ //noinspection unchecked
+ return (V) result;
+ }
+ }
+ }
+ }
+ }
+
+ public V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long remaining = unit.toNanos(timeout);
+ long start = System.nanoTime();
+ int state;
+ synchronized (this) {
+ for (;;) {
+ state = this.state;
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ if (remaining <= 0) {
+ throw new TimeoutException();
+ }
+ wait(remaining / 1_000_000L, (int) (remaining % 1_000_000));
+ break;
+ }
+ case ASF_ST_CANCELLED: {
+ throw new CancellationException("Task was cancelled");
+ }
+ case ASF_ST_REJECTED: {
+ throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result);
+ }
+ case ASF_ST_FAILED: {
+ throw new ExecutionException((Throwable) result);
+ }
+ case ASF_ST_FINISHED: {
+ //noinspection unchecked
+ return (V) result;
+ }
+ }
+ long newStart = System.nanoTime();
+ long elapsed = newStart - start;
+ remaining -= elapsed;
+ start = newStart;
+ }
+ }
+ }
+
+ public void run() {
+ stateTest: synchronized (this) {
+ switch (state) {
+ case ASF_ST_SUBMITTED: {
+ this.state = ASF_ST_RUNNING;
+ liveThread = currentThread();
+ break stateTest;
+ }
+ case ASF_ST_CANCELLED: {
+ // cancelled after submit but before it was run
+ return;
+ }
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // a recurring task terminated abruptly, but was still found in the schedule
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ try {
+ finish(performTask());
+ } catch (Throwable t) {
+ fail(t);
+ }
+ }
+
+ void submit() {
+ synchronized (this) {
+ stateTest: switch (state) {
+ case ASF_ST_WAITING: {
+ this.state = ASF_ST_SUBMITTED;
+ //noinspection UnnecessaryLabelOnBreakStatement
+ break stateTest;
+ }
+ case ASF_ST_CANCELLED: {
+ // do not actually submit
+ return;
+ }
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // a recurring task terminated abruptly, but was still found in the schedule
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ try {
+ /* copied from {@link #execute(Runnable)} */
+ int result = tryExecute(wrappingTask);
+ boolean ok = false;
+ if (result == EXE_OK) {
+ // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition
+ if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) {
+ deallocateThread();
+ }
+ if (UPDATE_STATISTICS) submittedTaskCounter.increment();
+ return;
+ } else if (result == EXE_CREATE_THREAD) try {
+ ok = doStartThread(wrappingTask);
+ } finally {
+ if (! ok) deallocateThread();
+ } else {
+ if (UPDATE_STATISTICS) rejectedTaskCounter.increment();
+ if (result == EXE_REJECT_SHUTDOWN) {
+ rejectShutdown(wrappingTask);
+ } else {
+ assert result == EXE_REJECT_QUEUE_FULL;
+ rejectQueueFull(wrappingTask);
+ }
+ }
+ } catch (RejectedExecutionException e) {
+ reject(e);
+ } catch (Throwable t) {
+ reject(new RejectedExecutionException("Task submission failed", t));
+ }
+ }
+
+ IllegalStateException badState() {
+ return new IllegalStateException("Task was not in expected state");
+ }
+
+ void reject(RejectedExecutionException e) {
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_SUBMITTED: {
+ result = e;
+ this.state = ASF_ST_REJECTED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ }
+
+ void fail(Throwable t) {
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_WAITING:
+ case ASF_ST_SUBMITTED:
+ case ASF_ST_RUNNING: {
+ result = t;
+ this.state = ASF_ST_FAILED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ case ASF_ST_CANCELLED:
+ case ASF_ST_FINISHED:
+ case ASF_ST_FAILED:
+ case ASF_ST_REJECTED: {
+ // ignore the failure, though we're likely in an invalid state
+ return;
+ }
+ }
+ }
+ }
+
+ void finish(V result) {
+ // overridden in subclasses where the task repeats
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_RUNNING: {
+ this.result = result;
+ this.state = ASF_ST_FINISHED;
+ liveThread = null;
+ notifyAll();
+ return;
+ }
+ default: {
+ // invalid state
+ fail(badState());
+ return;
+ }
+ }
+ }
+ }
+
+ abstract V performTask() throws Exception;
+
+ public String toString() {
+ return toString(new StringBuilder()).toString();
+ }
+
+ StringBuilder toString(StringBuilder b) {
+ return b.append("future result of ");
+ }
+ }
+
+ static int wrongType() throws ClassCastException {
+ throw new ClassCastException("Wrong task type for comparison");
+ }
+
+ final class RunnableScheduledFuture extends AbstractScheduledFuture {
+ final Runnable runnable;
+
+ RunnableScheduledFuture(final Runnable runnable, final long delay, final TimeUnit unit) {
+ super(delay, unit);
+ this.runnable = runnable;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ final class CallableScheduledFuture extends AbstractScheduledFuture {
+ final Callable callable;
+
+ CallableScheduledFuture(final Callable callable, final long delay, final TimeUnit unit) {
+ super(delay, unit);
+ this.callable = callable;
+ }
+
+ V performTask() throws Exception {
+ return callable.call();
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(callable);
+ }
+ }
+
+ abstract class RepeatingScheduledFuture extends AbstractScheduledFuture {
+ final long period;
+
+ RepeatingScheduledFuture(final long delay, final long period, final TimeUnit unit) {
+ super(delay, unit);
+ this.period = unit.toNanos(period);
+ }
+
+ /**
+ * Adjust the time of this future for resubmission, after the task has run successfully.
+ */
+ abstract void adjustTime();
+
+ public void run() {
+ super.run();
+ // if an exception is thrown, we will have failed already anyway
+ adjustTime();
+ synchronized (this) {
+ switch (state) {
+ case ASF_ST_RUNNING: {
+ state = ASF_ST_WAITING;
+ schedulerTask.schedule(this);
+ return;
+ }
+ default: {
+ // in all other cases, we failed so the task should not be rescheduled
+ return;
+ }
+ }
+ }
+ }
+
+ void finish(final V result) {
+ // repeating tasks never actually finish
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b.append("repeating "));
+ }
+ }
+
+ final class FixedRateRunnableScheduledFuture extends RepeatingScheduledFuture {
+ final Runnable runnable;
+
+ FixedRateRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) {
+ super(delay, period, unit);
+ this.runnable = runnable;
+ }
+
+ void adjustTime() {
+ // if this results in a time in the past, the next run will happen immediately
+ this.when += period;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ final class FixedDelayRunnableScheduledFuture extends RepeatingScheduledFuture {
+ final Runnable runnable;
+
+ FixedDelayRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) {
+ super(delay, period, unit);
+ this.runnable = runnable;
+ }
+
+ void adjustTime() {
+ this.when = schedulerTask.age() + period;
+ }
+
+ Void performTask() {
+ runnable.run();
+ return null;
+ }
+
+ StringBuilder toString(final StringBuilder b) {
+ return super.toString(b).append(runnable);
+ }
+ }
+
+ // =======================================================
+ // Scheduler task thread worker
+ // =======================================================
+
+ final class SchedulerTask implements Runnable {
+ final long startMark = System.nanoTime();
+ final ReentrantLock ql = new ReentrantLock();
+ final Condition qc = ql.newCondition();
+ // todo: switch to array queue on a more optimistic day
+ // protected by {@link #ql}
+ ScheduledFutureQueue q = new TreeSetQueue();
+ boolean shutdownDetected;
+
+ void shutdown() {
+ ql.lock();
+ try {
+ shutdownDetected = true;
+ qc.signal();
+ } finally {
+ ql.unlock();
+ }
+ }
+
+ public void run() {
+ ScheduledFutureQueue q = this.q;
+ AbstractScheduledFuture>[] remainingFutures;
+ AbstractScheduledFuture> first;
+ long startMark = this.startMark;
+ outerLoop: for (;;) {
+ ql.lock();
+ try {
+ innerLoop: for (;;) {
+ long now = System.nanoTime();
+ if (shutdownDetected) {
+ // drop all tasks and return
+ remainingFutures = q.toArray();
+ q.clear();
+ break outerLoop;
+ } else if (q.isEmpty()) try {
+ qc.await();
+ } catch (InterruptedException ignored) {
+ // clear interrupt status
+ continue innerLoop;
+ } else {
+ first = q.first();
+ long firstWhen = first.when;
+ long currentWhen = max(0, now - startMark);
+ if (firstWhen <= currentWhen) {
+ // it's ready; run it outside of the lock
+ q.pollFirst();
+ //noinspection UnnecessaryLabelOnBreakStatement
+ break innerLoop;
+ } else {
+ long waitTime = firstWhen - currentWhen;
+ try {
+ qc.awaitNanos(waitTime);
+ } catch (InterruptedException e) {
+ // clear interrupt status
+ continue innerLoop;
+ }
+ }
+ }
+ }
+ } finally {
+ ql.unlock();
+ }
+ // outside of lock; `break innerLoop` goes ↓ here
+ first.submit();
+ // continue loop to find the next task
+ }
+ // ↓ `break outerLoop` goes here ↓
+ if (remainingFutures.length > 0) {
+ for (AbstractScheduledFuture> future : remainingFutures) {
+ future.cancel(true);
+ }
+ }
+ return;
+ }
+
+ > F schedule(final F item) {
+ Task wrappingTask = item.wrappingTask;
+ if (item.when <= age()) {
+ // just submit it now
+ item.submit();
+ return item;
+ }
+ ql.lock();
+ try {
+ if (shutdownDetected) {
+ rejectShutdown(wrappingTask);
+ return item;
+ }
+ // check to see if we need to wake up the scheduler
+ boolean first;
+ for (;;) try {
+ first = q.insertAndCheckForFirst(item);
+ break;
+ } catch (QueueFullException ignored) {
+ q = q.grow();
+ }
+ if (first) {
+ // the delay time has changed, so wake up the waiter
+ qc.signal();
+ }
+ return item;
+ } finally {
+ ql.unlock();
+ }
+ }
+
+ long age() {
+ return System.nanoTime() - startMark;
+ }
+ }
+
+ // =======================================================
+ // Schedule queue API & implementations
+ // =======================================================
+
+ interface ScheduledFutureQueue {
+ AbstractScheduledFuture>[] toArray();
+
+ void clear();
+
+ boolean isEmpty();
+
+ int size();
+
+ AbstractScheduledFuture> first();
+
+ @SuppressWarnings("UnusedReturnValue") // must match signature for TreeSet
+ AbstractScheduledFuture> pollFirst();
+
+ /**
+ * Insert the item in order, checking to see if it was added as the first item.
+ *
+ * @param item the item to insert (must not be {@code null})
+ * @return {@code true} if the item is first, {@code false} otherwise
+ * @throws QueueFullException if the queue is full; it must be recreated in this case
+ */
+ boolean insertAndCheckForFirst(AbstractScheduledFuture> item) throws QueueFullException;
+
+ /**
+ * Get a new queue with the same contents as this one, but with a larger capacity.
+ *
+ * @return the grown queue
+ */
+ ScheduledFutureQueue grow();
+ }
+
+ static final class ArrayQueue implements ScheduledFutureQueue {
+ final AbstractScheduledFuture>[] array;
+ // the removal point (lowest+least index)
+ int head;
+ // the number of elements
+ int size;
+
+ ArrayQueue(int capacity) {
+ // next power of two
+ capacity = Integer.highestOneBit(Math.max(capacity, 2) - 1) << 1;
+ array = new AbstractScheduledFuture>[capacity];
+ }
+
+ private ArrayQueue(final ArrayQueue original, final int newCapacity) {
+ assert Integer.bitCount(newCapacity) == 1;
+ array = original.toArray(newCapacity);
+ head = 0;
+ size = original.size;
+ }
+
+ public AbstractScheduledFuture>[] toArray() {
+ return toArray(size());
+ }
+
+ public AbstractScheduledFuture>[] toArray(int size) {
+ int head = this.head;
+ int end = head + size;
+ AbstractScheduledFuture>[] copy = Arrays.copyOfRange(array, head, end);
+ if (end > array.length) {
+ // copy the wrapped elements
+ System.arraycopy(array, 0, copy, size - (array.length - head), size - array.length);
+ }
+ return copy;
+ }
+
+ public void clear() {
+ Arrays.fill(array, null);
+ head = size = 0;
+ }
+
+ public boolean isEmpty() {
+ return size == 0;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public AbstractScheduledFuture> first() {
+ if (size == 0) {
+ throw new NoSuchElementException();
+ }
+ return array[head];
+ }
+
+ public AbstractScheduledFuture> pollFirst() {
+ if (size == 0) {
+ throw new NoSuchElementException();
+ }
+ int head = this.head;
+ AbstractScheduledFuture> item = array[head];
+ array[head] = null;
+ this.size --;
+ int mask = array.length - 1;
+ this.head = head + 1 & mask;
+ return item;
+ }
+
+ public boolean insertAndCheckForFirst(final AbstractScheduledFuture> item) {
+ // find the insertion point
+ int size = this.size;
+ AbstractScheduledFuture>[] array = this.array;
+ int arrayLen = array.length;
+ if (size == arrayLen) {
+ throw new QueueFullException();
+ }
+ int mask = arrayLen - 1;
+ int idx = 0;
+ int high = size - 1;
+ // at this point and onwards, there is definitely space in the array
+ int head = this.head;
+
+ while (idx <= high) {
+ int mid = (idx + high) >>> 1;
+ AbstractScheduledFuture> testVal = array[head + mid & mask];
+ int cmp = testVal.compareTo(item);
+ if (cmp < 0) {
+ idx = mid + 1;
+ } else if (cmp > 0) {
+ high = mid - 1;
+ } else {
+ // we found this task already present in the queue (should never happen)
+ return false;
+ }
+ }
+
+ return insertAt(idx, item);
+ }
+
+ /**
+ * Move all elements starting at the given index forward to make space at that position, wrapping if needed.
+ *
+ * @param idx the element index relative to {@code head} to open up
+ */
+ void moveForward(final int idx, final AbstractScheduledFuture> storeVal) {
+ AbstractScheduledFuture>[] array = this.array;
+ int size = this.size;
+ int moveCnt = size - idx;
+ int arrayLength = array.length;
+ int mask = arrayLength - 1;
+ int head = this.head;
+ int start = head + idx;
+ // TODO:
+ // - change this to three calls to System.arraycopy
+ // - one for the already-wrapped portion
+ // - one for the portion that is being newly wrapped
+ // - one for the leading (pre-wrap) portion
+ for (int i = moveCnt - 1; i >= 0; i --) {
+ int pos = start + i;
+ array[pos + 1 & mask] = array[pos & mask];
+ }
+ array[start & mask] = storeVal;
+ }
+
+ /**
+ * Move all elements starting before the given index backward to make space at that position, wrapping if needed.
+ *
+ * @param idx the element index relative to {@code head} to open up
+ */
+ void moveBackward(final int idx, final AbstractScheduledFuture> storeVal) {
+ AbstractScheduledFuture>[] array = this.array;
+ int size = this.size;
+ int moveCnt = size - idx + 1;
+ int arrayLength = array.length;
+ int mask = arrayLength - 1;
+ int head = this.head;
+ int start = head + idx - 1;
+ // TODO:
+ // - change this to three calls to System.arraycopy
+ // - one for the leading (pre-wrap) portion
+ // - one for the portion that is being newly de-wrapped
+ // - one for the already-wrapped portion
+ for (int i = moveCnt - 1; i >= 0; i --) {
+ int pos = start - i;
+ array[pos - 1 & mask] = array[pos & mask];
+ }
+ array[start & mask] = storeVal;
+ this.head = head - 1 & mask;
+ }
+
+ boolean insertAt(final int idx, final AbstractScheduledFuture> item) {
+ // this is a separate method for easier testing of the arraycopy algebraic mayhem
+ int size = this.size;
+ // no matter what, we're growing by one
+ this.size = size + 1;
+ int halfSize = size + 1 >> 1;
+ if (idx >= halfSize) {
+ moveForward(idx, item);
+ } else {
+ moveBackward(idx, item);
+ }
+ return idx == 0;
+ }
+
+ public ScheduledFutureQueue grow() {
+ // todo: calibrate this threshold
+ if (array.length >= 256) {
+ return new TreeSetQueue(this);
+ } else {
+ return new ArrayQueue(this, array.length << 1);
+ }
+ }
+
+ // test points for white-box unit tests
+
+ int testPoint_arrayLength() {
+ return array.length;
+ }
+
+ int testPoint_head() {
+ return head;
+ }
+
+ void testPoint_setHead(int newHead) {
+ head = newHead;
+ }
+
+ void testPoint_setSize(int newSize) {
+ size = newSize;
+ }
+
+ AbstractScheduledFuture> testPoint_getArrayItem(int index) {
+ return array[index & array.length - 1];
+ }
+
+ AbstractScheduledFuture> testPoint_setArrayItem(int index, AbstractScheduledFuture> item) {
+ try {
+ return array[index & array.length - 1];
+ } finally {
+ array[index & array.length - 1] = item;
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static class TreeSetQueue extends TreeSet> implements ScheduledFutureQueue {
+ TreeSetQueue(final ScheduledFutureQueue original) {
+ Collections.addAll(this, original.toArray());
+ }
+
+ TreeSetQueue() {
+ }
+
+ public AbstractScheduledFuture>[] toArray() {
+ return super.toArray(NO_FUTURES);
+ }
+
+ public boolean insertAndCheckForFirst(final AbstractScheduledFuture> item) {
+ add(item);
+ return item == first();
+ }
+
+ public ScheduledFutureQueue grow() {
+ return this;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ static final class QueueFullException extends RuntimeException {
+ QueueFullException() {
+ super(null, null, false, false);
+ }
+ }
}
diff --git a/src/test/java/org/jboss/threads/ArrayQueueTests.java b/src/test/java/org/jboss/threads/ArrayQueueTests.java
new file mode 100644
index 0000000..4ffcd4f
--- /dev/null
+++ b/src/test/java/org/jboss/threads/ArrayQueueTests.java
@@ -0,0 +1,247 @@
+package org.jboss.threads;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ArrayQueueTests {
+
+ static EnhancedQueueExecutor eqe;
+
+ static EnhancedQueueExecutor.AbstractScheduledFuture>[] ITEMS;
+
+ @BeforeClass
+ public static void beforeAll() {
+ eqe = new EnhancedQueueExecutor.Builder().build();
+ ITEMS = new EnhancedQueueExecutor.AbstractScheduledFuture[32];
+ for (int i = 0; i < 32; i ++) {
+ final String toString = "[" + i + "]";
+ ITEMS[i] = eqe.new RunnableScheduledFuture(new Runnable() {
+ public void run() {
+ // nothing
+ }
+
+ public String toString() {
+ return toString;
+ }
+ }, 0, TimeUnit.DAYS);
+ }
+ }
+
+ @Test
+ public void testMoveForward() {
+ EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16);
+
+ int head = 5;
+ aq.testPoint_setHead(head);
+ aq.testPoint_setArrayItem(head + 0, ITEMS[0]);
+ aq.testPoint_setArrayItem(head + 1, ITEMS[1]);
+ aq.testPoint_setArrayItem(head + 2, ITEMS[2]);
+ aq.testPoint_setArrayItem(head + 3, ITEMS[3]);
+ aq.testPoint_setSize(4);
+
+ aq.moveForward(2, ITEMS[4]);
+
+ assertEquals(head, aq.testPoint_head());
+
+ assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4));
+ }
+
+ @Test
+ public void testMoveForwardWrap() {
+ EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16);
+
+ int head = 14;
+ aq.testPoint_setHead(head);
+ aq.testPoint_setArrayItem(head + 0, ITEMS[0]);
+ aq.testPoint_setArrayItem(head + 1, ITEMS[1]);
+ aq.testPoint_setArrayItem(head + 2, ITEMS[2]);
+ aq.testPoint_setArrayItem(head + 3, ITEMS[3]);
+ aq.testPoint_setSize(4);
+
+ aq.moveForward(2, ITEMS[4]);
+
+ assertEquals(head, aq.testPoint_head());
+
+ assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4));
+ }
+
+ @Test
+ public void testMoveBackward() {
+ EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16);
+
+ int head = 5;
+ aq.testPoint_setHead(head);
+ aq.testPoint_setArrayItem(head + 0, ITEMS[0]);
+ aq.testPoint_setArrayItem(head + 1, ITEMS[1]);
+ aq.testPoint_setArrayItem(head + 2, ITEMS[2]);
+ aq.testPoint_setArrayItem(head + 3, ITEMS[3]);
+ aq.testPoint_setSize(4);
+
+ aq.moveBackward(2, ITEMS[4]);
+
+ assertEquals(head - 1, aq.testPoint_head());
+ head--;
+
+ assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4));
+ }
+
+ @Test
+ public void testMoveBackwardWrap() {
+ EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16);
+
+ int head = 14;
+ aq.testPoint_setHead(head);
+ aq.testPoint_setArrayItem(head + 0, ITEMS[0]);
+ aq.testPoint_setArrayItem(head + 1, ITEMS[1]);
+ aq.testPoint_setArrayItem(head + 2, ITEMS[2]);
+ aq.testPoint_setArrayItem(head + 3, ITEMS[3]);
+ aq.testPoint_setSize(4);
+
+ aq.moveBackward(2, ITEMS[4]);
+
+ assertEquals(head - 1, aq.testPoint_head());
+ head--;
+
+ assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4));
+ }
+
+ @Test
+ public void testQueueBehavior() {
+ EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16);
+
+ // tc 0 (n/a)
+ assertTrue(aq.isEmpty());
+ assertEquals(0, aq.size());
+ assertEquals(0, aq.testPoint_head());
+ assertEquals(16, aq.testPoint_arrayLength());
+
+ // tc 1
+ aq.insertAt(0, ITEMS[0]);
+ assertFalse(aq.isEmpty());
+ assertEquals(1, aq.size());
+ assertEquals(0, aq.testPoint_head());
+ assertSame(ITEMS[0], aq.testPoint_getArrayItem(0));
+ assertNull(aq.testPoint_getArrayItem(1));
+ assertNull(aq.testPoint_getArrayItem(15));
+
+ // removal
+ assertSame(ITEMS[0], aq.pollFirst());
+ assertTrue(aq.isEmpty());
+ assertEquals(0, aq.size());
+ assertEquals(1, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(0));
+ assertNull(aq.testPoint_getArrayItem(1));
+
+ // tc 1 (but this time with head == 1)
+ aq.insertAt(0, ITEMS[1]);
+ assertFalse(aq.isEmpty());
+ assertEquals(1, aq.size());
+ assertEquals(1, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertNull(aq.testPoint_getArrayItem(2));
+
+ // tc 1 (but with head == 1 and size == 1)
+ aq.insertAt(1, ITEMS[2]);
+ assertFalse(aq.isEmpty());
+ assertEquals(2, aq.size());
+ assertEquals(1, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+
+ // tc 2 (but with head == 1 and size == 2)
+ aq.insertAt(0, ITEMS[3]);
+ assertFalse(aq.isEmpty());
+ assertEquals(3, aq.size()); // halfSize == 2
+ // head moves back to 0
+ assertEquals(0, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(15));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+
+ // tc 2 (but with head == 0 and size == 3)
+ aq.insertAt(0, ITEMS[4]);
+ assertFalse(aq.isEmpty());
+ assertEquals(4, aq.size());
+ // head wraps around to 15
+ assertEquals(15, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(14));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(15));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+
+ // tc 2 (but with head == 15 and size == 4)
+ aq.insertAt(0, ITEMS[5]);
+ assertFalse(aq.isEmpty());
+ assertEquals(5, aq.size());
+ assertEquals(14, aq.testPoint_head());
+ assertNull(aq.testPoint_getArrayItem(13));
+ assertSame(ITEMS[5], aq.testPoint_getArrayItem(14));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(15));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+
+ // tc
+ aq.insertAt(1, ITEMS[6]);
+
+ assertNull(aq.testPoint_getArrayItem(12));
+ assertSame(ITEMS[5], aq.testPoint_getArrayItem(13));
+ assertSame(ITEMS[6], aq.testPoint_getArrayItem(14));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(15));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+
+ aq.insertAt(0, ITEMS[7]);
+
+ assertNull(aq.testPoint_getArrayItem(11));
+ assertSame(ITEMS[7], aq.testPoint_getArrayItem(12));
+ assertSame(ITEMS[5], aq.testPoint_getArrayItem(13));
+ assertSame(ITEMS[6], aq.testPoint_getArrayItem(14));
+ assertSame(ITEMS[4], aq.testPoint_getArrayItem(15));
+ assertSame(ITEMS[3], aq.testPoint_getArrayItem(0));
+ assertSame(ITEMS[1], aq.testPoint_getArrayItem(1));
+ assertSame(ITEMS[2], aq.testPoint_getArrayItem(2));
+ assertNull(aq.testPoint_getArrayItem(3));
+ }
+
+ @AfterClass
+ public static void afterAll() throws InterruptedException {
+ try {
+ eqe.shutdown();
+ eqe.awaitTermination(30, TimeUnit.SECONDS);
+ } finally {
+ eqe = null;
+ }
+ }
+}
diff --git a/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java
new file mode 100644
index 0000000..1742ba1
--- /dev/null
+++ b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java
@@ -0,0 +1,167 @@
+package org.jboss.threads;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+public class ScheduledEnhancedQueueExecutorTest {
+
+ @Test
+ public void testCancel() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ ScheduledFuture> future = eqe.schedule(() -> fail("Should never run"), 1000, TimeUnit.DAYS);
+ Thread.sleep(400); // a few ms to let things percolate
+ assertFalse(future.isCancelled());
+ // this should succeed since the task isn't submitted yet
+ assertTrue(future.cancel(false));
+ assertTrue(future.isCancelled());
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testCancelWhileRunning() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ CountDownLatch latch = new CountDownLatch(1);
+ ScheduledFuture future = eqe.schedule(() -> { latch.countDown(); Thread.sleep(1_000_000_000L); return Boolean.TRUE; }, 1, TimeUnit.NANOSECONDS);
+ assertTrue("Timely task execution", latch.await(5, TimeUnit.SECONDS));
+ assertFalse(future.isCancelled());
+ // task is running; cancel will fail
+ assertFalse(future.cancel(false));
+ assertFalse(future.isCancelled());
+ assertFalse(future.isDone());
+ // now try to interrupt it (cancel still fails but the interrupt should be delivered)
+ assertFalse(future.cancel(true));
+ assertFalse(future.isCancelled());
+ // now get it
+ try {
+ future.get(100L, TimeUnit.MILLISECONDS);
+ fail("Expected exception");
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ assertTrue("Expected " + cause + " to be an InterruptedException", cause instanceof InterruptedException);
+ }
+ assertTrue(future.isDone());
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testReasonableExecutionDelay() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ Callable task = () -> Boolean.TRUE;
+ long start = System.nanoTime();
+ ScheduledFuture future = eqe.schedule(task, 1, TimeUnit.MILLISECONDS);
+ Boolean result = future.get();
+ long execTime = System.nanoTime() - start;
+ long expected = 1_000_000L;
+ assertTrue("Execution too short (expected at least " + expected + ", got " + execTime + ")", execTime >= expected);
+ assertNotNull(result);
+ assertTrue(result.booleanValue());
+ start = System.nanoTime();
+ future = eqe.schedule(task, 500, TimeUnit.MILLISECONDS);
+ result = future.get();
+ execTime = System.nanoTime() - start;
+ expected = 500_000_000L;
+ assertTrue("Execution too short (expected at least " + expected + ", got " + execTime + ")", execTime >= expected);
+ assertNotNull(result);
+ assertTrue(result.booleanValue());
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testFixedRateExecution() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ AtomicInteger ai = new AtomicInteger();
+ CountDownLatch completeLatch = new CountDownLatch(1);
+ ScheduledFuture> future = eqe.scheduleAtFixedRate(() -> {
+ if (ai.incrementAndGet() == 5) {
+ completeLatch.countDown();
+ }
+ }, 20, 50, TimeUnit.MILLISECONDS);
+ assertTrue("Completion of enough iterations", completeLatch.await(1, TimeUnit.SECONDS));
+ assertFalse(future.isDone()); // they're never done
+ // don't assert, because there's a small chance it would happen to be running
+ future.cancel(false);
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail("Expected cancellation exception");
+ } catch (CancellationException e) {
+ // expected
+ }
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testFixedDelayExecution() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ AtomicInteger ai = new AtomicInteger();
+ CountDownLatch completeLatch = new CountDownLatch(1);
+ ScheduledFuture> future = eqe.scheduleWithFixedDelay(() -> {
+ if (ai.incrementAndGet() == 5) {
+ completeLatch.countDown();
+ }
+ }, 20, 50, TimeUnit.MILLISECONDS);
+ assertTrue("Completion of enough iterations", completeLatch.await(1, TimeUnit.SECONDS));
+ assertFalse(future.isDone()); // they're never done
+ // don't assert, because there's a small chance it would happen to be running
+ future.cancel(false);
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail("Expected cancellation exception");
+ } catch (CancellationException e) {
+ // expected
+ }
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testCancelOnShutdown() throws Exception {
+ EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build();
+ try {
+ ScheduledFuture> future = eqe.schedule(() -> fail("Should never run"), 1, TimeUnit.DAYS);
+ eqe.shutdown();
+ assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS));
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ fail("Expected cancellation exception");
+ } catch (CancellationException e) {
+ // expected
+ }
+ assertTrue("Was cancelled on shutdown", future.isCancelled());
+ } finally {
+ eqe.shutdownNow();
+ }
+ }
+}