Skip to content

Commit

Permalink
Issue #8211 - fix ISE from HttpChannelState if failure during process
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 28, 2022
1 parent d05d663 commit 5253641
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 2 deletions.
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.http.HttpFields;
Expand Down Expand Up @@ -50,6 +51,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -378,6 +380,76 @@ public boolean onIdleTimeout(Stream stream, Throwable x)
assertFalse(((HTTP2Session)session).isDisconnected());
}

@Test
public void testForgivingFailedClientEnforcingStreamIdleTimeout() throws Exception
{
final int idleTimeout = 1000;
AtomicReference<Throwable> failure = new AtomicReference<>();
AtomicBoolean failedDidNotThrow = new AtomicBoolean(false);
start(new Handler.Processor()
{
@Override
public void process(Request request, Response response, Callback callback)
{
sleep(2 * idleTimeout);
try
{
callback.succeeded();
}
catch (Throwable t)
{
failure.set(t);
callback.failed(t);
failedDidNotThrow.set(true);
}
}
});

Session session = newClientSession(new Session.Listener.Adapter());

final CountDownLatch dataLatch = new CountDownLatch(1);
final CountDownLatch timeoutLatch = new CountDownLatch(1);
MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(metaData, null, true);
session.newStream(requestFrame, new Promise.Adapter<>()
{
@Override
public void succeeded(Stream stream)
{
stream.setIdleTimeout(idleTimeout);
}
}, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
dataLatch.countDown();
}

@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
assertThat(x, Matchers.instanceOf(TimeoutException.class));
timeoutLatch.countDown();
return true;
}
});

assertTrue(timeoutLatch.await(5, TimeUnit.SECONDS));
// We must not receive any DATA frame.
assertFalse(dataLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Stream must be gone.
assertTrue(session.getStreams().isEmpty());
// Session must not be closed, nor disconnected.
assertFalse(session.isClosed());
assertFalse(((HTTP2Session)session).isDisconnected());

// Callback.succeeded() should have thrown an exception, but Callback.failed() should not have thrown.
assertNotNull(failure.get());
assertTrue(failedDidNotThrow.get());
}

@Test
public void testServerEnforcingStreamIdleTimeout() throws Exception
{
Expand Down
Expand Up @@ -663,7 +663,15 @@ public void run()
failure = x;
}
if (failure != null)
request._callback.failed(failure);
{
try (AutoLock ignored = _lock.lock())
{
if (request._httpChannel != null && !request._httpChannel._completed)
request._callback.failed(failure);
else if (LOG.isDebugEnabled())
LOG.debug("Process failed", failure);
}
}

HttpStream stream;
boolean completeStream;
Expand Down Expand Up @@ -1364,6 +1372,9 @@ public void failed(Throwable failure)
boolean completeStream;
try (AutoLock ignored = _request._lock.lock())
{
request = _request;
if (request._httpChannel != null && request._httpChannel._error != null && TypeUtil.isAssociated(request._httpChannel._error.getCause(), failure))
return;
lockedOnComplete();

httpChannelState = _request._httpChannel;
Expand All @@ -1373,7 +1384,6 @@ public void failed(Throwable failure)
// Verify whether we can write an error response.
writeErrorResponse = !httpChannelState._stream.isCommitted();
stream = httpChannelState._stream;
request = _request;

// Consume any input.
Throwable unconsumed = stream.consumeAvailable();
Expand Down

0 comments on commit 5253641

Please sign in to comment.