diff --git a/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-all-virtual.xml b/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-all-virtual.xml new file mode 100644 index 000000000000..2719a4011a3d --- /dev/null +++ b/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-all-virtual.xml @@ -0,0 +1,15 @@ + + + + + + + + + + org.eclipse.jetty + + Virtual threads enabled. Using virtual threads for all tasks. + + + diff --git a/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-virtual.xml b/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-virtual.xml index 71dc9444ba8d..77109d6f83b7 100644 --- a/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-virtual.xml +++ b/jetty-core/jetty-server/src/main/config/etc/jetty-threadpool-virtual.xml @@ -37,7 +37,7 @@ org.eclipse.jetty - Virtual threads are enabled. + Virtual threads enabled. Using virtual threads for application tasks. diff --git a/jetty-core/jetty-server/src/main/config/modules/threadpool-all-virtual.mod b/jetty-core/jetty-server/src/main/config/modules/threadpool-all-virtual.mod new file mode 100644 index 000000000000..1f9526b4f30e --- /dev/null +++ b/jetty-core/jetty-server/src/main/config/modules/threadpool-all-virtual.mod @@ -0,0 +1,19 @@ +[description] +Enables and configures the Server ThreadPool with support for virtual threads to be used for all threads. +There is some risk of CPU pinning with this configuration. Only supported in Java 21 or later. + +[depends] +logging + +[provides] +threadpool + +[xml] +etc/jetty-threadpool-all-virtual.xml + +[ini-template] +# tag::documentation[] +## Platform threads name prefix. +#jetty.threadPool.namePrefix=vtp + +# end::documentation[] diff --git a/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod b/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod index 54a2912b4f13..88ab2a0c2749 100644 --- a/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod +++ b/jetty-core/jetty-server/src/main/config/modules/threadpool-virtual.mod @@ -1,5 +1,6 @@ [description] -Enables and configures the Server ThreadPool with support for virtual threads in Java 21 or later. +Enables and configures the Server ThreadPool with support for virtual threads to be used for blocking tasks. +Only supported in Java 21 or later. [depends] logging diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java index 8218e6174ba3..08ffd1f000a3 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/VirtualThreads.java @@ -32,21 +32,9 @@ public class VirtualThreads { private static final Logger LOG = LoggerFactory.getLogger(VirtualThreads.class); - private static final Executor executor = probeVirtualThreadExecutor(); + private static final Executor executor = getNamedVirtualThreadsExecutor(null); private static final Method isVirtualThread = probeIsVirtualThread(); - private static Executor probeVirtualThreadExecutor() - { - try - { - return (Executor)Executors.class.getMethod("newVirtualThreadPerTaskExecutor").invoke(null); - } - catch (Throwable x) - { - return null; - } - } - private static Method probeIsVirtualThread() { try @@ -131,7 +119,8 @@ public static Executor getNamedVirtualThreadsExecutor(String namePrefix) { Class builderClass = Class.forName("java.lang.Thread$Builder"); Object threadBuilder = Thread.class.getMethod("ofVirtual").invoke(null); - threadBuilder = builderClass.getMethod("name", String.class, long.class).invoke(threadBuilder, namePrefix, 0L); + if (StringUtil.isNotBlank(namePrefix)) + threadBuilder = builderClass.getMethod("name", String.class, long.class).invoke(threadBuilder, namePrefix, 0L); ThreadFactory factory = (ThreadFactory)builderClass.getMethod("factory").invoke(threadBuilder); return (Executor)Executors.class.getMethod("newThreadPerTaskExecutor", ThreadFactory.class).invoke(null, factory); } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java index 2078d92609ea..71ebd2534c50 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java @@ -449,7 +449,7 @@ public void setReservedThreads(int reservedThreads) } /** - * @return the name of the this thread pool + * @return the name of this thread pool */ @ManagedAttribute("name of the thread pool") public String getName() @@ -460,7 +460,7 @@ public String getName() /** *

Sets the name of this thread pool, used as a prefix for the thread names.

* - * @param name the name of the this thread pool + * @param name the name of this thread pool */ public void setName(String name) { @@ -835,7 +835,7 @@ public void join() throws InterruptedException while (isStopping()) { - Thread.sleep(1); + Thread.onSpinWait(); } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java new file mode 100644 index 000000000000..9714d5490741 --- /dev/null +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/TrackingExecutor.java @@ -0,0 +1,102 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.Dumpable; + +@ManagedObject("Tracking Executor wrapper") +public class TrackingExecutor implements Executor, Dumpable +{ + private final Executor _threadFactoryExecutor; + private final Set _threads = ConcurrentHashMap.newKeySet(); + private boolean _detailed; + + public TrackingExecutor(Executor executor, boolean detailed) + { + _threadFactoryExecutor = executor; + _detailed = detailed; + } + + @Override + public void execute(Runnable task) + { + _threadFactoryExecutor.execute(() -> + { + Thread thread = Thread.currentThread(); + try + { + _threads.add(thread); + task.run(); + } + finally + { + _threads.remove(thread); + } + }); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + Object[] threads = _threads.stream().map(DumpableThread::new).toArray(); + Dumpable.dumpObjects(out, indent, _threadFactoryExecutor.toString() + " size=" + threads.length, threads); + } + + public void setDetailedDump(boolean detailedDump) + { + _detailed = detailedDump; + } + + @ManagedAttribute("reports additional details in the dump") + public boolean isDetailedDump() + { + return _detailed; + } + + public int size() + { + return _threads.size(); + } + + private class DumpableThread implements Dumpable + { + private final Thread _thread; + + private DumpableThread(Thread thread) + { + _thread = thread; + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + if (_detailed) + { + Object[] stack = _thread.getStackTrace(); + Dumpable.dumpObjects(out, indent, _thread.toString(), stack); + } + else + { + Dumpable.dumpObject(out, _thread); + } + } + } +} diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java new file mode 100644 index 000000000000..bd90805b98da --- /dev/null +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/VirtualThreadPool.java @@ -0,0 +1,226 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; + +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.VirtualThreads; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.component.Dumpable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of {@link ThreadPool} interface that does not pool, but instead uses {@link VirtualThreads}. + */ +@ManagedObject("A thread non-pool for virtual threads") +public class VirtualThreadPool extends ContainerLifeCycle implements ThreadPool, Dumpable, TryExecutor, VirtualThreads.Configurable +{ + private static final Logger LOG = LoggerFactory.getLogger(VirtualThreadPool.class); + + private final AutoLock.WithCondition _joinLock = new AutoLock.WithCondition(); + private String _name = null; + private Executor _virtualExecutor; + private Thread _main; + private boolean _externalExecutor; + private boolean _tracking; + private boolean _detailedDump; + + public VirtualThreadPool() + { + if (!VirtualThreads.areSupported()) + throw new IllegalStateException("Virtual Threads not supported"); + } + + /** + * @return the name of this thread pool + */ + @ManagedAttribute("name of the thread pool") + public String getName() + { + return _name; + } + + /** + *

Sets the name of this thread pool, used as a prefix for the thread names.

+ * + * @param name the name of this thread pool + */ + public void setName(String name) + { + if (isRunning()) + throw new IllegalStateException(getState()); + if (StringUtil.isBlank(name) && name != null) + throw new IllegalArgumentException("Blank name"); + _name = name; + } + + /** + * Get if this pool is tracking virtual threads. + * @return {@code true} if the virtual threads will be tracked. + * @see TrackingExecutor + */ + @ManagedAttribute("virtual threads are tracked") + public boolean isTracking() + { + return _tracking; + } + + public void setTracking(boolean tracking) + { + if (isRunning()) + throw new IllegalStateException(getState()); + _tracking = tracking; + } + + @ManagedAttribute("reports additional details in the dump") + public boolean isDetailedDump() + { + return _detailedDump; + } + + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + if (_virtualExecutor instanceof TrackingExecutor trackingExecutor) + trackingExecutor.setDetailedDump(detailedDump); + } + + @Override + protected void doStart() throws Exception + { + _main = new Thread("jetty-virtual-thread-pool-keepalive") + { + @Override + public void run() + { + try (AutoLock.WithCondition l = _joinLock.lock()) + { + while (isRunning()) + { + l.await(); + } + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + }; + _main.start(); + + if (_virtualExecutor == null) + { + _externalExecutor = false; + _virtualExecutor = Objects.requireNonNull(StringUtil.isBlank(_name) + ? VirtualThreads.getDefaultVirtualThreadsExecutor() + : VirtualThreads.getNamedVirtualThreadsExecutor(_name)); + } + if (_tracking && !(_virtualExecutor instanceof TrackingExecutor)) + _virtualExecutor = new TrackingExecutor(_virtualExecutor, _detailedDump); + addBean(_virtualExecutor); + super.doStart(); + } + + @Override + protected void doStop() throws Exception + { + super.doStop(); + removeBean(_virtualExecutor); + if (!_externalExecutor) + _virtualExecutor = null; + _main = null; + + try (AutoLock.WithCondition l = _joinLock.lock()) + { + l.signalAll(); + } + } + + @Override + public Executor getVirtualThreadsExecutor() + { + return _virtualExecutor; + } + + @Override + public void setVirtualThreadsExecutor(Executor executor) + { + if (isRunning()) + throw new IllegalStateException(getState()); + _externalExecutor = executor != null; + _virtualExecutor = executor; + } + + @Override + public void join() throws InterruptedException + { + try (AutoLock.WithCondition l = _joinLock.lock()) + { + while (isRunning()) + { + l.await(); + } + } + + while (isStopping()) + { + Thread.onSpinWait(); + } + } + + @Override + public int getThreads() + { + return _virtualExecutor instanceof TrackingExecutor tracking ? tracking.size() : -1; + } + + @Override + public int getIdleThreads() + { + return 0; + } + + @Override + public boolean isLowOnThreads() + { + return false; + } + + @Override + public boolean tryExecute(Runnable task) + { + try + { + _virtualExecutor.execute(task); + return true; + } + catch (RejectedExecutionException e) + { + LOG.warn("tryExecute {} failed", _name, e); + } + return false; + } + + @Override + public void execute(Runnable task) + { + _virtualExecutor.execute(task); + } +} diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java new file mode 100644 index 000000000000..1f9ef396a77f --- /dev/null +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/VirtualThreadPoolTest.java @@ -0,0 +1,195 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.StringUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledForJreRange; +import org.junit.jupiter.api.condition.JRE; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DisabledForJreRange(max = JRE.JAVA_20) +public class VirtualThreadPoolTest +{ + @Test + public void testNamed() throws Exception + { + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.setName("namedV"); + vtp.start(); + + CompletableFuture name = new CompletableFuture<>(); + vtp.execute(() -> name.complete(Thread.currentThread().getName())); + + assertThat(name.get(5, TimeUnit.SECONDS), startsWith("namedV")); + + vtp.stop(); + } + + @Test + public void testJoin() throws Exception + { + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.start(); + + CountDownLatch running = new CountDownLatch(1); + CountDownLatch joined = new CountDownLatch(1); + + vtp.execute(() -> + { + try + { + running.countDown(); + vtp.join(); + joined.countDown(); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + }); + + assertTrue(running.await(5, TimeUnit.SECONDS)); + assertThat(joined.getCount(), is(1L)); + vtp.stop(); + assertTrue(joined.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testExecute() throws Exception + { + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.start(); + + CountDownLatch ran = new CountDownLatch(1); + vtp.execute(ran::countDown); + assertTrue(ran.await(5, TimeUnit.SECONDS)); + vtp.stop(); + } + + @Test + public void testTry() throws Exception + { + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.start(); + + CountDownLatch ran = new CountDownLatch(1); + assertTrue(vtp.tryExecute(ran::countDown)); + assertTrue(ran.await(5, TimeUnit.SECONDS)); + vtp.stop(); + } + + @Test + public void testTrackingDump() throws Exception + { + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.setTracking(true); + vtp.start(); + + assertThat(vtp.getVirtualThreadsExecutor(), instanceOf(TrackingExecutor.class)); + TrackingExecutor trackingExecutor = (TrackingExecutor)vtp.getVirtualThreadsExecutor(); + assertThat(trackingExecutor.size(), is(0)); + + CountDownLatch running = new CountDownLatch(4); + Waiter waiter = new Waiter(running, false); + Waiter spinner = new Waiter(running, true); + try + { + vtp.execute(waiter); + vtp.execute(spinner); + vtp.execute(waiter); + vtp.execute(spinner); + + assertTrue(running.await(5, TimeUnit.SECONDS)); + assertThat(trackingExecutor.size(), is(4)); + + vtp.setDetailedDump(false); + String dump = vtp.dump(); + assertThat(count(dump, "VirtualThread[#"), is(4)); + assertThat(count(dump, "/runnable@"), is(2)); + assertThat(count(dump, "waiting"), is(2)); + assertThat(count(dump, "VirtualThreadPoolTest.java"), is(0)); + + vtp.setDetailedDump(true); + dump = vtp.dump(); + assertThat(count(dump, "VirtualThread[#"), is(4)); + assertThat(count(dump, "/runnable@"), is(2)); + assertThat(count(dump, "waiting"), is(2)); + assertThat(count(dump, "VirtualThreadPoolTest.java"), is(4)); + assertThat(count(dump, "CountDownLatch.await("), is(2)); + } + finally + { + waiter.countDown(); + spinner.countDown(); + vtp.stop(); + } + } + + public static int count(String str, String subStr) + { + if (StringUtil.isEmpty(str)) + return 0; + + int count = 0; + int idx = 0; + + while ((idx = str.indexOf(subStr, idx)) != -1) + { + count++; + idx += subStr.length(); + } + + return count; + } + + private static class Waiter extends CountDownLatch implements Runnable + { + private final CountDownLatch _running; + private final boolean _spin; + + public Waiter(CountDownLatch running, boolean spin) + { + super(1); + _running = running; + _spin = spin; + } + + @Override + public void run() + { + try + { + _running.countDown(); + while (_spin && getCount() > 0) + Thread.onSpinWait(); + if (!await(10, TimeUnit.SECONDS)) + throw new IllegalStateException(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java similarity index 93% rename from jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java rename to jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java index f8874ae2de1b..636622b55468 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/AdaptiveExecutionStrategyTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/AdaptiveExecutionStrategyTest.java @@ -11,7 +11,7 @@ // ======================================================================== // -package org.eclipse.jetty.util.thread; +package org.eclipse.jetty.util.thread.strategy; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -20,7 +20,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.logging.StacklessLogging; -import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; +import org.eclipse.jetty.util.thread.ExecutionStrategy; +import org.eclipse.jetty.util.thread.Invocable; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ReservedThreadExecutor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java index a12c1f6527df..86f0b1cc726e 100644 --- a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/strategy/ExecutionStrategyTest.java @@ -15,6 +15,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; @@ -23,10 +24,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; +import org.eclipse.jetty.util.VirtualThreads; +import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.ExecutionStrategy.Producer; +import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.VirtualThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -39,22 +45,27 @@ public class ExecutionStrategyTest { - public static Stream strategies() + public static Stream pooledStrategies() { return Stream.of( + QueuedThreadPool.class, + ExecutorThreadPool.class, + VirtualThreads.getDefaultVirtualThreadsExecutor() == null ? null : VirtualThreadPool.class) + .filter(Objects::nonNull) + .flatMap(tp -> Stream.of( ProduceExecuteConsume.class, ExecuteProduceConsume.class, - AdaptiveExecutionStrategy.class - ).map(Arguments::of); + AdaptiveExecutionStrategy.class) + .map(s -> Arguments.of(tp, s))); } - QueuedThreadPool _threads = new QueuedThreadPool(20); - List strategies = new ArrayList<>(); + List _lifeCycles = new ArrayList<>(); protected ExecutionStrategy newExecutionStrategy(Class strategyClass, Producer producer, Executor executor) throws Exception { ExecutionStrategy strategy = strategyClass.getDeclaredConstructor(Producer.class, Executor.class).newInstance(producer, executor); - strategies.add(strategy); + _lifeCycles.add(executor); + _lifeCycles.add(strategy); LifeCycle.start(strategy); return strategy; } @@ -62,15 +73,12 @@ protected ExecutionStrategy newExecutionStrategy(Class LifeCycle.stop(strategy)); - _threads.stop(); + _lifeCycles.forEach(LifeCycle::stop); } public abstract static class TestProducer implements Producer @@ -83,9 +91,11 @@ public String toString() } @ParameterizedTest - @MethodSource("strategies") - public void idleTest(Class strategyClass) throws Exception + @MethodSource("pooledStrategies") + public void idleTest(Class threadPoolClass, Class strategyClass) throws Exception { + ThreadPool threadPool = threadPoolClass.getDeclaredConstructor().newInstance(); + LifeCycle.start(threadPool); AtomicInteger count = new AtomicInteger(0); Producer producer = new TestProducer() { @@ -97,16 +107,20 @@ public Runnable produce() } }; - ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, _threads); + ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, threadPool); strategy.produce(); assertThat(count.get(), greaterThan(0)); + + LifeCycle.stop(threadPool); } @ParameterizedTest - @MethodSource("strategies") - public void simpleTest(Class strategyClass) throws Exception + @MethodSource("pooledStrategies") + public void simpleTest(Class threadPoolClass, Class strategyClass) throws Exception { - final int TASKS = 3 * _threads.getMaxThreads(); + ThreadPool threadPool = threadPoolClass.getDeclaredConstructor().newInstance(); + LifeCycle.start(threadPool); + final int TASKS = 1000; final CountDownLatch latch = new CountDownLatch(TASKS); Producer producer = new TestProducer() { @@ -117,14 +131,14 @@ public Runnable produce() { if (tasks-- > 0) { - return () -> latch.countDown(); + return latch::countDown; } return null; } }; - ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, _threads); + ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, threadPool); for (int p = 0; latch.getCount() > 0 && p < TASKS; p++) { @@ -136,16 +150,20 @@ public Runnable produce() { // Dump state on failure return String.format("Timed out waiting for latch: %s%ntasks=%d latch=%d%n%s", - strategy, TASKS, latch.getCount(), _threads.dump()); + strategy, TASKS, latch.getCount(), threadPool instanceof Dumpable dumpable ? dumpable.dump() : ""); }); + + LifeCycle.stop(threadPool); } @ParameterizedTest - @MethodSource("strategies") - public void blockingProducerTest(Class strategyClass) throws Exception + @MethodSource("pooledStrategies") + public void blockingProducerTest(Class threadPoolClass, Class strategyClass) throws Exception { - final int TASKS = 3 * _threads.getMaxThreads(); - final BlockingQueue q = new ArrayBlockingQueue<>(_threads.getMaxThreads()); + ThreadPool threadPool = threadPoolClass.getDeclaredConstructor().newInstance(); + LifeCycle.start(threadPool); + final int TASKS = 256; + final BlockingQueue q = new ArrayBlockingQueue<>(1024); Producer producer = new TestProducer() { @@ -158,12 +176,12 @@ public Runnable produce() if (id >= 0) { - while (_threads.isRunning()) + while (((LifeCycle)threadPool).isRunning()) { try { final CountDownLatch latch = q.take(); - return () -> latch.countDown(); + return latch::countDown; } catch (InterruptedException e) { @@ -176,32 +194,30 @@ public Runnable produce() } }; - ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, _threads); + ExecutionStrategy strategy = newExecutionStrategy(strategyClass, producer, threadPool); strategy.dispatch(); final CountDownLatch latch = new CountDownLatch(TASKS); - _threads.execute(new Runnable() + threadPool.execute(() -> { - @Override - public void run() + try { - try - { - for (int t = TASKS; t-- > 0; ) - { - Thread.sleep(20); - q.offer(latch); - } - } - catch (Exception e) + for (int t = TASKS; t-- > 0; ) { - e.printStackTrace(); + Thread.sleep(5); + q.offer(latch); } } + catch (Exception e) + { + e.printStackTrace(); + } }); assertTrue(latch.await(30, TimeUnit.SECONDS), String.format("Timed out waiting for latch: %s%ntasks=%d latch=%d q=%d%n%s", - strategy, TASKS, latch.getCount(), q.size(), _threads.dump())); + strategy, TASKS, latch.getCount(), q.size(), threadPool instanceof Dumpable dumpable ? dumpable.dump() : "")); + + LifeCycle.stop(threadPool); } }