From 64e01beeeb9510e3b3bd028ecd0b45dd173a3b06 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 22 Feb 2021 23:02:12 +0100 Subject: [PATCH 1/2] Fixes #5994 - QueuedThreadPool "free" threads Introduced to QueuedThreadPool: * getMaxReservedThreads() * getAvailableReservedThreads() * getAvailableThreads() * getReadyThreads() * getLeasedThreads() Also few small code cleanups. Signed-off-by: Simone Bordet --- .../jetty/util/thread/QueuedThreadPool.java | 57 +++++++++++++++---- .../jetty/util/thread/ThreadPoolBudget.java | 17 ++++-- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 2408ee877dab..6ae2e1e4de1e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -20,8 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -51,7 +49,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor { private static final Logger LOG = Log.getLogger(QueuedThreadPool.class); - private static Runnable NOOP = () -> + private static final Runnable NOOP = () -> { }; @@ -158,6 +156,7 @@ public void setThreadPoolBudget(ThreadPoolBudget budget) { if (budget != null && budget.getSizedThreadPool() != this) throw new IllegalArgumentException(); + updateBean(_budget, budget); _budget = budget; } @@ -425,16 +424,34 @@ public int getMinThreads() * @return number of reserved threads or or -1 for heuristically determined * @see #setReservedThreads */ - @ManagedAttribute("the number of reserved threads in the pool") + @ManagedAttribute("number of configured reserved threads") public int getReservedThreads() { - if (isStarted()) + return _reservedThreads; + } + + @ManagedAttribute("maximum number of reserved threads") + public int getMaxReservedThreads() + { + TryExecutor tryExecutor = _tryExecutor; + if (tryExecutor instanceof ReservedThreadExecutor) { - ReservedThreadExecutor reservedThreadExecutor = getBean(ReservedThreadExecutor.class); - if (reservedThreadExecutor != null) - return reservedThreadExecutor.getCapacity(); + ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor; + return reservedThreadExecutor.getCapacity(); } - return _reservedThreads; + return 0; + } + + @ManagedAttribute("number of available reserved threads") + public int getAvailableReservedThreads() + { + TryExecutor tryExecutor = _tryExecutor; + if (tryExecutor instanceof ReservedThreadExecutor) + { + ReservedThreadExecutor reservedThreadExecutor = (ReservedThreadExecutor)tryExecutor; + return reservedThreadExecutor.getAvailable(); + } + return 0; } /** @@ -604,8 +621,26 @@ public int getIdleThreads() @ManagedAttribute("number of busy threads in the pool") public int getBusyThreads() { - int reserved = _tryExecutor instanceof ReservedThreadExecutor ? ((ReservedThreadExecutor)_tryExecutor).getAvailable() : 0; - return getThreads() - getIdleThreads() - reserved; + return getThreads() - getIdleThreads() - getAvailableReservedThreads(); + } + + @ManagedAttribute("number of potentially available threads in the pool") + public int getAvailableThreads() + { + return getMaxThreads() - getBusyThreads(); + } + + @ManagedAttribute("number of currently available threads in the pool") + public int getReadyThreads() + { + return getThreads() - getBusyThreads(); + } + + @ManagedAttribute("number of threads leased to components") + public int getLeasedThreads() + { + ThreadPoolBudget budget = _budget; + return budget == null ? 0 : budget.getLeasedThreads(); } /** diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java index 22358196a674..39a3064213d0 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ThreadPoolBudget.java @@ -24,6 +24,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -32,9 +34,10 @@ * * @see ThreadPool.SizedThreadPool#getThreadPoolBudget() */ +@ManagedObject public class ThreadPoolBudget { - static final Logger LOG = Log.getLogger(ThreadPoolBudget.class); + private static final Logger LOG = Log.getLogger(ThreadPoolBudget.class); public interface Lease extends Closeable { @@ -115,6 +118,14 @@ public ThreadPool.SizedThreadPool getSizedThreadPool() return pool; } + @ManagedAttribute("the number of threads leased to components") + public int getLeasedThreads() + { + return leases.stream() + .mapToInt(Lease::getThreads) + .sum(); + } + public void reset() { leases.clear(); @@ -146,9 +157,7 @@ public Lease leaseTo(Object leasee, int threads) */ public boolean check(int maxThreads) throws IllegalStateException { - int required = leases.stream() - .mapToInt(Lease::getThreads) - .sum(); + int required = getLeasedThreads(); int left = maxThreads - required; if (left <= 0) { From 530c14e7b3e228ca474f35cc117160a8fbc2fc33 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 23 Feb 2021 18:12:55 +0100 Subject: [PATCH 2/2] Issue #5994 - QueuedThreadPool "free" threads Updates after review. Signed-off-by: Simone Bordet --- .../jmx/MBeanContainerLifeCycleTest.java | 8 +- .../jetty/util/thread/QueuedThreadPool.java | 446 +++++++++++------- .../util/thread/ReservedThreadExecutor.java | 12 +- .../util/thread/QueuedThreadPoolTest.java | 77 ++- 4 files changed, 358 insertions(+), 185 deletions(-) diff --git a/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/MBeanContainerLifeCycleTest.java b/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/MBeanContainerLifeCycleTest.java index d787acd25530..d610fc21caf2 100644 --- a/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/MBeanContainerLifeCycleTest.java +++ b/jetty-jmx/src/test/java/org/eclipse/jetty/jmx/MBeanContainerLifeCycleTest.java @@ -77,12 +77,13 @@ public void testStoppingContainerDoesNotUnregistersMBeans() throws Exception String pkg = bean.getClass().getPackage().getName(); Set objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null); - assertEquals(1, objectNames.size()); + // QueuedThreadPool and ThreadPoolBudget. + assertEquals(2, objectNames.size()); container.stop(); objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null); - assertEquals(1, objectNames.size()); + assertEquals(2, objectNames.size()); // Remove the MBeans to start clean on the next test. objectNames.forEach(objectName -> @@ -105,7 +106,8 @@ public void testDestroyingContainerUnregistersMBeans() throws Exception String pkg = bean.getClass().getPackage().getName(); Set objectNames = mbeanServer.queryNames(ObjectName.getInstance(pkg + ":*"), null); - assertEquals(1, objectNames.size()); + // QueuedThreadPool and ThreadPoolBudget. + assertEquals(2, objectNames.size()); container.stop(); container.destroy(); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 6ae2e1e4de1e..dc0e66539469 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -45,6 +45,40 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool; +/** + *

A thread pool with a queue of jobs to execute.

+ *

Jetty components that need threads (such as network acceptors and selector) may lease threads + * from this thread pool using a {@link ThreadPoolBudget}; these threads are "active" from the point + * of view of the thread pool, but not available to run transient jobs such as processing + * an HTTP request or a WebSocket frame.

+ *

QueuedThreadPool has a {@link ReservedThreadExecutor} which leases threads from this pool, + * but makes them available as if they are "idle" threads.

+ *

QueuedThreadPool therefore has the following fundamental values:

+ *
    + *
  • {@link #getThreads() threads}: the current number of threads. These threads may execute + * a job (either internal or transient), or may be ready to run (either idle or reserved). + * This number may grow or shrink as the thread pool grows or shrinks.
  • + *
  • {@link #getReadyThreads() readyThreads}: the current number of threads that are ready to + * run transient jobs. + * This number may grow or shrink as the thread pool grows or shrinks.
  • + *
  • {@link #getLeasedThreads() leasedThreads}: the number of threads that run internal jobs. + * This number is typically constant after this thread pool is {@link #start() started}.
  • + *
+ *

Given the definitions above, the most interesting definitions are:

+ *
    + *
  • {@link #getThreads() threads} = {@link #getReadyThreads() readyThreads} + {@link #getLeasedThreads() leasedThreads} + {@link #getUtilizedThreads() utilizedThreads}
  • + *
  • readyThreads = {@link #getIdleThreads() idleThreads} + {@link #getAvailableReservedThreads() availableReservedThreads}
  • + *
  • {@link #getMaxAvailableThreads() maxAvailableThreads} = {@link #getMaxThreads() maxThreads} - leasedThreads
  • + *
  • {@link #getUtilizationRate() utilizationRate} = utilizedThreads / maxAvailableThreads
  • + *
+ *

Other definitions, typically less interesting because they take into account threads that + * execute internal jobs, or because they don't take into account available reserved threads + * (that are essentially ready to execute transient jobs), are:

+ *
    + *
  • {@link #getBusyThreads() busyThreads} = utilizedThreads + leasedThreads
  • + *
  • {@link #getIdleThreads()} idleThreads} = readyThreads - availableReservedThreads
  • + *
+ */ @ManagedObject("A thread pool") public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor { @@ -287,23 +321,19 @@ private void joinThreads(long stopByNanos) throws InterruptedException } /** - * Thread Pool should use Daemon Threading. - * - * @param daemon true to enable delegation - * @see Thread#setDaemon(boolean) + * @return the maximum thread idle time in ms */ - public void setDaemon(boolean daemon) + @ManagedAttribute("maximum time a thread may be idle in ms") + public int getIdleTimeout() { - _daemon = daemon; + return _idleTimeout; } /** - * Set the maximum thread idle time. - * Threads that are idle for longer than this period may be - * stopped. + *

Set the maximum thread idle time in ms.

+ *

Threads that are idle for longer than this period may be stopped.

* - * @param idleTimeout Max idle time in ms. - * @see #getIdleTimeout + * @param idleTimeout the maximum thread idle time in ms */ public void setIdleTimeout(int idleTimeout) { @@ -314,10 +344,17 @@ public void setIdleTimeout(int idleTimeout) } /** - * Set the maximum number of threads. - * - * @param maxThreads maximum number of threads. - * @see #getMaxThreads + * @return the maximum number of threads + */ + @Override + @ManagedAttribute("maximum number of threads in the pool") + public int getMaxThreads() + { + return _maxThreads; + } + + /** + * @param maxThreads the maximum number of threads */ @Override public void setMaxThreads(int maxThreads) @@ -330,10 +367,17 @@ public void setMaxThreads(int maxThreads) } /** - * Set the minimum number of threads. - * + * @return the minimum number of threads + */ + @Override + @ManagedAttribute("minimum number of threads in the pool") + public int getMinThreads() + { + return _minThreads; + } + + /** * @param minThreads minimum number of threads - * @see #getMinThreads */ @Override public void setMinThreads(int minThreads) @@ -348,10 +392,16 @@ public void setMinThreads(int minThreads) } /** - * Set the number of reserved threads. - * + * @return number of reserved threads or -1 for heuristically determined + */ + @ManagedAttribute("number of configured reserved threads or -1 for heuristic") + public int getReservedThreads() + { + return _reservedThreads; + } + + /** * @param reservedThreads number of reserved threads or -1 for heuristically determined - * @see #getReservedThreads */ public void setReservedThreads(int reservedThreads) { @@ -361,76 +411,100 @@ public void setReservedThreads(int reservedThreads) } /** - * @param name Name of this thread pool to use when naming threads. + * @return the name of the this thread pool + */ + @ManagedAttribute("name of the thread pool") + public String getName() + { + return _name; + } + + /** + *

Sets the name of this thread pool, used as a prefix for the thread names.

+ * + * @param name the name of the this thread pool */ public void setName(String name) { if (isRunning()) - throw new IllegalStateException("started"); + throw new IllegalStateException(getState()); _name = name; } /** - * Set the priority of the pool threads. - * - * @param priority the new thread priority. + * @return the priority of the pool threads */ - public void setThreadsPriority(int priority) + @ManagedAttribute("priority of threads in the pool") + public int getThreadsPriority() { - _priority = priority; + return _priority; } /** - * Get the maximum thread idle time. - * - * @return Max idle time in ms. - * @see #setIdleTimeout + * @param priority the priority of the pool threads */ - @ManagedAttribute("maximum time a thread may be idle in ms") - public int getIdleTimeout() + public void setThreadsPriority(int priority) { - return _idleTimeout; + _priority = priority; } /** - * Get the maximum number of threads. - * - * @return maximum number of threads. - * @see #setMaxThreads + * @return whether to use daemon threads + * @see Thread#isDaemon() */ - @Override - @ManagedAttribute("maximum number of threads in the pool") - public int getMaxThreads() + @ManagedAttribute("thread pool uses daemon threads") + public boolean isDaemon() { - return _maxThreads; + return _daemon; } /** - * Get the minimum number of threads. - * - * @return minimum number of threads. - * @see #setMinThreads + * @param daemon whether to use daemon threads + * @see Thread#setDaemon(boolean) */ - @Override - @ManagedAttribute("minimum number of threads in the pool") - public int getMinThreads() + public void setDaemon(boolean daemon) { - return _minThreads; + _daemon = daemon; + } + + @ManagedAttribute("reports additional details in the dump") + public boolean isDetailedDump() + { + return _detailedDump; + } + + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } + + @ManagedAttribute("threshold at which the pool is low on threads") + public int getLowThreadsThreshold() + { + return _lowThreadsThreshold; + } + + public void setLowThreadsThreshold(int lowThreadsThreshold) + { + _lowThreadsThreshold = lowThreadsThreshold; } /** - * Get the number of reserved threads. - * - * @return number of reserved threads or or -1 for heuristically determined - * @see #setReservedThreads + * @return the number of jobs in the queue waiting for a thread */ - @ManagedAttribute("number of configured reserved threads") - public int getReservedThreads() + @ManagedAttribute("size of the job queue") + public int getQueueSize() { - return _reservedThreads; + // The idle counter encodes demand, which is the effective queue size + int idle = _counts.getLo(); + return Math.max(0, -idle); } - @ManagedAttribute("maximum number of reserved threads") + /** + * @return the maximum number (capacity) of reserved threads + * @see ReservedThreadExecutor#getCapacity() + */ + @ManagedAttribute("maximum number (capacity) of reserved threads") public int getMaxReservedThreads() { TryExecutor tryExecutor = _tryExecutor; @@ -442,6 +516,10 @@ public int getMaxReservedThreads() return 0; } + /** + * @return the number of available reserved threads + * @see ReservedThreadExecutor#getAvailable() + */ @ManagedAttribute("number of available reserved threads") public int getAvailableReservedThreads() { @@ -455,68 +533,147 @@ public int getAvailableReservedThreads() } /** - * @return The name of the this thread pool + *

The fundamental value that represents the number of threads currently known by this thread pool.

+ *

This value includes threads that have been leased to internal components, idle threads, reserved threads + * and threads that are executing transient jobs.

+ * + * @return the number of threads currently known to the pool + * @see #getReadyThreads() + * @see #getLeasedThreads() */ - @ManagedAttribute("name of the thread pool") - public String getName() + @Override + @ManagedAttribute("number of threads in the pool") + public int getThreads() { - return _name; + int threads = _counts.getHi(); + return Math.max(0, threads); } /** - * Get the priority of the pool threads. + *

The fundamental value that represents the number of threads ready to execute transient jobs.

* - * @return the priority of the pool threads. + * @return the number of threads ready to execute transient jobs + * @see #getThreads() + * @see #getLeasedThreads() + * @see #getUtilizedThreads() */ - @ManagedAttribute("priority of threads in the pool") - public int getThreadsPriority() + @ManagedAttribute("number of threads ready to execute transient jobs") + public int getReadyThreads() { - return _priority; + return getIdleThreads() + getAvailableReservedThreads(); } /** - * Get the size of the job queue. + *

The fundamental value that represents the number of threads that are leased + * to internal components, and therefore cannot be used to execute transient jobs.

* - * @return Number of jobs queued waiting for a thread + * @return the number of threads currently used by internal components + * @see #getThreads() + * @see #getReadyThreads() */ - @ManagedAttribute("size of the job queue") - public int getQueueSize() + @ManagedAttribute("number of threads used by internal components") + public int getLeasedThreads() + { + return getMaxLeasedThreads() - getMaxReservedThreads(); + } + + /** + *

The maximum number of threads that are leased to internal components, + * as some component may allocate its threads lazily.

+ * + * @return the maximum number of threads leased by internal components + * @see #getLeasedThreads() + */ + @ManagedAttribute("maximum number of threads leased to internal components") + public int getMaxLeasedThreads() + { + ThreadPoolBudget budget = _budget; + return budget == null ? 0 : budget.getLeasedThreads(); + } + + /** + *

The number of idle threads, but without including reserved threads.

+ *

Prefer {@link #getReadyThreads()} for a better representation of + * "threads ready to execute transient jobs".

+ * + * @return the number of idle threads but not reserved + * @see #getReadyThreads() + */ + @Override + @ManagedAttribute("number of idle threads but not reserved") + public int getIdleThreads() { - // The idle counter encodes demand, which is the effective queue size int idle = _counts.getLo(); - return Math.max(0, -idle); + return Math.max(0, idle); } /** - * @return whether this thread pool is using daemon threads - * @see Thread#setDaemon(boolean) + *

The number of threads executing internal and transient jobs.

+ *

Prefer {@link #getUtilizedThreads()} for a better representation of + * "threads executing transient jobs".

+ * + * @return the number of threads executing internal and transient jobs + * @see #getUtilizedThreads() */ - @ManagedAttribute("thread pool uses daemon threads") - public boolean isDaemon() + @ManagedAttribute("number of threads executing internal and transient jobs") + public int getBusyThreads() { - return _daemon; + return getThreads() - getReadyThreads(); } - @ManagedAttribute("reports additional details in the dump") - public boolean isDetailedDump() + /** + *

The number of threads executing transient jobs.

+ * + * @return the number of threads executing transient jobs + * @see #getReadyThreads() + */ + @ManagedAttribute("number of threads executing transient jobs") + public int getUtilizedThreads() { - return _detailedDump; + return getThreads() - getLeasedThreads() - getReadyThreads(); } - public void setDetailedDump(boolean detailedDump) + /** + *

The maximum number of threads available to run transient jobs.

+ * + * @return the maximum number of threads available to run transient jobs + */ + @ManagedAttribute("maximum number of threads available to run transient jobs") + public int getMaxAvailableThreads() { - _detailedDump = detailedDump; + return getMaxThreads() - getLeasedThreads(); } - @ManagedAttribute("threshold at which the pool is low on threads") - public int getLowThreadsThreshold() + /** + *

The rate between the number of {@link #getUtilizedThreads() utilized threads} + * and the maximum number of {@link #getMaxAvailableThreads() utilizable threads}.

+ *

A value of {@code 0.0D} means that the thread pool is not utilized, while a + * value of {@code 1.0D} means that the thread pool is fully utilized to execute + * transient jobs.

+ * + * @return the utilization rate of threads executing transient jobs + */ + @ManagedAttribute("utilization rate of threads executing transient jobs") + public double getUtilizationRate() { - return _lowThreadsThreshold; + return (double)getUtilizedThreads() / getMaxAvailableThreads(); } - public void setLowThreadsThreshold(int lowThreadsThreshold) + /** + *

Returns whether this thread pool is low on threads.

+ *

The current formula is:

+ *
+     * maxThreads - threads + readyThreads - queueSize <= lowThreadsThreshold
+     * 
+ * + * @return whether the pool is low on threads + * @see #getLowThreadsThreshold() + */ + @Override + @ManagedAttribute(value = "thread pool is low on threads", readonly = true) + public boolean isLowOnThreads() { - _lowThreadsThreshold = lowThreadsThreshold; + return getMaxThreads() - getThreads() + getReadyThreads() - getQueueSize() <= getLowThreadsThreshold(); } @Override @@ -593,73 +750,6 @@ public void join() throws InterruptedException } } - /** - * @return the total number of threads currently in the pool - */ - @Override - @ManagedAttribute("number of threads in the pool") - public int getThreads() - { - int threads = _counts.getHi(); - return Math.max(0, threads); - } - - /** - * @return the number of idle threads in the pool - */ - @Override - @ManagedAttribute("number of idle threads in the pool") - public int getIdleThreads() - { - int idle = _counts.getLo(); - return Math.max(0, idle); - } - - /** - * @return the number of busy threads in the pool - */ - @ManagedAttribute("number of busy threads in the pool") - public int getBusyThreads() - { - return getThreads() - getIdleThreads() - getAvailableReservedThreads(); - } - - @ManagedAttribute("number of potentially available threads in the pool") - public int getAvailableThreads() - { - return getMaxThreads() - getBusyThreads(); - } - - @ManagedAttribute("number of currently available threads in the pool") - public int getReadyThreads() - { - return getThreads() - getBusyThreads(); - } - - @ManagedAttribute("number of threads leased to components") - public int getLeasedThreads() - { - ThreadPoolBudget budget = _budget; - return budget == null ? 0 : budget.getLeasedThreads(); - } - - /** - *

Returns whether this thread pool is low on threads.

- *

The current formula is:

- *
-     * maxThreads - threads + idleThreads - queueSize <= lowThreadsThreshold
-     * 
- * - * @return whether the pool is low on threads - * @see #getLowThreadsThreshold() - */ - @Override - @ManagedAttribute(value = "thread pool is low on threads", readonly = true) - public boolean isLowOnThreads() - { - return getMaxThreads() - getThreads() + getIdleThreads() - getQueueSize() <= getLowThreadsThreshold(); - } - private void ensureThreads() { while (true) @@ -779,28 +869,6 @@ private String getCompressedStackTag(StackTraceElement[] trace) return ""; } - @Override - public String toString() - { - long count = _counts.get(); - int threads = Math.max(0, AtomicBiInteger.getHi(count)); - int idle = Math.max(0, AtomicBiInteger.getLo(count)); - int queue = getQueueSize(); - - return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", - getClass().getSimpleName(), - _name, - hashCode(), - getState(), - getMinThreads(), - threads, - getMaxThreads(), - idle, - getReservedThreads(), - queue, - _tryExecutor); - } - private final Runnable _runnable = new Runner(); /** @@ -874,6 +942,28 @@ public String dumpThread(@Name("id") long id) return null; } + @Override + public String toString() + { + long count = _counts.get(); + int threads = Math.max(0, AtomicBiInteger.getHi(count)); + int idle = Math.max(0, AtomicBiInteger.getLo(count)); + int queue = getQueueSize(); + + return String.format("%s[%s]@%x{%s,%d<=%d<=%d,i=%d,r=%d,q=%d}[%s]", + getClass().getSimpleName(), + _name, + hashCode(), + getState(), + getMinThreads(), + threads, + getMaxThreads(), + idle, + getReservedThreads(), + queue, + _tryExecutor); + } + private class Runner implements Runnable { private Runnable idleJobPoll(long idleTimeout) throws InterruptedException diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java index bacf70a64893..610e59f798aa 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/ReservedThreadExecutor.java @@ -111,12 +111,18 @@ public Executor getExecutor() return _executor; } + /** + * @return the maximum number of reserved threads + */ @ManagedAttribute(value = "max number of reserved threads", readonly = true) public int getCapacity() { return _capacity; } + /** + * @return the number of threads available to {@link #tryExecute(Runnable)} + */ @ManagedAttribute(value = "available reserved threads", readonly = true) public int getAvailable() { @@ -196,8 +202,10 @@ public void execute(Runnable task) throws RejectedExecutionException } /** - * @param task The task to run - * @return True iff a reserved thread was available and has been assigned the task to run. + *

Executes the given task if and only if a reserved thread is available.

+ * + * @param task the task to run + * @return true if and only if a reserved thread was available and has been assigned the task to run. */ @Override public boolean tryExecute(Runnable task) diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java index 62bd0fd0e723..6ecf4282536f 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/thread/QueuedThreadPoolTest.java @@ -21,6 +21,8 @@ import java.io.Closeable; import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -194,7 +196,7 @@ public void testThreadPool() throws Exception waitForIdle(tp, 2); // Doesn't shrink to less than min threads - Thread.sleep(3 * tp.getIdleTimeout() / 2); + Thread.sleep(3L * tp.getIdleTimeout() / 2); assertThat(tp.getThreads(), is(2)); assertThat(tp.getIdleThreads(), is(2)); @@ -296,7 +298,7 @@ public void testThreadPoolFailingJobs() throws Exception waitForIdle(tp, 2); // Doesn't shrink to less than min threads - Thread.sleep(3 * tp.getIdleTimeout() / 2); + Thread.sleep(3L * tp.getIdleTimeout() / 2); waitForThreads(tp, 2); waitForIdle(tp, 2); @@ -857,6 +859,77 @@ public void testContextClassLoader() throws Exception assertThat(t.getContextClassLoader(), Matchers.equalTo(QueuedThreadPool.class.getClassLoader())); } } + + @Test + public void testThreadCounts() throws Exception + { + int maxThreads = 100; + QueuedThreadPool tp = new QueuedThreadPool(maxThreads, 0); + // Long timeout so it does not expire threads during the test. + tp.setIdleTimeout(60000); + int reservedThreads = 7; + tp.setReservedThreads(reservedThreads); + tp.start(); + int leasedThreads = 5; + tp.getThreadPoolBudget().leaseTo(new Object(), leasedThreads); + List leasedJobs = new ArrayList<>(); + for (int i = 0; i < leasedThreads; ++i) + { + RunningJob job = new RunningJob("JOB" + i); + leasedJobs.add(job); + tp.execute(job); + assertTrue(job._run.await(5, TimeUnit.SECONDS)); + } + + // Run some job to spawn threads. + for (int i = 0; i < 3; ++i) + { + tp.tryExecute(() -> {}); + } + int spawned = 13; + List jobs = new ArrayList<>(); + for (int i = 0; i < spawned; ++i) + { + RunningJob job = new RunningJob("JOB" + i); + jobs.add(job); + tp.execute(job); + assertTrue(job._run.await(5, TimeUnit.SECONDS)); + } + for (RunningJob job : jobs) + { + job._stopping.countDown(); + } + + // Wait for the threads to become idle again. + Thread.sleep(1000); + + // Submit less jobs to the queue so we have active and idle threads. + jobs.clear(); + int transientJobs = spawned / 2; + for (int i = 0; i < transientJobs; ++i) + { + RunningJob job = new RunningJob("JOB" + i); + jobs.add(job); + tp.execute(job); + assertTrue(job._run.await(5, TimeUnit.SECONDS)); + } + + try + { + assertThat(tp.getMaxReservedThreads(), Matchers.equalTo(reservedThreads)); + assertThat(tp.getLeasedThreads(), Matchers.equalTo(leasedThreads)); + assertThat(tp.getReadyThreads(), Matchers.equalTo(tp.getIdleThreads() + tp.getAvailableReservedThreads())); + assertThat(tp.getUtilizedThreads(), Matchers.equalTo(transientJobs)); + assertThat(tp.getThreads(), Matchers.equalTo(tp.getReadyThreads() + tp.getLeasedThreads() + tp.getUtilizedThreads())); + assertThat(tp.getBusyThreads(), Matchers.equalTo(tp.getUtilizedThreads() + tp.getLeasedThreads())); + } + finally + { + jobs.forEach(job -> job._stopping.countDown()); + leasedJobs.forEach(job -> job._stopping.countDown()); + tp.stop(); + } + } private int count(String s, String p) {