Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection reuse leads to Connection prematurely closed BEFORE response #1170

Closed
mp911de opened this issue Jun 22, 2020 · 6 comments
Closed
Labels
for/springframework This belongs to the Spring Framework project

Comments

@mp911de
Copy link

mp911de commented Jun 22, 2020

Actual Behavior

Reusing pooled connections cause Connection prematurely closed BEFORE response

Steps to Reproduce

The issue happens in the scope of a CI build of Spring Data Elasticsearch (https://github.com/spring-projects/spring-data-elasticsearch/)

Possible Solution

Using HttpClient.newConnection().compress(true) is a workaround.

Your Environment

  • Reactor version(s) used: 3.4.0 snapshots, Reactor Netty 1.0 snapshots
  • Other relevant libraries versions (eg. netty, ...): 4.1.50.Final
  • JVM version (javar -version): 1.8.0_232
  • OS and version (eg uname -a): MacOS, Darwin Kernel Version 19.5.0
@mp911de mp911de added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Jun 22, 2020
@violetagg violetagg removed the status/need-triage A new issue that still need to be evaluated as a whole label Jun 22, 2020
@violetagg
Copy link
Member

@mp911de @rstoyanchev @simonbasle The connection is closed by Spring Framework WebClient's new change for disposing the connection when a cancellation happens
spring-projects/spring-framework#25216 (comment)

2020-06-22 14:43:34,868 DEBUG esources.DefaultPooledConnectionProvider: 254 - [id: 0xe5472f79, L:/127.0.0.1:64381 - R:localhost/127.0.0.1:9200] onStateChange(POST{uri=/_bulk?refresh=true&timeout=1m, connection=PooledConnection{channel=[id: 0xe5472f79, L:/127.0.0.1:64381 - R:localhost/127.0.0.1:9200]}}, [request_sent])
2020-06-22 14:43:34,874 DEBUG     reactor.netty.http.client.HttpClient: 145 - [id: 0xe5472f79, L:/127.0.0.1:64381 - R:localhost/127.0.0.1:9200] CLOSE
2020-06-22 14:43:34,874 DEBUG esources.DefaultPooledConnectionProvider: 259 - [id: 0xe5472f79, L:/127.0.0.1:64381 ! R:localhost/127.0.0.1:9200] CLOSE received
java.lang.Exception: CLOSE received
	at reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.lambda$registerClose$2(DefaultPooledConnectionProvider.java:300)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1158)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:760)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:736)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:607)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1352)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
	at io.netty.handler.logging.LoggingHandler.close(LoggingHandler.java:247)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.close(CombinedChannelDuplexHandler.java:505)
	at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:77)
	at io.netty.channel.CombinedChannelDuplexHandler.close(CombinedChannelDuplexHandler.java:316)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472)
	at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957)
	at io.netty.channel.AbstractChannel.close(AbstractChannel.java:232)
	at reactor.netty.DisposableChannel.dispose(DisposableChannel.java:70)
	at reactor.netty.channel.ChannelOperations.dispose(ChannelOperations.java:180)
	at org.springframework.http.client.reactive.ReactorClientHttpConnector.lambda$connect$4(ReactorClientHttpConnector.java:120)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:783)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:790)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.cancel(FluxPeekFuseable.java:790)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.cancel(FluxMap.java:286)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:137)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.cancel(FluxOnAssembly.java:500)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2209)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2178)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1990)
	at reactor.core.publisher.Operators.terminate(Operators.java:1222)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2209)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2178)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1990)
	at reactor.core.publisher.Operators.terminate(Operators.java:1222)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180)
	at reactor.core.publisher.Operators.terminate(Operators.java:1222)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:167)
	at reactor.core.publisher.Operators.terminate(Operators.java:1222)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2209)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2178)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1990)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:123)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:75)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:385)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
	at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:365)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:383)

@violetagg violetagg added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Jun 22, 2020
@rstoyanchev
Copy link
Contributor

So there must be a cancel before the response content is subscribed to? Is it known where the cancel signal comes from and what was the outcome before?

@violetagg
Copy link
Member

violetagg commented Jun 22, 2020

I see this, unfortunately I do not know what the test is supposed to do

at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1990)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:123)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:75)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)

@rstoyanchev
Copy link
Contributor

@mp911de can you clarify the scenario?

@mp911de
Copy link
Author

mp911de commented Jun 22, 2020

Cancellation happens by MonoNext when the first element gets emitted. We found out that this case correlates with preceding HEAD requests of which we don't consume the response body. Cancellation leads to response.getConnection().dispose() in ReactorClientHttpConnector. Consuming the body (e.g. ClientResponse.toBodilessEntity()) causes the issue to disappear.

@violetagg violetagg removed the for/user-attention This issue needs user attention (feedback, rework, etc...) label Jun 22, 2020
rstoyanchev added a commit to spring-projects/spring-framework that referenced this issue Jun 23, 2020
The following was reported after the change and is related to it:
reactor/reactor-netty#1170. An HTTP HEAD with the body
not consumed. Connection is disposed and closed leading to subsequent request to
fail. Adding toBodilessEntity() helps.

This change does not close the connection but rather drains the body which does
not impact subsequent re-use of the connection. This however may compete with a
late subscriber actually attempting to read the response. At that point there is
little choice but to raise an ISE with a more specific description.

See gh-25216
@rstoyanchev
Copy link
Contributor

This can be closed now, following spring-projects/spring-framework@21d0696

@mp911de mp911de closed this as completed Jun 23, 2020
@violetagg violetagg added for/springframework This belongs to the Spring Framework project and removed type/bug A general bug labels Jun 24, 2020
FelixFly pushed a commit to FelixFly/spring-framework that referenced this issue Aug 16, 2020
The following was reported after the change and is related to it:
reactor/reactor-netty#1170. An HTTP HEAD with the body
not consumed. Connection is disposed and closed leading to subsequent request to
fail. Adding toBodilessEntity() helps.

This change does not close the connection but rather drains the body which does
not impact subsequent re-use of the connection. This however may compete with a
late subscriber actually attempting to read the response. At that point there is
little choice but to raise an ISE with a more specific description.

See spring-projectsgh-25216
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/springframework This belongs to the Spring Framework project
Projects
None yet
Development

No branches or pull requests

3 participants