From 21de09875601fae045f4302c4421ec552f5615b5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Jun 2020 21:40:44 +0100 Subject: [PATCH] Dispose on cancel if response not subscribed Closes gh-25216 --- .../reactive/ReactorClientHttpConnector.java | 27 ++++---- .../reactive/ReactorClientHttpResponse.java | 63 +++++++++++++++---- 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 98dd9dd8babb..e2af34a6208c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,15 +17,13 @@ package org.springframework.http.client.reactive; import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; -import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; -import reactor.netty.http.client.HttpClientResponse; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -104,12 +102,23 @@ public Mono connect(HttpMethod method, URI uri, return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); } + AtomicReference responseRef = new AtomicReference<>(); + return this.httpClient .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) .uri(uri.toString()) .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) - .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) - .next(); + .responseConnection((response, connection) -> { + responseRef.set(new ReactorClientHttpResponse(response, connection)); + return Mono.just((ClientHttpResponse) responseRef.get()); + }) + .next() + .doOnCancel(() -> { + ReactorClientHttpResponse response = responseRef.get(); + if (response != null && response.bodyNotSubscribed()) { + response.getConnection().dispose(); + } + }); } private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, @@ -118,10 +127,4 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); } - private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, - ByteBufAllocator allocator) { - - return new ReactorClientHttpResponse(response, nettyInbound, allocator); - } - } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index d8a1ef039823..7872b5020ac9 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -17,10 +17,12 @@ package org.springframework.http.client.reactive; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Flux; +import reactor.netty.Connection; import reactor.netty.NettyInbound; import reactor.netty.http.client.HttpClientResponse; @@ -29,6 +31,8 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -48,13 +52,35 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; - private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); + @Nullable + private final Connection connection; + // 0 - not subscribed, 1 - subscribed, 2 - cancelled + private final AtomicInteger state = new AtomicInteger(0); + + /** + * Constructor that matches the inputs from + * {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}. + * @since 5.3 + */ + public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) { + this.response = response; + this.inbound = connection.inbound(); + this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); + this.connection = connection; + } + + /** + * Constructor with inputs extracted from a {@link Connection}. + * @deprecated as of 5.2.8 + */ + @Deprecated public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { this.response = response; this.inbound = inbound; this.bufferFactory = new NettyDataBufferFactory(alloc); + this.connection = null; } @@ -62,17 +88,17 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou public Flux getBody() { return this.inbound.receive() .doOnSubscribe(s -> { - if (this.rejectSubscribers.get()) { - throw new IllegalStateException("The client response body can only be consumed once."); + if (!this.state.compareAndSet(0, 1)) { + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to reject once in cancelled state. + if (this.state.get() == 2) { + throw new IllegalStateException("The client response body can only be consumed once."); + } } }) - .doOnCancel(() -> - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to intercept and reject them in that case. - this.rejectSubscribers.set(true) - ) + .doOnCancel(() -> this.state.compareAndSet(1, 2)) .map(byteBuf -> { byteBuf.retain(); return this.bufferFactory.wrap(byteBuf); @@ -111,6 +137,21 @@ public MultiValueMap getCookies() { return CollectionUtils.unmodifiableMultiValueMap(result); } + /** + * For use by {@link ReactorClientHttpConnector}. + */ + boolean bodyNotSubscribed() { + return this.state.get() == 0; + } + + /** + * For use by {@link ReactorClientHttpConnector}. + */ + Connection getConnection() { + Assert.notNull(this.connection, "Constructor with connection wasn't used"); + return this.connection; + } + @Override public String toString() { return "ReactorClientHttpResponse{" +