From 0b315d7519ec28e282f138a237a3178eb751ff37 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 28 Apr 2021 13:47:00 +0200 Subject: [PATCH] Fixes #6208 - HTTP/2 max local stream count exceeded Backported from Jetty 10 the "new stream" event so that the Stream can be set early on the client's `HttpChannelOverHTTP2`. In this way, when a HEADERS frame stalled due to TCP congestion is failed, the corresponding Stream is closed and the connection released to the pool, fixing the "max stream exceeded" issue. Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 54 ++++--- .../org/eclipse/jetty/http2/HTTP2Session.java | 5 + .../org/eclipse/jetty/http2/HTTP2Stream.java | 40 +++++ .../java/org/eclipse/jetty/http2/IStream.java | 13 ++ .../org/eclipse/jetty/http2/api/Stream.java | 10 ++ .../client/http/HttpChannelOverHTTP2.java | 3 + .../client/http/HttpReceiverOverHTTP2.java | 6 + .../client/http/HttpSenderOverHTTP2.java | 5 +- .../client/http/MaxConcurrentStreamsTest.java | 137 ++++++++++++++++++ 9 files changed, 246 insertions(+), 27 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index c7208c2b8f9d..87d29e0d3913 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -31,6 +31,7 @@ import java.util.Set; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.hpack.HpackException; import org.eclipse.jetty.io.ByteBufferPool; @@ -195,11 +196,11 @@ protected Action process() throws Throwable // If the stream has been reset or removed, // don't send the frame and fail it here. - if (entry.isStale()) + if (entry.shouldBeDropped()) { if (LOG.isDebugEnabled()) - LOG.debug("Stale {}", entry); - entry.failed(new EofException("reset")); + LOG.debug("Dropped {}", entry); + entry.failed(new EofException("dropped")); pending.remove(); continue; } @@ -450,40 +451,47 @@ public void failed(Throwable x) } /** - * @return whether the entry is stale and must not be processed + * @return whether the entry should not be processed */ - private boolean isStale() - { - // If it is a protocol frame, process it. - if (isProtocolFrame(frame)) - return false; - // It's an application frame; is the stream gone already? - if (stream == null) - return true; - return stream.isResetOrFailed(); - } - - private boolean isProtocolFrame(Frame frame) + private boolean shouldBeDropped() { switch (frame.getType()) { - case DATA: - case HEADERS: - case PUSH_PROMISE: - case CONTINUATION: - return false; + // Frames of this type should not be dropped. case PRIORITY: - case RST_STREAM: case SETTINGS: case PING: case GO_AWAY: case WINDOW_UPDATE: case PREFACE: case DISCONNECT: - return true; + return false; + // Frames of this type follow the logic below. + case DATA: + case HEADERS: + case PUSH_PROMISE: + case CONTINUATION: + case RST_STREAM: + break; default: throw new IllegalStateException(); } + + // SPEC: section 6.4. + if (frame.getType() == FrameType.RST_STREAM) + return stream != null && stream.isLocal() && !stream.isCommitted(); + + // Frames that do not have a stream associated are dropped. + if (stream == null) + return true; + + return stream.isResetOrFailed(); + } + + void commit() + { + if (stream != null) + stream.commit(); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 55f195331dad..793982a3c38d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -47,6 +47,7 @@ import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PingFrame; +import org.eclipse.jetty.http2.frames.PrefaceFrame; import org.eclipse.jetty.http2.frames.PriorityFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -1221,6 +1222,8 @@ boolean hasHighPriority() @Override public void succeeded() { + commit(); + bytesWritten.addAndGet(frameBytes); frameBytes = 0; @@ -2072,6 +2075,8 @@ private boolean createLocalStream(Slot slot, List frames, Promise promise.succeeded(stream), x -> { HTTP2Session.this.onStreamDestroyed(streamId); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 677181296ea9..efa05f464d83 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -67,6 +67,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private boolean remoteReset; private Listener listener; private long dataLength; + private boolean committed; public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local) { @@ -226,6 +227,18 @@ public boolean isLocallyClosed() return closeState.get() == CloseState.LOCALLY_CLOSED; } + @Override + public void commit() + { + committed = true; + } + + @Override + public boolean isCommitted() + { + return committed; + } + @Override public boolean isOpen() { @@ -278,6 +291,11 @@ public void process(Frame frame, Callback callback) notIdle(); switch (frame.getType()) { + case PREFACE: + { + onNewStream(callback); + break; + } case HEADERS: { onHeaders((HeadersFrame)frame, callback); @@ -315,6 +333,12 @@ public void process(Frame frame, Callback callback) } } + private void onNewStream(Callback callback) + { + notifyNewStream(this); + callback.succeeded(); + } + private void onHeaders(HeadersFrame frame, Callback callback) { MetaData metaData = frame.getMetaData(); @@ -586,6 +610,22 @@ private Callback endWrite() } } + private void notifyNewStream(Stream stream) + { + Listener listener = this.listener; + if (listener != null) + { + try + { + listener.onNewStream(stream); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener {}", listener, x); + } + } + } + private void notifyData(Stream stream, DataFrame frame, Callback callback) { Listener listener = this.listener; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 6aefc65a2504..025365a21b01 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -126,6 +126,19 @@ public interface IStream extends Stream, Attachable, Closeable */ boolean isResetOrFailed(); + /** + * Marks this stream as committed. + * + * @see #isCommitted() + */ + void commit(); + + /** + * @return whether bytes for this stream have been sent to the remote peer. + * @see #commit() + */ + boolean isCommitted(); + /** *

An ordered list of frames belonging to the same stream.

*/ diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index f88fd22f2dd4..66c14b2f0e5f 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -138,6 +138,16 @@ public interface Stream */ interface Listener { + /** + *

Callback method invoked when a stream is created locally by + * {@link Session#newStream(HeadersFrame, Promise, Listener)}.

+ * + * @param stream the newly created stream + */ + public default void onNewStream(Stream stream) + { + } + /** *

Callback method invoked when a HEADERS frame representing the HTTP response has been received.

* diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index ec70babac97c..c1797b62ecee 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -82,6 +83,8 @@ public Stream getStream() public void setStream(Stream stream) { this.stream = stream; + if (stream != null) + ((IStream)stream).setAttachment(this); } public boolean isFailed() diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index f6398d3d4048..2e6762d4dcfe 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -74,6 +74,12 @@ protected void reset() contentNotifier.reset(); } + @Override + public void onNewStream(Stream stream) + { + getHttpChannel().setStream(stream); + } + @Override public void onHeaders(Stream stream, HeadersFrame frame) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 5af4e03d0276..b3b76b8ce2ab 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -177,7 +177,7 @@ private void sendTrailers(Stream stream, HttpFields trailers, Callback callback) stream.headers(trailersFrame, callback); } - private class HeadersPromise implements Promise + private static class HeadersPromise implements Promise { private final HttpRequest request; private final Callback callback; @@ -191,9 +191,6 @@ private HeadersPromise(HttpRequest request, Callback callback) @Override public void succeeded(Stream stream) { - HttpChannelOverHTTP2 channel = getHttpChannel(); - channel.setStream(stream); - ((IStream)stream).setAttachment(channel); long idleTimeout = request.getIdleTimeout(); if (idleTimeout >= 0) stream.setIdleTimeout(idleTimeout); diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index 8d044ace4602..f00a8b1f6d22 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -28,7 +28,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -41,9 +43,14 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -51,15 +58,21 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class MaxConcurrentStreamsTest extends AbstractTest @@ -418,6 +431,118 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS)); } + @Test + public void testTCPCongestedStreamTimesOut() throws Exception + { + CountDownLatch request1Latch = new CountDownLatch(1); + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + switch (request.getURI().getPath()) + { + case "/1": + { + // Do not return to cause TCP congestion. + assertTrue(awaitLatch(request1Latch, 15, TimeUnit.SECONDS)); + MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response1, null, true), Callback.NOOP); + break; + } + case "/3": + { + MetaData.Response response3 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response3, null, true), Callback.NOOP); + break; + } + default: + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.INTERNAL_SERVER_ERROR_500, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + } + return null; + } + }); + http2.setMaxConcurrentStreams(2); + // Set the HTTP/2 flow control windows very large so we can + // cause TCP congestion, not HTTP/2 flow control congestion. + http2.setInitialSessionRecvWindow(512 * 1024 * 1024); + http2.setInitialStreamRecvWindow(512 * 1024 * 1024); + prepareServer(http2); + server.start(); + + prepareClient(); + AtomicReference clientEndPointRef = new AtomicReference<>(); + CountDownLatch clientEndPointLatch = new CountDownLatch(1); + client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client) + { + @Override + public Connection newConnection(EndPoint endPoint, Map context) throws IOException + { + clientEndPointRef.set((AbstractEndPoint)endPoint); + clientEndPointLatch.countDown(); + return super.newConnection(endPoint, context); + } + }); + client.setMaxConnectionsPerDestination(1); + client.start(); + + // First request must cause TCP congestion. + CountDownLatch response1Latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()).path("/1") + .content(new BytesContentProvider(new byte[64 * 1024 * 1024])) + .send(result -> + { + assertTrue(result.isSucceeded(), String.valueOf(result.getFailure())); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + response1Latch.countDown(); + }); + + // Wait until TCP congested. + assertTrue(clientEndPointLatch.await(5, TimeUnit.SECONDS)); + AbstractEndPoint clientEndPoint = clientEndPointRef.get(); + long start = System.nanoTime(); + while (!clientEndPoint.getWriteFlusher().isPending()) + { + long elapsed = System.nanoTime() - start; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 15) + throw new TimeoutException(); + Thread.sleep(100); + } + // Wait for the selector to update the SelectionKey to OP_WRITE. + Thread.sleep(1000); + + // Second request cannot be sent due to TCP congestion and times out. + assertThrows(TimeoutException.class, () -> client.newRequest("localhost", connector.getLocalPort()) + .path("/2") + .timeout(1000, TimeUnit.MILLISECONDS) + .send()); + + // Third request should succeed. + CountDownLatch response3Latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .path("/3") + .send(result -> + { + assertTrue(result.isSucceeded(), String.valueOf(result.getFailure())); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + response3Latch.countDown(); + }); + + // Wait for the third request to generate the HTTP/2 stream. + Thread.sleep(1000); + + // Resolve the TCP congestion. + request1Latch.countDown(); + + assertTrue(response1Latch.await(5, TimeUnit.SECONDS)); + assertTrue(response3Latch.await(5, TimeUnit.SECONDS)); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. @@ -438,6 +563,18 @@ private void sleep(long time) } } + private boolean awaitLatch(CountDownLatch latch, long time, TimeUnit unit) + { + try + { + return latch.await(time, unit); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + private static class Wrapper implements Session.Listener { private final Session.Listener listener;