Skip to content

Commit

Permalink
Fixes #6251 - Use CyclicTimeout for HTTP2Streams. (#6267)
Browse files Browse the repository at this point in the history
* Fixes #6251 - Use CyclicTimeout for HTTP2Streams.

Introduced CyclicTimeouts to manage many entities that may timeout.
Rewritten HttpDestination request timeouts using CyclicTimeouts.
HTTP2Stream does not inherit from IdleTimeout anymore; now a
CyclicTimeouts in HTTP2Session manages the stream timeouts.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed May 16, 2021
1 parent 804630d commit 27db8ed
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,11 @@ public void send()
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
long timeoutAt = request.getTimeoutAt();
if (timeoutAt != -1)
long timeoutAt = exchange.getExpireNanoTime();
if (timeoutAt != Long.MAX_VALUE)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(request, timeoutAt);
_totalTimeout.schedule(exchange.getRequest(), timeoutAt);
}
send(exchange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
Expand All @@ -31,7 +30,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
Expand Down Expand Up @@ -255,10 +254,7 @@ public void send(HttpExchange exchange)
{
if (enqueue(exchanges, exchange))
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
requestTimeouts.schedule(expiresAt);

requestTimeouts.schedule(exchange);
if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
Expand Down Expand Up @@ -531,61 +527,25 @@ public interface Multiplexed
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
*/
private class RequestTimeouts extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public void onTimeoutExpired()
protected Iterator<HttpExchange> iterator()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);

long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);

// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
long expiresAt = request.getTimeoutAt();
if (expiresAt == -1)
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
return exchanges.iterator();
}

private void schedule(long expiresAt)
@Override
protected boolean onExpired(HttpExchange exchange)
{
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpExchange
public class HttpExchange implements CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpExchange.class);

Expand Down Expand Up @@ -89,6 +90,12 @@ public Throwable getResponseFailure()
}
}

@Override
public long getExpireNanoTime()
{
return request.getTimeoutAt();
}

/**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class HttpRequest implements Request
private boolean versionExplicit;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
private long timeoutAt = Long.MAX_VALUE;
private Content content;
private boolean followRedirects;
private List<HttpCookie> cookies;
Expand Down Expand Up @@ -821,11 +821,12 @@ private void sendAsync(BiConsumer<HttpRequest, List<Response.ResponseListener>>
void sent()
{
long timeout = getTimeout();
timeoutAt = timeout > 0 ? System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout) : -1;
if (timeout > 0)
timeoutAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout);
}

/**
* @return The nanoTime at which the timeout expires or -1 if there is no timeout.
* @return The nanoTime at which the timeout expires or {@link Long#MAX_VALUE} if there is no timeout.
* @see #timeout(long, TimeUnit)
*/
long getTimeoutAt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.AtomicBiInteger;
Expand Down Expand Up @@ -89,12 +91,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicLong bytesWritten = new AtomicLong();
private final Scheduler scheduler;
private final EndPoint endPoint;
private final Generator generator;
private final Session.Listener listener;
private final FlowControlStrategy flowControl;
private final HTTP2Flusher flusher;
private final StreamTimeouts streamTimeouts;
private int maxLocalStreams;
private int maxRemoteStreams;
private long streamIdleTimeout;
Expand All @@ -105,12 +107,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio

public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
{
this.scheduler = scheduler;
this.endPoint = endPoint;
this.generator = generator;
this.listener = listener;
this.flowControl = flowControl;
this.flusher = new HTTP2Flusher(this);
this.streamTimeouts = new StreamTimeouts(scheduler);
this.maxLocalStreams = -1;
this.maxRemoteStreams = -1;
this.localStreamIds.set(initialStreamId);
Expand Down Expand Up @@ -613,7 +615,7 @@ public Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Con

protected IStream newStream(int streamId, MetaData.Request request, boolean local)
{
return new HTTP2Stream(scheduler, this, streamId, request, local);
return new HTTP2Stream(this, streamId, request, local);
}

@Override
Expand Down Expand Up @@ -989,6 +991,11 @@ public void onFrame(Frame frame)
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
}

void scheduleTimeout(HTTP2Stream stream)
{
streamTimeouts.schedule(stream);
}

private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -1026,6 +1033,7 @@ public void onFlushed(long bytes) throws IOException
private void terminate(Throwable cause)
{
flusher.terminate(cause);
streamTimeouts.destroy();
disconnect();
}

Expand Down Expand Up @@ -2303,4 +2311,25 @@ private class Slot
private volatile List<HTTP2Flusher.Entry> entries;
}
}

private class StreamTimeouts extends CyclicTimeouts<HTTP2Stream>
{
private StreamTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
protected Iterator<HTTP2Stream> iterator()
{
return streams.values().stream().map(HTTP2Stream.class::cast).iterator();
}

@Override
protected boolean onExpired(HTTP2Stream stream)
{
stream.onIdleExpired(new TimeoutException("Idle timeout " + stream.getIdleTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MathUtils;
import org.eclipse.jetty.util.Promise;
Expand All @@ -48,7 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpable
public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Stream.class);

Expand All @@ -74,10 +74,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private boolean dataInitial;
private boolean dataProcess;
private boolean committed;
private long idleTimeout;
private long expireNanoTime = Long.MAX_VALUE;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
public HTTP2Stream(ISession session, int streamId, MetaData.Request request, boolean local)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
this.request = request;
Expand All @@ -86,6 +87,12 @@ public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData
this.dataInitial = true;
}

@Deprecated
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
{
this(session, streamId, request, local);
}

@Override
public int getId()
{
Expand Down Expand Up @@ -268,13 +275,39 @@ public boolean isCommitted()
return committed;
}

@Override
public boolean isOpen()
{
return !isClosed();
}

@Override
public void notIdle()
{
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0)
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
}

@Override
public long getExpireNanoTime()
{
return expireNanoTime;
}

@Override
public long getIdleTimeout()
{
return idleTimeout;
}

@Override
public void setIdleTimeout(long idleTimeout)
{
this.idleTimeout = idleTimeout;
notIdle();
((HTTP2Session)session).scheduleTimeout(this);
}

protected void onIdleExpired(TimeoutException timeout)
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -683,10 +716,8 @@ public void close()
}
}

@Override
public void onClose()
{
super.onClose();
notifyClosed(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
* session there is a CyclicTimeout and at the beginning of the
* request processing the timeout is canceled (via cancel()), but at
* the end of the request processing the timeout is re-scheduled.</p>
* <p>Another typical scenario is for a parent entity to manage
* the timeouts of many children entities; the timeout is scheduled
* for the child entity that expires the earlier; when the timeout
* expires, the implementation scans the children entities to find
* the expired child entities and to find the next child entity
* that expires the earlier. </p>
* <p>This implementation has a {@link Timeout} holding the time
* at which the scheduled task should fire, and a linked list of
* {@link Wakeup}, each holding the actual scheduled task.</p>
Expand All @@ -59,6 +53,8 @@
* When the Wakeup task fires, it will see that the Timeout is now
* in the future and will attach a new Wakeup with the future time
* to the Timeout, and submit a scheduler task for the new Wakeup.</p>
*
* @see CyclicTimeouts
*/
public abstract class CyclicTimeout implements Destroyable
{
Expand Down

0 comments on commit 27db8ed

Please sign in to comment.