diff --git a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java index 5a70548..5911d7e 100644 --- a/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java +++ b/src/main/java/org/jboss/threads/EnhancedQueueExecutor.java @@ -37,18 +37,26 @@ import java.util.Hashtable; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; -import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.concurrent.locks.ReentrantLock; import javax.management.ObjectInstance; import javax.management.ObjectName; @@ -65,12 +73,15 @@ * necessary size. In addition, the optional {@linkplain #setGrowthResistance(float) growth resistance feature} can * be used to further govern the thread pool size. *

+ * Additionally, this thread pool implementation supports scheduling of tasks. + * The scheduled tasks will execute on the main pool. + *

* New instances of this thread pool are created by constructing and configuring a {@link Builder} instance, and calling * its {@link Builder#build() build()} method. * * @author David M. Lloyd */ -public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService { +public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 implements ManageableThreadPoolExecutorService, ScheduledExecutorService { private static final Thread[] NO_THREADS = new Thread[0]; static { @@ -196,6 +207,14 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl * The context handler for the user-defined context. */ private final ContextHandler contextHandler; + /** + * The task for scheduled execution. + */ + private final SchedulerTask schedulerTask = new SchedulerTask(); + /** + * The scheduler thread. + */ + private final Thread schedulerThread; // ======================================================= // Current state fields @@ -350,6 +369,9 @@ public final class EnhancedQueueExecutor extends EnhancedQueueExecutorBase6 impl this.handoffExecutor = builder.getHandoffExecutor(); this.exceptionHandler = builder.getExceptionHandler(); this.threadFactory = builder.getThreadFactory(); + this.schedulerThread = threadFactory.newThread(schedulerTask); + String schedulerName = this.schedulerThread.getName(); + this.schedulerThread.setName(schedulerName + " (scheduler)"); this.terminationTask = builder.getTerminationTask(); this.growthResistance = builder.getGrowthResistance(); this.contextHandler = builder.getContextHandler(); @@ -902,6 +924,39 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws return isTerminated(); } + // ======================================================= + // ScheduledExecutorService + // ======================================================= + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new RunnableScheduledFuture(command, delay, unit)); + } + + public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new CallableScheduledFuture(callable, delay, unit)); + } + + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new FixedRateRunnableScheduledFuture(command, initialDelay, period, unit)); + } + + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + startScheduleThread(); + return schedulerTask.schedule(new FixedDelayRunnableScheduledFuture(command, initialDelay, delay, unit)); + } + + private void startScheduleThread() { + // this should be fairly quick... + if (schedulerThread.getState() == Thread.State.NEW) try { + schedulerThread.start(); + } catch (IllegalThreadStateException ignored) { + // make sure it's race-proof + } + } + // ======================================================= // Management // ======================================================= @@ -950,6 +1005,8 @@ public void shutdown(boolean interrupt) { if (isShutdownRequested(newStatus) != isShutdownRequested(oldStatus)) { assert ! isShutdownRequested(oldStatus); // because it can only ever be set, not cleared // we initiated shutdown + // terminate the scheduler + schedulerTask.shutdown(); // clear out all consumers and append a dummy waiter node TaskNode tail = this.tail; QNode tailNext; @@ -2421,6 +2478,10 @@ public long getSpinMissCount() { } } + // ======================================================= + // Basic task wrapper + // ======================================================= + final class Task implements Runnable { private final Runnable delegate; @@ -2478,4 +2539,549 @@ public String toString() { return "Task{delegate=" + delegate + ", contextClassLoader=" + contextClassLoader + '}'; } } + + // ======================================================= + // Scheduled future tasks + // ======================================================= + + static final int ASF_ST_WAITING = 0; + static final int ASF_ST_CANCELLED = 1; + static final int ASF_ST_SUBMITTED = 2; + static final int ASF_ST_RUNNING = 3; + static final int ASF_ST_FINISHED = 4; + static final int ASF_ST_FAILED = 5; + static final int ASF_ST_REJECTED = 6; + + static final AbstractScheduledFuture[] NO_FUTURES = new AbstractScheduledFuture[0]; + + static final AtomicLong SCHEDULED_TASK_SEQ = new AtomicLong(); + + /** + * An implementation of {@link ScheduledFuture} which is wrapped by {@link Task}. + * + * @param the result type + */ + abstract class AbstractScheduledFuture implements ScheduledFuture, Runnable { + final long seq = SCHEDULED_TASK_SEQ.getAndIncrement(); + /** + * The task which is wrapping this one. + */ + final Task wrappingTask; + /** + * The scheduled time for this task, in nanoseconds since the scheduler thread was born. + * Can be mutated in subclasses if the task is recurring, but only under lock. + */ + volatile long when; + /** + * The state of this task; one of {@code ASF_ST_*}. + */ + volatile int state = ASF_ST_WAITING; + /** + * The actual result; only valid in {@code ASF_ST_FINISHED} (where it is of type {@code V}), + * or in {@code ASF_ST_FAILED} (where it is of type {@code Throwable}), + * or in {@code ASF_ST_REJECTED} (where it is of type {@code RejectedExecutionException}). + */ + volatile Object result; + /** + * The thread which is currently live for this task. + */ + Thread liveThread; + + AbstractScheduledFuture(long delay, TimeUnit unit) { + when = Math.addExact(schedulerTask.age(), unit.toNanos(delay)); + wrappingTask = new Task(this, contextHandler.captureContext()); + } + + public int compareTo(final Delayed o) { + return o instanceof AbstractScheduledFuture ? compareTo((AbstractScheduledFuture) o) : wrongType(); + } + + public int compareTo(final AbstractScheduledFuture other) { + int cmp = Long.compare(when, other.when); + if (cmp == 0) cmp = Long.compare(seq, other.seq); + return cmp; + } + + public long getDelay(final TimeUnit unit) { + return unit.convert(Math.max(0, when - schedulerTask.age()), TimeUnit.NANOSECONDS); + } + + public boolean isCancelled() { + return state == ASF_ST_CANCELLED; + } + + public boolean isDone() { + int state = this.state; + return state == ASF_ST_FINISHED || state == ASF_ST_FAILED || state == ASF_ST_CANCELLED || state == ASF_ST_REJECTED; + } + + public boolean cancel(final boolean mayInterruptIfRunning) { + int state; + synchronized (this) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: { + this.state = ASF_ST_CANCELLED; + return true; + } + case ASF_ST_RUNNING: { + if (mayInterruptIfRunning) { + liveThread.interrupt(); + } + return false; + } + case ASF_ST_CANCELLED: { + return true; + } + default: { + return false; + } + } + } + } + + public V get() throws InterruptedException, ExecutionException { + int state; + synchronized (this) { + for (;;) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + wait(); + break; + } + case ASF_ST_CANCELLED: { + throw new CancellationException("Task was cancelled"); + } + case ASF_ST_REJECTED: { + throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result); + } + case ASF_ST_FAILED: { + throw new ExecutionException((Throwable) result); + } + case ASF_ST_FINISHED: { + //noinspection unchecked + return (V) result; + } + } + } + } + } + + public V get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long remaining = unit.toNanos(timeout); + long start = System.nanoTime(); + int state; + synchronized (this) { + for (;;) { + state = this.state; + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + if (remaining <= 0) { + throw new TimeoutException(); + } + wait(remaining / 1_000_000L, (int) (remaining % 1_000_000)); + break; + } + case ASF_ST_CANCELLED: { + throw new CancellationException("Task was cancelled"); + } + case ASF_ST_REJECTED: { + throw new ExecutionException("Task failed due to rejection", (RejectedExecutionException) result); + } + case ASF_ST_FAILED: { + throw new ExecutionException((Throwable) result); + } + case ASF_ST_FINISHED: { + //noinspection unchecked + return (V) result; + } + } + long newStart = System.nanoTime(); + long elapsed = newStart - start; + remaining -= elapsed; + start = newStart; + } + } + } + + public void run() { + stateTest: synchronized (this) { + switch (state) { + case ASF_ST_SUBMITTED: { + this.state = ASF_ST_RUNNING; + liveThread = currentThread(); + break stateTest; + } + case ASF_ST_CANCELLED: { + // cancelled after submit but before it was run + return; + } + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // a recurring task terminated abruptly, but was still found in the schedule + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + try { + finish(performTask()); + } catch (Throwable t) { + fail(t); + } + } + + void submit() { + synchronized (this) { + stateTest: switch (state) { + case ASF_ST_WAITING: { + this.state = ASF_ST_SUBMITTED; + //noinspection UnnecessaryLabelOnBreakStatement + break stateTest; + } + case ASF_ST_CANCELLED: { + // do not actually submit + return; + } + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // a recurring task terminated abruptly, but was still found in the schedule + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + try { + /* copied from {@link #execute(Runnable)} */ + int result = tryExecute(wrappingTask); + boolean ok = false; + if (result == EXE_OK) { + // last check to ensure that there is at least one existent thread to avoid rare thread timeout race condition + if (currentSizeOf(threadStatus) == 0 && tryAllocateThread(0.0f) == AT_YES && ! doStartThread(null)) { + deallocateThread(); + } + if (UPDATE_STATISTICS) submittedTaskCounter.increment(); + return; + } else if (result == EXE_CREATE_THREAD) try { + ok = doStartThread(wrappingTask); + } finally { + if (! ok) deallocateThread(); + } else { + if (UPDATE_STATISTICS) rejectedTaskCounter.increment(); + if (result == EXE_REJECT_SHUTDOWN) { + rejectShutdown(wrappingTask); + } else { + assert result == EXE_REJECT_QUEUE_FULL; + rejectQueueFull(wrappingTask); + } + } + } catch (RejectedExecutionException e) { + reject(e); + } catch (Throwable t) { + reject(new RejectedExecutionException("Task submission failed", t)); + } + } + + IllegalStateException badState() { + return new IllegalStateException("Task was not in expected state"); + } + + void reject(RejectedExecutionException e) { + synchronized (this) { + switch (state) { + case ASF_ST_SUBMITTED: { + result = e; + this.state = ASF_ST_REJECTED; + liveThread = null; + notifyAll(); + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + } + + void fail(Throwable t) { + synchronized (this) { + switch (state) { + case ASF_ST_WAITING: + case ASF_ST_SUBMITTED: + case ASF_ST_RUNNING: { + result = t; + this.state = ASF_ST_FAILED; + liveThread = null; + notifyAll(); + return; + } + case ASF_ST_CANCELLED: + case ASF_ST_FINISHED: + case ASF_ST_FAILED: + case ASF_ST_REJECTED: { + // ignore the failure, though we're likely in an invalid state + return; + } + } + } + } + + void finish(V result) { + // overridden in subclasses where the task repeats + synchronized (this) { + switch (state) { + case ASF_ST_RUNNING: { + this.result = result; + this.state = ASF_ST_FINISHED; + liveThread = null; + notifyAll(); + return; + } + default: { + // invalid state + fail(badState()); + return; + } + } + } + } + + abstract V performTask() throws Exception; + } + + static int wrongType() throws ClassCastException { + throw new ClassCastException("Wrong task type for comparison"); + } + + final class RunnableScheduledFuture extends AbstractScheduledFuture { + final Runnable runnable; + + RunnableScheduledFuture(final Runnable runnable, final long delay, final TimeUnit unit) { + super(delay, unit); + this.runnable = runnable; + } + + Void performTask() { + runnable.run(); + return null; + } + } + + final class CallableScheduledFuture extends AbstractScheduledFuture { + final Callable callable; + + CallableScheduledFuture(final Callable callable, final long delay, final TimeUnit unit) { + super(delay, unit); + this.callable = callable; + } + + V performTask() throws Exception { + return callable.call(); + } + } + + abstract class RepeatingScheduledFuture extends AbstractScheduledFuture { + final long period; + + RepeatingScheduledFuture(final long delay, final long period, final TimeUnit unit) { + super(delay, unit); + this.period = unit.toNanos(period); + } + + /** + * Adjust the time of this future for resubmission, after the task has run successfully. + */ + abstract void adjustTime(); + + public void run() { + super.run(); + // if an exception is thrown, we will have failed already anyway + adjustTime(); + synchronized (this) { + switch (state) { + case ASF_ST_RUNNING: { + state = ASF_ST_WAITING; + schedulerTask.schedule(this); + return; + } + default: { + // in all other cases, we failed so the task should not be rescheduled + return; + } + } + } + } + + void finish(final V result) { + // repeating tasks never actually finish + } + } + + final class FixedRateRunnableScheduledFuture extends RepeatingScheduledFuture { + final Runnable runnable; + + FixedRateRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) { + super(delay, period, unit); + this.runnable = runnable; + } + + void adjustTime() { + // if this results in a time in the past, the next run will happen immediately + this.when += period; + } + + Void performTask() { + runnable.run(); + return null; + } + } + + final class FixedDelayRunnableScheduledFuture extends RepeatingScheduledFuture { + final Runnable runnable; + + FixedDelayRunnableScheduledFuture(final Runnable runnable, final long delay, final long period, final TimeUnit unit) { + super(delay, period, unit); + this.runnable = runnable; + } + + void adjustTime() { + this.when = schedulerTask.age() + period; + } + + Void performTask() { + runnable.run(); + return null; + } + } + + // ======================================================= + // Scheduler task thread worker + // ======================================================= + + final class SchedulerTask implements Runnable { + final long startMark = System.nanoTime(); + final ReentrantLock ql = new ReentrantLock(); + final Condition qc = ql.newCondition(); + final ArrayList> q = new ArrayList<>(); + boolean shutdownDetected; + + void shutdown() { + ql.lock(); + try { + shutdownDetected = true; + qc.signal(); + } finally { + ql.unlock(); + } + } + + public void run() { + ArrayList> q = this.q; + AbstractScheduledFuture[] remainingFutures; + AbstractScheduledFuture first; + long startMark = this.startMark; + outerLoop: for (;;) { + ql.lock(); + try { + innerLoop: for (;;) { + long now = System.nanoTime(); + if (shutdownDetected) { + // drop all tasks and return + remainingFutures = q.toArray(NO_FUTURES); + q.clear(); + break outerLoop; + } else if (q.isEmpty()) try { + qc.await(); + } catch (InterruptedException ignored) { + // clear interrupt status + continue innerLoop; + } else { + first = q.get(0); + long firstWhen = first.when; + long currentWhen = max(0, now - startMark); + if (firstWhen <= currentWhen) { + // it's ready; run it outside of the lock + q.remove(0); + //noinspection UnnecessaryLabelOnBreakStatement + break innerLoop; + } else { + long waitTime = firstWhen - currentWhen; + try { + qc.awaitNanos(waitTime); + } catch (InterruptedException e) { + // clear interrupt status + continue innerLoop; + } + } + } + } + } finally { + ql.unlock(); + } + // outside of lock; `break innerLoop` goes ↓ here + first.submit(); + // continue loop to find the next task + } + // ↓ `break outerLoop` goes here ↓ + if (remainingFutures.length > 0) { + RejectedExecutionException ree = new RejectedExecutionException("Shut down"); + for (AbstractScheduledFuture future : remainingFutures) { + future.reject(ree); + } + } + return; + } + + > F schedule(final F item) { + Task wrappingTask = item.wrappingTask; + if (item.when <= age()) { + // just submit it now + item.submit(); + return item; + } + ql.lock(); + try { + if (shutdownDetected) { + rejectShutdown(wrappingTask); + return item; + } + int ip = Collections.binarySearch(q, item); + if (ip >= 0) { + // we found other tasks scheduled for the same time; do not modify ip (unlikely) + } else { + // no other tasks scheduled for this time + ip = -ip - 1; + } + q.add(ip, item); + // now check to see if we need to wake up the scheduler + if (ip == 0) { + // the delay time has (probably) changed, so wake up the waiter + qc.signal(); + } + return item; + } finally { + ql.unlock(); + } + } + + long age() { + return System.nanoTime() - startMark; + } + } }