Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6208 - HTTP/2 max local stream count exceeded #6220

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,6 +31,7 @@
import java.util.Set;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -195,11 +196,11 @@ protected Action process() throws Throwable

// If the stream has been reset or removed,
// don't send the frame and fail it here.
if (entry.isStale())
if (entry.shouldBeDropped())
{
if (LOG.isDebugEnabled())
LOG.debug("Stale {}", entry);
entry.failed(new EofException("reset"));
LOG.debug("Dropped {}", entry);
entry.failed(new EofException("dropped"));
pending.remove();
continue;
}
Expand Down Expand Up @@ -450,40 +451,47 @@ public void failed(Throwable x)
}

/**
* @return whether the entry is stale and must not be processed
* @return whether the entry should not be processed
*/
private boolean isStale()
{
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
private boolean shouldBeDropped()
{
switch (frame.getType())
{
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
// Frames of this type should not be dropped.
case PRIORITY:
case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY:
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
return true;
return false;
// Frames of this type follow the logic below.
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
case RST_STREAM:
break;
default:
throw new IllegalStateException();
}

// SPEC: section 6.4.
if (frame.getType() == FrameType.RST_STREAM)
return stream != null && stream.isLocal() && !stream.isCommitted();

// Frames that do not have a stream associated are dropped.
if (stream == null)
return true;

return stream.isResetOrFailed();
}

void commit()
{
if (stream != null)
stream.commit();
}

@Override
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -1221,6 +1222,8 @@ boolean hasHighPriority()
@Override
public void succeeded()
{
commit();

bytesWritten.addAndGet(frameBytes);
frameBytes = 0;

Expand Down Expand Up @@ -2072,6 +2075,8 @@ private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<S
return false;

stream.setListener(listener);
stream.process(new PrefaceFrame(), Callback.NOOP);

Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
Expand Down
Expand Up @@ -67,6 +67,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private boolean remoteReset;
private Listener listener;
private long dataLength;
private boolean committed;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
{
Expand Down Expand Up @@ -226,6 +227,18 @@ public boolean isLocallyClosed()
return closeState.get() == CloseState.LOCALLY_CLOSED;
}

@Override
public void commit()
{
committed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the memory barrier on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is set and read from the flusher thread, so the memory barrier is guaranteed by IteratingCallback.

}

@Override
public boolean isCommitted()
{
return committed;
}

@Override
public boolean isOpen()
{
Expand Down Expand Up @@ -278,6 +291,11 @@ public void process(Frame frame, Callback callback)
notIdle();
switch (frame.getType())
{
case PREFACE:
{
onNewStream(callback);
break;
}
case HEADERS:
{
onHeaders((HeadersFrame)frame, callback);
Expand Down Expand Up @@ -315,6 +333,12 @@ public void process(Frame frame, Callback callback)
}
}

private void onNewStream(Callback callback)
{
notifyNewStream(this);
callback.succeeded();
}

private void onHeaders(HeadersFrame frame, Callback callback)
{
MetaData metaData = frame.getMetaData();
Expand Down Expand Up @@ -586,6 +610,22 @@ private Callback endWrite()
}
}

private void notifyNewStream(Stream stream)
{
Listener listener = this.listener;
if (listener != null)
{
try
{
listener.onNewStream(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener {}", listener, x);
}
}
}

private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
Listener listener = this.listener;
Expand Down
Expand Up @@ -126,6 +126,19 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();

/**
* Marks this stream as committed.
*
* @see #isCommitted()
*/
void commit();

/**
* @return whether bytes for this stream have been sent to the remote peer.
* @see #commit()
*/
boolean isCommitted();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Expand Up @@ -138,6 +138,16 @@ public interface Stream
*/
interface Listener
{
/**
* <p>Callback method invoked when a stream is created locally by
* {@link Session#newStream(HeadersFrame, Promise, Listener)}.</p>
*
* @param stream the newly created stream
*/
public default void onNewStream(Stream stream)
{
}

/**
* <p>Callback method invoked when a HEADERS frame representing the HTTP response has been received.</p>
*
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -82,6 +83,8 @@ public Stream getStream()
public void setStream(Stream stream)
{
this.stream = stream;
if (stream != null)
((IStream)stream).setAttachment(this);
}

public boolean isFailed()
Expand Down
Expand Up @@ -74,6 +74,12 @@ protected void reset()
contentNotifier.reset();
}

@Override
public void onNewStream(Stream stream)
{
getHttpChannel().setStream(stream);
}

@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Expand Down
Expand Up @@ -177,7 +177,7 @@ private void sendTrailers(Stream stream, HttpFields trailers, Callback callback)
stream.headers(trailersFrame, callback);
}

private class HeadersPromise implements Promise<Stream>
private static class HeadersPromise implements Promise<Stream>
{
private final HttpRequest request;
private final Callback callback;
Expand All @@ -191,9 +191,6 @@ private HeadersPromise(HttpRequest request, Callback callback)
@Override
public void succeeded(Stream stream)
{
HttpChannelOverHTTP2 channel = getHttpChannel();
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);
Expand Down