Skip to content

Commit

Permalink
Fixes #8405 - onAllDataRead() is called twice under h2 if the stream … (
Browse files Browse the repository at this point in the history
#10174)

* Fixes #8405 - onAllDataRead() is called twice under h2 if the stream times out

Per Servlet semantic, HTTP/2 stream timeout should be ignored.

The code was trying to fail the read via `_contentDemander.onTimeout()`, but
then it was still calling `onContentProducible()`, which was returning `true`
because the state of the read was IDLE (all the request content was read) and
the request was suspended.

Now the code checks if the read was really failed; if it is not, then
`onContentProducible()` is not called and so the idle timeout is ignored.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jul 31, 2023
1 parent e268917 commit 87c24e7
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 41 deletions.
Expand Up @@ -127,7 +127,7 @@ public Runnable onRequest(HeadersFrame frame)

boolean connect = request instanceof MetaData.ConnectRequest;
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue && !connect;
!endStream && !_expect100Continue && !connect;

// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
Expand All @@ -146,10 +146,10 @@ public Runnable onRequest(HeadersFrame frame)
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}, delayed={}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
_delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), fields);
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
_delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), fields);
}

return _delayedUntilContent ? null : this;
Expand Down Expand Up @@ -179,9 +179,9 @@ public Runnable onPushRequest(MetaData.Request request)
{
Stream stream = getStream();
LOG.debug("HTTP2 PUSH Request #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), request.getFields());
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), request.getFields());
}

return this;
Expand Down Expand Up @@ -222,8 +222,8 @@ protected void commit(MetaData.Response info)
{
Stream stream = getStream();
LOG.debug("HTTP2 Commit Response #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
}
}

Expand Down Expand Up @@ -276,13 +276,13 @@ public InvocationType getInvocationType()
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
woken,
needed,
handle);
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
woken,
needed,
handle);
}

boolean wasDelayed = _delayedUntilContent;
Expand Down Expand Up @@ -622,8 +622,8 @@ public Runnable onTrailer(HeadersFrame frame)
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}, trailers:{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), trailers);
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), trailers);
}

// This will generate EOF -> need to call onContentProducible.
Expand All @@ -645,7 +645,7 @@ public boolean isIdle()
@Override
public boolean onTimeout(Throwable failure, Consumer<Runnable> consumer)
{
final boolean delayed = _delayedUntilContent;
boolean delayed = _delayedUntilContent;
_delayedUntilContent = false;

boolean reset = isIdle();
Expand All @@ -655,10 +655,9 @@ public boolean onTimeout(Throwable failure, Consumer<Runnable> consumer)
getHttpTransport().onStreamTimeout(failure);

failure.addSuppressed(new Throwable("HttpInput idle timeout"));
_contentDemander.onTimeout(failure);
boolean needed = getRequest().getHttpInput().onContentProducible();

if (needed || delayed)
boolean readFailed = _contentDemander.onTimeout(failure);
boolean handle = readFailed && getRequest().getHttpInput().onContentProducible();
if (handle || delayed)
{
consumer.accept(this::handleWithContext);
reset = false;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -96,7 +97,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -1321,12 +1324,12 @@ public void onError(Throwable x)
CountDownLatch clientLatch = new CountDownLatch(1);

String expected =
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";

scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
Expand Down Expand Up @@ -1509,13 +1512,13 @@ public void onError(Throwable x)
CountDownLatch clientLatch = new CountDownLatch(1);

String expected =
"0S" +
"1S" +
"2S" +
"3S" +
"4S" +
"5S" +
"6S";
"0S" +
"1S" +
"2S" +
"3S" +
"4S" +
"5S" +
"6S";

scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
Expand Down Expand Up @@ -1629,10 +1632,10 @@ public void onError(Throwable x)
CountDownLatch clientLatch = new CountDownLatch(1);

String expected =
"0S" +
"2S" +
"4S" +
"6S";
"0S" +
"2S" +
"4S" +
"6S";

scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
Expand Down Expand Up @@ -1739,6 +1742,69 @@ public int read(byte[] b, int off, int len)
assertThat(failures, empty());
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testOnAllDataReadCalledOnceThenIdleTimeout(Transport transport) throws Exception
{
init(transport);
AtomicInteger allDataReadCount = new AtomicInteger();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse resp) throws IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);

ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
}

@Override
public void onAllDataRead()
{
allDataReadCount.incrementAndGet();
}

@Override
public void onError(Throwable x)
{
// There should be no errors because request body has
// been successfully read and idle timeouts are ignored.
errorRef.set(x);
}
});

// Never reply to the request, let it idle timeout.
// The Servlet semantic is that the idle timeout will
// be ignored so the client will timeout the request.
}
});
long idleTimeout = 1000;
scenario.setConnectionIdleTimeout(2 * idleTimeout);
scenario.setRequestIdleTimeout(idleTimeout);

assertThrows(TimeoutException.class, () -> scenario.client.newRequest(scenario.newURI())
.path(scenario.servletPath)
.timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
.send()
);

assertNull(errorRef.get());
assertEquals(1, allDataReadCount.get());
}

private static class Listener implements ReadListener, WriteListener
{
private final Executor executor = Executors.newFixedThreadPool(32);
Expand Down

0 comments on commit 87c24e7

Please sign in to comment.