Skip to content

Commit

Permalink
Fixes #2796 - Max local stream count exceeded when request fails.
Browse files Browse the repository at this point in the history
Now releasing the connection only after the stream has been reset, so
we are sure that the stream has been closed and its count decremented.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 13, 2018
1 parent 5ee856c commit 35541d0
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 13 deletions.
Expand Up @@ -102,23 +102,21 @@ public void release()
}

@Override
public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
public void exchangeTerminated(HttpExchange exchange, Result result)
{
Stream stream = getStream();
boolean aborted = super.abort(exchange, requestFailure, responseFailure);
if (aborted)
super.exchangeTerminated(exchange, result);
if (result.isSucceeded())
{
release();
}
else
{
Stream stream = getStream();
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback());
else
release();
}
return aborted;
}

@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
release();
}

@Override
Expand All @@ -129,4 +127,27 @@ public String toString()
sender,
receiver);
}

private class ReleaseCallback implements Callback
{
@Override
public void succeeded()
{
release();
}

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
release();
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
}
Expand Up @@ -367,6 +367,39 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
Assert.assertTrue(failures.toString(), failures.isEmpty());
}

@Test
public void testTwoConcurrentStreamsFirstTimesOut() throws Exception
{
long timeout = 1000;
start(1, new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
if (target.endsWith("/1"))
sleep(2 * timeout);
}
});
client.setMaxConnectionsPerDestination(1);

CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.path("/1")
.timeout(timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
if (result.isFailed())
latch.countDown();
});

ContentResponse response2 = client.newRequest("localhost", connector.getLocalPort())
.path("/2")
.send();

Assert.assertEquals(HttpStatus.OK_200, response2.getStatus());
Assert.assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down

0 comments on commit 35541d0

Please sign in to comment.