diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index e2af34a6208c..4b635cfba42c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -115,8 +115,8 @@ public Mono connect(HttpMethod method, URI uri, .next() .doOnCancel(() -> { ReactorClientHttpResponse response = responseRef.get(); - if (response != null && response.bodyNotSubscribed()) { - response.getConnection().dispose(); + if (response != null) { + response.releaseAfterCancel(method); } }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 7872b5020ac9..678a68f72f66 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -21,6 +21,8 @@ import java.util.function.BiFunction; import io.netty.buffer.ByteBufAllocator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import reactor.netty.Connection; import reactor.netty.NettyInbound; @@ -29,10 +31,10 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.lang.Nullable; -import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -46,6 +48,8 @@ */ class ReactorClientHttpResponse implements ClientHttpResponse { + private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class); + private final NettyDataBufferFactory bufferFactory; private final HttpClientResponse response; @@ -53,9 +57,9 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; @Nullable - private final Connection connection; + private final String logPrefix; - // 0 - not subscribed, 1 - subscribed, 2 - cancelled + // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe) private final AtomicInteger state = new AtomicInteger(0); @@ -68,7 +72,7 @@ public ReactorClientHttpResponse(HttpClientResponse response, Connection connect this.response = response; this.inbound = connection.inbound(); this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); - this.connection = connection; + this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : ""); } /** @@ -80,7 +84,7 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou this.response = response; this.inbound = inbound; this.bufferFactory = new NettyDataBufferFactory(alloc); - this.connection = null; + this.logPrefix = ""; } @@ -88,14 +92,20 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou public Flux getBody() { return this.inbound.receive() .doOnSubscribe(s -> { - if (!this.state.compareAndSet(0, 1)) { - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to reject once in cancelled state. - if (this.state.get() == 2) { - throw new IllegalStateException("The client response body can only be consumed once."); - } + if (this.state.compareAndSet(0, 1)) { + return; + } + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to reject once in cancelled state. + if (this.state.get() == 2) { + throw new IllegalStateException( + "The client response body can only be consumed once."); + } + else if (this.state.get() == 3) { + throw new IllegalStateException( + "The client response body has been released already due to cancellation."); } }) .doOnCancel(() -> this.state.compareAndSet(1, 2)) @@ -127,6 +137,7 @@ public MultiValueMap getCookies() { MultiValueMap result = new LinkedMultiValueMap<>(); this.response.cookies().values().stream().flatMap(Collection::stream) .forEach(c -> + result.add(c.name(), ResponseCookie.fromClientResponse(c.name(), c.value()) .domain(c.domain()) .path(c.path()) @@ -138,18 +149,25 @@ public MultiValueMap getCookies() { } /** - * For use by {@link ReactorClientHttpConnector}. + * Called by {@link ReactorClientHttpConnector} when a cancellation is detected + * but the content has not been subscribed to. If the subscription never + * materializes then the content will remain not drained. Or it could still + * materialize if the cancellation happened very early, or the response + * reading was delayed for some reason. */ - boolean bodyNotSubscribed() { - return this.state.get() == 0; + void releaseAfterCancel(HttpMethod method) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) { + if (logger.isDebugEnabled()) { + logger.debug(this.logPrefix + "Releasing body, not yet subscribed."); + } + this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {}); + } } - /** - * For use by {@link ReactorClientHttpConnector}. - */ - Connection getConnection() { - Assert.notNull(this.connection, "Constructor with connection wasn't used"); - return this.connection; + private boolean mayHaveBody(HttpMethod method) { + int code = this.getRawStatusCode(); + return !((code >= 100 && code < 200) || code == 204 || code == 205 || + method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0); } @Override