From e8beb58baf5221430c8b2b993613181fda2a6519 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 24 Aug 2021 23:12:08 +0200 Subject: [PATCH] Fixes #6652 - Improve ReservedThreadExecutor dump. (#6653) Fixes #6652 - Improve ReservedThreadExecutor dump. Filtering out non-reserved threads in dump() and doStop(). Signed-off-by: Simone Bordet Co-authored-by: Greg Wilkins (cherry picked from commit b2a023675c4cfd0c1bc54ab8df22630953465eea) --- .../util/thread/ReservedThreadExecutor.java | 57 ++++++++++++------- .../util/thread/QueuedThreadPoolTest.java | 4 +- .../thread/ReservedThreadExecutorTest.java | 1 - 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index 81d41d0c7a2a..42446e9b37aa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.util.thread; import java.io.IOException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -21,6 +22,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.eclipse.jetty.util.AtomicBiInteger; import org.eclipse.jetty.util.ProcessorUtils; @@ -38,14 +40,14 @@ /** - * An Executor using pre-allocated/reserved Threads from a wrapped Executor. - *

Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed - * with a Thread immediately being assigned the Runnable task, or fail if no Thread is - * available. - *

Threads are reserved lazily, with a new reserved threads being allocated from the - * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been + *

A TryExecutor using pre-allocated/reserved threads from an external Executor.

+ *

Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either + * succeed with a reserved thread immediately being assigned the task, or fail if + * no reserved thread is available.

+ *

Threads are reserved lazily, with new reserved threads being allocated from the external + * {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been * idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to - * the executor. + * the external Executor.

*/ @ManagedObject("A pool for reserved threads") public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable @@ -62,7 +64,7 @@ public void run() @Override public String toString() { - return "STOP!"; + return "STOP"; } }; @@ -72,7 +74,6 @@ public String toString() private final SynchronousQueue _queue = new SynchronousQueue<>(false); private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size; private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime()); - private ThreadPoolBudget.Lease _lease; private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT; @@ -175,20 +176,25 @@ public void doStop() throws Exception super.doStop(); - // Offer STOP task to all waiting reserved threads. - for (int i = _count.getAndSetLo(-1); i-- > 0;) + // Mark this instance as stopped. + int size = _count.getAndSetLo(-1); + + // Offer the STOP task to all waiting reserved threads. + for (int i = 0; i < size; ++i) { - // yield to wait for any reserved threads that have incremented the size but not yet polled + // Yield to wait for any reserved threads that + // have incremented the size but not yet polled. Thread.yield(); _queue.offer(STOP); } - // Interrupt any reserved thread missed the offer so it doesn't wait too long. - for (ReservedThread reserved : _threads) - { - Thread thread = reserved._thread; - if (thread != null) - thread.interrupt(); - } + + // Interrupt any reserved thread missed the offer, + // so they do not wait for the whole idle timeout. + _threads.stream() + .filter(ReservedThread::isReserved) + .map(t -> t._thread) + .filter(Objects::nonNull) + .forEach(Thread::interrupt); _threads.clear(); _count.getAndSetHi(0); } @@ -264,13 +270,16 @@ private void startReservedThread() public void dump(Appendable out, String indent) throws IOException { Dumpable.dumpObjects(out, indent, this, - new DumpableCollection("reserved", _threads)); + new DumpableCollection("threads", + _threads.stream() + .filter(ReservedThread::isReserved) + .collect(Collectors.toList()))); } @Override public String toString() { - return String.format("%s@%x{s=%d/%d,p=%d}", + return String.format("%s@%x{reserved=%d/%d,pending=%d}", getClass().getSimpleName(), hashCode(), _count.getLo(), @@ -293,6 +302,11 @@ private class ReservedThread implements Runnable private volatile State _state = State.PENDING; private volatile Thread _thread; + private boolean isReserved() + { + return _state == State.RESERVED; + } + private Runnable reservedWait() { if (LOG.isDebugEnabled()) @@ -321,7 +335,6 @@ private Runnable reservedWait() } _state = size >= 0 ? State.IDLE : State.STOPPED; return STOP; - } catch (InterruptedException e) { diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index bb6d3a5af43e..c065eb38578a 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -831,7 +831,7 @@ public void testDump() throws Exception dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0")); - assertThat(dump, containsString("s=0/2")); + assertThat(dump, containsString("reserved=0/2")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(2)); assertThat(count(dump, " WAITING"), is(1)); @@ -846,7 +846,7 @@ public void testDump() throws Exception dump = pool.dump(); assertThat(count(dump, " - STARTED"), is(2)); assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0")); - assertThat(dump, containsString("s=1/2")); + assertThat(dump, containsString("reserved=1/2")); assertThat(dump, containsString("[ReservedThreadExecutor@")); assertThat(count(dump, " IDLE"), is(1)); assertThat(count(dump, " WAITING"), is(1)); diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java index e3a08b876bf3..58835091022b 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/ReservedThreadExecutorTest.java @@ -359,6 +359,5 @@ public void run() assertThat(usedReserved.get(), greaterThan(0)); assertThat(usedReserved.get() + usedPool.get(), is(LOOPS)); - // System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS); } }