From 03fa48919e2ec3a3fcff0372081df0ea64fc1a92 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 13 Jul 2020 16:51:09 +0300 Subject: [PATCH] Switch to Reactor Dysprosium snapshots See gh-25376 --- build.gradle | 3 ++- .../reactive/ReactorClientHttpResponse.java | 21 +++++-------------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/build.gradle b/build.gradle index fc60b1896463..3afbe10402c0 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,7 @@ configure(allprojects) { project -> imports { mavenBom "com.fasterxml.jackson:jackson-bom:2.10.4" mavenBom "io.netty:netty-bom:4.1.50.Final" - mavenBom "io.projectreactor:reactor-bom:Dysprosium-SR8" + mavenBom "io.projectreactor:reactor-bom:Dysprosium-BUILD-SNAPSHOT" mavenBom "io.rsocket:rsocket-bom:1.0.1" mavenBom "org.eclipse.jetty:jetty-bom:9.4.30.v20200611" mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72" @@ -279,6 +279,7 @@ configure(allprojects) { project -> repositories { mavenCentral() maven { url "https://repo.spring.io/libs-spring-framework-build" } + maven { url "https://repo.spring.io/snapshot" } // Reactor } } configurations.all { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 962bd8e9e5d6..38be13e461a9 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -34,7 +34,6 @@ import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; -import org.springframework.lang.Nullable; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -51,18 +50,17 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private static final Log logger = LogFactory.getLog(ReactorClientHttpResponse.class); - private final NettyDataBufferFactory bufferFactory; - private final HttpClientResponse response; private final NettyInbound inbound; - @Nullable - private final String logPrefix; + private final NettyDataBufferFactory bufferFactory; - // 0 - not subscribed, 1 - subscribed, 2 - cancelled, 3 - cancelled via connector (before subscribe) + // 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe) private final AtomicInteger state = new AtomicInteger(0); + private final String logPrefix; + /** * Constructor that matches the inputs from @@ -96,20 +94,11 @@ public Flux getBody() { if (this.state.compareAndSet(0, 1)) { return; } - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to reject once in cancelled state. if (this.state.get() == 2) { - throw new IllegalStateException( - "The client response body can only be consumed once."); - } - else if (this.state.get() == 3) { throw new IllegalStateException( "The client response body has been released already due to cancellation."); } }) - .doOnCancel(() -> this.state.compareAndSet(1, 2)) .map(byteBuf -> { byteBuf.retain(); return this.bufferFactory.wrap(byteBuf); @@ -157,7 +146,7 @@ public MultiValueMap getCookies() { * reading was delayed for some reason. */ void releaseAfterCancel(HttpMethod method) { - if (mayHaveBody(method) && this.state.compareAndSet(0, 3)) { + if (mayHaveBody(method) && this.state.compareAndSet(0, 2)) { if (logger.isDebugEnabled()) { logger.debug(this.logPrefix + "Releasing body, not yet subscribed."); }