Skip to content

Commit

Permalink
Dispose on cancel if response not subscribed
Browse files Browse the repository at this point in the history
Closes gh-25216
  • Loading branch information
rstoyanchev committed Jun 19, 2020
1 parent 9b615ed commit 21de098
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 23 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,15 +17,13 @@
package org.springframework.http.client.reactive;

import java.net.URI;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Mono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

Expand Down Expand Up @@ -104,12 +102,23 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri));
}

AtomicReference<ReactorClientHttpResponse> responseRef = new AtomicReference<>();

return this.httpClient
.request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name()))
.uri(uri.toString())
.send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound)))
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
.next();
.responseConnection((response, connection) -> {
responseRef.set(new ReactorClientHttpResponse(response, connection));
return Mono.just((ClientHttpResponse) responseRef.get());
})
.next()
.doOnCancel(() -> {
ReactorClientHttpResponse response = responseRef.get();
if (response != null && response.bodyNotSubscribed()) {
response.getConnection().dispose();
}
});
}

private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request,
Expand All @@ -118,10 +127,4 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
}

private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
ByteBufAllocator allocator) {

return new ReactorClientHttpResponse(response, nettyInbound, allocator);
}

}
Expand Up @@ -17,10 +17,12 @@
package org.springframework.http.client.reactive;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;

import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.netty.Connection;
import reactor.netty.NettyInbound;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -29,6 +31,8 @@
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
Expand All @@ -48,31 +52,53 @@ class ReactorClientHttpResponse implements ClientHttpResponse {

private final NettyInbound inbound;

private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
@Nullable
private final Connection connection;

// 0 - not subscribed, 1 - subscribed, 2 - cancelled
private final AtomicInteger state = new AtomicInteger(0);


/**
* Constructor that matches the inputs from
* {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}.
* @since 5.3
*/
public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) {
this.response = response;
this.inbound = connection.inbound();
this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc());
this.connection = connection;
}

/**
* Constructor with inputs extracted from a {@link Connection}.
* @deprecated as of 5.2.8
*/
@Deprecated
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
this.response = response;
this.inbound = inbound;
this.bufferFactory = new NettyDataBufferFactory(alloc);
this.connection = null;
}


@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s -> {
if (this.rejectSubscribers.get()) {
throw new IllegalStateException("The client response body can only be consumed once.");
if (!this.state.compareAndSet(0, 1)) {
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject once in cancelled state.
if (this.state.get() == 2) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
}
})
.doOnCancel(() ->
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to intercept and reject them in that case.
this.rejectSubscribers.set(true)
)
.doOnCancel(() -> this.state.compareAndSet(1, 2))
.map(byteBuf -> {
byteBuf.retain();
return this.bufferFactory.wrap(byteBuf);
Expand Down Expand Up @@ -111,6 +137,21 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
return CollectionUtils.unmodifiableMultiValueMap(result);
}

/**
* For use by {@link ReactorClientHttpConnector}.
*/
boolean bodyNotSubscribed() {
return this.state.get() == 0;
}

/**
* For use by {@link ReactorClientHttpConnector}.
*/
Connection getConnection() {
Assert.notNull(this.connection, "Constructor with connection wasn't used");
return this.connection;
}

@Override
public String toString() {
return "ReactorClientHttpResponse{" +
Expand Down

0 comments on commit 21de098

Please sign in to comment.