Skip to content

Commit

Permalink
Fixes #6693 - FastCGI review (#6694)
Browse files Browse the repository at this point in the history
* Fixes #6693 - FastCGI review

- Removed code supporting multiplex in the client.
- Removed code supporting multiplex in the server.
- Reworked the server-side processing of a request, now more similar to HTTP/1.1.
- Improved javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 7, 2021
1 parent 2231e64 commit 0bd15e0
Show file tree
Hide file tree
Showing 14 changed files with 431 additions and 254 deletions.
Expand Up @@ -16,12 +16,9 @@
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -59,19 +56,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);

private final RetainableByteBufferPool networkByteBufferPool;
private final AutoLock lock = new AutoLock();
private final LinkedList<Integer> requests = new LinkedList<>();
private final Map<Integer, HttpChannelOverFCGI> activeChannels = new ConcurrentHashMap<>();
private final Queue<HttpChannelOverFCGI> idleChannels = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final Promise<Connection> promise;
private final Flusher flusher;
private final Delegate delegate;
private final ClientParser parser;
private HttpChannelOverFCGI channel;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private final RetainableByteBufferPool retainableByteBufferPool;

public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
Expand All @@ -82,9 +78,8 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);

HttpClient client = destination.getHttpClient();
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -139,7 +134,7 @@ private void reacquireNetworkBuffer()
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = destination.getHttpClient();
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
return networkByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}

private void releaseNetworkBuffer()
Expand Down Expand Up @@ -205,7 +200,8 @@ private void shutdown()
{
// Close explicitly only if we are idle, since the request may still
// be in progress, otherwise close only if we can fail the responses.
if (activeChannels.isEmpty())
HttpChannelOverFCGI channel = this.channel;
if (channel == null || channel.getRequest() == 0)
close();
else
failAndClose(new EOFException(String.valueOf(getEndPoint())));
Expand All @@ -223,20 +219,25 @@ public boolean onIdleExpired()

protected void release(HttpChannelOverFCGI channel)
{
if (activeChannels.remove(channel.getRequest()) == null)
{
channel.destroy();
}
else
HttpChannelOverFCGI existing = this.channel;
if (existing == channel)
{
channel.setRequest(0);
// Recycle only non-failed channels.
if (channel.isFailed())
{
channel.destroy();
else
idleChannels.offer(channel);
this.channel = null;
}
destination.release(this);
}
else
{
if (existing == null)
channel.destroy();
else
throw new UnsupportedOperationException("FastCGI Multiplex");
}
}

@Override
Expand Down Expand Up @@ -290,34 +291,27 @@ protected boolean closeByHTTP(HttpFields fields)

protected void abort(Throwable failure)
{
for (HttpChannelOverFCGI channel : activeChannels.values())
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
exchange.getRequest().abort(failure);
channel.destroy();
}
activeChannels.clear();

HttpChannel channel = idleChannels.poll();
while (channel != null)
{
channel.destroy();
channel = idleChannels.poll();
this.channel = null;
}
}

private void failAndClose(Throwable failure)
{
boolean result = false;
for (HttpChannelOverFCGI channel : activeChannels.values())
HttpChannelOverFCGI channel = this.channel;
if (channel != null)
{
result |= channel.responseFailure(failure);
boolean result = channel.responseFailure(failure);
channel.destroy();
if (result)
close(failure);
}

if (result)
close(failure);
}

private int acquireRequest()
Expand All @@ -341,7 +335,6 @@ private void releaseRequest(int request)

protected HttpChannelOverFCGI acquireHttpChannel(int id, Request request)
{
HttpChannelOverFCGI channel = idleChannels.poll();
if (channel == null)
channel = newHttpChannel(request);
channel.setRequest(id);
Expand Down Expand Up @@ -373,7 +366,8 @@ private Delegate(HttpDestination destination)
@Override
protected Iterator<HttpChannel> getHttpChannels()
{
return new IteratorWrapper<>(activeChannels.values().iterator());
HttpChannel channel = HttpConnectionOverFCGI.this.channel;
return channel == null ? Collections.emptyIterator() : Collections.singleton(channel).iterator();
}

@Override
Expand All @@ -382,10 +376,8 @@ public SendFailure send(HttpExchange exchange)
HttpRequest request = exchange.getRequest();
normalizeRequest(request);

// FCGI may be multiplexed, so one channel for each exchange.
int id = acquireRequest();
HttpChannelOverFCGI channel = acquireHttpChannel(id, request);
activeChannels.put(id, channel);

return send(channel, exchange);
}
Expand Down Expand Up @@ -420,7 +412,7 @@ private class ResponseListener implements ClientParser.Listener
@Override
public void onBegin(int request, int code, String reason)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseBegin(code, reason);
else
Expand All @@ -430,7 +422,7 @@ public void onBegin(int request, int code, String reason)
@Override
public void onHeader(int request, HttpField field)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
channel.responseHeader(field);
else
Expand All @@ -440,7 +432,7 @@ public void onHeader(int request, HttpField field)
@Override
public boolean onHeaders(int request)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
return !channel.responseHeaders();
noChannel(request);
Expand All @@ -454,7 +446,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
{
case STD_OUT:
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
networkBuffer.retain();
Expand Down Expand Up @@ -482,7 +474,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
@Override
public void onEnd(int request)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
if (channel.responseSuccess())
Expand All @@ -497,7 +489,7 @@ public void onEnd(int request)
@Override
public void onFailure(int request, Throwable failure)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
if (channel.responseFailure(failure))
Expand All @@ -515,32 +507,4 @@ private void noChannel(int request)
LOG.debug("Channel not found for request {}", request);
}
}

private static final class IteratorWrapper<T> implements Iterator<T>
{
private final Iterator<? extends T> iterator;

private IteratorWrapper(Iterator<? extends T> iterator)
{
this.iterator = iterator;
}

@Override
public boolean hasNext()
{
return iterator.hasNext();
}

@Override
public T next()
{
return iterator.next();
}

@Override
public void remove()
{
iterator.remove();
}
}
}
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.jetty.fcgi.FCGI;

/**
* <p>Parser for the BEGIN_REQUEST frame body.</p>
* <p>Parser for the BEGIN_REQUEST frame content.</p>
* <pre>
* struct begin_request_body {
* ushort role;
Expand Down
Expand Up @@ -15,6 +15,23 @@

import java.nio.ByteBuffer;

/**
* <p>Parser for FastCGI frame content.</p>
* <p>Depending on the frame type specified in the FastCGI frame header,
* the FastCGI frame content has different formats and it is parsed by
* different implementation of this abstract class.</p>
* <p>There are these frame content types:</p>
* <ul>
* <li>{@code BEGIN_REQUEST}, to signal the begin of the request</li>
* <li>{@code PARAMS}, key/value pairs</li>
* <li>{@code STDIN}, the request body, handled as a stream</li>
* <li>{@code STDOUT}, the response body, handled as a stream</li>
* <li>{@code STDERR}, the response error, handled as a stream</li>
* <li>{@code END_REQUEST}, to signal the end of the response</li>
* </ul>
*
* @see Parser
*/
public abstract class ContentParser
{
private final HeaderParser headerParser;
Expand All @@ -24,9 +41,20 @@ protected ContentParser(HeaderParser headerParser)
this.headerParser = headerParser;
}

/**
* <p>Parses the bytes in the given {@code buffer} as FastCGI frame content bytes.</p>
*
* @param buffer the bytes to parse
* @return the result of the parsing
*/
public abstract Result parse(ByteBuffer buffer);

public void noContent()
/**
* <p>Invoked by the {@link Parser} when the frame content length is zero.</p>
*
* @return whether the parsing should stop
*/
public boolean noContent()
{
throw new IllegalStateException();
}
Expand All @@ -41,8 +69,25 @@ protected int getContentLength()
return headerParser.getContentLength();
}

/**
* <p>The result of the frame content parsing.</p>
*/
public enum Result
{
PENDING, ASYNC, COMPLETE
/**
* <p>Not enough bytes have been provided to the parser
* with a call to {@link ContentParser#parse(ByteBuffer)}.</p>
*/
PENDING,
/**
* <p>The frame content has been parsed, but the application
* signalled that it wants to process the content asynchronously.</p>
*/
ASYNC,
/**
* <p>The frame content parsing is complete,
* and the parser can now parse the padding bytes.</p>
*/
COMPLETE
}
}
Expand Up @@ -16,7 +16,7 @@
import java.nio.ByteBuffer;

/**
* <p>Parser for the END_REQUEST frame body.</p>
* <p>Parser for the END_REQUEST frame content.</p>
* <pre>
* struct end_request_body {
* uint applicationStatus;
Expand Down
Expand Up @@ -20,7 +20,7 @@
import org.slf4j.LoggerFactory;

/**
* <p>Parser for FastCGI frame headers.</p>
* <p>Parser for the FastCGI frame header.</p>
* <pre>
* struct frame_header {
* ubyte version;
Expand All @@ -47,7 +47,7 @@ public class HeaderParser
private int padding;

/**
* Parses the bytes in the given {@code buffer} as FastCGI header bytes
* Parses the bytes in the given {@code buffer} as FastCGI frame header bytes
*
* @param buffer the bytes to parse
* @return whether there were enough bytes for a FastCGI header
Expand Down

0 comments on commit 0bd15e0

Please sign in to comment.