Skip to content

Commit

Permalink
Refine solution for 21de09
Browse files Browse the repository at this point in the history
The following was reported after the change and is related to it:
reactor/reactor-netty#1170. An HTTP HEAD with the body
not consumed. Connection is disposed and closed leading to subsequent request to
fail. Adding toBodilessEntity() helps.

This change does not close the connection but rather drains the body which does
not impact subsequent re-use of the connection. This however may compete with a
late subscriber actually attempting to read the response. At that point there is
little choice but to raise an ISE with a more specific description.

See spring-projectsgh-25216
  • Loading branch information
rstoyanchev authored and xcl(徐程林) committed Aug 16, 2020
1 parent 88824e9 commit 3838c17
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 24 deletions.
Expand Up @@ -115,8 +115,8 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
.next()
.doOnCancel(() -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null && response.bodyNotSubscribed()) {
response.getConnection().dispose();
if (response != null) {
response.releaseAfterCancel(method);
}
});
}
Expand Down
Expand Up @@ -21,6 +21,8 @@
import java.util.function.BiFunction;

import io.netty.buffer.ByteBufAllocator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyInbound;
Expand All @@ -29,10 +31,10 @@
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
Expand All @@ -46,16 +48,18 @@
*/
class ReactorClientHttpResponse implements ClientHttpResponse {

private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class);

private final NettyDataBufferFactory bufferFactory;

private final HttpClientResponse response;

private final NettyInbound inbound;

@Nullable
private final Connection connection;
private final String logPrefix;

// 0 - not subscribed, 1 - subscribed, 2 - cancelled
// 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe)
private final AtomicInteger state = new AtomicInteger(0);


Expand All @@ -68,7 +72,7 @@ public ReactorClientHttpResponse(HttpClientResponse response, Connection connect
this.response = response;
this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.connection = connection;
this.logPrefix = (logger.isDebugEnabled() ? "[" + connection.channel().id().asShortText() + "] " : "");
}

/**
Expand All @@ -80,22 +84,28 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
this.response = response;
this.inbound = inbound;
this.bufferFactory = new NettyDataBufferFactory(alloc);
this.connection = null;
this.logPrefix = "";
}


@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s -> {
if (!this.state.compareAndSet(0, 1)) {
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
if (this.state.compareAndSet(0, 1)) {
return;
}
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException(
"The client response body can only be consumed once.");
}
else if (this.state.get() == 3) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
})
.doOnCancel(() -> this.state.compareAndSet(1, 2))
Expand Down Expand Up @@ -127,6 +137,7 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
this.response.cookies().values().stream().flatMap(Collection::stream)
.forEach(c ->

result.add(c.name(), ResponseCookie.fromClientResponse(c.name(), c.value())
.domain(c.domain())
.path(c.path())
Expand All @@ -138,18 +149,25 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
}

/**
* For use by {@link ReactorClientHttpConnector}.
* Called by {@link ReactorClientHttpConnector} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
boolean bodyNotSubscribed() {
return this.state.get() == 0;
void releaseAfterCancel(HttpMethod method) {
if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) {
if (logger.isDebugEnabled()) {
logger.debug(this.logPrefix + "Releasing body, not yet subscribed.");
}
this.inbound.receive().doOnNext(byteBuf -> {}).subscribe(byteBuf -> {}, ex -> {});
}
}

/**
* For use by {@link ReactorClientHttpConnector}.
*/
Connection getConnection() {
Assert.notNull(this.connection, "Constructor with connection wasn't used");
return this.connection;
private boolean mayHaveBody(HttpMethod method) {
int code = this.getRawStatusCode();
return !((code >= 100 && code < 200) || code == 204 || code == 205 ||
method.equals(HttpMethod.HEAD) || getHeaders().getContentLength() == 0);
}

@Override
Expand Down

0 comments on commit 3838c17

Please sign in to comment.