Skip to content

Commit

Permalink
Fixes #6208 - HTTP/2 max local stream count exceeded (#6220)
Browse files Browse the repository at this point in the history
Forward port of #6220 from jetty-9.4.x to jetty-10.0.x.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
(cherry-picked from commit 2f19c67)
  • Loading branch information
sbordet committed Apr 29, 2021
1 parent f05fc25 commit 2cb2462
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 26 deletions.
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
import org.eclipse.jetty.io.ByteBufferPool;
Expand Down Expand Up @@ -192,11 +193,11 @@ protected Action process() throws Throwable

// If the stream has been reset or removed,
// don't send the frame and fail it here.
if (entry.isStale())
if (entry.shouldBeDropped())
{
if (LOG.isDebugEnabled())
LOG.debug("Stale {}", entry);
entry.failed(new EofException("reset"));
LOG.debug("Dropped {}", entry);
entry.failed(new EofException("dropped"));
pending.remove();
continue;
}
Expand Down Expand Up @@ -447,40 +448,47 @@ public void failed(Throwable x)
}

/**
* @return whether the entry is stale and must not be processed
* @return whether the entry should not be processed
*/
private boolean isStale()
{
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
private boolean shouldBeDropped()
{
switch (frame.getType())
{
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
return false;
// Frames of this type should not be dropped.
case PRIORITY:
case RST_STREAM:
case SETTINGS:
case PING:
case GO_AWAY:
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
return true;
return false;
// Frames of this type follow the logic below.
case DATA:
case HEADERS:
case PUSH_PROMISE:
case CONTINUATION:
case RST_STREAM:
break;
default:
throw new IllegalStateException();
}

// SPEC: section 6.4.
if (frame.getType() == FrameType.RST_STREAM)
return stream != null && stream.isLocal() && !stream.isCommitted();

// Frames that do not have a stream associated are dropped.
if (stream == null)
return true;

return stream.isResetOrFailed();
}

void commit()
{
if (stream != null)
stream.commit();
}

@Override
Expand Down
Expand Up @@ -1270,6 +1270,8 @@ boolean hasHighPriority()
@Override
public void succeeded()
{
commit();

bytesWritten.addAndGet(frameBytes);
frameBytes = 0;

Expand Down
Expand Up @@ -75,6 +75,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private long dataDemand;
private boolean dataInitial;
private boolean dataProcess;
private boolean committed;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local)
{
Expand Down Expand Up @@ -253,6 +254,18 @@ public boolean isLocallyClosed()
return closeState.get() == CloseState.LOCALLY_CLOSED;
}

@Override
public void commit()
{
committed = true;
}

@Override
public boolean isCommitted()
{
return committed;
}

@Override
public boolean isOpen()
{
Expand Down
Expand Up @@ -129,6 +129,19 @@ public interface IStream extends Stream, Attachable, Closeable
*/
boolean isResetOrFailed();

/**
* Marks this stream as committed.
*
* @see #isCommitted()
*/
void commit();

/**
* @return whether bytes for this stream have been sent to the remote peer.
* @see #commit()
*/
boolean isCommitted();

/**
* <p>An ordered list of frames belonging to the same stream.</p>
*/
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
Expand Down Expand Up @@ -61,11 +62,12 @@ protected void prepareServer(ConnectionFactory connectionFactory)

protected void prepareClient()
{
http2Client = new HTTP2Client();
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
this.client.setExecutor(clientExecutor);
ClientConnector connector = new ClientConnector();
connector.setExecutor(clientExecutor);
http2Client = new HTTP2Client(connector);
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client));
}

@AfterEach
Expand Down
Expand Up @@ -23,7 +23,9 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand All @@ -36,25 +38,38 @@
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class MaxConcurrentStreamsTest extends AbstractTest
Expand Down Expand Up @@ -411,6 +426,118 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
}

@Test
public void testTCPCongestedStreamTimesOut() throws Exception
{
CountDownLatch request1Latch = new CountDownLatch(1);
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/1":
{
// Do not return to cause TCP congestion.
assertTrue(awaitLatch(request1Latch, 15, TimeUnit.SECONDS));
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response1, null, true), Callback.NOOP);
break;
}
case "/3":
{
MetaData.Response response3 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response3, null, true), Callback.NOOP);
break;
}
default:
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.INTERNAL_SERVER_ERROR_500, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
// Return a Stream listener that consumes the content.
return new Stream.Listener.Adapter();
}
});
http2.setMaxConcurrentStreams(2);
// Set the HTTP/2 flow control windows very large so we can
// cause TCP congestion, not HTTP/2 flow control congestion.
http2.setInitialSessionRecvWindow(512 * 1024 * 1024);
http2.setInitialStreamRecvWindow(512 * 1024 * 1024);
prepareServer(http2);
server.start();

prepareClient();
AtomicReference<AbstractEndPoint> clientEndPointRef = new AtomicReference<>();
CountDownLatch clientEndPointLatch = new CountDownLatch(1);
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)
{
@Override
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
{
clientEndPointRef.set((AbstractEndPoint)endPoint);
clientEndPointLatch.countDown();
return super.newConnection(endPoint, context);
}
});
client.setMaxConnectionsPerDestination(1);
client.start();

// First request must cause TCP congestion.
CountDownLatch response1Latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort()).path("/1")
.body(new BytesRequestContent(new byte[64 * 1024 * 1024]))
.send(result ->
{
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
response1Latch.countDown();
});

// Wait until TCP congested.
assertTrue(clientEndPointLatch.await(5, TimeUnit.SECONDS));
AbstractEndPoint clientEndPoint = clientEndPointRef.get();
long start = System.nanoTime();
while (!clientEndPoint.getWriteFlusher().isPending())
{
long elapsed = System.nanoTime() - start;
assertThat(TimeUnit.NANOSECONDS.toSeconds(elapsed), Matchers.lessThan(15L));
Thread.sleep(100);
}
// Wait for the selector to update the SelectionKey to OP_WRITE.
Thread.sleep(1000);

// Second request cannot be sent due to TCP congestion and times out.
assertThrows(TimeoutException.class, () -> client.newRequest("localhost", connector.getLocalPort())
.path("/2")
.timeout(1000, TimeUnit.MILLISECONDS)
.send());

// Third request should succeed.
CountDownLatch response3Latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.path("/3")
.send(result ->
{
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
response3Latch.countDown();
});

// Wait for the third request to generate the HTTP/2 stream.
Thread.sleep(1000);

// Resolve the TCP congestion.
request1Latch.countDown();

assertTrue(response1Latch.await(15, TimeUnit.SECONDS));
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand All @@ -431,6 +558,18 @@ private void sleep(long time)
}
}

private boolean awaitLatch(CountDownLatch latch, long time, TimeUnit unit)
{
try
{
return latch.await(time, unit);
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}

private static class Wrapper implements Session.Listener
{
private final Session.Listener listener;
Expand Down

0 comments on commit 2cb2462

Please sign in to comment.