diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 813672fb72c0..1e33255f6fa0 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -550,14 +550,14 @@ public boolean abort(HttpExchange exchange, Throwable failure) // respect to concurrency between request and response. Result result = exchange.terminateResponse(); terminateResponse(exchange, result); + return true; } else { if (LOG.isDebugEnabled()) LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); + return false; } - - return true; } private boolean updateResponseState(ResponseState from, ResponseState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index e2ff05cd6b84..0d4730857791 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -76,7 +76,7 @@ public class HttpRequest implements Request private String query; private String method = HttpMethod.GET.asString(); private HttpVersion version = HttpVersion.HTTP_1_1; - private long idleTimeout; + private long idleTimeout = -1; private long timeout; private long timeoutAt; private ContentProvider content; @@ -99,7 +99,6 @@ protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri) extractParams(query); followRedirects(client.isFollowRedirects()); - idleTimeout = client.getIdleTimeout(); HttpField acceptEncodingField = client.getAcceptEncodingField(); if (acceptEncodingField != null) headers.put(acceptEncodingField); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index a3093b994b21..5aa9787d6104 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -579,14 +579,14 @@ public boolean abort(HttpExchange exchange, Throwable failure) // respect to concurrency between request and response. Result result = exchange.terminateRequest(); terminateRequest(exchange, failure, result); + return true; } else { if (LOG.isDebugEnabled()) LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); + return false; } - - return true; } private boolean updateRequestState(RequestState from, RequestState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 2e344c3c840f..a6e69e8dad78 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -247,7 +247,9 @@ protected SendFailure send(HttpExchange exchange) // Save the old idle timeout to restore it. EndPoint endPoint = getEndPoint(); idleTimeout = endPoint.getIdleTimeout(); - endPoint.setIdleTimeout(request.getIdleTimeout()); + long requestIdleTimeout = request.getIdleTimeout(); + if (requestIdleTimeout >= 0) + endPoint.setIdleTimeout(requestIdleTimeout); // One channel per connection, just delegate the send. return send(channel, exchange); diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index 520375ab16f2..6ae8eb39fca7 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -166,7 +166,7 @@ public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout) { super(connection.getHttpDestination().getHttpClient().getScheduler()); this.connection = connection; - setIdleTimeout(idleTimeout); + setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout()); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 18a59f95b2d6..9c570c37d54e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -562,7 +562,7 @@ public void newStream(HeadersFrame frame, Promise promise, Stream.Listen IStream stream = createLocalStream(streamId); stream.setListener(listener); - ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream)); + ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream)); queued = flusher.append(entry); } // Iterate outside the synchronized block. @@ -606,7 +606,7 @@ public void push(IStream stream, Promise promise, PushPromiseFrame frame IStream pushStream = createLocalStream(streamId); pushStream.setListener(listener); - ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream)); + ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream)); queued = flusher.append(entry); } // Iterate outside the synchronized block. @@ -764,7 +764,8 @@ protected IStream createLocalStream(int streamId) int localCount = localStreamCount.get(); int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) - throw new IllegalStateException("Max local stream count " + maxCount + " exceeded"); + // TODO: remove the dump() in the exception message. + throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump()); if (localStreamCount.compareAndSet(localCount, localCount + 1)) break; } @@ -780,6 +781,7 @@ protected IStream createLocalStream(int streamId) } else { + localStreamCount.decrementAndGet(); throw new IllegalStateException("Duplicate stream " + streamId); } } @@ -816,6 +818,7 @@ protected IStream createRemoteStream(int streamId) } else { + remoteStreamCount.addAndGetHi(-1); onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream"); return null; } @@ -1461,21 +1464,21 @@ public void succeeded() } } - private static class PromiseCallback implements Callback + private static class StreamPromiseCallback implements Callback { - private final Promise promise; - private final C value; + private final Promise promise; + private final IStream stream; - private PromiseCallback(Promise promise, C value) + private StreamPromiseCallback(Promise promise, IStream stream) { this.promise = promise; - this.value = value; + this.stream = stream; } @Override public void succeeded() { - promise.succeeded(value); + promise.succeeded(stream); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 140023878801..715581e53ce7 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -138,6 +138,7 @@ private boolean startWrite(Callback callback) { if (writing.compareAndSet(null, callback)) return true; + close(); callback.failed(new WritePendingException()); return false; } @@ -275,8 +276,6 @@ public void process(Frame frame, Callback callback) private void onHeaders(HeadersFrame frame, Callback callback) { - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); MetaData metaData = frame.getMetaData(); if (metaData.isRequest() || metaData.isResponse()) { @@ -286,6 +285,10 @@ private void onHeaders(HeadersFrame frame, Callback callback) length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString()); dataLength = length >= 0 ? length : Long.MIN_VALUE; } + + if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) + session.removeStream(this); + callback.succeeded(); } @@ -507,6 +510,13 @@ public void close() } } + @Override + public void onClose() + { + super.onClose(); + notifyClosed(this); + } + private void updateStreamCount(int deltaStream, int deltaClosing) { ((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing); @@ -612,6 +622,21 @@ private void notifyFailure(Stream stream, FailureFrame frame, Callback callback) } } + private void notifyClosed(Stream stream) + { + Listener listener = this.listener; + if (listener == null) + return; + try + { + listener.onClosed(stream); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + @Override public String dump() { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java index f76afeaf4457..6ddbf7b0350c 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Stream.java @@ -163,6 +163,13 @@ public interface Listener */ public void onData(Stream stream, DataFrame frame, Callback callback); + /** + *

Callback method invoked when a RST_STREAM frame has been received for this stream.

+ * + * @param stream the stream + * @param frame the RST_FRAME received + * @param callback the callback to complete when the reset has been handled + */ public default void onReset(Stream stream, ResetFrame frame, Callback callback) { try @@ -214,11 +221,28 @@ public default boolean onIdleTimeout(Stream stream, Throwable x) return true; } + /** + *

Callback method invoked when the stream failed.

+ * + * @param stream the stream + * @param error the error code + * @param reason the error reason, or null + * @param callback the callback to complete when the failure has been handled + */ public default void onFailure(Stream stream, int error, String reason, Callback callback) { callback.succeeded(); } + /** + *

Callback method invoked after the stream has been closed.

+ * + * @param stream the stream + */ + public default void onClosed(Stream stream) + { + } + /** *

Empty implementation of {@link Listener}

*/ diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index f9cfa6c2949c..27c6022b6425 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -101,6 +102,11 @@ public void release() connection.release(this); } + void onStreamClosed(IStream stream) + { + connection.onStreamClosed(stream, this); + } + @Override public void exchangeTerminated(HttpExchange exchange, Result result) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 4d4a3cfebe8d..7863d3381caf 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -35,12 +35,17 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable { + private static final Logger LOG = Log.getLogger(HttpConnection.class); + private final Set activeChannels = ConcurrentHashMap.newKeySet(); private final Queue idleChannels = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -87,16 +92,15 @@ protected HttpChannelOverHTTP2 newHttpChannel() protected void release(HttpChannelOverHTTP2 channel) { - // Only non-push channels are released. + if (LOG.isDebugEnabled()) + LOG.debug("Released {}", channel); if (activeChannels.remove(channel)) { - channel.setStream(null); // Recycle only non-failed channels. if (channel.isFailed()) channel.destroy(); else idleChannels.offer(channel); - getHttpDestination().release(this); } else { @@ -104,6 +108,16 @@ protected void release(HttpChannelOverHTTP2 channel) } } + void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} closed for {}", stream, channel); + channel.setStream(null); + // Only non-push channels are released. + if (stream.isLocal()) + getHttpDestination().release(this); + } + @Override public boolean onIdleTimeout(long idleTimeout) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index a0f1817a6bb2..5f1a6751239b 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -38,6 +38,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -171,8 +172,10 @@ public void onReset(Stream stream, ResetFrame frame) @Override public boolean onIdleTimeout(Stream stream, Throwable x) { - responseFailure(x); - return true; + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return false; + return !exchange.abort(x); } @Override @@ -182,6 +185,12 @@ public void onFailure(Stream stream, int error, String reason, Callback callback callback.succeeded(); } + @Override + public void onClosed(Stream stream) + { + getHttpChannel().onStreamClosed((IStream)stream); + } + private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback) { contentNotifier.offer(new DataInfo(exchange, frame, callback)); diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java index 4950c949d3d5..1635d27f912a 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpSenderOverHTTP2.java @@ -67,7 +67,9 @@ public void succeeded(Stream stream) { channel.setStream(stream); ((IStream)stream).setAttachment(channel); - stream.setIdleTimeout(request.getIdleTimeout()); + long idleTimeout = request.getIdleTimeout(); + if (idleTimeout >= 0) + stream.setIdleTimeout(idleTimeout); if (content.hasContent() && !expects100Continue(request)) { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index fe030fa838f1..9e981ee1e807 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -18,6 +18,22 @@ package org.eclipse.jetty.http2.client.http; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.eclipse.jetty.client.AbstractConnectionPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpDestination; @@ -43,21 +59,6 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.Test; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -370,7 +371,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r } @Test - public void testTwoConcurrentStreamsFirstTimesOut() throws Exception + public void testTwoStreamsFirstTimesOut() throws Exception { long timeout = 1000; start(1, new EmptyServerHandler() diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java index eb34e6a7e43e..a6ff741129c0 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java @@ -18,15 +18,6 @@ package org.eclipse.jetty.http.client; -import static org.eclipse.jetty.http.client.Transport.FCGI; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assumptions.assumeTrue; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -65,6 +56,15 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.eclipse.jetty.http.client.Transport.FCGI; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + public class HttpClientContinueTest extends AbstractTest { @Override @@ -344,13 +344,14 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques } }); - scenario.client.setIdleTimeout(idleTimeout); + scenario.client.setIdleTimeout(2 * idleTimeout); byte[] content = new byte[1024]; final CountDownLatch latch = new CountDownLatch(1); scenario.client.newRequest(scenario.newURI()) .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) .content(new BytesContentProvider(content)) + .idleTimeout(idleTimeout, TimeUnit.MILLISECONDS) .send(new BufferingResponseListener() { @Override diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java index 0d2d9fa00139..814d9bd011b5 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientLoadTest.java @@ -18,11 +18,8 @@ package org.eclipse.jetty.http.client; -import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -30,11 +27,11 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; -import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -68,6 +65,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class HttpClientLoadTest extends AbstractTest { private final Logger logger = Log.getLogger(HttpClientLoadTest.class); @@ -186,7 +186,7 @@ private void test(final CountDownLatch latch, final List failures) // Choose a random method HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST; - boolean ssl = scenario.isTransportSecure(); + boolean ssl = scenario.transport.isTlsBased(); // Choose randomly whether to close the connection on the client or on the server boolean clientClose = false; @@ -196,13 +196,17 @@ private void test(final CountDownLatch latch, final List failures) if (!ssl && random.nextInt(100) < 5) serverClose = true; + long clientTimeout = 0; +// if (!ssl && random.nextInt(100) < 5) +// clientTimeout = random.nextInt(500) + 500; + int maxContentLength = 64 * 1024; int contentLength = random.nextInt(maxContentLength) + 1; - test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures); + test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures); } - private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List failures) + private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List failures) { long requestId = requestCount.incrementAndGet(); Request request = scenario.client.newRequest(host, scenario.getNetworkConnectorLocalPortInt().orElse(0)) @@ -215,6 +219,12 @@ private void test(String scheme, String host, String method, boolean clientClose else if (serverClose) request.header("X-Close", "true"); + if (clientTimeout > 0) + { + request.header("X-Timeout", String.valueOf(clientTimeout)); + request.idleTimeout(clientTimeout, TimeUnit.MILLISECONDS); + } + switch (method) { case "GET": @@ -254,12 +264,18 @@ public void onComplete(Result result) { if (result.isFailed()) { - result.getFailure().printStackTrace(); - failures.add("Result failed " + result); + Throwable failure = result.getFailure(); + if (!(clientTimeout > 0 && failure instanceof TimeoutException)) + { + failure.printStackTrace(); + failures.add("Result failed " + result); + } + } + else + { + if (checkContentLength && contentLength.get() != 0) + failures.add("Content length mismatch " + contentLength); } - - if (checkContentLength && contentLength.get() != 0) - failures.add("Content length mismatch " + contentLength); requestLatch.countDown(); latch.countDown(); @@ -288,8 +304,14 @@ private boolean await(CountDownLatch latch, long time, TimeUnit unit) private class LoadHandler extends AbstractHandler { @Override - public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { + baseRequest.setHandled(true); + + String timeout = request.getHeader("X-Timeout"); + if (timeout != null) + sleep(2 * Integer.parseInt(timeout)); + String method = request.getMethod().toUpperCase(Locale.ENGLISH); switch (method) { @@ -313,8 +335,18 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, if (Boolean.parseBoolean(request.getHeader("X-Close"))) response.setHeader("Connection", "close"); + } - baseRequest.setHandled(true); + private void sleep(long time) throws InterruptedIOException + { + try + { + Thread.sleep(time); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } } } @@ -329,8 +361,9 @@ public LoadTransportScenario(Transport transport, AtomicLong connectionLeaks) th } @Override - public Connector newServerConnector( Server server) throws Exception { - if (transport == UNIX_SOCKET) + public Connector newServerConnector( Server server) + { + if (transport == Transport.UNIX_SOCKET) { UnixSocketConnector unixSocketConnector = new UnixSocketConnector( server, provideServerConnectionFactory( transport ));