From 0bd15e083163699a5a33fc21d0af23e88d098028 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 7 Sep 2021 09:57:31 +0200 Subject: [PATCH] Fixes #6693 - FastCGI review (#6694) * Fixes #6693 - FastCGI review - Removed code supporting multiplex in the client. - Removed code supporting multiplex in the server. - Reworked the server-side processing of a request, now more similar to HTTP/1.1. - Improved javadocs. Signed-off-by: Simone Bordet --- .../client/http/HttpConnectionOverFCGI.java | 106 ++++------ .../parser/BeginRequestContentParser.java | 2 +- .../jetty/fcgi/parser/ContentParser.java | 49 ++++- .../fcgi/parser/EndRequestContentParser.java | 2 +- .../jetty/fcgi/parser/HeaderParser.java | 4 +- .../fcgi/parser/ParamsContentParser.java | 11 +- .../org/eclipse/jetty/fcgi/parser/Parser.java | 24 ++- .../fcgi/parser/ResponseContentParser.java | 10 +- .../fcgi/parser/StreamContentParser.java | 15 +- .../fcgi/server/HttpChannelOverFCGI.java | 195 +++++++++++------- .../fcgi/server/HttpTransportOverFCGI.java | 3 + .../fcgi/server/ServerFCGIConnection.java | 152 +++++++++++--- .../server/AbstractHttpClientServerTest.java | 25 +-- .../jetty/fcgi/server/HttpClientTest.java | 87 ++++---- 14 files changed, 431 insertions(+), 254 deletions(-) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index cf49a458e299..50c603d18385 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -16,12 +16,9 @@ import java.io.EOFException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,19 +56,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class); + private final RetainableByteBufferPool networkByteBufferPool; private final AutoLock lock = new AutoLock(); private final LinkedList requests = new LinkedList<>(); - private final Map activeChannels = new ConcurrentHashMap<>(); - private final Queue idleChannels = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(); private final HttpDestination destination; private final Promise promise; private final Flusher flusher; private final Delegate delegate; private final ClientParser parser; + private HttpChannelOverFCGI channel; private RetainableByteBuffer networkBuffer; private Object attachment; - private final RetainableByteBufferPool retainableByteBufferPool; public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise promise) { @@ -82,9 +78,8 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr this.delegate = new Delegate(destination); this.parser = new ClientParser(new ResponseListener()); requests.addLast(0); - HttpClient client = destination.getHttpClient(); - this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool()); + this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool()); } public HttpDestination getHttpDestination() @@ -139,7 +134,7 @@ private void reacquireNetworkBuffer() private RetainableByteBuffer newNetworkBuffer() { HttpClient client = destination.getHttpClient(); - return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers()); + return networkByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers()); } private void releaseNetworkBuffer() @@ -205,7 +200,8 @@ private void shutdown() { // Close explicitly only if we are idle, since the request may still // be in progress, otherwise close only if we can fail the responses. - if (activeChannels.isEmpty()) + HttpChannelOverFCGI channel = this.channel; + if (channel == null || channel.getRequest() == 0) close(); else failAndClose(new EOFException(String.valueOf(getEndPoint()))); @@ -223,20 +219,25 @@ public boolean onIdleExpired() protected void release(HttpChannelOverFCGI channel) { - if (activeChannels.remove(channel.getRequest()) == null) - { - channel.destroy(); - } - else + HttpChannelOverFCGI existing = this.channel; + if (existing == channel) { channel.setRequest(0); // Recycle only non-failed channels. if (channel.isFailed()) + { channel.destroy(); - else - idleChannels.offer(channel); + this.channel = null; + } destination.release(this); } + else + { + if (existing == null) + channel.destroy(); + else + throw new UnsupportedOperationException("FastCGI Multiplex"); + } } @Override @@ -290,34 +291,27 @@ protected boolean closeByHTTP(HttpFields fields) protected void abort(Throwable failure) { - for (HttpChannelOverFCGI channel : activeChannels.values()) + HttpChannelOverFCGI channel = this.channel; + if (channel != null) { HttpExchange exchange = channel.getHttpExchange(); if (exchange != null) exchange.getRequest().abort(failure); channel.destroy(); - } - activeChannels.clear(); - - HttpChannel channel = idleChannels.poll(); - while (channel != null) - { - channel.destroy(); - channel = idleChannels.poll(); + this.channel = null; } } private void failAndClose(Throwable failure) { - boolean result = false; - for (HttpChannelOverFCGI channel : activeChannels.values()) + HttpChannelOverFCGI channel = this.channel; + if (channel != null) { - result |= channel.responseFailure(failure); + boolean result = channel.responseFailure(failure); channel.destroy(); + if (result) + close(failure); } - - if (result) - close(failure); } private int acquireRequest() @@ -341,7 +335,6 @@ private void releaseRequest(int request) protected HttpChannelOverFCGI acquireHttpChannel(int id, Request request) { - HttpChannelOverFCGI channel = idleChannels.poll(); if (channel == null) channel = newHttpChannel(request); channel.setRequest(id); @@ -373,7 +366,8 @@ private Delegate(HttpDestination destination) @Override protected Iterator getHttpChannels() { - return new IteratorWrapper<>(activeChannels.values().iterator()); + HttpChannel channel = HttpConnectionOverFCGI.this.channel; + return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator(); } @Override @@ -382,10 +376,8 @@ public SendFailure send(HttpExchange exchange) HttpRequest request = exchange.getRequest(); normalizeRequest(request); - // FCGI may be multiplexed, so one channel for each exchange. int id = acquireRequest(); HttpChannelOverFCGI channel = acquireHttpChannel(id, request); - activeChannels.put(id, channel); return send(channel, exchange); } @@ -420,7 +412,7 @@ private class ResponseListener implements ClientParser.Listener @Override public void onBegin(int request, int code, String reason) { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) channel.responseBegin(code, reason); else @@ -430,7 +422,7 @@ public void onBegin(int request, int code, String reason) @Override public void onHeader(int request, HttpField field) { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) channel.responseHeader(field); else @@ -440,7 +432,7 @@ public void onHeader(int request, HttpField field) @Override public boolean onHeaders(int request) { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) return !channel.responseHeaders(); noChannel(request); @@ -454,7 +446,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { case STD_OUT: { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) { networkBuffer.retain(); @@ -482,7 +474,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) @Override public void onEnd(int request) { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) { if (channel.responseSuccess()) @@ -497,7 +489,7 @@ public void onEnd(int request) @Override public void onFailure(int request, Throwable failure) { - HttpChannelOverFCGI channel = activeChannels.get(request); + HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel; if (channel != null) { if (channel.responseFailure(failure)) @@ -515,32 +507,4 @@ private void noChannel(int request) LOG.debug("Channel not found for request {}", request); } } - - private static final class IteratorWrapper implements Iterator - { - private final Iterator iterator; - - private IteratorWrapper(Iterator iterator) - { - this.iterator = iterator; - } - - @Override - public boolean hasNext() - { - return iterator.hasNext(); - } - - @Override - public T next() - { - return iterator.next(); - } - - @Override - public void remove() - { - iterator.remove(); - } - } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java index ad4cb9a2aeff..9876a3176777 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/BeginRequestContentParser.java @@ -18,7 +18,7 @@ import org.eclipse.jetty.fcgi.FCGI; /** - *

Parser for the BEGIN_REQUEST frame body.

+ *

Parser for the BEGIN_REQUEST frame content.

*
  * struct begin_request_body {
  *     ushort role;
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java
index add250b90d5d..7cd56efa1eb3 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ContentParser.java
@@ -15,6 +15,23 @@
 
 import java.nio.ByteBuffer;
 
+/**
+ * 

Parser for FastCGI frame content.

+ *

Depending on the frame type specified in the FastCGI frame header, + * the FastCGI frame content has different formats and it is parsed by + * different implementation of this abstract class.

+ *

There are these frame content types:

+ *
    + *
  • {@code BEGIN_REQUEST}, to signal the begin of the request
  • + *
  • {@code PARAMS}, key/value pairs
  • + *
  • {@code STDIN}, the request body, handled as a stream
  • + *
  • {@code STDOUT}, the response body, handled as a stream
  • + *
  • {@code STDERR}, the response error, handled as a stream
  • + *
  • {@code END_REQUEST}, to signal the end of the response
  • + *
+ * + * @see Parser + */ public abstract class ContentParser { private final HeaderParser headerParser; @@ -24,9 +41,20 @@ protected ContentParser(HeaderParser headerParser) this.headerParser = headerParser; } + /** + *

Parses the bytes in the given {@code buffer} as FastCGI frame content bytes.

+ * + * @param buffer the bytes to parse + * @return the result of the parsing + */ public abstract Result parse(ByteBuffer buffer); - public void noContent() + /** + *

Invoked by the {@link Parser} when the frame content length is zero.

+ * + * @return whether the parsing should stop + */ + public boolean noContent() { throw new IllegalStateException(); } @@ -41,8 +69,25 @@ protected int getContentLength() return headerParser.getContentLength(); } + /** + *

The result of the frame content parsing.

+ */ public enum Result { - PENDING, ASYNC, COMPLETE + /** + *

Not enough bytes have been provided to the parser + * with a call to {@link ContentParser#parse(ByteBuffer)}.

+ */ + PENDING, + /** + *

The frame content has been parsed, but the application + * signalled that it wants to process the content asynchronously.

+ */ + ASYNC, + /** + *

The frame content parsing is complete, + * and the parser can now parse the padding bytes.

+ */ + COMPLETE } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java index 212eb200f7e9..df707133ecad 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java @@ -16,7 +16,7 @@ import java.nio.ByteBuffer; /** - *

Parser for the END_REQUEST frame body.

+ *

Parser for the END_REQUEST frame content.

*
  * struct end_request_body {
  *     uint applicationStatus;
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java
index 6ffb9fd5ffa4..1ff6d645b927 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/HeaderParser.java
@@ -20,7 +20,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * 

Parser for FastCGI frame headers.

+ *

Parser for the FastCGI frame header.

*
  * struct frame_header {
  *     ubyte version;
@@ -47,7 +47,7 @@ public class HeaderParser
     private int padding;
 
     /**
-     * Parses the bytes in the given {@code buffer} as FastCGI header bytes
+     * Parses the bytes in the given {@code buffer} as FastCGI frame header bytes
      *
      * @param buffer the bytes to parse
      * @return whether there were enough bytes for a FastCGI header
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java
index 41056c7d27f6..275225c16d08 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ParamsContentParser.java
@@ -22,7 +22,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * 

Parser for the PARAMS frame body.

+ *

Parser for the PARAMS frame content.

*
  * struct small_name_small_value_params_body {
  *     ubyte nameLength;
@@ -227,9 +227,9 @@ public Result parse(ByteBuffer buffer)
     }
 
     @Override
-    public void noContent()
+    public boolean noContent()
     {
-        onParams();
+        return onParams();
     }
 
     protected void onParam(String name, String value)
@@ -245,16 +245,17 @@ protected void onParam(String name, String value)
         }
     }
 
-    protected void onParams()
+    protected boolean onParams()
     {
         try
         {
-            listener.onHeaders(getRequest());
+            return listener.onHeaders(getRequest());
         }
         catch (Throwable x)
         {
             if (LOG.isDebugEnabled())
                 LOG.debug("Exception while invoking listener {}", listener, x);
+            return false;
         }
     }
 
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java
index 5db5a6ec9d91..2f2e9381581a 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java
@@ -36,6 +36,20 @@
  * 
*

Depending on the {@code type}, the content may have a different format, * so there are specialized content parsers.

+ *

A typical exchange is:

+ *
+ * BEGIN_REQUEST
+ * PARAMS (length > 0)
+ * PARAMS (length == 0 to signal end of PARAMS frames)
+ * [STDIN (length > 0 in case of request content)]
+ * STDIN (length == 0 to signal end of STDIN frames and end of request)
+ * ...
+ * STDOUT (length > 0 with HTTP headers and HTTP content)
+ * STDOUT (length == 0 to signal end of STDOUT frames)
+ * [STDERR (length > 0)]
+ * [STDERR (length == 0 to signal end of STDERR frames)]
+ * END_REQUEST
+ * 
* * @see HeaderParser * @see ContentParser @@ -70,7 +84,10 @@ public boolean parse(ByteBuffer buffer) ContentParser contentParser = findContentParser(headerParser.getFrameType()); if (headerParser.getContentLength() == 0) { - contentParser.noContent(); + padding = headerParser.getPaddingLength(); + state = State.PADDING; + if (contentParser.noContent()) + return true; } else { @@ -89,9 +106,9 @@ public boolean parse(ByteBuffer buffer) // parsing; the async operation will eventually resume parsing. return true; } + padding = headerParser.getPaddingLength(); + state = State.PADDING; } - padding = headerParser.getPaddingLength(); - state = State.PADDING; break; } case PADDING: @@ -176,7 +193,6 @@ public void onEnd(int request) @Override public void onFailure(int request, Throwable failure) { - } } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java index d1a34250e859..6673bf359749 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ResponseContentParser.java @@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory; /** - *

The parser for STDOUT type frame bodies.

- *

STDOUT frame bodies contain both the HTTP headers (but not the response line) + *

The parser for STDOUT frame content.

+ *

STDOUT frame content contain both the HTTP headers (but not the response line) * and the HTTP content (either Content-Length delimited or chunked).

*

For this reason, a special HTTP parser is used to parse the frames body. * This special HTTP parser is configured to skip the response line, and to @@ -52,9 +52,11 @@ public ResponseContentParser(HeaderParser headerParser, ClientParser.Listener li } @Override - public void noContent() + public boolean noContent() { - // Does nothing, since for responses the end of content is signaled via a FCGI_END_REQUEST frame + // Does nothing, since for responses the end of + // content is signaled via a FCGI_END_REQUEST frame. + return false; } @Override diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java index 815d41bf7753..249108d43082 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/StreamContentParser.java @@ -57,14 +57,18 @@ public Result parse(ByteBuffer buffer) int length = Math.min(contentLength, buffer.remaining()); int limit = buffer.limit(); buffer.limit(buffer.position() + length); - final ByteBuffer slice = buffer.slice(); + ByteBuffer slice = buffer.slice(); buffer.position(buffer.limit()); buffer.limit(limit); contentLength -= length; + if (contentLength <= 0) + state = State.EOF; if (onContent(slice)) return Result.ASYNC; - if (contentLength > 0) - break; + break; + } + case EOF: + { state = State.LENGTH; return Result.COMPLETE; } @@ -78,7 +82,7 @@ public Result parse(ByteBuffer buffer) } @Override - public void noContent() + public boolean noContent() { try { @@ -89,6 +93,7 @@ public void noContent() if (LOG.isDebugEnabled()) LOG.debug("Exception while invoking listener {}", listener, x); } + return false; } protected boolean onContent(ByteBuffer buffer) @@ -111,6 +116,6 @@ protected void end(int request) private enum State { - LENGTH, CONTENT + LENGTH, CONTENT, EOF } } diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java index 3d9df9e3ad3a..70752c872140 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpChannelOverFCGI.java @@ -13,11 +13,7 @@ package org.eclipse.jetty.fcgi.server; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; import java.util.Locale; -import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; @@ -35,114 +31,149 @@ import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpTransport; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.StringUtil; -import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HttpChannelOverFCGI extends HttpChannel { private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverFCGI.class); + private static final HttpInput.Content EOF_CONTENT = new HttpInput.EofContent(); - private final Queue _contentQueue = new LinkedList<>(); - private final AutoLock _lock = new AutoLock(); - private HttpInput.Content _specialContent; + private final Callback asyncFillCallback = new AsyncFillCallback(); + private final ServerFCGIConnection connection; private final HttpFields.Mutable fields = HttpFields.build(); private final Dispatcher dispatcher; + private HttpInput.Content normalContent; + private HttpInput.Content specialContent; private String method; private String path; private String query; private String version; private HostPortHttpField hostPort; - public HttpChannelOverFCGI(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport) + public HttpChannelOverFCGI(ServerFCGIConnection connection, Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport) { super(connector, configuration, endPoint, transport); + this.connection = connection; this.dispatcher = new Dispatcher(connector.getServer().getThreadPool(), this); } @Override public boolean onContent(HttpInput.Content content) { - boolean b = super.onContent(content); + boolean result = super.onContent(content); - Throwable failure; - try (AutoLock l = _lock.lock()) + HttpInput.Content special = this.specialContent; + Throwable failure = special == null ? null : special.getError(); + if (failure == null) { - failure = _specialContent == null ? null : _specialContent.getError(); - if (failure == null) - _contentQueue.offer(content); + if (normalContent != null) + throw new IllegalStateException("onContent has unconsumed content"); + normalContent = content; } - if (failure != null) + else + { content.failed(failure); + } - return b; + return result; } @Override public boolean needContent() { - try (AutoLock l = _lock.lock()) + if (hasContent()) + { + if (LOG.isDebugEnabled()) + LOG.debug("needContent has immediate content {}", this); + return true; + } + + parseAndFill(); + + if (hasContent()) { - boolean hasContent = _specialContent != null || !_contentQueue.isEmpty(); if (LOG.isDebugEnabled()) - LOG.debug("needContent has content? {}", hasContent); - return hasContent; + LOG.debug("needContent has parsed content {}", this); + return true; } + + connection.getEndPoint().tryFillInterested(asyncFillCallback); + return false; + } + + private boolean hasContent() + { + return specialContent != null || normalContent != null; } @Override public HttpInput.Content produceContent() { - HttpInput.Content content; - try (AutoLock l = _lock.lock()) + if (!hasContent()) + parseAndFill(); + + if (!hasContent()) + return null; + + HttpInput.Content content = normalContent; + if (content != null) { - content = _contentQueue.poll(); - if (content == null) - content = _specialContent; + if (LOG.isDebugEnabled()) + LOG.debug("produceContent produced {} {}", content, this); + normalContent = null; + return content; } + content = specialContent; if (LOG.isDebugEnabled()) - LOG.debug("produceContent has produced {}", content); + LOG.debug("produceContent produced special {} {}", content, this); return content; } + private void parseAndFill() + { + if (LOG.isDebugEnabled()) + LOG.debug("parseAndFill {}", this); + connection.parseAndFill(); + } + @Override public boolean failAllContent(Throwable failure) { if (LOG.isDebugEnabled()) - LOG.debug("failing all content with {} {}", failure, this); - List copy; - try (AutoLock l = _lock.lock()) + LOG.debug("failing all content {}", this); + HttpInput.Content normal = normalContent; + if (normal != null) + normal.failed(failure); + HttpInput.Content special = specialContent; + if (special != null) + return special.isEof(); + while (true) { - copy = new ArrayList<>(_contentQueue); - _contentQueue.clear(); - } - copy.forEach(c -> c.failed(failure)); - boolean atEof; - try (AutoLock l = _lock.lock()) - { - atEof = _specialContent != null && _specialContent.isEof(); + HttpInput.Content content = produceContent(); + if (content == null) + return false; + special = specialContent; + if (special != null) + return special.isEof(); + content.failed(failure); } - if (LOG.isDebugEnabled()) - LOG.debug("failed all content, EOF = {}", atEof); - return atEof; } @Override public boolean failed(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("failed " + x); - - try (AutoLock l = _lock.lock()) - { - Throwable error = _specialContent == null ? null : _specialContent.getError(); + LOG.debug("failed {}", this, x); - if (error != null && error != x) - error.addSuppressed(x); - else - _specialContent = new HttpInput.ErrorContent(x); - } + HttpInput.Content special = specialContent; + Throwable error = special == null ? null : special.getError(); + if (error != null && error != x) + error.addSuppressed(x); + else + specialContent = new HttpInput.ErrorContent(x); return getRequest().getHttpInput().onContentProducible(); } @@ -152,10 +183,7 @@ protected boolean eof() { if (LOG.isDebugEnabled()) LOG.debug("received EOF"); - try (AutoLock l = _lock.lock()) - { - _specialContent = new HttpInput.EofContent(); - } + specialContent = EOF_CONTENT; return getRequest().getHttpInput().onContentProducible(); } @@ -238,20 +266,12 @@ public boolean onIdleTimeout(Throwable timeout) private boolean doOnIdleTimeout(Throwable x) { boolean neverDispatched = getState().isIdle(); - boolean waitingForContent; - HttpInput.Content specialContent; - try (AutoLock l = _lock.lock()) - { - waitingForContent = _contentQueue.isEmpty() || _contentQueue.peek().remaining() == 0; - specialContent = _specialContent; - } + HttpInput.Content normal = this.normalContent; + boolean waitingForContent = normal == null || normal.remaining() == 0; if ((waitingForContent || neverDispatched) && specialContent == null) { x.addSuppressed(new Throwable("HttpInput idle timeout")); - try (AutoLock l = _lock.lock()) - { - _specialContent = new HttpInput.ErrorContent(x); - } + specialContent = new HttpInput.ErrorContent(x); return getRequest().getHttpInput().onContentProducible(); } return false; @@ -260,13 +280,46 @@ private boolean doOnIdleTimeout(Throwable x) @Override public void recycle() { - try (AutoLock l = _lock.lock()) + super.recycle(); + HttpInput.Content normal = normalContent; + if (normal != null) + throw new AssertionError("unconsumed content: " + normal); + specialContent = null; + } + + @Override + public void onCompleted() + { + super.onCompleted(); + HttpInput input = getRequest().getHttpInput(); + boolean consumed = input.consumeAll(); + // Assume we don't arrive here from the connection's onFillable() (which already + // calls fillInterested()), because we dispatch() when all the headers are received. + // When the request/response is completed, we must arrange to call fillInterested(). + connection.onCompleted(consumed); + } + + private class AsyncFillCallback implements Callback + { + @Override + public void succeeded() { - if (!_contentQueue.isEmpty()) - throw new AssertionError("unconsumed content: " + _contentQueue); - _specialContent = null; + if (getRequest().getHttpInput().onContentProducible()) + handle(); + } + + @Override + public void failed(Throwable x) + { + if (HttpChannelOverFCGI.this.failed(x)) + handle(); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; } - super.recycle(); } private static class Dispatcher implements Runnable diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java index 9603f4367b39..d9372136e70f 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java @@ -32,6 +32,7 @@ public class HttpTransportOverFCGI implements HttpTransport { private static final Logger LOG = LoggerFactory.getLogger(HttpTransportOverFCGI.class); + private final ServerGenerator generator; private final Flusher flusher; private final int request; @@ -48,6 +49,8 @@ public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, boolean useDirectByt @Override public void send(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) { + if (LOG.isDebugEnabled()) + LOG.debug("send {} {} l={}", this, request, lastContent); boolean head = HttpMethod.HEAD.is(request.getMethod()); if (response != null) { diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index ce7d84e7c024..1069ddcc15c0 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -14,8 +14,6 @@ package org.eclipse.jetty.fcgi.server; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.fcgi.generator.Flusher; @@ -24,8 +22,9 @@ import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpInput; @@ -36,19 +35,22 @@ public class ServerFCGIConnection extends AbstractConnection { private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class); - private final ConcurrentMap channels = new ConcurrentHashMap<>(); private final Connector connector; + private final RetainableByteBufferPool networkByteBufferPool; private final boolean sendStatus200; private final Flusher flusher; private final HttpConfiguration configuration; private final ServerParser parser; private boolean useInputDirectByteBuffers; private boolean useOutputDirectByteBuffers; + private RetainableByteBuffer networkBuffer; + private HttpChannelOverFCGI channel; public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200) { super(endPoint, connector.getExecutor()); this.connector = connector; + this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool()); this.flusher = new Flusher(endPoint); this.configuration = configuration; this.sendStatus200 = sendStatus200; @@ -85,29 +87,28 @@ public void onOpen() @Override public void onFillable() { - EndPoint endPoint = getEndPoint(); - ByteBufferPool bufferPool = connector.getByteBufferPool(); - ByteBuffer buffer = bufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers()); + acquireInputBuffer(); try { while (true) { - int read = endPoint.fill(buffer); - if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read' - LOG.debug("Read {} bytes from {}", read, endPoint); + int read = fillInputBuffer(); + if (LOG.isDebugEnabled()) + LOG.debug("Read {} bytes from {} {}", read, getEndPoint(), this); if (read > 0) { - parse(buffer); + if (parse(networkBuffer.getBuffer())) + return; } else if (read == 0) { - bufferPool.release(buffer); + releaseInputBuffer(); fillInterested(); break; } else { - bufferPool.release(buffer); + releaseInputBuffer(); shutdown(); break; } @@ -117,25 +118,81 @@ else if (read == 0) { if (LOG.isDebugEnabled()) LOG.debug("Unable to fill endpoint", x); - bufferPool.release(buffer); + networkBuffer.clear(); + releaseInputBuffer(); // TODO: fail and close ? } } + /** + * This is just a "consume" method, so it must not call + * fillInterested(), but just consume what's in the network + * for the current request. + */ + void parseAndFill() + { + if (LOG.isDebugEnabled()) + LOG.debug("parseAndFill {}", this); + // This loop must run only until the request is completed. + // See also HttpConnection.parseAndFillForContent(). + while (channel != null) + { + if (parse(networkBuffer.getBuffer())) + return; + // Check if the request was completed by the parsing. + if (channel == null) + return; + if (fillInputBuffer() <= 0) + break; + } + } + + private void acquireInputBuffer() + { + if (networkBuffer == null) + networkBuffer = networkByteBufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers()); + } + + private void releaseInputBuffer() + { + boolean released = networkBuffer.release(); + if (LOG.isDebugEnabled()) + LOG.debug("releaseInputBuffer {} {}", released, this); + if (released) + networkBuffer = null; + } + + private int fillInputBuffer() + { + try + { + return getEndPoint().fill(networkBuffer.getBuffer()); + } + catch (Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not fill from {}", this, x); + return -1; + } + } + @Override protected boolean onReadTimeout(Throwable timeout) { - return channels.values().stream() - .mapToInt(channel -> channel.onIdleTimeout(timeout) ? 0 : 1) - .sum() == 0; + if (channel != null) + return channel.onIdleTimeout(timeout); + return true; } - private void parse(ByteBuffer buffer) + private boolean parse(ByteBuffer buffer) { while (buffer.hasRemaining()) { - parser.parse(buffer); + boolean result = parser.parse(buffer); + if (result) + return true; } + return false; } private void shutdown() @@ -143,17 +200,23 @@ private void shutdown() flusher.shutdown(); } + void onCompleted(boolean fillMore) + { + releaseInputBuffer(); + if (getEndPoint().isOpen() && fillMore) + fillInterested(); + } + private class ServerListener implements ServerParser.Listener { @Override public void onStart(int request, FCGI.Role role, int flags) { // TODO: handle flags - HttpChannelOverFCGI channel = new HttpChannelOverFCGI(connector, configuration, getEndPoint(), + if (channel != null) + throw new UnsupportedOperationException("FastCGI Multiplexing"); + channel = new HttpChannelOverFCGI(ServerFCGIConnection.this, connector, configuration, getEndPoint(), new HttpTransportOverFCGI(connector.getByteBufferPool(), isUseOutputDirectByteBuffers(), sendStatus200, flusher, request)); - HttpChannelOverFCGI existing = channels.putIfAbsent(request, channel); - if (existing != null) - throw new IllegalStateException(); if (LOG.isDebugEnabled()) LOG.debug("Request {} start on {}", request, channel); } @@ -161,7 +224,6 @@ public void onStart(int request, FCGI.Role role, int flags) @Override public void onHeader(int request, HttpField field) { - HttpChannelOverFCGI channel = channels.get(request); if (LOG.isDebugEnabled()) LOG.debug("Request {} header {} on {}", request, field, channel); if (channel != null) @@ -171,13 +233,14 @@ public void onHeader(int request, HttpField field) @Override public boolean onHeaders(int request) { - HttpChannelOverFCGI channel = channels.get(request); if (LOG.isDebugEnabled()) LOG.debug("Request {} headers on {}", request, channel); if (channel != null) { channel.onRequest(); channel.dispatch(); + // We have dispatched to the application, so we must stop the fill & parse loop. + return true; } return false; } @@ -185,14 +248,13 @@ public boolean onHeaders(int request) @Override public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) { - HttpChannelOverFCGI channel = channels.get(request); if (LOG.isDebugEnabled()) LOG.debug("Request {} {} content {} on {}", request, stream, buffer, channel); if (channel != null) { - ByteBuffer copy = ByteBuffer.allocate(buffer.remaining()); - copy.put(buffer).flip(); - channel.onContent(new HttpInput.Content(copy)); + channel.onContent(new FastCGIContent(buffer)); + // Signal that the content is processed asynchronously, to ensure backpressure. + return true; } return false; } @@ -200,24 +262,52 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer) @Override public void onEnd(int request) { - HttpChannelOverFCGI channel = channels.remove(request); if (LOG.isDebugEnabled()) LOG.debug("Request {} end on {}", request, channel); if (channel != null) { channel.onContentComplete(); channel.onRequestComplete(); + // Nulling out the channel signals that the + // request is complete, see also parseAndFill(). + channel = null; } } @Override public void onFailure(int request, Throwable failure) { - HttpChannelOverFCGI channel = channels.remove(request); if (LOG.isDebugEnabled()) LOG.debug("Request {} failure on {}: {}", request, channel, failure); if (channel != null) channel.onBadMessage(new BadMessageException(HttpStatus.BAD_REQUEST_400, null, failure)); + channel = null; + } + + private class FastCGIContent extends HttpInput.Content + { + public FastCGIContent(ByteBuffer content) + { + super(content); + networkBuffer.retain(); + } + + @Override + public void succeeded() + { + release(); + } + + @Override + public void failed(Throwable x) + { + release(); + } + + private void release() + { + networkBuffer.release(); + } } } } diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java index 128bcd7b07c3..2c18f80746d0 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/AbstractHttpClientServerTest.java @@ -21,6 +21,7 @@ import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.LeakTrackingByteBufferPool; import org.eclipse.jetty.io.MappedByteBufferPool; import org.eclipse.jetty.server.Handler; @@ -47,22 +48,26 @@ public abstract class AbstractHttpClientServerTest public void start(Handler handler) throws Exception { - server = new Server(); - + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); ServerFCGIConnectionFactory fcgiConnectionFactory = new ServerFCGIConnectionFactory(new HttpConfiguration()); serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); connector = new ServerConnector(server, null, null, serverBufferPool, 1, Math.max(1, ProcessorUtils.availableProcessors() / 2), fcgiConnectionFactory); -// connector.setPort(9000); - server.addConnector(connector); server.setHandler(handler); server.start(); - QueuedThreadPool executor = new QueuedThreadPool(); - executor.setName(executor.getName() + "-client"); - - HttpClientTransport transport = new HttpClientTransportOverFCGI(1, ""); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSelectors(1); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + clientConnector.setExecutor(clientThreads); + if (clientBufferPool == null) + clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); + clientConnector.setByteBufferPool(clientBufferPool); + HttpClientTransport transport = new HttpClientTransportOverFCGI(clientConnector, ""); transport.setConnectionPoolFactory(destination -> new LeakTrackingConnectionPool(destination, client.getMaxConnectionsPerDestination(), destination) { @Override @@ -72,10 +77,6 @@ protected void leaked(LeakDetector.LeakInfo leakInfo) } }); client = new HttpClient(transport); - client.setExecutor(executor); - if (clientBufferPool == null) - clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()); - client.setByteBufferPool(clientBufferPool); client.start(); } diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java index fee6a3d2617c..ebbf5e9b98cf 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.fcgi.server; -import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.net.URLEncoder; @@ -48,18 +47,15 @@ import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +// @checkstyle-disable-check : AvoidEscapedUnicodeCharactersCheck public class HttpClientTest extends AbstractHttpClientServerTest { - // @checkstyle-disable-check : AvoidEscapedUnicodeCharactersCheck - @Test public void testGETResponseWithoutContent() throws Exception { @@ -76,7 +72,7 @@ public void testGETResponseWithoutContent() throws Exception @Test public void testGETResponseWithContent() throws Exception { - final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; + byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; start(new AbstractHandler() { @Override @@ -103,7 +99,7 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testGETResponseWithBigContent() throws Exception { - final byte[] data = new byte[16 * 1024 * 1024]; + byte[] data = new byte[16 * 1024 * 1024]; new Random().nextBytes(data); start(new AbstractHandler() { @@ -132,8 +128,8 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testGETWithParametersResponseWithContent() throws Exception { - final String paramName1 = "a"; - final String paramName2 = "b"; + String paramName1 = "a"; + String paramName2 = "b"; start(new AbstractHandler() { @Override @@ -164,8 +160,8 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testGETWithParametersMultiValuedResponseWithContent() throws Exception { - final String paramName1 = "a"; - final String paramName2 = "b"; + String paramName1 = "a"; + String paramName2 = "b"; start(new AbstractHandler() { @Override @@ -202,8 +198,8 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testPOSTWithParameters() throws Exception { - final String paramName = "a"; - final String paramValue = "\u20AC"; + String paramName = "a"; + String paramValue = "\u20AC"; start(new AbstractHandler() { @Override @@ -233,8 +229,8 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testPOSTWithQueryString() throws Exception { - final String paramName = "a"; - final String paramValue = "\u20AC"; + String paramName = "a"; + String paramValue = "\u20AC"; start(new AbstractHandler() { @Override @@ -265,8 +261,8 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testPUTWithParameters() throws Exception { - final String paramName = "a"; - final String paramValue = "\u20AC"; + String paramName = "a"; + String paramValue = "\u20AC"; start(new AbstractHandler() { @Override @@ -297,9 +293,9 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testPOSTWithParametersWithContent() throws Exception { - final byte[] content = {0, 1, 2, 3}; - final String paramName = "a"; - final String paramValue = "\u20AC"; + byte[] content = {0, 1, 2, 3}; + String paramName = "a"; + String paramValue = "\u20AC"; start(new AbstractHandler() { @Override @@ -318,7 +314,7 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, for (int i = 0; i < 256; ++i) { - ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?b=1") + ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort() + "/?r=" + i) .param(paramName, paramValue) .body(new BytesRequestContent(content)) .timeout(5, TimeUnit.SECONDS) @@ -326,14 +322,14 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, assertNotNull(response); assertEquals(200, response.getStatus()); - assertArrayEquals(content, response.getContent()); + assertArrayEquals(content, response.getContent(), "content mismatch for request " + i); } } @Test public void testPOSTWithContentNotifiesRequestContentListener() throws Exception { - final byte[] content = {0, 1, 2, 3}; + byte[] content = {0, 1, 2, 3}; start(new EmptyServerHandler()); ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort()) @@ -357,7 +353,7 @@ public void testPOSTWithContentTracksProgress() throws Exception { start(new EmptyServerHandler()); - final AtomicInteger progress = new AtomicInteger(); + AtomicInteger progress = new AtomicInteger(); ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort()) .onRequestContent((request, buffer) -> { @@ -385,7 +381,7 @@ public void testGZIPContentEncoding() throws Exception // appear as "leaked", so we use a normal ByteBufferPool. clientBufferPool = new MappedByteBufferPool.Tagged(); - final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; start(new AbstractHandler() { @Override @@ -411,7 +407,7 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testConnectionIdleTimeout() throws Exception { - final long idleTimeout = 1000; + long idleTimeout = 1000; start(new AbstractHandler() { @Override @@ -431,25 +427,26 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, connector.setIdleTimeout(idleTimeout); - ExecutionException x = assertThrows(ExecutionException.class, () -> - client.newRequest("localhost", connector.getLocalPort()) - .scheme(scheme) - .idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS) - .timeout(3 * idleTimeout, TimeUnit.MILLISECONDS) - .send()); - assertThat(x.getCause(), instanceOf(EOFException.class)); + // Request does not fail because idle timeouts while dispatched are ignored. + ContentResponse response1 = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS) + .timeout(3 * idleTimeout, TimeUnit.MILLISECONDS) + .send(); + assertNotNull(response1); + assertEquals(200, response1.getStatus()); connector.setIdleTimeout(5 * idleTimeout); - // Make another request to be sure the connection is recreated - ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + // Make another request to be sure the connection works fine. + ContentResponse response2 = client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .idleTimeout(4 * idleTimeout, TimeUnit.MILLISECONDS) .timeout(3 * idleTimeout, TimeUnit.MILLISECONDS) .send(); - assertNotNull(response); - assertEquals(200, response.getStatus()); + assertNotNull(response2); + assertEquals(200, response2.getStatus()); } @Test @@ -470,7 +467,7 @@ public void testSendToIPv6Address() throws Exception @Test public void testHEADWithResponseContentLength() throws Exception { - final int length = 1024; + int length = 1024; start(new AbstractHandler() { @Override @@ -507,7 +504,7 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, @Test public void testLongPollIsAbortedWhenClientIsStopped() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); start(new AbstractHandler() { @Override @@ -519,7 +516,7 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, } }); - final CountDownLatch completeLatch = new CountDownLatch(1); + CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .send(result -> @@ -576,7 +573,7 @@ public void testBigContentDelimitedByEOFWithSlowRequest() throws Exception private void testContentDelimitedByEOFWithSlowRequest(int length) throws Exception { - final byte[] data = new byte[length]; + byte[] data = new byte[length]; new Random().nextBytes(data); start(new AbstractHandler() { @@ -620,10 +617,10 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest, } }); - final AtomicInteger contentCount = new AtomicInteger(); - final AtomicReference callbackRef = new AtomicReference<>(); - final AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); - final CountDownLatch completeLatch = new CountDownLatch(1); + AtomicInteger contentCount = new AtomicInteger(); + AtomicReference callbackRef = new AtomicReference<>(); + AtomicReference contentLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch completeLatch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) .onResponseContentAsync((response, content, callback) ->