Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LEAK: ByteBuf.release() was not called before it's garbage-collected. #1130

Closed
shikouchen opened this issue Jun 1, 2020 · 19 comments
Closed
Labels
for/springframework This belongs to the Spring Framework project

Comments

@shikouchen
Copy link

shikouchen commented Jun 1, 2020

Expected Behavior

The problem occurred when I used Webclient to retrive data from the remote server by http requests.

I checked memory usage of the system, it showed the memory usage was around 50%.
The remote server which received requests from other clients showed fine as well. So it might not be problems on the remote server side.

I use same spring-boot and netty version for two microservices. But only one side with much more requests has this error. Therefore, it might be related to the number of requests?

Actual Behavior

time:2020-05-31T21:27:25.319+09:00	level:ERROR	thread:reactor-http-epoll-6	logger:io.netty.util.ResourceLeakDetector	message:LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:91)
org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:75)
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99)
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2082)
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1956)
reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)\n	reactor.core.publisher.FluxJust.subscribe(FluxJust.java:70)
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:435)
reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:514)
reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onStateChange(PooledConnectionProvider.java:523)
reactor.netty.resources.PooledConnectionProvider$PooledConnection.onStateChange(PooledConnectionProvider.java:429)
reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:60)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:412)
io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)
io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:211)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)
io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216)	io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895)
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620)	io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653)	
io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)

Steps to Reproduce

the example code written in Kotlin I used is as follows, But I can not reproduce the error. It just occasionally occurred.

fun example() {
    WebClient.builder()
        .baseUrl("http://localhost:8080/graphql")
        .build()
        .post()
        .header(HttpHeaders.CONTENT_TYPE, "application/graphql")
        .bodyValue(query)
        .retrieve()
        .onStatus(HttpStatus::isError) {
            it.bodyToMono<String>().map { response ->
                ExternalServerError(platform, "Failed. body: $response")
            }
        }
        .bodyToMono<String>()
        .timeout(Duration.ofSeconds(3))
        .awaitFirstOrNull()
        ?: throw ExternalServerError("remote server error")
}

Possible Solution

Your Environment

  • openjdk version: 1.8.0_222
  • spring-boot version: 2.2.6
  • reactor-netty: 0.9.6.RELEASE
  • OS: CentOs
@shikouchen shikouchen added status/need-triage A new issue that still need to be evaluated as a whole type/bug A general bug labels Jun 1, 2020
@violetagg
Copy link
Member

@shikouchen There were fixes in Reactor Core (https://github.com/reactor/reactor-core/releases/tag/v3.3.5.RELEASE) related to leaks. Can you upgrade to Spring Boot 2.2.7 and test again?

@violetagg violetagg added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed status/need-triage A new issue that still need to be evaluated as a whole labels Jun 1, 2020
@lesongjia
Copy link

lesongjia commented Jun 5, 2020

@violetagg I have the same issue. And I tried upgraded to Spring Boot 2.2.7 with openjdk11. The error still occurs and our JVM metrics show the direct buffer keeps growing over time.
image

@violetagg
Copy link
Member

@lesongjia Will you be able to provide a reproducible example?

@lesongjia
Copy link

lesongjia commented Jun 5, 2020

@violetagg We see this in our testing env and prod. With Spring Boot 2.2.7 and also with advanced leak detection option. We have find more useful logs:

Recent access records: 
#1:
	Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	Hint: 'ReadTimeoutHandler' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
	io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
	io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:107)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:332)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(TEpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)" 

This log aline with the problem could happen on the timeout() operator.

@violetagg
Copy link
Member

@lesongjia Can you also enable Reactor Netty logs?

either logging.level.reactor.netty=debug or at least logging.level.reactor.netty.channel.FluxReceive=debug

@lesongjia
Copy link

@violetagg After I turn on the debug log for reactor netty, I saw the logs on one thread and one channel when this issue happened. The logs shows within 10 min, there were multiple request handled and I do think our 1 min timeout was triggered within this. And what is interested to me is the last few logs before the leak log says

2020-06-08 14:35:51.965 EDT
LEAK: ByteBuf.release() was not called before it's garbage-collected. 

2020-06-08 14:31:50.291 EDT
[id: 0x0f9d0570, L:/10.36.23.97:48846 ! R:www.googleapis.com/108.177.111.95:443] onStateChange(... [disconnecting]

2020-06-08 14:31:50.291 EDT
[id: 0x0f9d0570, L:/10.36.23.97:48846 ! R:www.googleapis.com/108.177.111.95:443] Non Removed handler: WriteTimeoutHandler, context: ChannelHandlerContext

2020-06-08 14:31:50.290 EDT
[id: 0x0f9d0570, L:/10.36.23.97:48846 ! R:www.googleapis.com/108.177.111.95:443] Non Removed handler: ReadTimeoutHandler, context: ChannelHandlerContext

Also notice, there is a 4 mins gap between the onStateChange and LEAK: .. Not sure if this indicates anything. On the LEAK log below, hint #3 mentioned ReadTimoutHandler will handle the message from this point. I wonder if it has problem. We do config our WebClient to use it.

LEAK Log:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	Hint: [id: 0x0f9d0570, L:/10.36.23.97:48846 - R:www.googleapis.com/108.177.111.95:443] Buffered ByteBufHolder in Inbound Flux Queue
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:349)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352)
	reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:629)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#2:
	Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
#3:
	Hint: 'ReadTimeoutHandler' will handle the message from this point.
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:86)
	io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:25)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)
Created at:
	io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
	io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
	io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:107)
	io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:332)
	io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1518)
	io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1267)
	io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1314)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.base/java.lang.Thread.run(Thread.java:834)

Our WebClient config

    @Bean
    @Primary
    WebClient webClient(final WebClient.Builder builder) {
        final Duration timeOutValue = Duration.ofSeconds(60);
        final TcpClient tcpClient = TcpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) timeOutValue.toMillis())
                .doOnConnected(connection -> connection.addHandlerLast(new ReadTimeoutHandler((int) timeOutValue.getSeconds()))
                        .addHandlerLast(new WriteTimeoutHandler((int) timeOutValue.getSeconds())));
        return builder
                .clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient)))
                .build();
    }

@violetagg
Copy link
Member

violetagg commented Jun 9, 2020

@lesongjia Is it possible to share also how you use WebClient? From the stack above I can see the incoming message was received but not consumed.

@lesongjia
Copy link

lesongjia commented Jun 9, 2020

@violetagg Sure, we have 2 usages, they are similar.

//first
private <T> Mono<T> makeRawRequest(final String uri, final Credential credential, Class<T> entityClass, Object... uriVariables) {
    return webClient.get()
            .uri(uri, uriVariables)
            .header(HttpHeaders.AUTHORIZATION, credential.getHeaderPrefix() + " " + credential.getAuthorization())
            .retrieve()
            .bodyToMono(entityClass)
            .onErrorMap(WebClientResponseException.Forbidden.class,
                    e -> e.getResponseBodyAsString().contains("Not Authorized to access this resource/api")
                            ? new NotAuthorizedToAccessApiException("Not authorized to access api. This may be due to trying to retrieve an external entity", e) : e)
            .onErrorMap(WebClientResponseException.Unauthorized.class,
                    e -> e.getResponseBodyAsString().contains("Access denied. You are not authorized to read records")
                            ? new NotAuthorizedToAccessApiException("Integration does not have access to api", e) : e)
            .onErrorMap(WebClientResponseException.Forbidden.class,
                    e -> e.getResponseBodyAsString().contains("dailyLimitExceeded")
                            ? new DailyLimitException(e) : e)
            .timeout(Duration.ofMinutes(1))
            .retryWhen(Retry.anyOf(TimeoutException.class)
                    .doOnRetry(context -> log.warn("request timed out. Retrying attempt: {}, uri: {}, variables: {}", context.iteration(), uri, uriVariables))
                    .retryMax(3))
            .doOnError(e -> !(e instanceof WebClientResponseException), e -> OverflowSafeLoggingUtil.warn("Non client error making api call - uri: {} variables: {}", uri, uriVariables, e))
            .doOnError(WebClientResponseException.class, e -> log.warn("Client error making api call- uri: {} variables: {}, body: {}", uri, uriVariables, e.getResponseBodyAsString()));
}


//second
private <T> Mono<T> makeRawRequest(final String uri, final Credential credential, final Class<T> entityClass, final Object... uriVariables) {
    return webClient.get()
            .uri(uri, uriVariables)
            .header(HttpHeaders.AUTHORIZATION, credential.getHeaderPrefix() + " " + credential.getAuthorization())
            .retrieve()
            .bodyToMono(entityClass)
            .doOnError(this::isUnExpectedWebClientResponseException,
                    e -> log.warn("Error - uri: {}, uri variables {}, body: {}", uri, uriVariables,
                            ((WebClientResponseException) e).getResponseBodyAsString()))
            .retryWhen(apiRetryStrategy);
}

@lesongjia
Copy link

We do use onErrorMap later subscribe the Mono makeRawRequest returned. For example:

private <T> Mono<T> makeRequest(final String uri, final Integration integration, CredentialType credentialType, @Nullable String userId, Class<T> entityClass, Object... uriVariables) {
    return Mono.defer(
            () -> credentialClient.retrieveCredential(credentialType, integration.getTenantId(), integration.getIntegrationId(), userId))
            .flatMap(credential -> makeRawRequest(uri, credential, entityClass, uriVariables))
            .doOnError(WebClientResponseException.Unauthorized.class,
                    e -> credentialClient.clearCache(credentialType, integration.getTenantId(), integration.getIntegrationId(), userId))
            .doOnError(IOException.class, e -> LogWithContext.log(integration, log, LogLevel.WARN, "Received IOException on Google Api Call: " + e.getMessage(), null))
            .retryWhen(retryStrategy)
            .onErrorMap(RetryExhaustedException.class, Throwable::getCause);
}

@rstoyanchev
Copy link
Contributor

I have the same issue. And I tried upgraded to Spring Boot 2.2.7 with openjdk11

@lesongjia did you just discover this or was it working okay fine up to some previous version? It would help to have either something to run or to narrow down if possible when the issue might have appeared (if it was a regression).

@lesongjia
Copy link

@rstoyanchev We just discovered this with an older version of Spring Boot 2.1.14. And we were able to recreate locally. After we upgrade to Spring Boot 2.2.7, the local issue disappeared but when release to production, the issue still exists.

Meanwhile, we also discovered another problem on our end with Mono.zip vs Mono.zipDelayError. So we are testing it out right now and I will follow up here.

A quick question here that we want to get explained, what is the different on the WebClient configuring a ReadTimeoutHandler vs using Mono.timeout(). As the code example I shared, we did both, anything stands up to you that we did something wrong with using the WebClient?

@violetagg
Copy link
Member

A quick question here that we want to get explained, what is the different on the WebClient configuring a ReadTimeoutHandler vs using Mono.timeout(). As the code example I shared, we did both, anything stands up to you that we did something wrong with using the WebClient?

When you use Mono.timeout this is related for the whole execution - this means from connecting to the remote peer to receiving the result. While ReadTimeoutHandler is triggered only if you cannot read from the remote peer for a particular time

@lesongjia
Copy link

Thanks for the explanation @violetagg. Our fix looks good in non-prod env. Once all clear in our production env. I will follow up here with a summary.

@lesongjia
Copy link

After we fixed Mono.zip with Mono.zipDelayError referring to #1013, the direct buffer leak on JVM is fixed. However, the LEAK logs are still sometimes happened in prod env but not in non-prod. Plus, I believe our underlying problem on the service that could cause by a memory leak stop to happen as well. Therefore, we are lower the priority of the issue.

One point to help others with the same issue, improper use of operators could cause system resource leak esp. when canceling subscriptions. So be careful with such operators.

One suggestion here (not sure if it is possible) is to alert when improper use of an operator that actually may cause a leak or unexpected cancel of a subscription.

Thanks for the help @violetagg!

@violetagg
Copy link
Member

Will you be able to test with the recommended components here spring-projects/spring-framework#25216 (comment)

@violetagg
Copy link
Member

Additional fix in Spring Framework was made spring-projects/spring-framework@21d0696

I'm closing this one for now, if the issue appears even with the new Spring Framework version we can reopen it.

@violetagg violetagg added for/springframework This belongs to the Spring Framework project and removed for/user-attention This issue needs user attention (feedback, rework, etc...) type/bug A general bug labels Jun 24, 2020
@vmadd03
Copy link

vmadd03 commented Aug 7, 2020

Hi ... we saw the same "LEAK: ByteBuf.release() was not called before it's garbage-collected" issue...can you guys tell me to fix this....we are using same reactor netty and making webclient calls

@violetagg
Copy link
Member

@vmadd03 Please create a new issue, specify the components versions and if possible provide a reproducible example.

@violetagg
Copy link
Member

Issue #1254 was created for the comment above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/springframework This belongs to the Spring Framework project
Projects
None yet
Development

No branches or pull requests

5 participants