Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

javax.net.ssl.SSLException: SSLEngine closed already for gateway requests #782

Closed
venkatnpedada opened this issue Jul 16, 2019 · 58 comments
Closed
Labels
type/bug A general bug
Milestone

Comments

@venkatnpedada
Copy link

venkatnpedada commented Jul 16, 2019

Cloud Gateway version:2.1.3 RELEASE
Reactor Netty version: 0.8.5 RELEASE

In our production system, we are observing "javax.net.ssl.SSLException: SSLEngine closed already" exceptions. The exception encounter rate depending on the route count and traffic.

I'm using the elastic pool(default) and below is the stack trace from logs:

reactor-http-nio-150 HttpClientConnect - [id: 0x21e26b17, L:/127.0.0.1:29974 - R:localhost/127.0.0.1:8872] The connection observed an error
javax.net.ssl.SSLException: SSLEngine closed already
at io.netty.handler.ssl.SslHandler.wrap(...)(Unknown Source)

I am suspecting the connection is already closed on other end, but elastic connection pool is still trying to use the same connection which is causing this issue. seems netty connection pool not doing health checks whether the connection is already broken or not.

@venkatnpedada venkatnpedada added the type/bug A general bug label Jul 16, 2019
@violetagg
Copy link
Member

@venkatnpedada Try to use the latest versions - Reactor Netty 0.8.9.RELEASE, Spring Framework 5.1.8.RELEASE, Spring Boot 2.1.6.RELEASE

@nnanda2016
Copy link

@violetagg This is happening with Reactor-Netty 0.8.9 (BOM: Californium-SR9). I just saw this in our production system

reactor.netty.http.client.HttpClientConnect# [id: 0x75732029, L:/<local-ip>:53795 - R:<remote-dns-name>/<remote-ip>:443] The connection observed an error
javax.net.ssl.SSLException: SSLEngine closed already
	at io.netty.handler.ssl.SslHandler.wrap(...)(Unknown Source) ~[netty-handler-4.1.36.Final.jar!/:4.1.36.Final]


@violetagg
Copy link
Member

@nnanda2016 Is it possible to provide some reproducible scenario/logs/tcp dump

@violetagg violetagg added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Jul 26, 2019
@violetagg violetagg added the status/need-investigation This needs more in-depth investigation label Aug 2, 2019
@iamankur82
Copy link

I am also observing the same issue on Reactor-Netty 0.8.10. It is on production server so i am not able take debug logs/tcp dumps.
Once error occurs on a channel, then it keeps on coming on the same channel for hours. More than 2500 errors occurred for same channel across 2 hours. This stopped only when java process was stopped and started again.

`08/08/2019 05:45:05.456 +0530 | 1565223305456 [webflux-http-nio-2] WARN reactor.netty.http.client.HttpClientConnect -[id: 0xb2c5cd59, L:/:53826 - R::443] The connection observed an error [javax.net.ssl.SSLException: SSLEngine closed already         at io.netty.handler.ssl.SslHandler.wrap(...)(Unknown Source)

08/08/2019 05:21:33.896 +0530 | 1565221893896 [webflux-http-nio-2] WARN reactor.netty.http.client.HttpClientConnect - [id: 0xb2c5cd59, L:/:53826 - R::443] The connection observed an error [javax.net.ssl.SSLException: SSLEngine closed already         at io.netty.handler.ssl.SslHandler.wrap(...)(Unknown Source)

08/08/2019 04:47:38.915 +0530 | 1565219858915 [webflux-http-nio-2] WARN reactor.netty.http.client.HttpClientConnect -[id: 0xb2c5cd59, L:/:53826 - R::443] The connection observed an error [javax.net.ssl.SSLException: SSLEngine closed already         at io.netty.handler.ssl.SslHandler.wrap(...)(Unknown Source)`

@nnanda2016
Copy link

nnanda2016 commented Aug 8, 2019

For now, I have applied a retry for this exception and it is helping. At least since 07/25 (when I restarted my process) my app has not failed with this exception.

.retryWhen(Retry.anyOf(MyCustomException.class, ReadTimeoutException.class, WriteTimeoutException.class, SSLException.class)
                    					.retryMax(maxRetryAttempt)
                    					.backoff(Backoff.exponential(Duration.ofMillis(1000L), Duration.ofSeconds(10), 3, true))
                    					.jitter(Jitter.random(0.9))
                    					.doOnRetry(retryContext -> logger.error("[Service Invocation failed][Retry context: {}]", retryContext))
                    		)

What I feel is reactor-netty is not refreshing the connection pool once such exceptions happen. Till now, I found retry being the only way to refresh the connection pool. This is okay, but there should be a better mechanism. May be there is but I am not aware of.

@violetagg
Copy link
Member

@iamankur82 Can you provide some reproducible example/describe the scenario.
From your logs I can say the following:
The connection is not removed from the pool because it is not closed. Typically when you receive SSLEngine closed already this means that the connection will be closed, but in your case it is not.

[id: 0xb2c5cd59, L:/:53826 - R::443]
The log above: when you have - between local and remote address, means connection is alive. When you have ! means closed.

@iamankur82 @nnanda2016 Can you switch from elastic to fixed connection pool and tell us whether the issue still exists?

@nnanda2016
Copy link

@violetagg I will give it a try; only thing is, with retry, it has not happened again in my app. I can give it a try in one of our lower env, but I donno when it will break. I will report in this issue if I see the app breaking with SSLException

@sms0070
Copy link

sms0070 commented Aug 22, 2019

Hi ,
Is there any update on this?
We are also facing the same issue.

@venkatnpedada
Copy link
Author

@violetagg We have tried with FIXED connection pool also, but we faced the same issue at a certain load. The issue is occurring even with the latest version 0.8.9

@violetagg
Copy link
Member

@venkatnpedada @sms0070 try to isolate the scenario and provide a reproducible example. Also if you can try 0.8.11.BUILD-SNAPSHOT, we have some fixes there that might be relevant.

@violetagg violetagg removed the status/need-investigation This needs more in-depth investigation label Aug 29, 2019
@kushagraThapar
Copy link

With some load testing, I am seeing the same issue on 0.9.0.M3.
Haven't debugged the issue, but I guess its happening when I dispose the connection after receiving the response.

Here is the stack trace:

The connection observed an error javax.net.ssl.SSLException: SSLEngine closed already at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:834) at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:1878) at io.netty.handler.ssl.SslHandler.closeOutboundAndChannel(SslHandler.java:1846) at io.netty.handler.ssl.SslHandler.close(SslHandler.java:729) at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:621) at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:605) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.close(CombinedChannelDuplexHandler.java:507) at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:77) at io.netty.channel.CombinedChannelDuplexHandler.close(CombinedChannelDuplexHandler.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:621) at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:605) at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:467) at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:968) at io.netty.channel.AbstractChannel.close(AbstractChannel.java:231) at reactor.netty.http.client.HttpClientOperations.onInboundCancel(HttpClientOperations.java:241) at reactor.netty.channel.FluxReceive.unsubscribeReceiver(FluxReceive.java:398) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748)

@violetagg
Copy link
Member

@kushagraThapar Are you able to extract a reproducible example. Can you test the current 0.9.0.BUILD-SNAPSHOT as we have fixes that might be related?

@griswold
Copy link

griswold commented Sep 5, 2019

@violetagg thanks so much for all your help on this so far.

I have been trying to create a minimal repo to reproduce this issue and have struggled. For us, we only see the issue under fairly heavy and sustained load. I will keep working to see if I can get something working.

As you noted above, it appears that a channel is entering a state where the channel itself is open and active, but the SSLEngine associated with that channel's pipeline is closed. I have used the debugger to put a channel in that state "artificially" by closing the SSLEngine. I can confirm that, when I do this, the faulty channel stays in the pool until the remote server closes it.

I'm trying to build a toy example wherein a channel enters this state "naturally". Assuming I'm ever able to figure this out how to do this, it seems that the inevitable conclusion is that using Channel#isActive as the test for the health of a channel in the pool is insufficient because that does not consider the state of the SSLEngine. If it is at all possible that the SSLEngine might close in normal operation without the connection itself closing, I think this issue will persist.

Will continue to work on the example, but I was curious about your thoughts on that analysis.

@hitendrapratap
Copy link

We are facing the similar issue. Reactor Netty 0.8.9.RELEASE, Spring Framework 5.1.8.RELEASE, Spring Boot 2.1.6.RELEASE

@gnilron
Copy link

gnilron commented Sep 10, 2019

We also have the same issue, we tried switching to fixed pool without success, we think of testing 0.9.0.RC1 if that could make sense

@rahulmakwana32
Copy link

Debug Loggers and tcpdump should help to understand the actual cause this error message and in turn can help in finding resolution.

@polsson12
Copy link

polsson12 commented Oct 1, 2019

When is a pull request scheduled to be created of out this commit?
Do you know which version this commit can be released in?

I'm working on a project where we are very eager to have a fix for SSLEngine closed already error.

@Incarnation-p-lee
Copy link

any update about this issue ? I can see one branch with commit try to fix it but not released yet since 11-7-2019. Or should we have a try for 0.9.x ?

@gtomassi
Copy link

gtomassi commented Nov 8, 2019

Same issue here, takes production servers down completely since they seem to have no way to recover.

Similar behavior to the others that have commented:

  • Hard to replicate
  • Occurs only under heavier load

Additional info:

  • Amongst many simultaneous queries, ONLY happens on large requests (>10kb). Smaller requests unaffected.
  • Once it starts, even if the load declines it continues to occur
  • Confirmed on 0.8.12.RELEASE & 0.8.13.RELEASE
  • Code that performs the query that always fails once symptoms begin:
    private Mono<byte[]> downloadImage(String uri) {
        return HttpClient.create()
                .secure()
                .get()
                .uri(uri)
                .responseSingle((httpClientResponse, byteBufMono) -> byteBufMono.asByteArray().subscribeOn(scheduler))
                .publishOn(scheduler).subscribeOn(scheduler);
    }

@CamielCop
Copy link

I am having the same issue. I can reproduce the issue very easily. Here is what I am using:

  • reactor-netty:0.9.1.RELEASE
  • reactor-spring:1.0.1.RELEASE

My scenario is the following:

  • I am using WebClient to make post requests to 2 different servers.
  • The majority of the requests are very big, ranging from 2kB to 200kB.
  • Posts to one of the servers are successful (Majority of the posts).
  • Posts to the other server returns 500 Internal Server Error (Only a few posts).

We I run tests with smaller POST requests I do not see the error, but with large request, they start to happen very quickly.

This is how I am starting the POST requests:

Mono<ApiResponseBody> mono = webClient
                    .post()
                    .uri(getVtmsTranscribeSyncEndpoint())
                    .body(BodyInserters.fromValue(vtmsRequest))
                    .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                    .header("ININ-Correlation-Id", correlationId)
                    .accept(MediaType.APPLICATION_JSON)
                    .acceptCharset(Charset.forName("UTF-8"))
                    .retrieve()
                    .bodyToMono(ApiResponseBody.class);

And:

Mono<String> mono = webClient
                .post()
                .uri(getTpPostAPIEndpoint(requestResultModel.getOrgId()))
                .body(BodyInserters.fromValue(requestResultModel))
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .header("ININ-Correlation-Id", correlationId)
                .retrieve()
                .bodyToMono(String.class);

And this is the exception I am getting:

Nov11 15:42:02.817|WARN||r.n.h.c.HttpClientConnect||reactor-http-nio-1|[id: 0xdb6a0795, L:/192.168.129.117:61379 - R:gia-voice-transcription.us-east-1.inindca.com/10.25.202.254:443] The connection observed an error
javax.net.ssl.SSLException: SSLEngine closed already
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:837)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Request to POST https://gia-voice-transcription.us-east-1.inindca.com/vtms/v1/Transcribe/Sync [DefaultWebClient]
Stack trace:
		at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:837)
		at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:800)
		at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:781)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
		at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533)
		at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
		at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
		at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1037)
		at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:293)
		at reactor.netty.http.HttpOperations.lambda$send$0(HttpOperations.java:101)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
		at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2148)
		at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.request(FluxContextStart.java:132)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:103)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
		at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onSubscribe(FluxContextStart.java:97)
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4087)
		at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:146)
		at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
		at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:72)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2148)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1956)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1830)
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:430)
		at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:473)
		at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run(PooledConnectionProvider.java:564)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518)
		at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.lang.Thread.run(Thread.java:748)

@violetagg
Copy link
Member

@CamielCop Is it possible to provide more complete example?

@dave-fl
Copy link

dave-fl commented Dec 17, 2019

@violetagg A double gateway test might help reproduce. I’ve definitely seen this one. Would try with POST of reasonable size. Maybe a regression?

@benweet
Copy link

benweet commented Jan 11, 2020

Reproduced with a fixed connection pool, which I think is the default. Here is how the client connector is created (in Kotlin) :

val connector = ReactorClientHttpConnector(
    from(create().runOn(LoopResources.create("reactor-webclient"))
        .option(CONNECT_TIMEOUT_MILLIS, REQUEST_TIMEOUT.toInt())
        .doOnConnected { connection ->
            connection.addHandlerLast(
                ReadTimeoutHandler(
                    REQUEST_TIMEOUT,
                    TimeUnit.MILLISECONDS
                )
            )
            connection.addHandlerLast(
                WriteTimeoutHandler(
                    REQUEST_TIMEOUT,
                    TimeUnit.MILLISECONDS
                )
            )
        }
    )
        .followRedirect(true)
)

The stacktrace looks the same :

javax.net.ssl.SSLException: SSLEngine closed already
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:837)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Request to DELETE https://**************************** [DefaultWebClient]
Stack trace:
		at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:837)
		at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:800)
		at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:781)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
		at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533)
		at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
		at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
		at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
		at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125)
		at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757)
		at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812)
		at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1037)
		at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:293)
		at reactor.netty.http.HttpOperations.lambda$then$2(HttpOperations.java:163)
		at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:116)
		at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
		at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:146)
		at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60)
		at reactor.core.publisher.MonoFromFluxOperator.subscribe(MonoFromFluxOperator.java:72)
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
		at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:430)
		at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:494)
		at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run(PooledConnectionProvider.java:568)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510)
		at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
		at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.lang.Thread.run(Thread.java:748)

@violetagg violetagg added this to the 0.9.x Maintenance Backlog milestone Mar 20, 2020
violetagg added a commit that referenced this issue Apr 15, 2020
…osed when PooledConnectionProvider.DisposableAcquire#onNext

Ensure the "owner" will be set in the channel only if the channel is active

Related to #782, #981
violetagg added a commit that referenced this issue Apr 15, 2020
…osed when PooledConnectionProvider.DisposableAcquire#onNext

Ensure the "owner" will be set in the channel only if the channel is active

Related to #782, #981
@violetagg
Copy link
Member

All

If you are able to test this PR #1065 it will be great

Thanks

@violetagg
Copy link
Member

The cases that may lead to this exception are:

  1. There is an idle timeout on the target server. The connection may be acquired successfully from the pool, but meantime to be closed by the target server.
    With the PR above we will try to re-acquire a new connection in some cases, depending when the connection close happend.
    The connection pool may be configured with pool.maxIdleTime < targetServer max idle time + LIFO leasing strategy. This LIFO strategy will guarantee that if the first connection in the pool exceeds the idle timeout that all connections in the pool will be cleared as they also exceeds the idle timeout.
  2. There is TCP keepalive time setting on the system. The connection may be acquired successfully from the pool, but meantime to be closed by the system because of that setting.
    With the PR above we will try to re-acquire a new connection in some cases, depending when the connection close happend.
    The connection pool may be configured with pool.maxLifeTime < TCP keepalive time + LIFO leasing strategy. This LIFO strategy will guarantee that if the first connection in the pool exceeds the life timeout that all connections in the pool will be cleared as they also exceeds the life timeout.

violetagg added a commit that referenced this issue Apr 15, 2020
…osed when PooledConnectionProvider.DisposableAcquire#onNext

Ensure the "owner" will be set in the channel only if the channel is active

Related to #782, #981
@violetagg
Copy link
Member

The fix will be available in 0.9.7.RELEASE
Meanwhile it can be tested with 0.9.7.BUILD-SNAPSHOT

@violetagg violetagg removed the for/user-attention This issue needs user attention (feedback, rework, etc...) label Apr 15, 2020
@violetagg violetagg modified the milestones: 0.9.x Maintenance Backlog, 0.9.7.RELEASE Apr 15, 2020
@martinritz
Copy link

when will 0.9.7 be released?

@violetagg
Copy link
Member

@martinritz 27.04

@kushagraThapar
Copy link

kushagraThapar commented Apr 29, 2020

@violetagg - Unfortunately, I am still seeing this issue with latest reactor-netty release 0.9.7.RELEASE. It happens randomly but not necessarily on load testing. We have different benchmarks for our SDK (Azure Cosmos DB) including (load read testing, load write testing, load read my writes testing, etc). This issue happens only in some cases.

Anyway, I plan to debug this more in details, but was wondering if you can provide any pointers or log messages I should add somewhere to be able to debug this quickly. Thanks!

FYI, this is our reactor netty http client class : https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java

Here is the complete stack trace :

2020-04-29 12:00:22,295       [reactor-http-nio-4] WARN  reactor.netty.http.client.HttpClientConnect - [id: 0x6e7c02b4, L:/192.168.0.10:51475 - R:...] The connection observed an error
javax.net.ssl.SSLException: SSLEngine closed already
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:848) ~[netty-handler-4.1.49.Final.jar:4.1.49.Final]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoIgnoreThen] :
	reactor.core.publisher.Mono.thenEmpty(Mono.java:4486)
	reactor.netty.ReactorNetty$OutboundThen.<init>(ReactorNetty.java:553)
Error has been observed at the following site(s):
	|_  Mono.thenEmpty ⇢ at reactor.netty.ReactorNetty$OutboundThen.<init>(ReactorNetty.java:553)
	|_ Mono.fromDirect ⇢ at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.lambda$onStateChange$0(HttpClientConnect.java:437)
	|_      Mono.defer ⇢ at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:437)
Stack trace:
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:848) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:811) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:792) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:1931) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.closeOutboundAndChannel(SslHandler.java:1899) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.close(SslHandler.java:743) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.close(CombinedChannelDuplexHandler.java:505) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.ChannelOutboundHandlerAdapter.close(ChannelOutboundHandlerAdapter.java:77) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.close(CombinedChannelDuplexHandler.java:316) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:622) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:606) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:472) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.DefaultChannelPipeline.close(DefaultChannelPipeline.java:957) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannel.close(AbstractChannel.java:232) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at reactor.netty.http.client.HttpClientOperations.onInboundCancel(HttpClientOperations.java:258) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.FluxReceive.unsubscribeReceiver(FluxReceive.java:436) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.FluxReceive.lambda$new$0(FluxReceive.java:80) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.FluxReceive.cancelReceiver(FluxReceive.java:148) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.FluxReceive.cancel(FluxReceive.java:90) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperations.dispose(ChannelOperations.java:160) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.core.publisher.FluxCreate$SinkDisposable.cancel(FluxCreate.java:1039) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.disposeResource(MonoCreate.java:307) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.cancel(MonoCreate.java:296) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.cancel(MonoFlatMapMany.java:123) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.cancel(MonoSingle.java:101) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:137) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:137) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:351) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1035) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:332) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:212) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1095) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:352) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.cancel(MonoCollectList.java:146) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.cancel(FluxTakeUntil.java:138) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.cancel(MonoIgnoreElements.java:96) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.cancel(FluxFlatMap.java:1035) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:332) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.unsubscribeEntry(FluxFlatMap.java:212) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FlatMapTracker.unsubscribe(FluxFlatMap.java:1095) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:352) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.cancel(MonoSingle.java:101) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.cancel(FluxPeekFuseable.java:152) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.cancel(MonoSingle.java:101) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.cancel(MonoPeekTerminal.java:137) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.terminate(Operators.java:1199) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.cancel(MonoFlatMap.java:180) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:167) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.cancel(MonoSingle.java:101) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.cancel(FluxMapFuseable.java:167) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.cancel(FluxRetryWhen.java:151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.SerializedSubscriber.cancel(SerializedSubscriber.java:157) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators.set(Operators.java:1119) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.cancel(FluxMergeSequential.java:593) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancelAll(FluxMergeSequential.java:278) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drainAndCancel(FluxMergeSequential.java:269) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.cancel(FluxMergeSequential.java:263) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drainLoop(Operators.java:2182) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.drain(Operators.java:2151) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.cancel(Operators.java:1963) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.cancel(FluxFlatMap.java:351) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeOrdered$MergeOrderedInnerSubscriber.cancel(FluxMergeOrdered.java:407) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeOrdered$MergeOrderedMainProducer.cancel(FluxMergeOrdered.java:196) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.dispose(FluxWindow.java:231) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.cancel(FluxWindow.java:224) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:802) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:689) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:986) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1737) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.UnicastProcessor.checkTerminated(UnicastProcessor.java:349) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.UnicastProcessor.drainRegular(UnicastProcessor.java:239) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.UnicastProcessor.drain(UnicastProcessor.java:324) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.UnicastProcessor.onComplete(UnicastProcessor.java:429) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxWindow$WindowExactSubscriber.onNext(FluxWindow.java:175) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeOrdered$MergeOrderedMainProducer.drain(FluxMergeOrdered.java:312) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeOrdered$MergeOrderedInnerSubscriber.onNext(FluxMergeOrdered.java:371) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:535) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:999) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:267) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:225) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onSubscribe(FluxFlatMap.java:979) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8325) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:418) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.drain(FluxMergeSequential.java:425) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.innerNext(FluxMergeSequential.java:297) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMergeSequential$MergeSequentialInner.onNext(FluxMergeSequential.java:563) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:162) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1756) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:270) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1756) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:909) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:1013) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:191) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:81) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onComplete(FluxTakeUntil.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onNext(FluxTakeUntil.java:92) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:79) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:909) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:1013) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1989) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1756) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:178) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1755) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:121) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:366) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:367) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:423) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:607) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314) [netty-handler-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) [netty-codec-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) [netty-codec-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) [netty-codec-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.49.Final.jar:4.1.49.Final]
	at java.lang.Thread.run(Thread.java:834) [?:?]

@violetagg
Copy link
Member

@kushagraThapar Here the exception is a bit different the inbound is not closed but canceled.

at reactor.netty.http.client.HttpClientOperations.onInboundCancel

Do you use operators as timeout, next, first, zip?
These operators cancel the subscription.
For example issue with Mono.zip - #1013 (comment)

@kushagraThapar
Copy link

kushagraThapar commented Apr 30, 2020

@violetagg - Yes indeed, we do use these operators at a lot of places in our code (next, first, delayElements, etc). In fact I have also seen NotYetConnectedException sometimes in our code.
We also use Mono.empty() all over the place.

What different can we do to avoid these two issues (NotYetConnectedException and SSLEngineClosedAlready exception)

@kushagraThapar
Copy link

So I tried getting rid of some of these operators .. but I still seeing the issue, I am wondering if there is a way to get to know which operator in the chain exactly called the cancel operation on the http client / or underlying channel ?

@violetagg
Copy link
Member

@kushagraThapar
Copy link

@violetagg - sure will do, thanks.

@kushagraThapar
Copy link

kushagraThapar commented May 14, 2020

@violetagg - I did some digging, and to start with, the problem is occurring because of these two operators we have:
Flux.mergeSequential and Flux.mergeOrdered
If I take them out then I don't see this issue at all.

But these operators don't mention explicitly how they cancel the subscription or dispose elements that are not required. That being said, I am still not sure how the cancellation happens.

My logic is related to some sort of pagination using queries, and I have something like below for pagination logic:

private static <T extends Resource> Flux<FeedResponse<T>> getPaginatedQueryResultAsObservable(
            String continuationToken,
            BiFunction<String, Integer, RxDocumentServiceRequest> createRequestFunc,
            Function<RxDocumentServiceRequest, Mono<FeedResponse<T>>> executeFunc, Class<T> resourceType,
            int top, int maxPageSize, boolean isChangeFeed) {

        return Flux.defer(() -> {
            Flux<Flux<FeedResponse<T>>> generate = Flux.generate(() ->
                    new Fetcher<>(createRequestFunc, executeFunc, continuationToken, isChangeFeed, top, maxPageSize),
                    (tFetcher, sink) -> {
                        if (tFetcher.shouldFetchMore()) {
                            Mono<FeedResponse<T>> nextPage = tFetcher.nextPage();
                            sink.next(nextPage.flux());
                        } else {
                            logger.debug("No more results");
                            sink.complete();
                        }
                        return tFetcher;
            });

            return generate.flatMapSequential(feedResponseFlux -> feedResponseFlux, 1);
        });
    }

Do you think the problem could be related to this pagination logic, which in turn might be causing any issue with the above two operators ?

But I am thinking what will happen in the case where an end user cancels the operation and there are still remaining elements in the buffer (or to process) - Wouldn't the end user see this exception in that case?

@violetagg
Copy link
Member

@simonbasle @bsideup Can you tell us how cancelation happens in case of Flux.mergeSequential and Flux.mergeOrdered

@simonbasle
Copy link
Member

for mergeOrdered:

  • each source is internally subscribed by the operator
  • upon cancellation of the main, all these inner subscriptions are also cancelled
  • the values array that holds the next value from each source until all sources have produced (and so the order can be decided) is nulled out, but the values themselves are not discarded (as per doOnDiscard)

for mergeSequential:

  • each source is pretty similarly subscribed to (except they're passer to the operator as a single FluxArray<Publisher> source)
  • in case of cancellation, some sources may never have been subscribed (these are simply ignored). the other sources that have so far been subscribed to are cancelled.
  • once again, the queue contents are not passed to a discard hook, but should not be emitted downstream anymore at that point

@kushagraThapar
Copy link

@violetagg So based on these operators which cancel the upstream operations, I want to know what can be done in the reactor netty client to handle these cancellations ?

Since this issue will be faced by all end users who cancel the operation manually and there are still remaining elements in the buffer (or to process) - Wouldn't the end user see this exception in that case?

Because in cases like mergeOrdered and mergeSequential - there is really no other alternative reactor operators to use.

@kushagraThapar
Copy link

@violetagg @simonbasle - I think the issue might be related to takeUntil . I found one place where we use it to wait for a number of documents from backend, and as soon as we get that many documents from backend, we proceed.

But I saw that takeUntil and takeWhile both cancel the operation which might be the root cause here. What other alternatives can we use ?

@simonbasle
Copy link
Member

There is no real alternative to cancelling a subscription when you're only interested in a subset of the elements. Even if you could request the exact desired amount (which is not the case with takeUntil), you'd still have to cancel in order to free the source rather than let it hang waiting for more request...

@kushagraThapar
Copy link

Thanks @simonbasle for the explanation, @violetagg - is there a way to handle this scenario in reactor-netty http / tcp client?
As this is causing the cancellation to happen, which in turn then causes the SSLException issue.
The errors generated in this case is also causing OutOfMemoryError for some of our customers.

@violetagg
Copy link
Member

@kushagraThapar Create a new issue with a reproducible example in order to see why a cancellation causes OOM. Let's discuss there also the SSLException when you cancel (close the connection) and what's your expectation for this use case.

@kushagraThapar
Copy link

@violetagg - I have opened new issue with repro code : #1165

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests