Skip to content

Commit

Permalink
Fixes #6208 - HTTP/2 max local stream count exceeded (#6220)
Browse files Browse the repository at this point in the history
* Fixes #6208 - HTTP/2 max local stream count exceeded

Backported from Jetty 10 the "new stream" event so that the Stream can be set early on the client's `HttpChannelOverHTTP2`.
In this way, when a HEADERS frame stalled due to TCP congestion is failed, the corresponding Stream is closed and the connection released to the pool, fixing the "max stream exceeded" issue.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Apr 29, 2021
1 parent e3abe7b commit 2f19c67
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 27 deletions.
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -195,11 +196,11 @@ protected Action process() throws Throwable

// If the stream has been reset or removed,
// don't send the frame and fail it here.
if (entry.isStale())
if (entry.shouldBeDropped())
{
if (LOG.isDebugEnabled())
LOG.debug("Stale {}", entry);
entry.failed(new EofException("reset"));
LOG.debug("Dropped {}", entry);
entry.failed(new EofException("dropped"));
pending.remove();
continue;
}
Expand Down Expand Up @@ -450,40 +451,47 @@ public void failed(Throwable x)
}

/**
* @return whether the entry is stale and must not be processed
* @return whether the entry should not be processed
*/
private boolean isStale()
{
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
private boolean shouldBeDropped()
{
switch (frame.getType())
{
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
// Frames of this type should not be dropped.
case PRIORITY:
case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY:
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
return true;
return false;
// Frames of this type follow the logic below.
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
case RST_STREAM:
break;
default:
throw new IllegalStateException();
}

// SPEC: section 6.4.
if (frame.getType() == FrameType.RST_STREAM)
return stream != null && stream.isLocal() && !stream.isCommitted();

// Frames that do not have a stream associated are dropped.
if (stream == null)
return true;

return stream.isResetOrFailed();
}

void commit()
{
if (stream != null)
stream.commit();
}

@Override
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -1221,6 +1222,8 @@ boolean hasHighPriority()
@Override
public void succeeded()
{
commit();

bytesWritten.addAndGet(frameBytes);
frameBytes = 0;

Expand Down Expand Up @@ -2072,6 +2075,8 @@ private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<S
return false;

stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);

Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
Expand Down
Expand Up @@ -67,6 +67,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private boolean remoteReset;
private Listener listener;
private long dataLength;
private boolean committed;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
{
Expand Down Expand Up @@ -226,6 +227,18 @@ public boolean isLocallyClosed()
return closeState.get() == CloseState.LOCALLY_CLOSED;
}

@Override
public void commit()
{
committed = true;
}

@Override
public boolean isCommitted()
{
return committed;
}

@Override
public boolean isOpen()
{
Expand Down Expand Up @@ -278,6 +291,11 @@ public void process(Frame frame, Callback callback)
notIdle();
switch (frame.getType())
{
case PREFACE:
{
onNewStream(callback);
break;
}
case HEADERS:
{
onHeaders((HeadersFrame)frame, callback);
Expand Down Expand Up @@ -315,6 +333,12 @@ public void process(Frame frame, Callback callback)
}
}

private void onNewStream(Callback callback)
{
notifyNewStream(this);
callback.succeeded();
}

private void onHeaders(HeadersFrame frame, Callback callback)
{
MetaData metaData = frame.getMetaData();
Expand Down Expand Up @@ -586,6 +610,22 @@ private Callback endWrite()
}
}

private void notifyNewStream(Stream stream)
{
Listener listener = this.listener;
if (listener != null)
{
try
{
listener.onNewStream(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener {}", listener, x);
}
}
}

private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
Listener listener = this.listener;
Expand Down
Expand Up @@ -126,6 +126,19 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();

/**
* Marks this stream as committed.
*
* @see #isCommitted()
*/
void commit();

/**
* @return whether bytes for this stream have been sent to the remote peer.
* @see #commit()
*/
boolean isCommitted();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Expand Up @@ -138,6 +138,16 @@ public interface Stream
*/
interface Listener
{
/**
* <p>Callback method invoked when a stream is created locally by
* {@link Session#newStream(HeadersFrame, Promise, Listener)}.</p>
*
* @param stream the newly created stream
*/
public default void onNewStream(Stream stream)
{
}

/**
* <p>Callback method invoked when a HEADERS frame representing the HTTP response has been received.</p>
*
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -82,6 +83,8 @@ public Stream getStream()
public void setStream(Stream stream)
{
this.stream = stream;
if (stream != null)
((IStream)stream).setAttachment(this);
}

public boolean isFailed()
Expand Down
Expand Up @@ -74,6 +74,12 @@ protected void reset()
contentNotifier.reset();
}

@Override
public void onNewStream(Stream stream)
{
getHttpChannel().setStream(stream);
}

@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Expand Down
Expand Up @@ -177,7 +177,7 @@ private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
stream.headers(trailersFrame, callback);
}

private class HeadersPromise implements Promise<Stream>
private static class HeadersPromise implements Promise<Stream>
{
private final HttpRequest request;
private final Callback callback;
Expand All @@ -191,9 +191,6 @@ private HeadersPromise(HttpRequest request, Callback callback)
@Override
public void succeeded(Stream stream)
{
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
Expand Down

0 comments on commit 2f19c67

Please sign in to comment.