Skip to content

Commit

Permalink
Fixes #6652 - Improve ReservedThreadExecutor dump.
Browse files Browse the repository at this point in the history
Incorporate suggestions from review.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 24, 2021
1 parent 128ede5 commit 85611e5
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
Expand Up @@ -19,7 +19,6 @@
package org.eclipse.jetty.util.thread;

import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -35,6 +34,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

Expand All @@ -43,14 +43,14 @@
import static org.eclipse.jetty.util.AtomicBiInteger.getLo;

/**
* An Executor using pre-allocated/reserved Threads from a wrapped Executor.
* <p>Calls to {@link #execute(Runnable)} on a {@link ReservedThreadExecutor} will either succeed
* with a Thread immediately being assigned the Runnable task, or fail if no Thread is
* available.
* <p>Threads are reserved lazily, with a new reserved threads being allocated from the
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* <p>A TryExecutor using pre-allocated/reserved threads from an external Executor.</p>
* <p>Calls to {@link #tryExecute(Runnable)} on ReservedThreadExecutor will either
* succeed with a reserved thread immediately being assigned the task, or fail if
* no reserved thread is available.</p>
* <p>Threads are reserved lazily, with new reserved threads being allocated from the external
* {@link Executor} passed to the constructor. Whenever 1 or more reserved threads have been
* idle for more than {@link #getIdleTimeoutMs()} then one reserved thread will return to
* the executor.
* the external Executor.</p>
*/
@ManagedObject("A pool for reserved threads")
public class ReservedThreadExecutor extends AbstractLifeCycle implements TryExecutor, Dumpable
Expand All @@ -67,7 +67,7 @@ public void run()
@Override
public String toString()
{
return "STOP!";
return "STOP";
}
};

Expand All @@ -77,7 +77,6 @@ public String toString()
private final SynchronousQueue<Runnable> _queue = new SynchronousQueue<>(false);
private final AtomicBiInteger _count = new AtomicBiInteger(); // hi=pending; lo=size;
private final AtomicLong _lastEmptyTime = new AtomicLong(System.nanoTime());

private ThreadPoolBudget.Lease _lease;
private long _idleTimeNanos = DEFAULT_IDLE_TIMEOUT;

Expand Down Expand Up @@ -194,7 +193,10 @@ public void doStop() throws Exception

// Interrupt any reserved thread missed the offer,
// so they do not wait for the whole idle timeout.
_threads.stream().filter(ReservedThread::isReserved).map(t -> t._thread).forEach(Thread::interrupt);
_threads.stream()
.filter(ReservedThread::isReserved)
.map(t -> t._thread)
.forEach(Thread::interrupt);
_threads.clear();
_count.getAndSetHi(0);
}
Expand Down Expand Up @@ -269,7 +271,10 @@ private void startReservedThread()
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this,
_threads.stream().filter(ReservedThread::isReserved).collect(Collectors.toList()));
new DumpableCollection("threads",
_threads.stream()
.filter(ReservedThread::isReserved)
.collect(Collectors.toList())));
}

@Override
Expand Down Expand Up @@ -300,7 +305,7 @@ private class ReservedThread implements Runnable

private boolean isReserved()
{
return Objects.equals(_state, State.RESERVED);
return _state == State.RESERVED;
}

private Runnable reservedWait()
Expand Down Expand Up @@ -331,7 +336,6 @@ private Runnable reservedWait()
}
_state = size >= 0 ? State.IDLE : State.STOPPED;
return STOP;

}
catch (InterruptedException e)
{
Expand Down
Expand Up @@ -360,13 +360,10 @@ public void run()
while (!reserved.tryExecute(() -> {}))
Thread.yield();

System.err.println(reserved.dump());

reserved.stop();
pool.stop();

assertThat(usedReserved.get(), greaterThan(0));
assertThat(usedReserved.get() + usedPool.get(), is(LOOPS));
// System.err.printf("reserved=%d pool=%d total=%d%n", usedReserved.get(), usedPool.get(), LOOPS);
}
}

0 comments on commit 85611e5

Please sign in to comment.