Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6208 - HTTP/2 max local stream count exceeded (#6220) #6225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -192,11 +193,11 @@ protected Action process() throws Throwable

// If the stream has been reset or removed,
// don't send the frame and fail it here.
if (entry.isStale())
if (entry.shouldBeDropped())
{
if (LOG.isDebugEnabled())
LOG.debug("Stale {}", entry);
entry.failed(new EofException("reset"));
LOG.debug("Dropped {}", entry);
entry.failed(new EofException("dropped"));
pending.remove();
continue;
}
Expand Down Expand Up @@ -447,40 +448,47 @@ public void failed(Throwable x)
}

/**
* @return whether the entry is stale and must not be processed
* @return whether the entry should not be processed
*/
private boolean isStale()
{
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
private boolean shouldBeDropped()
{
switch (frame.getType())
{
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
// Frames of this type should not be dropped.
case PRIORITY:
case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY:
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
return true;
return false;
// Frames of this type follow the logic below.
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
case RST_STREAM:
break;
default:
throw new IllegalStateException();
}

// SPEC: section 6.4.
if (frame.getType() == FrameType.RST_STREAM)
return stream != null && stream.isLocal() && !stream.isCommitted();

// Frames that do not have a stream associated are dropped.
if (stream == null)
return true;

return stream.isResetOrFailed();
}

void commit()
{
if (stream != null)
stream.commit();
}

@Override
Expand Down
Expand Up @@ -1270,6 +1270,8 @@ boolean hasHighPriority()
@Override
public void succeeded()
{
commit();

bytesWritten.addAndGet(frameBytes);
frameBytes = 0;

Expand Down
Expand Up @@ -75,6 +75,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private long dataDemand;
private boolean dataInitial;
private boolean dataProcess;
private boolean committed;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
{
Expand Down Expand Up @@ -253,6 +254,18 @@ public boolean isLocallyClosed()
return closeState.get() == CloseState.LOCALLY_CLOSED;
}

@Override
public void commit()
{
committed = true;
}

@Override
public boolean isCommitted()
{
return committed;
}

@Override
public boolean isOpen()
{
Expand Down
Expand Up @@ -129,6 +129,19 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();

/**
* Marks this stream as committed.
*
* @see #isCommitted()
*/
void commit();

/**
* @return whether bytes for this stream have been sent to the remote peer.
* @see #commit()
*/
boolean isCommitted();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
Expand Down Expand Up @@ -61,11 +62,12 @@ protected void prepareServer(ConnectionFactory connectionFactory)

protected void prepareClient()
{
http2Client = new HTTP2Client();
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
this.client.setExecutor(clientExecutor);
ClientConnector connector = new ClientConnector();
connector.setExecutor(clientExecutor);
http2Client = new HTTP2Client(connector);
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
}

@AfterEach
Expand Down
Expand Up @@ -23,7 +23,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand All @@ -36,25 +38,38 @@
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MaxConcurrentStreamsTest extends AbstractTest
Expand Down Expand Up @@ -411,6 +426,118 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
}

@Test
public void testTCPCongestedStreamTimesOut() throws Exception
{
CountDownLatch request1Latch = new CountDownLatch(1);
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/1":
{
// Do not return to cause TCP congestion.
assertTrue(awaitLatch(request1Latch, 15, TimeUnit.SECONDS));
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response1, null, true), Callback.NOOP);
break;
}
case "/3":
{
MetaData.Response response3 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response3, null, true), Callback.NOOP);
break;
}
default:
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.INTERNAL_SERVER_ERROR_500, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
// Return a Stream listener that consumes the content.
return new Stream.Listener.Adapter();
}
});
http2.setMaxConcurrentStreams(2);
// Set the HTTP/2 flow control windows very large so we can
// cause TCP congestion, not HTTP/2 flow control congestion.
http2.setInitialSessionRecvWindow(512 * 1024 * 1024);
http2.setInitialStreamRecvWindow(512 * 1024 * 1024);
prepareServer(http2);
server.start();

prepareClient();
AtomicReference<AbstractEndPoint> clientEndPointRef = new AtomicReference<>();
CountDownLatch clientEndPointLatch = new CountDownLatch(1);
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)
{
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
clientEndPointRef.set((AbstractEndPoint)endPoint);
clientEndPointLatch.countDown();
return super.newConnection(endPoint, context);
}
});
client.setMaxConnectionsPerDestination(1);
client.start();

// First request must cause TCP congestion.
CountDownLatch response1Latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort()).path("/1")
.body(new BytesRequestContent(new byte[64 * 1024 * 1024]))
.send(result ->
{
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
response1Latch.countDown();
});

// Wait until TCP congested.
assertTrue(clientEndPointLatch.await(5, TimeUnit.SECONDS));
AbstractEndPoint clientEndPoint = clientEndPointRef.get();
long start = System.nanoTime();
while (!clientEndPoint.getWriteFlusher().isPending())
{
long elapsed = System.nanoTime() - start;
assertThat(TimeUnit.NANOSECONDS.toSeconds(elapsed), Matchers.lessThan(15L));
Thread.sleep(100);
}
// Wait for the selector to update the SelectionKey to OP_WRITE.
Thread.sleep(1000);

// Second request cannot be sent due to TCP congestion and times out.
assertThrows(TimeoutException.class, () -> client.newRequest("localhost", connector.getLocalPort())
.path("/2")
.timeout(1000, TimeUnit.MILLISECONDS)
.send());

// Third request should succeed.
CountDownLatch response3Latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.path("/3")
.send(result ->
{
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
response3Latch.countDown();
});

// Wait for the third request to generate the HTTP/2 stream.
Thread.sleep(1000);

// Resolve the TCP congestion.
request1Latch.countDown();

assertTrue(response1Latch.await(15, TimeUnit.SECONDS));
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand All @@ -431,6 +558,18 @@ private void sleep(long time)
}
}

private boolean awaitLatch(CountDownLatch latch, long time, TimeUnit unit)
{
try
{
return latch.await(time, unit);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}

private static class Wrapper implements Session.Listener
{
private final Session.Listener listener;
Expand Down