diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 813672fb72c0..1e33255f6fa0 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -550,14 +550,14 @@ public boolean abort(HttpExchange exchange, Throwable failure) // respect to concurrency between request and response. Result result = exchange.terminateResponse(); terminateResponse(exchange, result); + return true; } else { if (LOG.isDebugEnabled()) LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); + return false; } - - return true; } private boolean updateResponseState(ResponseState from, ResponseState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index e2ff05cd6b84..0d4730857791 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -76,7 +76,7 @@ public class HttpRequest implements Request private String query; private String method = HttpMethod.GET.asString(); private HttpVersion version = HttpVersion.HTTP_1_1; - private long idleTimeout; + private long idleTimeout = -1; private long timeout; private long timeoutAt; private ContentProvider content; @@ -99,7 +99,6 @@ protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri) extractParams(query); followRedirects(client.isFollowRedirects()); - idleTimeout = client.getIdleTimeout(); HttpField acceptEncodingField = client.getAcceptEncodingField(); if (acceptEncodingField != null) headers.put(acceptEncodingField); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index a3093b994b21..5aa9787d6104 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -579,14 +579,14 @@ public boolean abort(HttpExchange exchange, Throwable failure) // respect to concurrency between request and response. Result result = exchange.terminateRequest(); terminateRequest(exchange, failure, result); + return true; } else { if (LOG.isDebugEnabled()) LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); + return false; } - - return true; } private boolean updateRequestState(RequestState from, RequestState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 2e344c3c840f..a6e69e8dad78 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -247,7 +247,9 @@ protected SendFailure send(HttpExchange exchange) // Save the old idle timeout to restore it. EndPoint endPoint = getEndPoint(); idleTimeout = endPoint.getIdleTimeout(); - endPoint.setIdleTimeout(request.getIdleTimeout()); + long requestIdleTimeout = request.getIdleTimeout(); + if (requestIdleTimeout >= 0) + endPoint.setIdleTimeout(requestIdleTimeout); // One channel per connection, just delegate the send. return send(channel, exchange); diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index 520375ab16f2..6ae8eb39fca7 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -166,7 +166,7 @@ public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout) { super(connection.getHttpDestination().getHttpClient().getScheduler()); this.connection = connection; - setIdleTimeout(idleTimeout); + setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout()); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 04279864c5d2..83a82d17eb79 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -341,7 +341,7 @@ public void onSettings(SettingsFrame frame, boolean reply) case SettingsFrame.MAX_CONCURRENT_STREAMS: { if (LOG.isDebugEnabled()) - LOG.debug("Updating max local concurrent streams to {} for {}", maxLocalStreams, this); + LOG.debug("Updating max local concurrent streams to {} for {}", value, this); maxLocalStreams = value; break; } @@ -561,7 +561,7 @@ public void newStream(HeadersFrame frame, Promise promise, Stream.Listen IStream stream = createLocalStream(streamId); stream.setListener(listener); - ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream)); + ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)); queued = flusher.append(entry); } // Iterate outside the synchronized block. @@ -605,7 +605,7 @@ public void push(IStream stream, Promise promise, PushPromiseFrame frame IStream pushStream = createLocalStream(streamId); pushStream.setListener(listener); - ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream)); + ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream)); queued = flusher.append(entry); } // Iterate outside the synchronized block. @@ -779,6 +779,7 @@ protected IStream createLocalStream(int streamId) } else { + localStreamCount.decrementAndGet(); throw new IllegalStateException("Duplicate stream " + streamId); } } @@ -815,6 +816,7 @@ protected IStream createRemoteStream(int streamId) } else { + remoteStreamCount.addAndGetHi(-1); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream"); return null; } @@ -1461,21 +1463,21 @@ public void succeeded() } } - private static class PromiseCallback implements Callback + private static class StreamPromiseCallback implements Callback { - private final Promise promise; - private final C value; + private final Promise promise; + private final IStream stream; - private PromiseCallback(Promise promise, C value) + private StreamPromiseCallback(Promise promise, IStream stream) { this.promise = promise; - this.value = value; + this.stream = stream; } @Override public void succeeded() { - promise.succeeded(value); + promise.succeeded(stream); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index b9609ad49032..265f57aa8a4f 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -139,6 +139,7 @@ private boolean startWrite(Callback callback) { if (writing.compareAndSet(null, callback)) return true; + close(); callback.failed(new WritePendingException()); return false; } @@ -275,8 +276,6 @@ public void process(Frame frame, Callback callback) private void onHeaders(HeadersFrame frame, Callback callback) { - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); MetaData metaData = frame.getMetaData(); if (metaData.isRequest() || metaData.isResponse()) { @@ -286,6 +285,10 @@ private void onHeaders(HeadersFrame frame, Callback callback) length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString()); dataLength = length >= 0 ? length : Long.MIN_VALUE; } + + if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) + session.removeStream(this); + callback.succeeded(); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index f76afeaf4457..9e879cab8915 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -163,6 +163,13 @@ public interface Listener */ public void onData(Stream stream, DataFrame frame, Callback callback); + /** + *

Callback method invoked when a RST_STREAM frame has been received for this stream.

+ * + * @param stream the stream + * @param frame the RST_FRAME received + * @param callback the callback to complete when the reset has been handled + */ public default void onReset(Stream stream, ResetFrame frame, Callback callback) { try @@ -214,6 +221,14 @@ public default boolean onIdleTimeout(Stream stream, Throwable x) return true; } + /** + *

Callback method invoked when the stream failed.

+ * + * @param stream the stream + * @param error the error code + * @param reason the error reason, or null + * @param callback the callback to complete when the failure has been handled + */ public default void onFailure(Stream stream, int error, String reason, Callback callback) { callback.succeeded(); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index a0f1817a6bb2..94fa9776fa2d 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -171,8 +171,10 @@ public void onReset(Stream stream, ResetFrame frame) @Override public boolean onIdleTimeout(Stream stream, Throwable x) { - responseFailure(x); - return true; + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return false; + return !exchange.abort(x); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 4950c949d3d5..1635d27f912a 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -67,7 +67,9 @@ public void succeeded(Stream stream) { channel.setStream(stream); ((IStream)stream).setAttachment(channel); - stream.setIdleTimeout(request.getIdleTimeout()); + long idleTimeout = request.getIdleTimeout(); + if (idleTimeout >= 0) + stream.setIdleTimeout(idleTimeout); if (content.hasContent() && !expects100Continue(request)) { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index fe030fa838f1..9e981ee1e807 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -18,6 +18,22 @@ package org.eclipse.jetty.http2.client.http; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.eclipse.jetty.client.AbstractConnectionPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpDestination; @@ -43,21 +59,6 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.Test; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -370,7 +371,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r } @Test - public void testTwoConcurrentStreamsFirstTimesOut() throws Exception + public void testTwoStreamsFirstTimesOut() throws Exception { long timeout = 1000; start(1, new EmptyServerHandler() diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java index eb34e6a7e43e..a6ff741129c0 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java @@ -18,15 +18,6 @@ package org.eclipse.jetty.http.client; -import static org.eclipse.jetty.http.client.Transport.FCGI; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -65,6 +56,15 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.eclipse.jetty.http.client.Transport.FCGI; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + public class HttpClientContinueTest extends AbstractTest { @Override @@ -344,13 +344,14 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques } }); - scenario.client.setIdleTimeout(idleTimeout); + scenario.client.setIdleTimeout(2 * idleTimeout); byte[] content = new byte[1024]; final CountDownLatch latch = new CountDownLatch(1); scenario.client.newRequest(scenario.newURI()) .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) .content(new BytesContentProvider(content)) + .idleTimeout(idleTimeout, TimeUnit.MILLISECONDS) .send(new BufferingResponseListener() { @Override