Skip to content

Commit

Permalink
Issue jetty#4105 - atomically increment idle count when starting new …
Browse files Browse the repository at this point in the history
…thread in QTP (jetty#4118)

* Issue jetty#4105 - starting a thread in QTP now increments idle count

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue jetty#4105 - improve comments in test

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Sep 25, 2019
1 parent 39ee316 commit ba728ee
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 13 deletions.
Expand Up @@ -176,7 +176,6 @@ protected void doStop() throws Exception
removeBean(_tryExecutor);
_tryExecutor = TryExecutor.NO_TRY;


// Signal the Runner threads that we are stopping
int threads = _counts.getAndSetHi(Integer.MIN_VALUE);

Expand Down Expand Up @@ -483,7 +482,7 @@ public void setLowThreadsThreshold(int lowThreadsThreshold)
public void execute(Runnable job)
{
// Determine if we need to start a thread, use and idle thread or just queue this job
boolean startThread;
int startThread;
while (true)
{
// Get the atomic counts
Expand All @@ -501,10 +500,10 @@ public void execute(Runnable job)

// Start a thread if we have insufficient idle threads to meet demand
// and we are not at max threads.
startThread = (idle <= 0 && threads < _maxThreads);
startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;

// The job will be run by an idle thread when available
if (!_counts.compareAndSet(counts, threads + (startThread ? 1 : 0), idle - 1))
if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
continue;

break;
Expand All @@ -513,7 +512,7 @@ public void execute(Runnable job)
if (!_jobs.offer(job))
{
// reverse our changes to _counts.
if (addCounts(startThread ? -1 : 0, 1))
if (addCounts(-startThread, 1 - startThread))
LOG.warn("{} rejected {}", this, job);
throw new RejectedExecutionException(job.toString());
}
Expand All @@ -522,7 +521,7 @@ public void execute(Runnable job)
LOG.debug("queue {} startThread={}", job, startThread);

// Start a thread if one was needed
if (startThread)
while (startThread-- > 0)
startThread();
}

Expand Down Expand Up @@ -617,7 +616,7 @@ private void ensureThreads()
if (threads < _minThreads || (idle < 0 && threads < _maxThreads))
{
// Then try to start a thread.
if (_counts.compareAndSet(counts, threads + 1, idle))
if (_counts.compareAndSet(counts, threads + 1, idle + 1))
startThread();
// Otherwise continue to check state again.
continue;
Expand Down Expand Up @@ -645,7 +644,7 @@ protected void startThread()
finally
{
if (!started)
addCounts(-1, 0); // threads, idle
addCounts(-1, -1); // threads, idle
}
}

Expand Down Expand Up @@ -859,20 +858,16 @@ public void run()
LOG.debug("Runner started for {}", QueuedThreadPool.this);

Runnable job = null;

try
{
// All threads start idle (not yet taken a job)
if (!addCounts(0, 1))
return;

while (true)
{
// If we had a job, signal that we are idle again
if (job != null)
{
if (!addCounts(0, 1))
break;
job = null;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
Expand Down
Expand Up @@ -45,6 +45,61 @@ public class QueuedThreadPoolTest extends AbstractThreadPoolTest
private static final Logger LOG = Log.getLogger(QueuedThreadPoolTest.class);
private final AtomicInteger _jobs = new AtomicInteger();

private static class TestQueuedThreadPool extends QueuedThreadPool
{
private final AtomicInteger _started;
private final CountDownLatch _enteredRemoveThread;
private final CountDownLatch _exitRemoveThread;

public TestQueuedThreadPool(AtomicInteger started, CountDownLatch enteredRemoveThread, CountDownLatch exitRemoveThread)
{
_started = started;
_enteredRemoveThread = enteredRemoveThread;
_exitRemoveThread = exitRemoveThread;
}

public void superStartThread()
{
super.startThread();
}

@Override
protected void startThread()
{
switch (_started.incrementAndGet())
{
case 1:
case 2:
case 3:
super.startThread();
break;

case 4:
// deliberately not start thread
break;

default:
throw new IllegalStateException("too many threads started");
}
}

@Override
protected void removeThread(Thread thread)
{
try
{
_enteredRemoveThread.countDown();
_exitRemoveThread.await();
}
catch (Exception e)
{
throw new RuntimeException(e);
}

super.removeThread(thread);
}
}

private class RunningJob implements Runnable
{
final CountDownLatch _run = new CountDownLatch(1);
Expand Down Expand Up @@ -450,6 +505,63 @@ public void testShrink() throws Exception
tp.stop();
}

@Test
public void testEnsureThreads() throws Exception
{
AtomicInteger started = new AtomicInteger(0);

CountDownLatch enteredRemoveThread = new CountDownLatch(1);
CountDownLatch exitRemoveThread = new CountDownLatch(1);
TestQueuedThreadPool tp = new TestQueuedThreadPool(started, enteredRemoveThread, exitRemoveThread);

tp.setMinThreads(2);
tp.setMaxThreads(10);
tp.setIdleTimeout(400);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);

tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);

RunningJob job1 = new RunningJob();
RunningJob job2 = new RunningJob();
RunningJob job3 = new RunningJob();
tp.execute(job1);
tp.execute(job2);
tp.execute(job3);

waitForThreads(tp, 3);
waitForIdle(tp, 0);

// We stop job3, the thread becomes idle, thread decides to shrink, and then blocks in removeThread().
job3.stop();
assertTrue(enteredRemoveThread.await(5, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 1);

// Executing job4 will not start a new thread because we already have 1 idle thread.
RunningJob job4 = new RunningJob();
tp.execute(job4);

// Allow thread to exit from removeThread().
// The 4th thread is not actually started in our startThread() until tp.superStartThread() is called.
// Delay by 1000ms to check that ensureThreads is only starting one thread even though it is slow to start.
assertThat(started.get(), is(3));
exitRemoveThread.countDown();
Thread.sleep(1000);

// Now startThreads() should have been called 4 times.
// Actually start the thread, and job4 should be run.
assertThat(started.get(), is(4));
tp.superStartThread();
assertTrue(job4._run.await(5, TimeUnit.SECONDS));

job1.stop();
job2.stop();
job4.stop();
tp.stop();
}

@Test
public void testMaxStopTime() throws Exception
{
Expand Down

0 comments on commit ba728ee

Please sign in to comment.