Skip to content

Commit

Permalink
Address maxConcurrentStreams violation on write timeout (#3908)
Browse files Browse the repository at this point in the history
**Background**

Currently, armeria maintains a state `HttpResponseDecoder#unfinishedResponses` to check how many in-flight requests are being processed for a connection.

Armeria uses this value to check if all connections occupy too many concurrent streams, and creates a new connection if necessary.

On the other hand, netty maintains it's own state to check how many in-flight requests are being processed for a connection. (`DefaultHttp2Connection.DefaultEndpoint#numActiveStreams`)

Netty checks this value before creating a stream, and throws a `Http2Exception$StreamException` if `MAX_CONCURRENT_STREAMS` is unavailable.

**Problem Statement**

Currently, when a `WriteTimeoutException` is triggered, armeria decrements `unfinishedResponses` and removes the response. (A `WriteTimeoutException` is thrown when a request header isn't written within a predefined `writeTimeoutMillis`)
However, netty may not be aware that armeria has failed the response. Consequently, netty's `numActiveStreams` is greater than armeria's `unfinishedResponses`. This may cause a violation of `MAX_CONCURRENT_STREAMS` for additional requests on the connection.

**Motivation**
Netty always calls `Http2ResponseDecoder.onStreamClosed` before decrementing `numActiveStreams`.
If we want `numActiveStreams` to be in sync with `unfinishedResponses`, I propose that we modify the timing of decrementing `unfinishedResponses` to `Http2ResponseDecoder.onStreamClosed`.

In detail, when a `WriteTimeoutException` is scheduled

https://github.com/line/armeria/blob/117a21e17ec9e30b0c3c2d74d16fdde3cab62434/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java#L171-L173

the response is closed.

https://github.com/line/armeria/blob/117a21e17ec9e30b0c3c2d74d16fdde3cab62434/core/src/main/java/com/linecorp/armeria/client/HttpRequestSubscriber.java#L318

Consequently, after the stream processes the `close` event, `whenComplete` is triggered.

https://github.com/line/armeria/blob/117a21e17ec9e30b0c3c2d74d16fdde3cab62434/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java#L83-L90

And the response is removed (and `unfinishedResponses` is decremented)

https://github.com/line/armeria/blob/117a21e17ec9e30b0c3c2d74d16fdde3cab62434/core/src/main/java/com/linecorp/armeria/client/Http2ResponseDecoder.java#L101

However, as far as netty is concerned, the request may have been written and may still be processing.

**Misc**

Reproduced `maxConcurrentStreams` when `WriteTimeoutException` occurs at 225a684

**Modifications**

- Remove the `removeResponse` call from `Http2ResponseDecoder. onWrapperCompleted`, and rely on `onStreamClosed` to remove the response/decrement `unfinishedResponses`
- When receiving callbacks for `onHeadersRead`, `onDataRead`, `onRstStreamRead`, also check if `resWrapper` had been closed. This preserves behavior since `res` was previously removed on `WriteTimeoutException`, resulting in `res == null`.


*Update*

I realized that if we simply don't process values when headers/data/rst are received, then we might not send a `GoAway` and close the connection when `disconnectWhenFinished = true` due to df43379.

I've verified this behavior from test cases added in 8018da1

I've modified further such that:
- Only remove responses when `onStreamClosed` is called.
- Remove calls to `channel().close();` if `shouldSendGoAway()` is true for `onDataRead`, `onHeadersRead` since `onStreamClosed` will handle this instead.
- Remove `onStreamClosed` to try to close the `ResponseWrapper` only if the underlying `delegate` is open.

d1183d8

There is a slight change of behavior, where a `GoAway` may be triggered from `onRstStream` as well. Let me know if this change shouldn't be made 🙏 

Result:

- Closes #3858
  • Loading branch information
jrhee17 committed Nov 23, 2021
1 parent 43a6758 commit 8763394
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 34 deletions.
Expand Up @@ -96,9 +96,10 @@ private void onWrapperCompleted(HttpResponseWrapper resWrapper, int id, @Nullabl
resWrapper.onSubscriptionCancelled(cause);

if (cause != null) {
// We are not closing the connection but just send a RST_STREAM,
// so we have to remove the response manually.
removeResponse(id);
// Removing the response and decrementing `unfinishedResponses` isn't done immediately
// here. Instead, we rely on `Http2ResponseDecoder#onStreamClosed` to decrement
// `unfinishedResponses` after Netty decrements `numActiveStreams` in `DefaultHttp2Connection`
// so that `unfinishedResponses` is never greater than `numActiveStreams`.

// Reset the stream.
final int streamId = idToStreamId(id);
Expand Down Expand Up @@ -133,21 +134,23 @@ public void onStreamHalfClosed(Http2Stream stream) {}
public void onStreamClosed(Http2Stream stream) {
goAwayHandler.onStreamClosed(channel(), stream);

final HttpResponseWrapper res = getResponse(streamIdToId(stream.id()), true);
final HttpResponseWrapper res = removeResponse(streamIdToId(stream.id()));
if (res == null) {
return;
}

if (!goAwayHandler.receivedGoAway()) {
res.close(ClosedStreamException.get());
return;
}
if (res.isOpen()) {
if (!goAwayHandler.receivedGoAway()) {
res.close(ClosedStreamException.get());
return;
}

final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (stream.id() > lastStreamId) {
res.close(UnprocessedRequestException.of(GoAwayReceivedException.get()));
} else {
res.close(ClosedStreamException.get());
final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (stream.id() > lastStreamId) {
res.close(UnprocessedRequestException.of(GoAwayReceivedException.get()));
} else {
res.close(ClosedStreamException.get());
}
}

if (shouldSendGoAway()) {
Expand Down Expand Up @@ -184,8 +187,8 @@ public void onSettingsAckRead(ChannelHandlerContext ctx) {}
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late HEADERS frame for a closed stream: {}",
Expand All @@ -211,10 +214,6 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers

if (endOfStream) {
res.close();

if (shouldSendGoAway()) {
channel().close();
}
}
}

Expand All @@ -233,8 +232,8 @@ public int onDataRead(
keepAliveChannelRead();

final int dataLength = data.readableBytes();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late DATA frame for a closed stream: {}",
Expand Down Expand Up @@ -271,12 +270,6 @@ public int onDataRead(

if (endOfStream) {
res.close();

if (shouldSendGoAway()) {
// The connection has reached its lifespan.
// Should send a GOAWAY frame if it did not receive or send a GOAWAY frame.
channel().close();
}
}

// All bytes have been processed.
Expand All @@ -294,8 +287,8 @@ private boolean shouldSendGoAway() {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = removeResponse(streamIdToId(streamId));
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late RST_STREAM frame for a closed stream: {}",
Expand Down
Expand Up @@ -93,11 +93,6 @@ final HttpResponseWrapper getResponse(int id) {
return responses.get(id);
}

@Nullable
final HttpResponseWrapper getResponse(int id, boolean remove) {
return remove ? removeResponse(id) : getResponse(id);
}

@Nullable
final HttpResponseWrapper removeResponse(int id) {
if (closing) {
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static com.linecorp.armeria.common.HttpStatus.OK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.net.InetSocketAddress;
Expand All @@ -35,6 +36,8 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand All @@ -54,6 +57,14 @@ protected void configure(ServerBuilder sb) {
sb.idleTimeoutMillis(0);
sb.requestTimeoutMillis(0);
sb.service("/", (ctx, req) -> HttpResponse.of(OK));
sb.annotatedService("/delayed", new Object() {

@Get
public HttpResponse delayed(@Param("seconds") long seconds) {
return HttpResponse.delayed(
HttpResponse.of(200), Duration.ofSeconds(seconds));
}
});
}

@Override
Expand Down Expand Up @@ -160,4 +171,48 @@ void shouldCloseIdleConnectionByMaxConnectionAge(SessionProtocol protocol) {
await().untilAtomic(closed, Matchers.is(1));
}
}

@EnumSource(value = SessionProtocol.class, names = "PROXY", mode = Mode.EXCLUDE)
@ParameterizedTest
void shouldCloseConnectionAfterLongRequest(SessionProtocol protocol) throws Exception {
try (ClientFactory factory = ClientFactory.builder()
.connectionPoolListener(connectionPoolListener)
.idleTimeoutMillis(0)
.maxConnectionAgeMillis(MAX_CONNECTION_AGE)
.tlsNoVerify()
.build()) {
final WebClient client = WebClient.builder(server.uri(protocol))
.factory(factory)
.responseTimeoutMillis(0)
.build();

assertThat(client.get("/delayed?seconds=4").aggregate().join().status()).isEqualTo(OK);

await().untilAtomic(opened, Matchers.is(1));
await().untilAtomic(closed, Matchers.is(1));
}
}

@EnumSource(value = SessionProtocol.class, names = "PROXY", mode = Mode.EXCLUDE)
@ParameterizedTest
void shouldCloseConnectionAfterLongRequestTimeout(SessionProtocol protocol) throws Exception {
try (ClientFactory factory = ClientFactory.builder()
.connectionPoolListener(connectionPoolListener)
.idleTimeoutMillis(0)
.maxConnectionAgeMillis(MAX_CONNECTION_AGE)
.tlsNoVerify()
.build()) {
final long responseTimeoutMillis = MAX_CONNECTION_AGE + 1000;
final WebClient client = WebClient.builder(server.uri(protocol))
.factory(factory)
.responseTimeoutMillis(responseTimeoutMillis)
.build();

assertThatThrownBy(() -> client.get("/delayed?seconds=10").aggregate().join().status())
.hasRootCauseInstanceOf(ResponseTimeoutException.class);

await().untilAtomic(opened, Matchers.is(1));
await().untilAtomic(closed, Matchers.is(1));
}
}
}

0 comments on commit 8763394

Please sign in to comment.