Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume/drain the inbound in case a cancellation is received before subscription #13493

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ public Map<String, String> toMap() {
return result;
}

public long getContentLength() {
HttpHeader httpHeader = headers.get("Content-Length");
if (httpHeader != null) {
return Long.parseLong(httpHeader.value());
}
return -1;
}

@Override
public Iterator<HttpHeader> iterator() {
return headers.values().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -24,6 +25,7 @@
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.http.HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY;
Expand Down Expand Up @@ -102,6 +104,8 @@ public Mono<HttpResponse> send(final HttpRequest request) {
request.setReactorNettyRequestRecord(reactorNettyRequestRecord);
}

final AtomicReference<ReactorNettyHttpResponse> responseReference = new AtomicReference<>();

return this.httpClient
.observe((connection, state) -> {
Instant time = Instant.now();
Expand All @@ -116,12 +120,23 @@ public Mono<HttpResponse> send(final HttpRequest request) {
}
})
.keepAlive(this.httpClientConfig.isConnectionKeepAlive())
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection(responseDelegate(request))
.single();
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection((reactorNettyResponse, reactorNettyConnection) -> {
HttpResponse httpResponse = new ReactorNettyHttpResponse(reactorNettyResponse,
reactorNettyConnection).withRequest(request);
responseReference.set((ReactorNettyHttpResponse) httpResponse);
return Mono.just(httpResponse);
})
.doOnCancel(() -> {
ReactorNettyHttpResponse reactorNettyHttpResponse = responseReference.get();
if (reactorNettyHttpResponse != null) {
reactorNettyHttpResponse.releaseAfterCancel(request.httpMethod());
}
})
.single();
}

/**
Expand All @@ -143,17 +158,6 @@ private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bod
};
}

/**
* Delegate to receive response.
*
* @param restRequest the Rest request whose response this delegate handles
* @return a delegate upon invocation setup Rest response object
*/
private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse>> responseDelegate(final HttpRequest restRequest) {
return (reactorNettyResponse, reactorNettyConnection) ->
Mono.just(new ReactorNettyHttpResponse(reactorNettyResponse, reactorNettyConnection).withRequest(restRequest));
}

@Override
public void shutdown() {
if (this.connectionProvider != null) {
Expand All @@ -162,6 +166,9 @@ public void shutdown() {
}

private static class ReactorNettyHttpResponse extends HttpResponse {

private final AtomicReference<ReactorNettyResponseState> state = new AtomicReference<>(ReactorNettyResponseState.NOT_SUBSCRIBED);

private final HttpClientResponse reactorNettyResponse;
private final Connection reactorNettyConnection;

Expand Down Expand Up @@ -189,22 +196,33 @@ public HttpHeaders headers() {

@Override
public Flux<ByteBuf> body() {
return bodyIntern();
return bodyIntern()
.doOnSubscribe(this::updateSubscriptionState)
.map(byteBuf -> {
byteBuf.retain();
return byteBuf;
});
}

@Override
public Mono<byte[]> bodyAsByteArray() {
return bodyIntern().aggregate().asByteArray();
return bodyIntern().aggregate()
.asByteArray()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString() {
return bodyIntern().aggregate().asString();
return bodyIntern().aggregate()
.asString()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString(Charset charset) {
return bodyIntern().aggregate().asString(charset);
return bodyIntern().aggregate()
.asString(charset)
.doOnSubscribe(this::updateSubscriptionState);
}

private ByteBufFlux bodyIntern() {
Expand All @@ -215,5 +233,50 @@ private ByteBufFlux bodyIntern() {
Connection internConnection() {
return reactorNettyConnection;
}

private void updateSubscriptionState(Subscription subscription) {
if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.SUBSCRIBED)) {
return;
}
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject ones in cancelled state.
if (this.state.get() == ReactorNettyResponseState.CANCELLED) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
}

/**
* Called by {@link ReactorNettyClient} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
private void releaseAfterCancel(HttpMethod method) {
if (mayHaveBody(method) && this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.CANCELLED)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need mayHaveBody check , what happen if we clear on all cancel irrespective of body, we can avoid extra check ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are draining content here, we want to make sure we drain it under very specific conditions, specially when the body can be present.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine ,but my doubt is if some valid response miss mayHaveBody (due to any missed scenario), then we will still face issue , vs draining non body too along with body response (Its a trade off thing )

if (logger.isDebugEnabled()) {
logger.debug("Releasing body, not yet subscribed");
}
this.bodyIntern()
.doOnNext(byteBuf -> {})
.subscribe(byteBuf -> {}, ex -> {});
}
}

private boolean mayHaveBody(HttpMethod method) {
int code = this.statusCode();
return !((code >= 100 && code < 200) || code == 204 || code == 205 ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do 304 classified as non body status code ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 304 will be a non body status code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So lets add that here

method.equals(HttpMethod.HEAD) || headers().getContentLength() == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of 4xx error codes from Cosmos (e.g., 400 query plan) also have body.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would these be considered as error or success ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition is already returning true for 4xx, isn't ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is.

}
}

private enum ReactorNettyResponseState {
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
NOT_SUBSCRIBED,
SUBSCRIBED,
CANCELLED;
}
}