Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address maxConcurrentStreams violation on write timeout #3908

Merged
merged 10 commits into from
Nov 23, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ private void onWrapperCompleted(HttpResponseWrapper resWrapper, int id, @Nullabl
resWrapper.onSubscriptionCancelled(cause);

if (cause != null) {
// We are not closing the connection but just send a RST_STREAM,
// so we have to remove the response manually.
removeResponse(id);
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
// Removing the response and decrementing `unfinishedResponses` isn't done immediately
// here. Instead, we rely on `Http2ResponseDecoder#onStreamClosed` to decrement
// `unfinishedResponses` after Netty decrements `numActiveStreams` in `DefaultHttp2Connection`
// so that `unfinishedResponses` is never greater than `numActiveStreams`.

// Reset the stream.
final int streamId = idToStreamId(id);
Expand Down Expand Up @@ -133,21 +134,23 @@ public void onStreamHalfClosed(Http2Stream stream) {}
public void onStreamClosed(Http2Stream stream) {
goAwayHandler.onStreamClosed(channel(), stream);

final HttpResponseWrapper res = getResponse(streamIdToId(stream.id()), true);
final HttpResponseWrapper res = removeResponse(streamIdToId(stream.id()));
if (res == null) {
return;
}

if (!goAwayHandler.receivedGoAway()) {
res.close(ClosedStreamException.get());
return;
}
if (res.isOpen()) {
if (!goAwayHandler.receivedGoAway()) {
res.close(ClosedStreamException.get());
return;
}

final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (stream.id() > lastStreamId) {
res.close(UnprocessedRequestException.of(GoAwayReceivedException.get()));
} else {
res.close(ClosedStreamException.get());
final int lastStreamId = conn.local().lastStreamKnownByPeer();
if (stream.id() > lastStreamId) {
res.close(UnprocessedRequestException.of(GoAwayReceivedException.get()));
} else {
res.close(ClosedStreamException.get());
}
}

if (shouldSendGoAway()) {
Expand Down Expand Up @@ -184,8 +187,8 @@ public void onSettingsAckRead(ChannelHandlerContext ctx) {}
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late HEADERS frame for a closed stream: {}",
Expand All @@ -211,10 +214,6 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers

if (endOfStream) {
res.close();

if (shouldSendGoAway()) {
channel().close();
}
}
}

Expand All @@ -233,8 +232,8 @@ public int onDataRead(
keepAliveChannelRead();

final int dataLength = data.readableBytes();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late DATA frame for a closed stream: {}",
Expand Down Expand Up @@ -271,12 +270,6 @@ public int onDataRead(

if (endOfStream) {
res.close();

if (shouldSendGoAway()) {
// The connection has reached its lifespan.
// Should send a GOAWAY frame if it did not receive or send a GOAWAY frame.
channel().close();
}
}

// All bytes have been processed.
Expand All @@ -294,8 +287,8 @@ private boolean shouldSendGoAway() {
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = removeResponse(streamIdToId(streamId));
if (res == null) {
final HttpResponseWrapper res = getResponse(streamIdToId(streamId));
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late RST_STREAM frame for a closed stream: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ final HttpResponseWrapper getResponse(int id) {
return responses.get(id);
}

@Nullable
final HttpResponseWrapper getResponse(int id, boolean remove) {
return remove ? removeResponse(id) : getResponse(id);
}

@Nullable
final HttpResponseWrapper removeResponse(int id) {
if (closing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.linecorp.armeria.common.HttpStatus.OK;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.net.InetSocketAddress;
Expand All @@ -35,6 +36,8 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.metric.MoreMeters;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
Expand All @@ -54,6 +57,14 @@ protected void configure(ServerBuilder sb) {
sb.idleTimeoutMillis(0);
sb.requestTimeoutMillis(0);
sb.service("/", (ctx, req) -> HttpResponse.of(OK));
sb.annotatedService("/delayed", new Object() {

@Get
public HttpResponse delayed(@Param("seconds") long seconds) {
return HttpResponse.delayed(
HttpResponse.of(200), Duration.ofSeconds(seconds));
}
});
}

@Override
Expand Down Expand Up @@ -160,4 +171,48 @@ void shouldCloseIdleConnectionByMaxConnectionAge(SessionProtocol protocol) {
await().untilAtomic(closed, Matchers.is(1));
}
}

@EnumSource(value = SessionProtocol.class, names = "PROXY", mode = Mode.EXCLUDE)
@ParameterizedTest
void shouldCloseConnectionAfterLongRequest(SessionProtocol protocol) throws Exception {
try (ClientFactory factory = ClientFactory.builder()
.connectionPoolListener(connectionPoolListener)
.idleTimeoutMillis(0)
.maxConnectionAgeMillis(MAX_CONNECTION_AGE)
.tlsNoVerify()
.build()) {
final WebClient client = WebClient.builder(server.uri(protocol))
.factory(factory)
.responseTimeoutMillis(0)
.build();

assertThat(client.get("/delayed?seconds=4").aggregate().join().status()).isEqualTo(OK);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The request doesn't seem to be closed unlike what the method name says?

I maybe miss something, but if the MAX_CONNECTION_AGE is two seconds and the response is sent after 4 seconds, shouldn't the connection should be closed forcefully? 馃

Copy link
Contributor Author

@jrhee17 jrhee17 Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the late check

Maybe I'm misunderstanding the specification, but I thought that even if MAX_CONNECTION_AGE is reached, the client waits for requests to finish (so it doesn't forcefully fail long-running requests). I've updated the test name to better reflect this 馃槄

I've also organized how I think MAX_CONNECTION_AGE works

  1. A timer is scheduled for MAX_CONNECTION_AGE, but is only respected if there are no running requests

    if (!isServer && !hasRequestsInProgress(ctx)) {
    logger.debug("{} Closing a {} connection exceeding the max age: {}ns",
    ctx.channel(), name, maxConnectionAgeNanos);
    ctx.channel().close();
    }

  2. Instead, every time a request may have finished, we check if MAX_CONNECTION_AGE has passed, and there are no running requests.

if (shouldSendGoAway()) {
channel().close();
}

if (shouldSendGoAway()) {
channel().close();
}

if (shouldSendGoAway()) {
// The connection has reached its lifespan.
// Should send a GOAWAY frame if it did not receive or send a GOAWAY frame.
channel().close();
}

  1. At the same time, we don't allow future requests for connections where MAX_CONNECTION_AGE has passed.
    return active && !responseDecoder.needsToDisconnectWhenFinished();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the test name to better reflect this 馃槄

Thanks for that. 馃槃

but I thought that even if MAX_CONNECTION_AGE is reached, the client waits for requests to finish (so it doesn't forcefully fail long-running requests)

Exactly. My memory was wrong. Thanks for checking it. 馃槃


await().untilAtomic(opened, Matchers.is(1));
await().untilAtomic(closed, Matchers.is(1));
}
}

@EnumSource(value = SessionProtocol.class, names = "PROXY", mode = Mode.EXCLUDE)
@ParameterizedTest
void shouldCloseConnectionAfterLongRequestTimeout(SessionProtocol protocol) throws Exception {
try (ClientFactory factory = ClientFactory.builder()
.connectionPoolListener(connectionPoolListener)
.idleTimeoutMillis(0)
.maxConnectionAgeMillis(MAX_CONNECTION_AGE)
.tlsNoVerify()
.build()) {
final long responseTimeoutMillis = MAX_CONNECTION_AGE + 1000;
final WebClient client = WebClient.builder(server.uri(protocol))
.factory(factory)
.responseTimeoutMillis(responseTimeoutMillis)
.build();

assertThatThrownBy(() -> client.get("/delayed?seconds=10").aggregate().join().status())
.hasRootCauseInstanceOf(ResponseTimeoutException.class);

await().untilAtomic(opened, Matchers.is(1));
await().untilAtomic(closed, Matchers.is(1));
}
}
}