From 21de09875601fae045f4302c4421ec552f5615b5 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Jun 2020 21:40:44 +0100 Subject: [PATCH 1/2] Dispose on cancel if response not subscribed Closes gh-25216 --- .../reactive/ReactorClientHttpConnector.java | 27 ++++---- .../reactive/ReactorClientHttpResponse.java | 63 +++++++++++++++---- 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 98dd9dd8babb..e2af34a6208c 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,15 +17,13 @@ package org.springframework.http.client.reactive; import java.net.URI; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Mono; -import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; -import reactor.netty.http.client.HttpClientResponse; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -104,12 +102,23 @@ public Mono connect(HttpMethod method, URI uri, return Mono.error(new IllegalArgumentException("URI is not absolute: " + uri)); } + AtomicReference responseRef = new AtomicReference<>(); + return this.httpClient .request(io.netty.handler.codec.http.HttpMethod.valueOf(method.name())) .uri(uri.toString()) .send((request, outbound) -> requestCallback.apply(adaptRequest(method, uri, request, outbound))) - .responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc()))) - .next(); + .responseConnection((response, connection) -> { + responseRef.set(new ReactorClientHttpResponse(response, connection)); + return Mono.just((ClientHttpResponse) responseRef.get()); + }) + .next() + .doOnCancel(() -> { + ReactorClientHttpResponse response = responseRef.get(); + if (response != null && response.bodyNotSubscribed()) { + response.getConnection().dispose(); + } + }); } private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, @@ -118,10 +127,4 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl return new ReactorClientHttpRequest(method, uri, request, nettyOutbound); } - private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound, - ByteBufAllocator allocator) { - - return new ReactorClientHttpResponse(response, nettyInbound, allocator); - } - } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index d8a1ef039823..7872b5020ac9 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -17,10 +17,12 @@ package org.springframework.http.client.reactive; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import io.netty.buffer.ByteBufAllocator; import reactor.core.publisher.Flux; +import reactor.netty.Connection; import reactor.netty.NettyInbound; import reactor.netty.http.client.HttpClientResponse; @@ -29,6 +31,8 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -48,13 +52,35 @@ class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyInbound inbound; - private final AtomicBoolean rejectSubscribers = new AtomicBoolean(); + @Nullable + private final Connection connection; + // 0 - not subscribed, 1 - subscribed, 2 - cancelled + private final AtomicInteger state = new AtomicInteger(0); + + /** + * Constructor that matches the inputs from + * {@link reactor.netty.http.client.HttpClient.ResponseReceiver#responseConnection(BiFunction)}. + * @since 5.3 + */ + public ReactorClientHttpResponse(HttpClientResponse response, Connection connection) { + this.response = response; + this.inbound = connection.inbound(); + this.bufferFactory = new NettyDataBufferFactory(connection.outbound().alloc()); + this.connection = connection; + } + + /** + * Constructor with inputs extracted from a {@link Connection}. + * @deprecated as of 5.2.8 + */ + @Deprecated public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) { this.response = response; this.inbound = inbound; this.bufferFactory = new NettyDataBufferFactory(alloc); + this.connection = null; } @@ -62,17 +88,17 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou public Flux getBody() { return this.inbound.receive() .doOnSubscribe(s -> { - if (this.rejectSubscribers.get()) { - throw new IllegalStateException("The client response body can only be consumed once."); + if (!this.state.compareAndSet(0, 1)) { + // https://github.com/reactor/reactor-netty/issues/503 + // FluxReceive rejects multiple subscribers, but not after a cancel(). + // Subsequent subscribers after cancel() will not be rejected, but will hang instead. + // So we need to reject once in cancelled state. + if (this.state.get() == 2) { + throw new IllegalStateException("The client response body can only be consumed once."); + } } }) - .doOnCancel(() -> - // https://github.com/reactor/reactor-netty/issues/503 - // FluxReceive rejects multiple subscribers, but not after a cancel(). - // Subsequent subscribers after cancel() will not be rejected, but will hang instead. - // So we need to intercept and reject them in that case. - this.rejectSubscribers.set(true) - ) + .doOnCancel(() -> this.state.compareAndSet(1, 2)) .map(byteBuf -> { byteBuf.retain(); return this.bufferFactory.wrap(byteBuf); @@ -111,6 +137,21 @@ public MultiValueMap getCookies() { return CollectionUtils.unmodifiableMultiValueMap(result); } + /** + * For use by {@link ReactorClientHttpConnector}. + */ + boolean bodyNotSubscribed() { + return this.state.get() == 0; + } + + /** + * For use by {@link ReactorClientHttpConnector}. + */ + Connection getConnection() { + Assert.notNull(this.connection, "Constructor with connection wasn't used"); + return this.connection; + } + @Override public String toString() { return "ReactorClientHttpResponse{" + From f35903f23d6af3f2279fd938e53717a793466ad8 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Jun 2020 21:56:58 +0100 Subject: [PATCH 2/2] Catch IllegalReferenceCountException Closes gh-22594 --- .../core/codec/AbstractSingleValueEncoder.java | 5 +++-- .../core/io/buffer/DataBufferUtils.java | 18 +++++++++++++++--- .../http/codec/EncoderHttpMessageWriter.java | 5 +++-- .../multipart/MultipartHttpMessageWriter.java | 3 ++- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java index 3a45507b03de..cdaba36bcc37 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/AbstractSingleValueEncoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.lang.Nullable; import org.springframework.util.MimeType; @@ -51,7 +52,7 @@ public final Flux encode(Publisher inputStream, DataBuf return Flux.from(inputStream) .take(1) .concatMap(value -> encode(value, bufferFactory, elementType, mimeType, hints)) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release); + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } /** diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 6db32e406127..eb2927677cf2 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import io.netty.util.IllegalReferenceCountException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; @@ -60,6 +63,8 @@ */ public abstract class DataBufferUtils { + private final static Log logger = LogFactory.getLog(DataBufferUtils.class); + private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; @@ -494,7 +499,15 @@ public static boolean release(@Nullable DataBuffer dataBuffer) { if (dataBuffer instanceof PooledDataBuffer) { PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer; if (pooledDataBuffer.isAllocated()) { - return pooledDataBuffer.release(); + try { + return pooledDataBuffer.release(); + } + catch (IllegalReferenceCountException ex) { + if (logger.isDebugEnabled()) { + logger.debug("RefCount already at 0", ex); + } + return false; + } } } return false; @@ -523,7 +536,6 @@ public static Consumer releaseConsumer() { * @return a buffer that is composed from the {@code dataBuffers} argument * @since 5.0.3 */ - @SuppressWarnings("unchecked") public static Mono join(Publisher dataBuffers) { return join(dataBuffers, -1); } diff --git a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java index a5ed28274175..5a63145b4be9 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java @@ -29,6 +29,7 @@ import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpLogging; import org.springframework.http.MediaType; @@ -126,13 +127,13 @@ public Mono write(Publisher inputStream, ResolvableType eleme .flatMap(buffer -> { message.getHeaders().setContentLength(buffer.readableByteCount()); return message.writeWith(Mono.just(buffer) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)); + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); }); } if (isStreamingMediaType(contentType)) { return message.writeAndFlushWith(body.map(buffer -> - Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release))); + Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release))); } return message.writeWith(body); diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java index 350d693cd318..1db1356751f3 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartHttpMessageWriter.java @@ -40,6 +40,7 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; import org.springframework.http.HttpEntity; @@ -247,7 +248,7 @@ private Mono writeMultipart(MultiValueMap map, Flux body = Flux.fromIterable(map.entrySet()) .concatMap(entry -> encodePartValues(boundary, entry.getKey(), entry.getValue(), bufferFactory)) .concatWith(generateLastLine(boundary, bufferFactory)) - .doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release); + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); return outputMessage.writeWith(body); }