Skip to content

Commit

Permalink
Fixes #6208 - HTTP/2 max local stream count exceeded
Browse files Browse the repository at this point in the history
Backported from Jetty 10 the "new stream" event so that the Stream can be set early on the client's `HttpChannelOverHTTP2`.
In this way, when a HEADERS frame stalled due to TCP congestion is failed, the corresponding Stream is closed and the connection released to the pool, fixing the "max stream exceeded" issue.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Apr 28, 2021
1 parent e3abe7b commit 0b315d7
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 27 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -195,11 +196,11 @@ protected Action process() throws Throwable

// If the stream has been reset or removed,
// don't send the frame and fail it here.
if (entry.isStale())
if (entry.shouldBeDropped())
{
if (LOG.isDebugEnabled())
LOG.debug("Stale {}", entry);
entry.failed(new EofException("reset"));
LOG.debug("Dropped {}", entry);
entry.failed(new EofException("dropped"));
pending.remove();
continue;
}
Expand Down Expand Up @@ -450,40 +451,47 @@ public void failed(Throwable x)
}

/**
* @return whether the entry is stale and must not be processed
* @return whether the entry should not be processed
*/
private boolean isStale()
{
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
private boolean shouldBeDropped()
{
switch (frame.getType())
{
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
// Frames of this type should not be dropped.
case PRIORITY:
case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY:
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
return true;
return false;
// Frames of this type follow the logic below.
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
case RST_STREAM:
break;
default:
throw new IllegalStateException();
}

// SPEC: section 6.4.
if (frame.getType() == FrameType.RST_STREAM)
return stream != null && stream.isLocal() && !stream.isCommitted();

// Frames that do not have a stream associated are dropped.
if (stream == null)
return true;

return stream.isResetOrFailed();
}

void commit()
{
if (stream != null)
stream.commit();
}

@Override
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -1221,6 +1222,8 @@ boolean hasHighPriority()
@Override
public void succeeded()
{
commit();

bytesWritten.addAndGet(frameBytes);
frameBytes = 0;

Expand Down Expand Up @@ -2072,6 +2075,8 @@ private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<S
return false;

stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);

Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
Expand Down
Expand Up @@ -67,6 +67,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private boolean remoteReset;
private Listener listener;
private long dataLength;
private boolean committed;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
{
Expand Down Expand Up @@ -226,6 +227,18 @@ public boolean isLocallyClosed()
return closeState.get() == CloseState.LOCALLY_CLOSED;
}

@Override
public void commit()
{
committed = true;
}

@Override
public boolean isCommitted()
{
return committed;
}

@Override
public boolean isOpen()
{
Expand Down Expand Up @@ -278,6 +291,11 @@ public void process(Frame frame, Callback callback)
notIdle();
switch (frame.getType())
{
case PREFACE:
{
onNewStream(callback);
break;
}
case HEADERS:
{
onHeaders((HeadersFrame)frame, callback);
Expand Down Expand Up @@ -315,6 +333,12 @@ public void process(Frame frame, Callback callback)
}
}

private void onNewStream(Callback callback)
{
notifyNewStream(this);
callback.succeeded();
}

private void onHeaders(HeadersFrame frame, Callback callback)
{
MetaData metaData = frame.getMetaData();
Expand Down Expand Up @@ -586,6 +610,22 @@ private Callback endWrite()
}
}

private void notifyNewStream(Stream stream)
{
Listener listener = this.listener;
if (listener != null)
{
try
{
listener.onNewStream(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener {}", listener, x);
}
}
}

private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
Listener listener = this.listener;
Expand Down
Expand Up @@ -126,6 +126,19 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();

/**
* Marks this stream as committed.
*
* @see #isCommitted()
*/
void commit();

/**
* @return whether bytes for this stream have been sent to the remote peer.
* @see #commit()
*/
boolean isCommitted();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Expand Up @@ -138,6 +138,16 @@ public interface Stream
*/
interface Listener
{
/**
* <p>Callback method invoked when a stream is created locally by
* {@link Session#newStream(HeadersFrame, Promise, Listener)}.</p>
*
* @param stream the newly created stream
*/
public default void onNewStream(Stream stream)
{
}

/**
* <p>Callback method invoked when a HEADERS frame representing the HTTP response has been received.</p>
*
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -82,6 +83,8 @@ public Stream getStream()
public void setStream(Stream stream)
{
this.stream = stream;
if (stream != null)
((IStream)stream).setAttachment(this);
}

public boolean isFailed()
Expand Down
Expand Up @@ -74,6 +74,12 @@ protected void reset()
contentNotifier.reset();
}

@Override
public void onNewStream(Stream stream)
{
getHttpChannel().setStream(stream);
}

@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Expand Down
Expand Up @@ -177,7 +177,7 @@ private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
stream.headers(trailersFrame, callback);
}

private class HeadersPromise implements Promise<Stream>
private static class HeadersPromise implements Promise<Stream>
{
private final HttpRequest request;
private final Callback callback;
Expand All @@ -191,9 +191,6 @@ private HeadersPromise(HttpRequest request, Callback callback)
@Override
public void succeeded(Stream stream)
{
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
Expand Down

0 comments on commit 0b315d7

Please sign in to comment.