Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume/drain the inbound in case a cancellation is received before subscription #13493

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -24,6 +25,7 @@
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.http.HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY;
Expand Down Expand Up @@ -102,6 +104,8 @@ public Mono<HttpResponse> send(final HttpRequest request) {
request.setReactorNettyRequestRecord(reactorNettyRequestRecord);
}

final AtomicReference<ReactorNettyHttpResponse> responseReference = new AtomicReference<>();

return this.httpClient
.observe((connection, state) -> {
Instant time = Instant.now();
Expand All @@ -116,12 +120,23 @@ public Mono<HttpResponse> send(final HttpRequest request) {
}
})
.keepAlive(this.httpClientConfig.isConnectionKeepAlive())
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection(responseDelegate(request))
.single();
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection((reactorNettyResponse, reactorNettyConnection) -> {
HttpResponse httpResponse = new ReactorNettyHttpResponse(reactorNettyResponse,
reactorNettyConnection).withRequest(request);
responseReference.set((ReactorNettyHttpResponse) httpResponse);
return Mono.just(httpResponse);
})
.doOnCancel(() -> {
ReactorNettyHttpResponse reactorNettyHttpResponse = responseReference.get();
if (reactorNettyHttpResponse != null) {
reactorNettyHttpResponse.releaseAfterCancel(request.httpMethod());
}
})
.single();
}

/**
Expand All @@ -143,17 +158,6 @@ private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bod
};
}

/**
* Delegate to receive response.
*
* @param restRequest the Rest request whose response this delegate handles
* @return a delegate upon invocation setup Rest response object
*/
private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse>> responseDelegate(final HttpRequest restRequest) {
return (reactorNettyResponse, reactorNettyConnection) ->
Mono.just(new ReactorNettyHttpResponse(reactorNettyResponse, reactorNettyConnection).withRequest(restRequest));
}

@Override
public void shutdown() {
if (this.connectionProvider != null) {
Expand All @@ -162,6 +166,9 @@ public void shutdown() {
}

private static class ReactorNettyHttpResponse extends HttpResponse {

private final AtomicReference<ReactorNettyResponseState> state = new AtomicReference<>(ReactorNettyResponseState.NOT_SUBSCRIBED);

private final HttpClientResponse reactorNettyResponse;
private final Connection reactorNettyConnection;

Expand Down Expand Up @@ -189,22 +196,33 @@ public HttpHeaders headers() {

@Override
public Flux<ByteBuf> body() {
return bodyIntern();
return bodyIntern()
.doOnSubscribe(this::updateSubscriptionState)
.map(byteBuf -> {
byteBuf.retain();
return byteBuf;
});
}

@Override
public Mono<byte[]> bodyAsByteArray() {
return bodyIntern().aggregate().asByteArray();
return bodyIntern().aggregate()
.asByteArray()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString() {
return bodyIntern().aggregate().asString();
return bodyIntern().aggregate()
.asString()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString(Charset charset) {
return bodyIntern().aggregate().asString(charset);
return bodyIntern().aggregate()
.asString(charset)
.doOnSubscribe(this::updateSubscriptionState);
}

private ByteBufFlux bodyIntern() {
Expand All @@ -215,5 +233,44 @@ private ByteBufFlux bodyIntern() {
Connection internConnection() {
return reactorNettyConnection;
}

private void updateSubscriptionState(Subscription subscription) {
if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.SUBSCRIBED)) {
return;
}
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject ones in cancelled state.
if (this.state.get() == ReactorNettyResponseState.CANCELLED) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
}

/**
* Called by {@link ReactorNettyClient} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
private void releaseAfterCancel(HttpMethod method) {
if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.CANCELLED)) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing body, not yet subscribed");
}
this.bodyIntern()
.doOnNext(byteBuf -> {})
.subscribe(byteBuf -> {}, ex -> {});
}
}
}

private enum ReactorNettyResponseState {
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
NOT_SUBSCRIBED,
SUBSCRIBED,
CANCELLED;
}
}