diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index d0e9df4cc50d..aedaa2e0bef5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -26,6 +26,7 @@ import java.util.Set; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.FrameType; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.hpack.HpackException; import org.eclipse.jetty.io.ByteBufferPool; @@ -192,11 +193,11 @@ protected Action process() throws Throwable // If the stream has been reset or removed, // don't send the frame and fail it here. - if (entry.isStale()) + if (entry.shouldBeDropped()) { if (LOG.isDebugEnabled()) - LOG.debug("Stale {}", entry); - entry.failed(new EofException("reset")); + LOG.debug("Dropped {}", entry); + entry.failed(new EofException("dropped")); pending.remove(); continue; } @@ -447,40 +448,47 @@ public void failed(Throwable x) } /** - * @return whether the entry is stale and must not be processed + * @return whether the entry should not be processed */ - private boolean isStale() - { - // If it is a protocol frame, process it. - if (isProtocolFrame(frame)) - return false; - // It's an application frame; is the stream gone already? - if (stream == null) - return true; - return stream.isResetOrFailed(); - } - - private boolean isProtocolFrame(Frame frame) + private boolean shouldBeDropped() { switch (frame.getType()) { - case DATA: - case HEADERS: - case PUSH_PROMISE: - case CONTINUATION: - return false; + // Frames of this type should not be dropped. case PRIORITY: - case RST_STREAM: case SETTINGS: case PING: case GO_AWAY: case WINDOW_UPDATE: case PREFACE: case DISCONNECT: - return true; + return false; + // Frames of this type follow the logic below. + case DATA: + case HEADERS: + case PUSH_PROMISE: + case CONTINUATION: + case RST_STREAM: + break; default: throw new IllegalStateException(); } + + // SPEC: section 6.4. + if (frame.getType() == FrameType.RST_STREAM) + return stream != null && stream.isLocal() && !stream.isCommitted(); + + // Frames that do not have a stream associated are dropped. + if (stream == null) + return true; + + return stream.isResetOrFailed(); + } + + void commit() + { + if (stream != null) + stream.commit(); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index e72e980e1c52..b7a7df6fcd1c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1270,6 +1270,8 @@ boolean hasHighPriority() @Override public void succeeded() { + commit(); + bytesWritten.addAndGet(frameBytes); frameBytes = 0; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 8805fcb3a638..8df7f805b5ce 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -75,6 +75,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private long dataDemand; private boolean dataInitial; private boolean dataProcess; + private boolean committed; public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, MetaData.Request request, boolean local) { @@ -253,6 +254,18 @@ public boolean isLocallyClosed() return closeState.get() == CloseState.LOCALLY_CLOSED; } + @Override + public void commit() + { + committed = true; + } + + @Override + public boolean isCommitted() + { + return committed; + } + @Override public boolean isOpen() { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 2ef24ef9c256..177afc6fa196 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -129,6 +129,19 @@ public interface IStream extends Stream, Attachable, Closeable */ boolean isResetOrFailed(); + /** + * Marks this stream as committed. + * + * @see #isCommitted() + */ + void commit(); + + /** + * @return whether bytes for this stream have been sent to the remote peer. + * @see #commit() + */ + boolean isCommitted(); + /** *

An ordered list of frames belonging to the same stream.

*/ diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java index de632ad95561..0efdbb6d841c 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java @@ -18,6 +18,7 @@ import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; @@ -61,11 +62,12 @@ protected void prepareServer(ConnectionFactory connectionFactory) protected void prepareClient() { - http2Client = new HTTP2Client(); - client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)); QueuedThreadPool clientExecutor = new QueuedThreadPool(); clientExecutor.setName("client"); - this.client.setExecutor(clientExecutor); + ClientConnector connector = new ClientConnector(); + connector.setExecutor(clientExecutor); + http2Client = new HTTP2Client(connector); + client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)); } @AfterEach diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index 4cb78cffd23c..4b6d68e0e1e8 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -23,7 +23,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -36,9 +38,14 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BytesRequestContent; +import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -46,15 +53,23 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class MaxConcurrentStreamsTest extends AbstractTest @@ -411,6 +426,118 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS)); } + @Test + public void testTCPCongestedStreamTimesOut() throws Exception + { + CountDownLatch request1Latch = new CountDownLatch(1); + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + switch (request.getURI().getPath()) + { + case "/1": + { + // Do not return to cause TCP congestion. + assertTrue(awaitLatch(request1Latch, 15, TimeUnit.SECONDS)); + MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response1, null, true), Callback.NOOP); + break; + } + case "/3": + { + MetaData.Response response3 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response3, null, true), Callback.NOOP); + break; + } + default: + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.INTERNAL_SERVER_ERROR_500, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + } + // Return a Stream listener that consumes the content. + return new Stream.Listener.Adapter(); + } + }); + http2.setMaxConcurrentStreams(2); + // Set the HTTP/2 flow control windows very large so we can + // cause TCP congestion, not HTTP/2 flow control congestion. + http2.setInitialSessionRecvWindow(512 * 1024 * 1024); + http2.setInitialStreamRecvWindow(512 * 1024 * 1024); + prepareServer(http2); + server.start(); + + prepareClient(); + AtomicReference clientEndPointRef = new AtomicReference<>(); + CountDownLatch clientEndPointLatch = new CountDownLatch(1); + client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client) + { + @Override + public Connection newConnection(EndPoint endPoint, Map context) throws IOException + { + clientEndPointRef.set((AbstractEndPoint)endPoint); + clientEndPointLatch.countDown(); + return super.newConnection(endPoint, context); + } + }); + client.setMaxConnectionsPerDestination(1); + client.start(); + + // First request must cause TCP congestion. + CountDownLatch response1Latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()).path("/1") + .body(new BytesRequestContent(new byte[64 * 1024 * 1024])) + .send(result -> + { + assertTrue(result.isSucceeded(), String.valueOf(result.getFailure())); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + response1Latch.countDown(); + }); + + // Wait until TCP congested. + assertTrue(clientEndPointLatch.await(5, TimeUnit.SECONDS)); + AbstractEndPoint clientEndPoint = clientEndPointRef.get(); + long start = System.nanoTime(); + while (!clientEndPoint.getWriteFlusher().isPending()) + { + long elapsed = System.nanoTime() - start; + assertThat(TimeUnit.NANOSECONDS.toSeconds(elapsed), Matchers.lessThan(15L)); + Thread.sleep(100); + } + // Wait for the selector to update the SelectionKey to OP_WRITE. + Thread.sleep(1000); + + // Second request cannot be sent due to TCP congestion and times out. + assertThrows(TimeoutException.class, () -> client.newRequest("localhost", connector.getLocalPort()) + .path("/2") + .timeout(1000, TimeUnit.MILLISECONDS) + .send()); + + // Third request should succeed. + CountDownLatch response3Latch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .path("/3") + .send(result -> + { + assertTrue(result.isSucceeded(), String.valueOf(result.getFailure())); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + response3Latch.countDown(); + }); + + // Wait for the third request to generate the HTTP/2 stream. + Thread.sleep(1000); + + // Resolve the TCP congestion. + request1Latch.countDown(); + + assertTrue(response1Latch.await(15, TimeUnit.SECONDS)); + assertTrue(response3Latch.await(5, TimeUnit.SECONDS)); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. @@ -431,6 +558,18 @@ private void sleep(long time) } } + private boolean awaitLatch(CountDownLatch latch, long time, TimeUnit unit) + { + try + { + return latch.await(time, unit); + } + catch (InterruptedException x) + { + throw new RuntimeException(x); + } + } + private static class Wrapper implements Session.Listener { private final Session.Listener listener;