Skip to content

Commit

Permalink
Issue #2796 - Max local stream count exceeded when request fails.
Browse files Browse the repository at this point in the history
Bound release of the channel to stream close event.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Oct 11, 2018
1 parent cec84cf commit ec2b5b1
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 21 deletions.
Expand Up @@ -763,7 +763,7 @@ protected IStream createLocalStream(int streamId)
int localCount = localStreamCount.get();
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded");
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump());
if (localStreamCount.compareAndSet(localCount, localCount + 1))
break;
}
Expand Down
Expand Up @@ -510,6 +510,13 @@ public void close()
}
}

@Override
public void onClose()
{
super.onClose();
notifyClosed(this);
}

private void updateStreamCount(int deltaStream, int deltaClosing)
{
((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
Expand Down Expand Up @@ -615,6 +622,21 @@ private void notifyFailure(Stream stream, FailureFrame frame, Callback callback)
}
}

private void notifyClosed(Stream stream)
{
Listener listener = this.listener;
if (listener == null)
return;
try
{
listener.onClosed(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}

@Override
public String dump()
{
Expand Down
Expand Up @@ -234,6 +234,15 @@ public default void onFailure(Stream stream, int error, String reason, Callback
callback.succeeded();
}

/**
* <p>Callback method invoked after the stream has been closed.</p>
*
* @param stream the stream
*/
public default void onClosed(Stream stream)
{
}

/**
* <p>Empty implementation of {@link Listener}</p>
*/
Expand Down
Expand Up @@ -101,6 +101,11 @@ public void release()
connection.release(this);
}

void onStreamClosed(Stream stream)
{
connection.onStreamClosed(stream, this);
}

@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
Expand Down
Expand Up @@ -36,11 +36,16 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
private final Queue<HttpChannelOverHTTP2> idleChannels = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -87,23 +92,31 @@ protected HttpChannelOverHTTP2 newHttpChannel()

protected void release(HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("Released {}", channel);
// Only non-push channels are released.
if (activeChannels.remove(channel))
{
channel.setStream(null);
// Recycle only non-failed channels.
if (channel.isFailed())
channel.destroy();
else
idleChannels.offer(channel);
getHttpDestination().release(this);
}
else
{
channel.destroy();
}
}

void onStreamClosed(Stream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
getHttpDestination().release(this);
}

@Override
public boolean onIdleTimeout(long idleTimeout)
{
Expand Down
Expand Up @@ -184,6 +184,12 @@ public void onFailure(Stream stream, int error, String reason, Callback callback
callback.succeeded();
}

@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed(stream);
}

private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(new DataInfo(exchange, frame, callback));
Expand Down
Expand Up @@ -18,10 +18,6 @@

package org.eclipse.jetty.http.client;

import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,11 +26,11 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand Down Expand Up @@ -68,6 +64,9 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class HttpClientLoadTest extends AbstractTest<HttpClientLoadTest.LoadTransportScenario>
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
Expand Down Expand Up @@ -186,7 +185,7 @@ private void test(final CountDownLatch latch, final List<String> failures)
// Choose a random method
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;

boolean ssl = scenario.isTransportSecure();
boolean ssl = scenario.transport.isTlsBased();

// Choose randomly whether to close the connection on the client or on the server
boolean clientClose = false;
Expand All @@ -196,13 +195,17 @@ private void test(final CountDownLatch latch, final List<String> failures)
if (!ssl && random.nextInt(100) < 5)
serverClose = true;

int maxContentLength = 64 * 1024;
long clientTimeout = 0;
// if (!ssl && random.nextInt(100) < 5)
// clientTimeout = random.nextInt(500) + 500;

int maxContentLength = 1024 * 1024;
int contentLength = random.nextInt(maxContentLength) + 1;

test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
test(scenario.getScheme(), host, method.asString(), clientClose, serverClose, clientTimeout, contentLength, true, latch, failures);
}

private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, long clientTimeout, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures)
{
long requestId = requestCount.incrementAndGet();
Request request = scenario.client.newRequest(host, scenario.getNetworkConnectorLocalPortInt().orElse(0))
Expand All @@ -215,6 +218,12 @@ private void test(String scheme, String host, String method, boolean clientClose
else if (serverClose)
request.header("X-Close", "true");

if (clientTimeout > 0)
{
request.header("X-Timeout", String.valueOf(clientTimeout));
request.idleTimeout(clientTimeout, TimeUnit.MILLISECONDS);
}

switch (method)
{
case "GET":
Expand Down Expand Up @@ -254,12 +263,18 @@ public void onComplete(Result result)
{
if (result.isFailed())
{
result.getFailure().printStackTrace();
failures.add("Result failed " + result);
Throwable failure = result.getFailure();
if (!(clientTimeout > 0 && failure instanceof TimeoutException))
{
failure.printStackTrace();
failures.add("Result failed " + result);
}
}
else
{
if (checkContentLength && contentLength.get() != 0)
failures.add("Content length mismatch " + contentLength);
}

if (checkContentLength && contentLength.get() != 0)
failures.add("Content length mismatch " + contentLength);

requestLatch.countDown();
latch.countDown();
Expand Down Expand Up @@ -288,8 +303,14 @@ private boolean await(CountDownLatch latch, long time, TimeUnit unit)
private class LoadHandler extends AbstractHandler
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);

String timeout = request.getHeader("X-Timeout");
if (timeout != null)
sleep(2 * Integer.parseInt(timeout));

String method = request.getMethod().toUpperCase(Locale.ENGLISH);
switch (method)
{
Expand All @@ -313,8 +334,17 @@ public void handle(String target, org.eclipse.jetty.server.Request baseRequest,

if (Boolean.parseBoolean(request.getHeader("X-Close")))
response.setHeader("Connection", "close");
}

baseRequest.setHandled(true);
private void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException ignored)
{
}
}
}

Expand All @@ -329,8 +359,9 @@ public LoadTransportScenario(Transport transport, AtomicLong connectionLeaks) th
}

@Override
public Connector newServerConnector( Server server) throws Exception {
if (transport == UNIX_SOCKET)
public Connector newServerConnector( Server server)
{
if (transport == Transport.UNIX_SOCKET)
{
UnixSocketConnector
unixSocketConnector = new UnixSocketConnector( server, provideServerConnectionFactory( transport ));
Expand Down

0 comments on commit ec2b5b1

Please sign in to comment.