Skip to content

Commit

Permalink
Merge pull request #4149 from eclipse/jetty-9.4.x-4121-qtp-threadfactory
Browse files Browse the repository at this point in the history
Issue #4121 - ThreadFactory support in QTP
  • Loading branch information
joakime committed Oct 2, 2019
2 parents ed9031b + b121ba7 commit c19d33d
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 4 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -45,7 +46,7 @@
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;

@ManagedObject("A thread pool")
public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadPool, Dumpable, TryExecutor
public class QueuedThreadPool extends ContainerLifeCycle implements ThreadFactory, SizedThreadPool, Dumpable, TryExecutor
{
private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
private static Runnable NOOP = () ->
Expand All @@ -67,6 +68,7 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
private final Object _joinLock = new Object();
private final BlockingQueue<Runnable> _jobs;
private final ThreadGroup _threadGroup;
private final ThreadFactory _threadFactory;
private String _name = "qtp" + hashCode();
private int _idleTimeout;
private int _maxThreads;
Expand Down Expand Up @@ -114,7 +116,17 @@ public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads")
this(maxThreads, minThreads, idleTimeout, -1, queue, threadGroup);
}

public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads, @Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads, @Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup)
{
this(maxThreads, minThreads, idleTimeout, reservedThreads, queue, threadGroup, null);
}

public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads") int minThreads,
@Name("idleTimeout") int idleTimeout, @Name("reservedThreads") int reservedThreads,
@Name("queue") BlockingQueue<Runnable> queue, @Name("threadGroup") ThreadGroup threadGroup,
@Name("threadFactory") ThreadFactory threadFactory)
{
if (maxThreads < minThreads)
throw new IllegalArgumentException("max threads (" + maxThreads + ") less than min threads (" + minThreads + ")");
Expand All @@ -131,6 +143,7 @@ public QueuedThreadPool(@Name("maxThreads") int maxThreads, @Name("minThreads")
_jobs = queue;
_threadGroup = threadGroup;
setThreadPoolBudget(new ThreadPoolBudget(this));
_threadFactory = threadFactory == null ? this : threadFactory;
}

@Override
Expand Down Expand Up @@ -639,7 +652,7 @@ protected void startThread()
boolean started = false;
try
{
Thread thread = newThread(_runnable);
Thread thread = _threadFactory.newThread(_runnable);
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
Expand Down Expand Up @@ -669,7 +682,8 @@ private boolean addCounts(int deltaThreads, int deltaIdle)
}
}

protected Thread newThread(Runnable runnable)
@Override
public Thread newThread(Runnable runnable)
{
Thread thread = new Thread(_threadGroup, runnable);
thread.setDaemon(isDaemon());
Expand Down
@@ -0,0 +1,104 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.util.thread;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.util.MultiException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class ThreadFactoryTest
{
@Test
public void testThreadFactory() throws Exception
{
ThreadGroup threadGroup = new ThreadGroup("my-group");
MyThreadFactory threadFactory = new MyThreadFactory(threadGroup);

QueuedThreadPool qtp = new QueuedThreadPool(200, 10, 2000, 0, null, threadGroup, threadFactory);
try
{
qtp.start();

int testThreads = 2000;
CountDownLatch threadsLatch = new CountDownLatch(testThreads);
MultiException mex = new MultiException();

for (int i = 0; i < testThreads; i++)
{
qtp.execute(() ->
{
try
{
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(20, 500));
Thread thread = Thread.currentThread();

if (!thread.getName().startsWith("My-"))
{
mex.add(new AssertionError("Thread " + thread.getName() + " does not start with 'My-'"));
}

if (!thread.getThreadGroup().getName().equalsIgnoreCase("my-group"))
{
mex.add(new AssertionError("Thread Group " + thread.getThreadGroup().getName() + " is not 'my-group'"));
}

threadsLatch.countDown();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
}

assertTrue(threadsLatch.await(5, TimeUnit.SECONDS), "Did not see all tasks finish");
mex.ifExceptionThrow();
}
finally
{
qtp.stop();
}
}

public static class MyThreadFactory implements ThreadFactory
{
private final ThreadGroup threadGroup;

public MyThreadFactory(ThreadGroup threadGroup)
{
this.threadGroup = threadGroup;
}

@Override
public Thread newThread(Runnable runnable)
{
Thread thread = new Thread(threadGroup, runnable);
thread.setDaemon(false);
thread.setPriority(Thread.MIN_PRIORITY);
thread.setName("My-" + thread.getId());
return thread;
}
}
}

0 comments on commit c19d33d

Please sign in to comment.