From 735e97d5c7e41a39e759a5d76e45f0b5c1226209 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 29 Jul 2021 09:46:48 +1000 Subject: [PATCH] Non blocking ReservedThreadExecutor (#6535) A call to offer must never block, nor even yield, since to do so give an opportunity for the allocated CPU core to change, defeating the whole purpose of the class. There is also some reasonable level of diagnostic warnings if a reserved thread misses too many offers consecutively, based on tracking the state of the reserved thread. Remove the stack data structure entirely. ReservedThreads all poll the same SynchronousQueue and tryExecute does a non blocking offer. Added test for busy shrinking Remember last time we hit zero reserved threads Co-authored-by: Simone Bordet --- .../util/thread/ReservedThreadExecutor.java | 337 +++++++++--------- .../thread/ReservedThreadExecutorTest.java | 43 ++- 2 files changed, 216 insertions(+), 164 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 372d563625d8..924f1d0564ec 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -18,34 +18,44 @@ package org.eclipse.jetty.util.thread; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.ProcessorUtils; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.eclipse.jetty.util.AtomicBiInteger.getHi; +import static org.eclipse.jetty.util.AtomicBiInteger.getLo; + /** - * An Executor using preallocated/reserved Threads from a wrapped Executor. + * An Executor using pre-allocated/reserved Threads from a wrapped Executor. *

Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed * with a Thread immediately being assigned the Runnable task, or fail if no Thread is * available. - *

Threads are reserved lazily, with a new reserved thread being allocated from a - * wrapped {@link Executor} when an execution fails. If the {@link #setIdleTimeout(long, TimeUnit)} - * is set to non zero (default 1 minute), then the reserved thread pool will shrink by 1 thread - * whenever it has been idle for that period. + *

Threads are reserved lazily, with a new reserved threads being allocated from the + * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been + * idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to + * the executor. */ @ManagedObject("A pool for reserved threads") -public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor +public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable { private static final Logger LOG = Log.getLogger(ReservedThreadExecutor.class); + private static final long DEFAULT_IDLE_TIMEOUT = TimeUnit.MINUTES.toNanos(1); private static final Runnable STOP = new Runnable() { @Override @@ -62,13 +72,13 @@ public String toString() private final Executor _executor; private final int _capacity; - private final ConcurrentLinkedDeque _stack; - private final AtomicInteger _size = new AtomicInteger(); - private final AtomicInteger _pending = new AtomicInteger(); + private final Set _threads = ConcurrentHashMap.newKeySet(); + private final SynchronousQueue _queue = new SynchronousQueue<>(false); + private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size; + private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime()); private ThreadPoolBudget.Lease _lease; - private long _idleTime = 1L; - private TimeUnit _idleTimeUnit = TimeUnit.MINUTES; + private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT; /** * @param executor The executor to use to obtain threads @@ -80,7 +90,6 @@ public ReservedThreadExecutor(Executor executor, int capacity) { _executor = executor; _capacity = reservedThreads(executor, capacity); - _stack = new ConcurrentLinkedDeque<>(); if (LOG.isDebugEnabled()) LOG.debug("{}", this); } @@ -126,42 +135,39 @@ public int getCapacity() @ManagedAttribute(value = "available reserved threads", readonly = true) public int getAvailable() { - return _stack.size(); + return _count.getLo(); } @ManagedAttribute(value = "pending reserved threads", readonly = true) public int getPending() { - return _pending.get(); + return _count.getHi(); } - @ManagedAttribute(value = "idletimeout in MS", readonly = true) + @ManagedAttribute(value = "idle timeout in ms", readonly = true) public long getIdleTimeoutMs() { - if (_idleTimeUnit == null) - return 0; - return _idleTimeUnit.toMillis(_idleTime); + return NANOSECONDS.toMillis(_idleTimeNanos); } /** * Set the idle timeout for shrinking the reserved thread pool * - * @param idleTime Time to wait before shrinking, or 0 for no timeout. + * @param idleTime Time to wait before shrinking, or 0 for default timeout. * @param idleTimeUnit Time units for idle timeout */ public void setIdleTimeout(long idleTime, TimeUnit idleTimeUnit) { if (isRunning()) throw new IllegalStateException(); - _idleTime = idleTime; - _idleTimeUnit = idleTimeUnit; + _idleTimeNanos = (idleTime <= 0 || idleTimeUnit == null) ? DEFAULT_IDLE_TIMEOUT : idleTimeUnit.toNanos(idleTime); } @Override public void doStart() throws Exception { _lease = ThreadPoolBudget.leaseFrom(getExecutor(), this, _capacity); - _size.set(0); + _count.set(0, 0); super.doStart(); } @@ -173,26 +179,22 @@ public void doStop() throws Exception super.doStop(); - while (true) + // Offer STOP task to all waiting reserved threads. + for (int i = _count.getAndSetLo(-1); i-- > 0;) { - int size = _size.get(); - // If no reserved threads left try setting size to -1 to - // atomically prevent other threads adding themselves to stack. - if (size == 0 && _size.compareAndSet(size, -1)) - break; - - ReservedThread thread = _stack.pollFirst(); - if (thread == null) - { - // Reserved thread must have incremented size but not yet added itself to queue. - // We will spin until it is added. - Thread.yield(); - continue; - } - - _size.decrementAndGet(); - thread.stop(); + // yield to wait for any reserved threads that have incremented the size but not yet polled + Thread.yield(); + _queue.offer(STOP); + } + // Interrupt any reserved thread missed the offer so it doesn't wait too long. + for (ReservedThread reserved : _threads) + { + Thread thread = reserved._thread; + if (thread != null) + thread.interrupt(); } + _threads.clear(); + _count.getAndSetHi(0); } @Override @@ -212,52 +214,60 @@ public boolean tryExecute(Runnable task) { if (LOG.isDebugEnabled()) LOG.debug("{} tryExecute {}", this, task); - if (task == null) return false; - ReservedThread thread = _stack.pollFirst(); - if (thread == null) - { - if (task != STOP) - startReservedThread(); - return false; - } + // Offer will only succeed if there is a reserved thread waiting + boolean offered = _queue.offer(task); - int size = _size.decrementAndGet(); - if (!thread.offer(task)) - return false; + // If the offer succeeded we need to reduce the size, unless it is set to -1 in the meantime + int size = _count.getLo(); + while (offered && size > 0 && !_count.compareAndSetLo(size, --size)) + size = _count.getLo(); + // If size is 0 and we are not stopping, start a new reserved thread if (size == 0 && task != STOP) startReservedThread(); - return true; + return offered; } private void startReservedThread() { - try + while (true) { - while (true) + long count = _count.get(); + int pending = getHi(count); + int size = getLo(count); + if (size < 0 || pending + size >= _capacity) + return; + if (size == 0) + _lastEmptyTime.set(System.nanoTime()); + if (!_count.compareAndSet(count, pending + 1, size)) + continue; + + if (LOG.isDebugEnabled()) + LOG.debug("{} startReservedThread p={}", this, pending + 1); + try { - // Not atomic, but there is a re-check in ReservedThread.run(). - int pending = _pending.get(); - int size = _size.get(); - if (pending + size >= _capacity) - return; - if (_pending.compareAndSet(pending, pending + 1)) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} startReservedThread p={}", this, pending + 1); - _executor.execute(new ReservedThread()); - return; - } + ReservedThread thread = new ReservedThread(); + _threads.add(thread); + _executor.execute(thread); } + catch (Throwable e) + { + _count.add(-1, 0); + LOG.ignore(e); + } + return; } - catch (RejectedExecutionException e) - { - LOG.ignore(e); - } + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + Dumpable.dumpObjects(out, indent, this, + new DumpableCollection("reserved", _threads)); } @Override @@ -266,139 +276,148 @@ public String toString() return String.format("%s@%x{s=%d/%d,p=%d}", getClass().getSimpleName(), hashCode(), - _size.get(), + _count.getLo(), _capacity, - _pending.get()); + _count.getHi()); } - private class ReservedThread implements Runnable + private enum State { - private final SynchronousQueue _task = new SynchronousQueue<>(); - private boolean _starting = true; - - public boolean offer(Runnable task) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} offer {}", this, task); - - try - { - _task.put(task); - return true; - } - catch (Throwable e) - { - LOG.ignore(e); - _size.getAndIncrement(); - _stack.offerFirst(this); - return false; - } - } + PENDING, + RESERVED, + RUNNING, + IDLE, + STOPPED + } - public void stop() - { - offer(STOP); - } + private class ReservedThread implements Runnable + { + // The state and thread are kept only for dumping + private volatile State _state = State.PENDING; + private volatile Thread _thread; private Runnable reservedWait() { if (LOG.isDebugEnabled()) - LOG.debug("{} waiting", this); + LOG.debug("{} waiting {}", this, ReservedThreadExecutor.this); - while (true) + // Keep waiting until stopped, tasked or idle + while (_count.getLo() >= 0) { try { - Runnable task = _idleTime <= 0 ? _task.take() : _task.poll(_idleTime, _idleTimeUnit); + // Always poll at some period as safety to ensure we don't poll forever. + Runnable task = _queue.poll(_idleTimeNanos, NANOSECONDS); if (LOG.isDebugEnabled()) - LOG.debug("{} task={}", this, task); + LOG.debug("{} task={} {}", this, task, ReservedThreadExecutor.this); if (task != null) return task; - if (_stack.remove(this)) + // we have idled out + int size = _count.getLo(); + // decrement size if we have not also been stopped. + while (size > 0) { - if (LOG.isDebugEnabled()) - LOG.debug("{} IDLE", this); - _size.decrementAndGet(); - return STOP; + if (_count.compareAndSetLo(size, --size)) + break; + size = _count.getLo(); } + _state = size >= 0 ? State.IDLE : State.STOPPED; + return STOP; + } catch (InterruptedException e) { LOG.ignore(e); - // If the wait was interrupted, then STOP if we are not running - if (!isRunning()) - return STOP; } } + _state = State.STOPPED; + return STOP; } @Override public void run() { - while (isRunning()) + _thread = Thread.currentThread(); + try { - // test and increment size BEFORE decrementing pending, - // so that we don't have a race starting new pending. - int size = _size.get(); + while (true) + { + long count = _count.get(); - // Are we stopped? - if (size < 0) - return; + // reduce pending if this thread was pending + int pending = getHi(count) - (_state == State.PENDING ? 1 : 0); + int size = getLo(count); - // Are we surplus to capacity? - if (size >= _capacity) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} size {} > capacity", this, size, _capacity); - if (_starting) - _pending.decrementAndGet(); - return; - } + State next; + if (size < 0 || size >= _capacity) + { + // The executor has stopped or this thread is excess to capacity + next = State.STOPPED; + } + else + { + long now = System.nanoTime(); + long lastEmpty = _lastEmptyTime.get(); + if (size > 0 && _idleTimeNanos < (now - lastEmpty) && _lastEmptyTime.compareAndSet(lastEmpty, now)) + { + // it has been too long since we hit zero reserved threads, so are "busy" idle + next = State.IDLE; + } + else + { + // We will become a reserved thread if we can update the count below. + next = State.RESERVED; + size++; + } + } - // If we cannot update size then recalculate - if (!_size.compareAndSet(size, size + 1)) - continue; + // Update count for pending and size + if (!_count.compareAndSet(count, pending, size)) + continue; - if (_starting) - { if (LOG.isDebugEnabled()) - LOG.debug("{} started", this); - _pending.decrementAndGet(); - _starting = false; - } + LOG.debug("{} was={} next={} size={}+{} capacity={}", this, _state, next, pending, size, _capacity); + _state = next; + if (next != State.RESERVED) + break; - // Insert ourselves in the stack. Size is already incremented, but - // that only effects the decision to keep other threads reserved. - _stack.offerFirst(this); + // We are reserved whilst we are waiting for an offered _task. + Runnable task = reservedWait(); - // Once added to the stack, we must always wait for a job on the _task Queue - // and never return early, else we may leave a thread blocked offering a _task. - Runnable task = reservedWait(); + // Is the task the STOP poison pill? + if (task == STOP) + break; - if (task == STOP) - // return on STOP poison pill - break; - - // Run the task - try - { - task.run(); - } - catch (Throwable e) - { - LOG.warn(e); + // Run the task + try + { + _state = State.RUNNING; + task.run(); + } + catch (Throwable e) + { + LOG.warn("Unable to run task", e); + } } } - - if (LOG.isDebugEnabled()) - LOG.debug("{} Exited", this); + finally + { + if (LOG.isDebugEnabled()) + LOG.debug("{} exited {}", this, ReservedThreadExecutor.this); + _threads.remove(this); + _thread = null; + } } @Override public String toString() { - return String.format("%s@%x", ReservedThreadExecutor.this, hashCode()); + return String.format("%s@%x{%s,thread=%s}", + getClass().getSimpleName(), + hashCode(), + _state, + _thread); } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index 45928cc34437..654b248928d1 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -27,10 +27,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -185,6 +185,35 @@ public void testShrink() throws Exception assertThat(_reservedExecutor.getAvailable(), is(0)); } + @Test + public void testBusyShrink() throws Exception + { + final long IDLE = 1000; + + _reservedExecutor.stop(); + _reservedExecutor.setIdleTimeout(IDLE, TimeUnit.MILLISECONDS); + _reservedExecutor.start(); + assertThat(_reservedExecutor.getAvailable(), is(0)); + + assertThat(_reservedExecutor.tryExecute(NOOP), is(false)); + assertThat(_reservedExecutor.tryExecute(NOOP), is(false)); + + _executor.startThread(); + _executor.startThread(); + + waitForAvailable(2); + + int available = _reservedExecutor.getAvailable(); + assertThat(available, is(2)); + + for (int i = 10; i-- > 0;) + { + assertThat(_reservedExecutor.tryExecute(NOOP), is(true)); + Thread.sleep(200); + } + assertThat(_reservedExecutor.getAvailable(), is(1)); + } + @Test public void testReservedIdleTimeoutWithOneReservedThread() throws Exception { @@ -266,7 +295,6 @@ public void run() } } - @Disabled @Test public void stressTest() throws Exception { @@ -277,9 +305,9 @@ public void stressTest() throws Exception reserved.setIdleTimeout(0, null); reserved.start(); - final int LOOPS = 1000000; + final int LOOPS = 200000; final AtomicInteger executions = new AtomicInteger(LOOPS); - final CountDownLatch executed = new CountDownLatch(executions.get()); + final CountDownLatch executed = new CountDownLatch(LOOPS); final AtomicInteger usedReserved = new AtomicInteger(0); final AtomicInteger usedPool = new AtomicInteger(0); @@ -328,10 +356,15 @@ public void run() assertTrue(executed.await(60, TimeUnit.SECONDS)); + // ensure tryExecute is still working + while (!reserved.tryExecute(() -> {})) + Thread.yield(); + reserved.stop(); pool.stop(); + assertThat(usedReserved.get(), greaterThan(0)); assertThat(usedReserved.get() + usedPool.get(), is(LOOPS)); - System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS); + // System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS); } }