Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6652 - Improve ReservedThreadExecutor dump. (#6653) #6662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -14,13 +14,15 @@
package org.eclipse.jetty.util.thread;

import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.ProcessorUtils;
Expand All @@ -38,14 +40,14 @@


/**
* An Executor using pre-allocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available.
* <p>Threads are reserved lazily, with a new reserved threads being allocated from the
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* <p>A TryExecutor using pre-allocated/reserved threads from an external Executor.</p>
* <p>Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either
* succeed with a reserved thread immediately being assigned the task, or fail if
* no reserved thread is available.</p>
* <p>Threads are reserved lazily, with new reserved threads being allocated from the external
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
* the executor.
* the external Executor.</p>
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
Expand All @@ -62,7 +64,7 @@ public void run()
@Override
public String toString()
{
return "STOP!";
return "STOP";
}
};

Expand All @@ -72,7 +74,6 @@ public String toString()
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());

private ThreadPoolBudget.Lease _lease;
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;

Expand Down Expand Up @@ -175,20 +176,25 @@ public void doStop() throws Exception

super.doStop();

// Offer STOP task to all waiting reserved threads.
for (int i = _count.getAndSetLo(-1); i-- > 0;)
// Mark this instance as stopped.
int size = _count.getAndSetLo(-1);

// Offer the STOP task to all waiting reserved threads.
for (int i = 0; i < size; ++i)
{
// yield to wait for any reserved threads that have incremented the size but not yet polled
// Yield to wait for any reserved threads that
// have incremented the size but not yet polled.
Thread.yield();
_queue.offer(STOP);
}
// Interrupt any reserved thread missed the offer so it doesn't wait too long.
for (ReservedThread reserved : _threads)
{
Thread thread = reserved._thread;
if (thread != null)
thread.interrupt();
}

// Interrupt any reserved thread missed the offer,
// so they do not wait for the whole idle timeout.
_threads.stream()
.filter(ReservedThread::isReserved)
.map(t -> t._thread)
.filter(Objects::nonNull)
.forEach(Thread::interrupt);
_threads.clear();
_count.getAndSetHi(0);
}
Expand Down Expand Up @@ -264,13 +270,16 @@ private void startReservedThread()
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("reserved", _threads));
new DumpableCollection("threads",
_threads.stream()
.filter(ReservedThread::isReserved)
.collect(Collectors.toList())));
}

@Override
public String toString()
{
return String.format("%s@%x{s=%d/%d,p=%d}",
return String.format("%s@%x{reserved=%d/%d,pending=%d}",
getClass().getSimpleName(),
hashCode(),
_count.getLo(),
Expand All @@ -293,6 +302,11 @@ private class ReservedThread implements Runnable
private volatile State _state = State.PENDING;
private volatile Thread _thread;

private boolean isReserved()
{
return _state == State.RESERVED;
}

private Runnable reservedWait()
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -321,7 +335,6 @@ private Runnable reservedWait()
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;

}
catch (InterruptedException e)
{
Expand Down
Expand Up @@ -831,7 +831,7 @@ public void testDump() throws Exception
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=2,r=2,q=0"));
assertThat(dump, containsString("s=0/2"));
assertThat(dump, containsString("reserved=0/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(2));
assertThat(count(dump, " WAITING"), is(1));
Expand All @@ -846,7 +846,7 @@ public void testDump() throws Exception
dump = pool.dump();
assertThat(count(dump, " - STARTED"), is(2));
assertThat(dump, containsString(",3<=3<=4,i=1,r=2,q=0"));
assertThat(dump, containsString("s=1/2"));
assertThat(dump, containsString("reserved=1/2"));
assertThat(dump, containsString("[ReservedThreadExecutor@"));
assertThat(count(dump, " IDLE"), is(1));
assertThat(count(dump, " WAITING"), is(1));
Expand Down
Expand Up @@ -359,6 +359,5 @@ public void run()

assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
// System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
}
}