Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address maxConcurrentStreams violation on write timeout #3908

Merged
merged 10 commits into from
Nov 23, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ private void onWrapperCompleted(HttpResponseWrapper resWrapper, int id, @Nullabl
resWrapper.onSubscriptionCancelled(cause);

if (cause != null) {
// We are not closing the connection but just send a RST_STREAM,
// so we have to remove the response manually.
removeResponse(id);
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
// Removing the response and decrementing {@code unfinishedResponses} isn't done immediately
// here. Instead, we rely on {@code Http2ResponseDecoder#onStreamClosed} to decrement
// `unfinishedResponses` to match the timing where netty decrements {@code numActiveStreams}.
Copy link
Member

@minwoox minwoox Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

Suggested change
// `unfinishedResponses` to match the timing where netty decrements {@code numActiveStreams}.
// `unfinishedResponses` after Netty decrements `numActiveStreams` in `DefaultHttp2Connection` so that `unfinishedResponses` is never greater than `numActiveStreams`.

(We don't have to use {@code ...} in the comment. 馃槈 )


// Reset the stream.
final int streamId = idToStreamId(id);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers
boolean endOfStream) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late HEADERS frame for a closed stream: {}",
Expand Down Expand Up @@ -234,7 +234,7 @@ public int onDataRead(

final int dataLength = data.readableBytes();
final HttpResponseWrapper res = getResponse(streamIdToId(streamId), endOfStream);
if (res == null) {
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late DATA frame for a closed stream: {}",
Expand Down Expand Up @@ -295,7 +295,7 @@ private boolean shouldSendGoAway() {
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
keepAliveChannelRead();
final HttpResponseWrapper res = removeResponse(streamIdToId(streamId));
if (res == null) {
if (res == null || !res.isOpen()) {
if (conn.streamMayHaveExisted(streamId)) {
if (logger.isDebugEnabled()) {
logger.debug("{} Received a late RST_STREAM frame for a closed stream: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void clientTimeout() throws InterruptedException {
assertThat(loggingEventCaptor.getAllValues()).noneMatch(event -> {
return event.getLevel() == Level.WARN &&
event.getThrowableProxy() != null &&
event.getThrowableProxy().getMessage() != null &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert?

event.getThrowableProxy().getMessage().contains("call already closed");
});
}
Expand Down