From 1299a1812b8366eea13994e36c40066df0f06931 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Mon, 27 Jun 2022 15:46:13 -0500 Subject: [PATCH] Add scheduling capabilities to `EnhancedQueueExecutor` --- .../jboss/threads/EnhancedQueueExecutor.java | 914 +++++++++++++++++- .../org/jboss/threads/ArrayQueueTests.java | 247 +++++ .../ScheduledEnhancedQueueExecutorTest.java | 167 ++++ 3 files changed, 1324 insertions(+), 4 deletions(-) create mode 100644 src/test/java/org/jboss/threads/ArrayQueueTests.java create mode 100644 src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 5a70548..5fa3450 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -33,22 +33,33 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Hashtable; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; -import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.concurrent.locks.ReentrantLock; import javax.management.ObjectInstance; import javax.management.ObjectName; @@ -65,12 +76,15 @@ * necessary size. In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can * be used to further govern the thread pool size. *

+ * Additionally, this thread pool implementation supports scheduling of tasks. + * The scheduled tasks will execute on the main pool. + *

* New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling * its {@link Builder#build() build()} method. * * @author David M. Lloyd */ -public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService { +public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService, ScheduledExecutorService { private static final Thread[] NO_THREADS = new Thread[0]; static { @@ -196,6 +210,14 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl * The context handler for the user-defined context. */ private final ContextHandler contextHandler; + /** + * The task for scheduled execution. + */ + private final SchedulerTask schedulerTask = new SchedulerTask(); + /** + * The scheduler thread. + */ + private final Thread schedulerThread; // ======================================================= // Current state fields @@ -350,6 +372,9 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl this.handoffExecutor = builder.getHandoffExecutor(); this.exceptionHandler = builder.getExceptionHandler(); this.threadFactory = builder.getThreadFactory(); + this.schedulerThread = threadFactory.newThread(schedulerTask); + String schedulerName = this.schedulerThread.getName(); + this.schedulerThread.setName(schedulerName + " (scheduler)"); this.terminationTask = builder.getTerminationTask(); this.growthResistance = builder.getGrowthResistance(); this.contextHandler = builder.getContextHandler(); @@ -875,7 +900,7 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws Assert.checkNotNullParam("unit", unit); if (timeout > 0) { final Thread thread = Thread.currentThread(); - if (runningThreads.contains(thread)) { + if (runningThreads.contains(thread) || thread == schedulerThread) { throw Messages.msg.cannotAwaitWithin(); } Waiter waiters = this.terminationWaiters; @@ -902,6 +927,39 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws return isTerminated(); } + // ======================================================= + // ScheduledExecutorService + // ======================================================= + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new RunnableScheduledFuture(command, delay, unit)); + } + + public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new CallableScheduledFuture(callable, delay, unit)); + } + + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new FixedRateRunnableScheduledFuture(command, initialDelay, period, unit)); + } + + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new FixedDelayRunnableScheduledFuture(command, initialDelay, delay, unit)); + } + + private void startScheduleThread() { + // this should be fairly quick... + if (schedulerThread.getState() == Thread.State.NEW) try { + schedulerThread.start(); + } catch (IllegalThreadStateException ignored) { + // make sure it's race-proof + } + } + // ======================================================= // Management // ======================================================= @@ -950,6 +1008,8 @@ public void shutdown(boolean interrupt) { if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) { assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared // we initiated shutdown + // terminate the scheduler + schedulerTask.shutdown(); // clear out all consumers and append a dummy waiter node TaskNode tail = this.tail; QNode tailNext; @@ -2421,6 +2481,10 @@ public long getSpinMissCount() { } } + // ======================================================= + // Basic task wrapper + // ======================================================= + final class Task implements Runnable { private final Runnable delegate; @@ -2478,4 +2542,846 @@ public String toString() { return "Task{delegate=" + delegate + ", contextClassLoader=" + contextClassLoader + '}'; } } + + // ======================================================= + // Scheduled future tasks + // ======================================================= + + static final int ASF_ST_WAITING = 0; + static final int ASF_ST_CANCELLED = 1; + static final int ASF_ST_SUBMITTED = 2; + static final int ASF_ST_RUNNING = 3; + static final int ASF_ST_FINISHED = 4; + static final int ASF_ST_FAILED = 5; + static final int ASF_ST_REJECTED = 6; + + static final AbstractScheduledFuture[] NO_FUTURES = new AbstractScheduledFuture[0]; + + static final AtomicLong SCHEDULED_TASK_SEQ = new AtomicLong(); + + /** + * An implementation of {@link ScheduledFuture} which is wrapped by {@link Task}. + * + * @param the result type + */ + abstract class AbstractScheduledFuture implements ScheduledFuture, Runnable { + final long seq = SCHEDULED_TASK_SEQ.getAndIncrement(); + /** + * The task which is wrapping this one. + */ + final Task wrappingTask; + /** + * The scheduled time for this task, in nanoseconds since the scheduler thread was born. + * Can be mutated in subclasses if the task is recurring, but only under lock. + */ + volatile long when; + /** + * The state of this task; one of {@code ASF_ST_*}. + */ + volatile int state = ASF_ST_WAITING; + /** + * The actual result; only valid in {@code ASF_ST_FINISHED} (where it is of type {@code V}), + * or in {@code ASF_ST_FAILED} (where it is of type {@code Throwable}), + * or in {@code ASF_ST_REJECTED} (where it is of type {@code RejectedExecutionException}). + */ + volatile Object result; + /** + * The thread which is currently live for this task. + */ + Thread liveThread; + + AbstractScheduledFuture(long delay, TimeUnit unit) { + when = Math.addExact(schedulerTask.age(), unit.toNanos(delay)); + wrappingTask = new Task(this, contextHandler.captureContext()); + } + + public int compareTo(final Delayed o) { + return o instanceof AbstractScheduledFuture ? compareTo((AbstractScheduledFuture) o) : wrongType(); + } + + public int compareTo(final AbstractScheduledFuture other) { + int cmp = Long.compare(when, other.when); + if (cmp == 0) cmp = Long.compare(seq, other.seq); + return cmp; + } + + public long getDelay(final TimeUnit unit) { + return unit.convert(Math.max(0, when - schedulerTask.age()), TimeUnit.NANOSECONDS); + } + + public boolean isCancelled() { + return state == ASF_ST_CANCELLED; + } + + public boolean isDone() { + int state = this.state; + return state == ASF_ST_FINISHED || state == ASF_ST_FAILED || state == ASF_ST_CANCELLED || state == ASF_ST_REJECTED; + } + + public boolean cancel(final boolean mayInterruptIfRunning) { + int state; + synchronized (this) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: { + this.state = ASF_ST_CANCELLED; + return true; + } + case ASF_ST_RUNNING: { + if (mayInterruptIfRunning) { + liveThread.interrupt(); + } + return false; + } + case ASF_ST_CANCELLED: { + return true; + } + default: { + return false; + } + } + } + } + + public V get() throws InterruptedException, ExecutionException { + int state; + synchronized (this) { + for (;;) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + wait(); + break; + } + case ASF_ST_CANCELLED: { + throw new CancellationException("Task was cancelled"); + } + case ASF_ST_REJECTED: { + throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result); + } + case ASF_ST_FAILED: { + throw new ExecutionException((Throwable) result); + } + case ASF_ST_FINISHED: { + //noinspection unchecked + return (V) result; + } + } + } + } + } + + public V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long remaining = unit.toNanos(timeout); + long start = System.nanoTime(); + int state; + synchronized (this) { + for (;;) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + if (remaining <= 0) { + throw new TimeoutException(); + } + wait(remaining / 1_000_000L, (int) (remaining % 1_000_000)); + break; + } + case ASF_ST_CANCELLED: { + throw new CancellationException("Task was cancelled"); + } + case ASF_ST_REJECTED: { + throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result); + } + case ASF_ST_FAILED: { + throw new ExecutionException((Throwable) result); + } + case ASF_ST_FINISHED: { + //noinspection unchecked + return (V) result; + } + } + long newStart = System.nanoTime(); + long elapsed = newStart - start; + remaining -= elapsed; + start = newStart; + } + } + } + + public void run() { + stateTest: synchronized (this) { + switch (state) { + case ASF_ST_SUBMITTED: { + this.state = ASF_ST_RUNNING; + liveThread = currentThread(); + break stateTest; + } + case ASF_ST_CANCELLED: { + // cancelled after submit but before it was run + return; + } + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // a recurring task terminated abruptly, but was still found in the schedule + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + try { + finish(performTask()); + } catch (Throwable t) { + fail(t); + } + } + + void submit() { + synchronized (this) { + stateTest: switch (state) { + case ASF_ST_WAITING: { + this.state = ASF_ST_SUBMITTED; + //noinspection UnnecessaryLabelOnBreakStatement + break stateTest; + } + case ASF_ST_CANCELLED: { + // do not actually submit + return; + } + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // a recurring task terminated abruptly, but was still found in the schedule + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + try { + /* copied from {@link #execute(Runnable)} */ + int result = tryExecute(wrappingTask); + boolean ok = false; + if (result == EXE_OK) { + // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition + if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { + deallocateThread(); + } + if (UPDATE_STATISTICS) submittedTaskCounter.increment(); + return; + } else if (result == EXE_CREATE_THREAD) try { + ok = doStartThread(wrappingTask); + } finally { + if (! ok) deallocateThread(); + } else { + if (UPDATE_STATISTICS) rejectedTaskCounter.increment(); + if (result == EXE_REJECT_SHUTDOWN) { + rejectShutdown(wrappingTask); + } else { + assert result == EXE_REJECT_QUEUE_FULL; + rejectQueueFull(wrappingTask); + } + } + } catch (RejectedExecutionException e) { + reject(e); + } catch (Throwable t) { + reject(new RejectedExecutionException("Task submission failed", t)); + } + } + + IllegalStateException badState() { + return new IllegalStateException("Task was not in expected state"); + } + + void reject(RejectedExecutionException e) { + synchronized (this) { + switch (state) { + case ASF_ST_SUBMITTED: { + result = e; + this.state = ASF_ST_REJECTED; + liveThread = null; + notifyAll(); + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + } + + void fail(Throwable t) { + synchronized (this) { + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + result = t; + this.state = ASF_ST_FAILED; + liveThread = null; + notifyAll(); + return; + } + case ASF_ST_CANCELLED: + case ASF_ST_FINISHED: + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // ignore the failure, though we're likely in an invalid state + return; + } + } + } + } + + void finish(V result) { + // overridden in subclasses where the task repeats + synchronized (this) { + switch (state) { + case ASF_ST_RUNNING: { + this.result = result; + this.state = ASF_ST_FINISHED; + liveThread = null; + notifyAll(); + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + } + + abstract V performTask() throws Exception; + + public String toString() { + return toString(new StringBuilder()).toString(); + } + + StringBuilder toString(StringBuilder b) { + return b.append("future result of "); + } + } + + static int wrongType() throws ClassCastException { + throw new ClassCastException("Wrong task type for comparison"); + } + + final class RunnableScheduledFuture extends AbstractScheduledFuture { + final Runnable runnable; + + RunnableScheduledFuture(final Runnable runnable, final long delay, final TimeUnit unit) { + super(delay, unit); + this.runnable = runnable; + } + + Void performTask() { + runnable.run(); + return null; + } + + StringBuilder toString(final StringBuilder b) { + return super.toString(b).append(runnable); + } + } + + final class CallableScheduledFuture extends AbstractScheduledFuture { + final Callable callable; + + CallableScheduledFuture(final Callable callable, final long delay, final TimeUnit unit) { + super(delay, unit); + this.callable = callable; + } + + V performTask() throws Exception { + return callable.call(); + } + + StringBuilder toString(final StringBuilder b) { + return super.toString(b).append(callable); + } + } + + abstract class RepeatingScheduledFuture extends AbstractScheduledFuture { + final long period; + + RepeatingScheduledFuture(final long delay, final long period, final TimeUnit unit) { + super(delay, unit); + this.period = unit.toNanos(period); + } + + /** + * Adjust the time of this future for resubmission, after the task has run successfully. + */ + abstract void adjustTime(); + + public void run() { + super.run(); + // if an exception is thrown, we will have failed already anyway + adjustTime(); + synchronized (this) { + switch (state) { + case ASF_ST_RUNNING: { + state = ASF_ST_WAITING; + schedulerTask.schedule(this); + return; + } + default: { + // in all other cases, we failed so the task should not be rescheduled + return; + } + } + } + } + + void finish(final V result) { + // repeating tasks never actually finish + } + + StringBuilder toString(final StringBuilder b) { + return super.toString(b.append("repeating ")); + } + } + + final class FixedRateRunnableScheduledFuture extends RepeatingScheduledFuture { + final Runnable runnable; + + FixedRateRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) { + super(delay, period, unit); + this.runnable = runnable; + } + + void adjustTime() { + // if this results in a time in the past, the next run will happen immediately + this.when += period; + } + + Void performTask() { + runnable.run(); + return null; + } + + StringBuilder toString(final StringBuilder b) { + return super.toString(b).append(runnable); + } + } + + final class FixedDelayRunnableScheduledFuture extends RepeatingScheduledFuture { + final Runnable runnable; + + FixedDelayRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) { + super(delay, period, unit); + this.runnable = runnable; + } + + void adjustTime() { + this.when = schedulerTask.age() + period; + } + + Void performTask() { + runnable.run(); + return null; + } + + StringBuilder toString(final StringBuilder b) { + return super.toString(b).append(runnable); + } + } + + // ======================================================= + // Scheduler task thread worker + // ======================================================= + + final class SchedulerTask implements Runnable { + final long startMark = System.nanoTime(); + final ReentrantLock ql = new ReentrantLock(); + final Condition qc = ql.newCondition(); + // todo: switch to array queue on a more optimistic day + // protected by {@link #ql} + ScheduledFutureQueue q = new TreeSetQueue(); + boolean shutdownDetected; + + void shutdown() { + ql.lock(); + try { + shutdownDetected = true; + qc.signal(); + } finally { + ql.unlock(); + } + } + + public void run() { + ScheduledFutureQueue q = this.q; + AbstractScheduledFuture[] remainingFutures; + AbstractScheduledFuture first; + long startMark = this.startMark; + outerLoop: for (;;) { + ql.lock(); + try { + innerLoop: for (;;) { + long now = System.nanoTime(); + if (shutdownDetected) { + // drop all tasks and return + remainingFutures = q.toArray(); + q.clear(); + break outerLoop; + } else if (q.isEmpty()) try { + qc.await(); + } catch (InterruptedException ignored) { + // clear interrupt status + continue innerLoop; + } else { + first = q.first(); + long firstWhen = first.when; + long currentWhen = max(0, now - startMark); + if (firstWhen <= currentWhen) { + // it's ready; run it outside of the lock + q.pollFirst(); + //noinspection UnnecessaryLabelOnBreakStatement + break innerLoop; + } else { + long waitTime = firstWhen - currentWhen; + try { + qc.awaitNanos(waitTime); + } catch (InterruptedException e) { + // clear interrupt status + continue innerLoop; + } + } + } + } + } finally { + ql.unlock(); + } + // outside of lock; `break innerLoop` goes ↓ here + first.submit(); + // continue loop to find the next task + } + // ↓ `break outerLoop` goes here ↓ + if (remainingFutures.length > 0) { + for (AbstractScheduledFuture future : remainingFutures) { + future.cancel(true); + } + } + return; + } + + > F schedule(final F item) { + Task wrappingTask = item.wrappingTask; + if (item.when <= age()) { + // just submit it now + item.submit(); + return item; + } + ql.lock(); + try { + if (shutdownDetected) { + rejectShutdown(wrappingTask); + return item; + } + // check to see if we need to wake up the scheduler + boolean first; + for (;;) try { + first = q.insertAndCheckForFirst(item); + break; + } catch (QueueFullException ignored) { + q = q.grow(); + } + if (first) { + // the delay time has changed, so wake up the waiter + qc.signal(); + } + return item; + } finally { + ql.unlock(); + } + } + + long age() { + return System.nanoTime() - startMark; + } + } + + // ======================================================= + // Schedule queue API & implementations + // ======================================================= + + interface ScheduledFutureQueue { + AbstractScheduledFuture[] toArray(); + + void clear(); + + boolean isEmpty(); + + int size(); + + AbstractScheduledFuture first(); + + @SuppressWarnings("UnusedReturnValue") // must match signature for TreeSet + AbstractScheduledFuture pollFirst(); + + /** + * Insert the item in order, checking to see if it was added as the first item. + * + * @param item the item to insert (must not be {@code null}) + * @return {@code true} if the item is first, {@code false} otherwise + * @throws QueueFullException if the queue is full; it must be recreated in this case + */ + boolean insertAndCheckForFirst(AbstractScheduledFuture item) throws QueueFullException; + + /** + * Get a new queue with the same contents as this one, but with a larger capacity. + * + * @return the grown queue + */ + ScheduledFutureQueue grow(); + } + + static final class ArrayQueue implements ScheduledFutureQueue { + final AbstractScheduledFuture[] array; + // the removal point (lowest+least index) + int head; + // the number of elements + int size; + + ArrayQueue(int capacity) { + // next power of two + capacity = Integer.highestOneBit(Math.max(capacity, 2) - 1) << 1; + array = new AbstractScheduledFuture[capacity]; + } + + private ArrayQueue(final ArrayQueue original, final int newCapacity) { + assert Integer.bitCount(newCapacity) == 1; + array = original.toArray(newCapacity); + head = 0; + size = original.size; + } + + public AbstractScheduledFuture[] toArray() { + return toArray(size()); + } + + public AbstractScheduledFuture[] toArray(int size) { + int head = this.head; + int end = head + size; + AbstractScheduledFuture[] copy = Arrays.copyOfRange(array, head, end); + if (end > array.length) { + // copy the wrapped elements + System.arraycopy(array, 0, copy, size - (array.length - head), size - array.length); + } + return copy; + } + + public void clear() { + Arrays.fill(array, null); + head = size = 0; + } + + public boolean isEmpty() { + return size == 0; + } + + public int size() { + return size; + } + + public AbstractScheduledFuture first() { + if (size == 0) { + throw new NoSuchElementException(); + } + return array[head]; + } + + public AbstractScheduledFuture pollFirst() { + if (size == 0) { + throw new NoSuchElementException(); + } + int head = this.head; + AbstractScheduledFuture item = array[head]; + array[head] = null; + this.size --; + int mask = array.length - 1; + this.head = head + 1 & mask; + return item; + } + + public boolean insertAndCheckForFirst(final AbstractScheduledFuture item) { + // find the insertion point + int size = this.size; + AbstractScheduledFuture[] array = this.array; + int arrayLen = array.length; + if (size == arrayLen) { + throw new QueueFullException(); + } + int mask = arrayLen - 1; + int idx = 0; + int high = size - 1; + // at this point and onwards, there is definitely space in the array + int head = this.head; + + while (idx <= high) { + int mid = (idx + high) >>> 1; + AbstractScheduledFuture testVal = array[head + mid & mask]; + int cmp = testVal.compareTo(item); + if (cmp < 0) { + idx = mid + 1; + } else if (cmp > 0) { + high = mid - 1; + } else { + // we found this task already present in the queue (should never happen) + return false; + } + } + + return insertAt(idx, item); + } + + /** + * Move all elements starting at the given index forward to make space at that position, wrapping if needed. + * + * @param idx the element index relative to {@code head} to open up + */ + void moveForward(final int idx, final AbstractScheduledFuture storeVal) { + AbstractScheduledFuture[] array = this.array; + int size = this.size; + int moveCnt = size - idx; + int arrayLength = array.length; + int mask = arrayLength - 1; + int head = this.head; + int start = head + idx; + // TODO: + // - change this to three calls to System.arraycopy + // - one for the already-wrapped portion + // - one for the portion that is being newly wrapped + // - one for the leading (pre-wrap) portion + for (int i = moveCnt - 1; i >= 0; i --) { + int pos = start + i; + array[pos + 1 & mask] = array[pos & mask]; + } + array[start & mask] = storeVal; + } + + /** + * Move all elements starting before the given index backward to make space at that position, wrapping if needed. + * + * @param idx the element index relative to {@code head} to open up + */ + void moveBackward(final int idx, final AbstractScheduledFuture storeVal) { + AbstractScheduledFuture[] array = this.array; + int size = this.size; + int moveCnt = size - idx + 1; + int arrayLength = array.length; + int mask = arrayLength - 1; + int head = this.head; + int start = head + idx - 1; + // TODO: + // - change this to three calls to System.arraycopy + // - one for the leading (pre-wrap) portion + // - one for the portion that is being newly de-wrapped + // - one for the already-wrapped portion + for (int i = moveCnt - 1; i >= 0; i --) { + int pos = start - i; + array[pos - 1 & mask] = array[pos & mask]; + } + array[start & mask] = storeVal; + this.head = head - 1 & mask; + } + + boolean insertAt(final int idx, final AbstractScheduledFuture item) { + // this is a separate method for easier testing of the arraycopy algebraic mayhem + int size = this.size; + // no matter what, we're growing by one + this.size = size + 1; + int halfSize = size + 1 >> 1; + if (idx >= halfSize) { + moveForward(idx, item); + } else { + moveBackward(idx, item); + } + return idx == 0; + } + + public ScheduledFutureQueue grow() { + // todo: calibrate this threshold + if (array.length >= 256) { + return new TreeSetQueue(this); + } else { + return new ArrayQueue(this, array.length << 1); + } + } + + // test points for white-box unit tests + + int testPoint_arrayLength() { + return array.length; + } + + int testPoint_head() { + return head; + } + + void testPoint_setHead(int newHead) { + head = newHead; + } + + void testPoint_setSize(int newSize) { + size = newSize; + } + + AbstractScheduledFuture testPoint_getArrayItem(int index) { + return array[index & array.length - 1]; + } + + AbstractScheduledFuture testPoint_setArrayItem(int index, AbstractScheduledFuture item) { + try { + return array[index & array.length - 1]; + } finally { + array[index & array.length - 1] = item; + } + } + } + + @SuppressWarnings("serial") + static class TreeSetQueue extends TreeSet> implements ScheduledFutureQueue { + TreeSetQueue(final ScheduledFutureQueue original) { + Collections.addAll(this, original.toArray()); + } + + TreeSetQueue() { + } + + public AbstractScheduledFuture[] toArray() { + return super.toArray(NO_FUTURES); + } + + public boolean insertAndCheckForFirst(final AbstractScheduledFuture item) { + add(item); + return item == first(); + } + + public ScheduledFutureQueue grow() { + return this; + } + } + + @SuppressWarnings("serial") + static final class QueueFullException extends RuntimeException { + QueueFullException() { + super(null, null, false, false); + } + } } diff --git a/src/test/java/org/jboss/threads/ArrayQueueTests.java b/src/test/java/org/jboss/threads/ArrayQueueTests.java new file mode 100644 index 0000000..4ffcd4f --- /dev/null +++ b/src/test/java/org/jboss/threads/ArrayQueueTests.java @@ -0,0 +1,247 @@ +package org.jboss.threads; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ArrayQueueTests { + + static EnhancedQueueExecutor eqe; + + static EnhancedQueueExecutor.AbstractScheduledFuture[] ITEMS; + + @BeforeClass + public static void beforeAll() { + eqe = new EnhancedQueueExecutor.Builder().build(); + ITEMS = new EnhancedQueueExecutor.AbstractScheduledFuture[32]; + for (int i = 0; i < 32; i ++) { + final String toString = "[" + i + "]"; + ITEMS[i] = eqe.new RunnableScheduledFuture(new Runnable() { + public void run() { + // nothing + } + + public String toString() { + return toString; + } + }, 0, TimeUnit.DAYS); + } + } + + @Test + public void testMoveForward() { + EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16); + + int head = 5; + aq.testPoint_setHead(head); + aq.testPoint_setArrayItem(head + 0, ITEMS[0]); + aq.testPoint_setArrayItem(head + 1, ITEMS[1]); + aq.testPoint_setArrayItem(head + 2, ITEMS[2]); + aq.testPoint_setArrayItem(head + 3, ITEMS[3]); + aq.testPoint_setSize(4); + + aq.moveForward(2, ITEMS[4]); + + assertEquals(head, aq.testPoint_head()); + + assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4)); + } + + @Test + public void testMoveForwardWrap() { + EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16); + + int head = 14; + aq.testPoint_setHead(head); + aq.testPoint_setArrayItem(head + 0, ITEMS[0]); + aq.testPoint_setArrayItem(head + 1, ITEMS[1]); + aq.testPoint_setArrayItem(head + 2, ITEMS[2]); + aq.testPoint_setArrayItem(head + 3, ITEMS[3]); + aq.testPoint_setSize(4); + + aq.moveForward(2, ITEMS[4]); + + assertEquals(head, aq.testPoint_head()); + + assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4)); + } + + @Test + public void testMoveBackward() { + EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16); + + int head = 5; + aq.testPoint_setHead(head); + aq.testPoint_setArrayItem(head + 0, ITEMS[0]); + aq.testPoint_setArrayItem(head + 1, ITEMS[1]); + aq.testPoint_setArrayItem(head + 2, ITEMS[2]); + aq.testPoint_setArrayItem(head + 3, ITEMS[3]); + aq.testPoint_setSize(4); + + aq.moveBackward(2, ITEMS[4]); + + assertEquals(head - 1, aq.testPoint_head()); + head--; + + assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4)); + } + + @Test + public void testMoveBackwardWrap() { + EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16); + + int head = 14; + aq.testPoint_setHead(head); + aq.testPoint_setArrayItem(head + 0, ITEMS[0]); + aq.testPoint_setArrayItem(head + 1, ITEMS[1]); + aq.testPoint_setArrayItem(head + 2, ITEMS[2]); + aq.testPoint_setArrayItem(head + 3, ITEMS[3]); + aq.testPoint_setSize(4); + + aq.moveBackward(2, ITEMS[4]); + + assertEquals(head - 1, aq.testPoint_head()); + head--; + + assertSame(ITEMS[0], aq.testPoint_getArrayItem(head + 0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(head + 1)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(head + 2)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(head + 3)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(head + 4)); + } + + @Test + public void testQueueBehavior() { + EnhancedQueueExecutor.ArrayQueue aq = new EnhancedQueueExecutor.ArrayQueue(16); + + // tc 0 (n/a) + assertTrue(aq.isEmpty()); + assertEquals(0, aq.size()); + assertEquals(0, aq.testPoint_head()); + assertEquals(16, aq.testPoint_arrayLength()); + + // tc 1 + aq.insertAt(0, ITEMS[0]); + assertFalse(aq.isEmpty()); + assertEquals(1, aq.size()); + assertEquals(0, aq.testPoint_head()); + assertSame(ITEMS[0], aq.testPoint_getArrayItem(0)); + assertNull(aq.testPoint_getArrayItem(1)); + assertNull(aq.testPoint_getArrayItem(15)); + + // removal + assertSame(ITEMS[0], aq.pollFirst()); + assertTrue(aq.isEmpty()); + assertEquals(0, aq.size()); + assertEquals(1, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(0)); + assertNull(aq.testPoint_getArrayItem(1)); + + // tc 1 (but this time with head == 1) + aq.insertAt(0, ITEMS[1]); + assertFalse(aq.isEmpty()); + assertEquals(1, aq.size()); + assertEquals(1, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertNull(aq.testPoint_getArrayItem(2)); + + // tc 1 (but with head == 1 and size == 1) + aq.insertAt(1, ITEMS[2]); + assertFalse(aq.isEmpty()); + assertEquals(2, aq.size()); + assertEquals(1, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + + // tc 2 (but with head == 1 and size == 2) + aq.insertAt(0, ITEMS[3]); + assertFalse(aq.isEmpty()); + assertEquals(3, aq.size()); // halfSize == 2 + // head moves back to 0 + assertEquals(0, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(15)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + + // tc 2 (but with head == 0 and size == 3) + aq.insertAt(0, ITEMS[4]); + assertFalse(aq.isEmpty()); + assertEquals(4, aq.size()); + // head wraps around to 15 + assertEquals(15, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(14)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(15)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + + // tc 2 (but with head == 15 and size == 4) + aq.insertAt(0, ITEMS[5]); + assertFalse(aq.isEmpty()); + assertEquals(5, aq.size()); + assertEquals(14, aq.testPoint_head()); + assertNull(aq.testPoint_getArrayItem(13)); + assertSame(ITEMS[5], aq.testPoint_getArrayItem(14)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(15)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + + // tc + aq.insertAt(1, ITEMS[6]); + + assertNull(aq.testPoint_getArrayItem(12)); + assertSame(ITEMS[5], aq.testPoint_getArrayItem(13)); + assertSame(ITEMS[6], aq.testPoint_getArrayItem(14)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(15)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + + aq.insertAt(0, ITEMS[7]); + + assertNull(aq.testPoint_getArrayItem(11)); + assertSame(ITEMS[7], aq.testPoint_getArrayItem(12)); + assertSame(ITEMS[5], aq.testPoint_getArrayItem(13)); + assertSame(ITEMS[6], aq.testPoint_getArrayItem(14)); + assertSame(ITEMS[4], aq.testPoint_getArrayItem(15)); + assertSame(ITEMS[3], aq.testPoint_getArrayItem(0)); + assertSame(ITEMS[1], aq.testPoint_getArrayItem(1)); + assertSame(ITEMS[2], aq.testPoint_getArrayItem(2)); + assertNull(aq.testPoint_getArrayItem(3)); + } + + @AfterClass + public static void afterAll() throws InterruptedException { + try { + eqe.shutdown(); + eqe.awaitTermination(30, TimeUnit.SECONDS); + } finally { + eqe = null; + } + } +} diff --git a/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java new file mode 100644 index 0000000..1742ba1 --- /dev/null +++ b/src/test/java/org/jboss/threads/ScheduledEnhancedQueueExecutorTest.java @@ -0,0 +1,167 @@ +package org.jboss.threads; + +import static org.junit.Assert.*; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +public class ScheduledEnhancedQueueExecutorTest { + + @Test + public void testCancel() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + ScheduledFuture future = eqe.schedule(() -> fail("Should never run"), 1000, TimeUnit.DAYS); + Thread.sleep(400); // a few ms to let things percolate + assertFalse(future.isCancelled()); + // this should succeed since the task isn't submitted yet + assertTrue(future.cancel(false)); + assertTrue(future.isCancelled()); + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + } finally { + eqe.shutdownNow(); + } + } + + @Test + public void testCancelWhileRunning() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + CountDownLatch latch = new CountDownLatch(1); + ScheduledFuture future = eqe.schedule(() -> { latch.countDown(); Thread.sleep(1_000_000_000L); return Boolean.TRUE; }, 1, TimeUnit.NANOSECONDS); + assertTrue("Timely task execution", latch.await(5, TimeUnit.SECONDS)); + assertFalse(future.isCancelled()); + // task is running; cancel will fail + assertFalse(future.cancel(false)); + assertFalse(future.isCancelled()); + assertFalse(future.isDone()); + // now try to interrupt it (cancel still fails but the interrupt should be delivered) + assertFalse(future.cancel(true)); + assertFalse(future.isCancelled()); + // now get it + try { + future.get(100L, TimeUnit.MILLISECONDS); + fail("Expected exception"); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + assertTrue("Expected " + cause + " to be an InterruptedException", cause instanceof InterruptedException); + } + assertTrue(future.isDone()); + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + } finally { + eqe.shutdownNow(); + } + } + + @Test + public void testReasonableExecutionDelay() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + Callable task = () -> Boolean.TRUE; + long start = System.nanoTime(); + ScheduledFuture future = eqe.schedule(task, 1, TimeUnit.MILLISECONDS); + Boolean result = future.get(); + long execTime = System.nanoTime() - start; + long expected = 1_000_000L; + assertTrue("Execution too short (expected at least " + expected + ", got " + execTime + ")", execTime >= expected); + assertNotNull(result); + assertTrue(result.booleanValue()); + start = System.nanoTime(); + future = eqe.schedule(task, 500, TimeUnit.MILLISECONDS); + result = future.get(); + execTime = System.nanoTime() - start; + expected = 500_000_000L; + assertTrue("Execution too short (expected at least " + expected + ", got " + execTime + ")", execTime >= expected); + assertNotNull(result); + assertTrue(result.booleanValue()); + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + } finally { + eqe.shutdownNow(); + } + } + + @Test + public void testFixedRateExecution() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + AtomicInteger ai = new AtomicInteger(); + CountDownLatch completeLatch = new CountDownLatch(1); + ScheduledFuture future = eqe.scheduleAtFixedRate(() -> { + if (ai.incrementAndGet() == 5) { + completeLatch.countDown(); + } + }, 20, 50, TimeUnit.MILLISECONDS); + assertTrue("Completion of enough iterations", completeLatch.await(1, TimeUnit.SECONDS)); + assertFalse(future.isDone()); // they're never done + // don't assert, because there's a small chance it would happen to be running + future.cancel(false); + try { + future.get(1, TimeUnit.SECONDS); + fail("Expected cancellation exception"); + } catch (CancellationException e) { + // expected + } + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + } finally { + eqe.shutdownNow(); + } + } + + @Test + public void testFixedDelayExecution() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + AtomicInteger ai = new AtomicInteger(); + CountDownLatch completeLatch = new CountDownLatch(1); + ScheduledFuture future = eqe.scheduleWithFixedDelay(() -> { + if (ai.incrementAndGet() == 5) { + completeLatch.countDown(); + } + }, 20, 50, TimeUnit.MILLISECONDS); + assertTrue("Completion of enough iterations", completeLatch.await(1, TimeUnit.SECONDS)); + assertFalse(future.isDone()); // they're never done + // don't assert, because there's a small chance it would happen to be running + future.cancel(false); + try { + future.get(1, TimeUnit.SECONDS); + fail("Expected cancellation exception"); + } catch (CancellationException e) { + // expected + } + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + } finally { + eqe.shutdownNow(); + } + } + + @Test + public void testCancelOnShutdown() throws Exception { + EnhancedQueueExecutor eqe = new EnhancedQueueExecutor.Builder().build(); + try { + ScheduledFuture future = eqe.schedule(() -> fail("Should never run"), 1, TimeUnit.DAYS); + eqe.shutdown(); + assertTrue("Timely shutdown", eqe.awaitTermination(5, TimeUnit.SECONDS)); + try { + future.get(1, TimeUnit.SECONDS); + fail("Expected cancellation exception"); + } catch (CancellationException e) { + // expected + } + assertTrue("Was cancelled on shutdown", future.isCancelled()); + } finally { + eqe.shutdownNow(); + } + } +}