Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebClient using default reactor-netty client leaks netty ByteBufs on timeout #22384

Closed
danielra opened this issue Feb 7, 2019 · 20 comments
Closed
Labels
for: external-project Needs a fix in external project

Comments

@danielra
Copy link

danielra commented Feb 7, 2019

Affects: 5.1.4.RELEASE

I see buffer leak reports from Netty's ResourceLeakDetector when a Mono.timeout cancels a subscription to the Mono returned from bodyToMono. I've created a demo project with a test case which seems to reliably reproduce the issue. A separate test case is included which uses the reactor-netty HttpClient directly instead of using it through the WebClient wrapper - and I have not seen any leak reports in this case with the current version of reactor-netty (v0.8.4.RELEASE).

Demo project with mentioned test cases can be found here:
https://github.com/danielra/buffer-leak-repro-reactor-netty
The specific test case which reproduces the leak can be seen here:
https://github.com/danielra/buffer-leak-repro-reactor-netty/blob/master/src/test/java/com/example/demo/DemoApplicationTests.java#L59

The tests can be run via ./gradlew clean test --debug. Please note that the tests "pass" unconditionally, but buffer leak reports can be viewed in the console output.

Here is an example of the leak report log messages I have observed from this test case:
webclient_buffer_leak_example_log.txt


@rstoyanchev
Copy link
Contributor

Thanks for the report. This has been tagged as Reactor Netty issue. Please follow reactor/reactor-netty#603.

@rstoyanchev rstoyanchev added for: external-project Needs a fix in external project and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels Feb 8, 2019
@danielra
Copy link
Author

Thanks. I will follow that issue as you suggest. Interesting that the issue here is actually with reactor-netty, when the problem doesn't seem to reproduce when using their HttpClient directly - only when using reactor-netty indirectly via WebClient. Thanks for looking into it and forwarding appropriately!

@rstoyanchev rstoyanchev reopened this Feb 28, 2019
@rstoyanchev
Copy link
Contributor

rstoyanchev commented Mar 11, 2019

Interesting that the issue here is actually with reactor-netty, when the problem doesn't seem to reproduce when using their HttpClient directly - only when using reactor-netty indirectly via WebClient.

Are you sure about this? I see the exact opposite. When I run the tests with @Ignore on reproBufferLeakOnTimeout I see the leaks, or vice versa if i put @Ignore on noReproBufferLeakOnTimeout the leak reports go away.

This is also supported by looking at the source code. The Reactor Netty ByteBufFlux#aggregate is accumulating and retains without provisions for cancellation. By contrast the StringDecoder does use onDiscard.

So as far as I can see we're okay in the Spring Framework. /cc @violetagg

@violetagg
Copy link
Member

@rstoyanchev I'm not able to reproduce the issue.

Only these exceptions appear in the log files

  1. Handle "IllegalReferenceCountException: refCnt: 0, decrement: 1" when ByteBufFlux#aggregate reactor/reactor-netty#636

io.netty.util.IllegalReferenceCountException: refCnt: 0
	at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1441)
	at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1373)
	at io.netty.buffer.PooledHeapByteBuf.nioBuffer(PooledHeapByteBuf.java:298)
	at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1224)
	at org.springframework.core.io.buffer.NettyDataBuffer.asByteBuffer(NettyDataBuffer.java:266)
	at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:207)
	at org.springframework.core.codec.StringDecoder.decodeDataBuffer(StringDecoder.java:59)
	at org.springframework.core.codec.AbstractDataBufferDecoder.lambda$decodeToMono$1(AbstractDataBufferDecoder.java:68)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
	at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:384)
	at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
	at io.netty.channel.CombinedChannelDuplexHandler.chan
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at java.base/java.lang.Thread.run(Thread.java:834)

I'm using Netty 4.1.34.Final and Reactor Netty 0.8.6.BUILD-SNAPSHOT

@danielra
Copy link
Author

I just took another look at this. I've upgraded the spring-boot version from 2.1.2.RELEASE to 2.1.3.RELEASE, and thus spring-framework 5.1.5.RELEASE and reactor-netty 0.8.5.RELEASE.

It looks like I may have just gotten "lucky" in my initial runs of my test cases and only happened to see leak reports when running the one that uses the WebClient. I do still see leak messages emitted when running both test cases (but only sometimes).

I ran the following 20 times (running just the test case which uses a WebClient which uses reactor-netty):

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient

and here is the output from the 6 of those times where leak reports were emitted:
webclient-test-log-with-leak_1.txt
webclient-test-log-with-leak_2.txt
webclient-test-log-with-leak_3.txt
webclient-test-log-with-leak_4.txt
webclient-test-log-with-leak_5.txt
webclient-test-log-with-leak_6.txt

I ran the following 20 times (running just the test case which uses reactor-netty directly):

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty

and here is the output from the 4 of those times where leak reports were emitted:
reactor-netty-test-log-with-leak_1.txt
reactor-netty-test-log-with-leak_2.txt
reactor-netty-test-log-with-leak_3.txt
reactor-netty-test-log-with-leak_4.txt

There is some variety to the leak reports. One general observation is that in the direct reactor-netty cases, the leak reports were always accompanied by IllegalReferenceCountExceptions like the one noted above. In contrast, none of the WebClient test runs which emitted leak reports contained this exception (though I did see this exception on a few runs which did not emit leak reports). I don't know whether this detail is a coincident or not.

I've pushed a few minor changes to the test cases along with the version update noted above.

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Mar 14, 2019

I'm using Netty 4.1.34.Final and Reactor Netty 0.8.6.BUILD-SNAPSHOT

@danielra did you notice the versions mentioned? Can you also try with:

ext['reactor-bom.version'] = 'Californium-BUILD-SNAPSHOT'

@danielra
Copy link
Author

Thank you for calling that out! I did miss updating that version. I have done so now (and pushed the change), and on my first attempt with the webClient test, I observed this leak report emitted via:

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient

webclient-test-log-with-leak-updated-reactor_1.txt
It took a few runs, but I also observed a leak report in the test case that uses the reactor-netty client directly via:

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty

reactor-netty-test-log-with-leak-updated-reactor_1.txt

FWIW, the pattern of correlation with IllegalReferenceCountExceptions occurring when leak reports are emitted by the direct reactorNetty test (but not in the webClient test) persisted.

@danielra
Copy link
Author

Sorry, that was with the updated reactor-bom but Netty 4.1.33.Final instead of Netty 4.1.34.Final. I've updated the netty version as well and captured the following two leak reports:

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient

webclient-test-log-with-leak-updated-reactor-and-netty_1.txt

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty

reactor-netty-test-log-with-leak-updated-reactor-and-netty_1.txt

@violetagg
Copy link
Member

@danielra Can you try now 0.8.6.BUILD-SNAPSHOT I committed the fix for reactor/reactor-netty#636

@violetagg
Copy link
Member

Created an issue for the exception that I observed when running the test with WebClient #22594

@danielra
Copy link
Author

Thanks!

I just tried a few runs of the test cases with the 0.8.6.BUILD-SNAPSHOT version of reactor-netty, and pushed that dependency change to the demo project. Here is example output from each test case from runs which emitted leak reports:

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.webClient

webclient-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_1.txt

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty

reactor-netty-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_1.txt

@violetagg
Copy link
Member

violetagg commented Mar 15, 2019

@danielra According to the log you are still using 0.8.6.BUILD-SNAPSHOT without the fix above

08:57:04.793 [DEBUG] [TestEventLogger]          io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:604)
08:57:04.793 [DEBUG] [TestEventLogger]          io.netty.buffer.CompositeByteBuf$Component.transferTo(CompositeByteBuf.java:1794)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:464)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:266)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:222)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:165)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.WrappedCompositeByteBuf.addComponent(WrappedCompositeByteBuf.java:493)
08:57:04.794 [DEBUG] [TestEventLogger]          io.netty.buffer.AdvancedLeakAwareCompositeByteBuf.addComponent(AdvancedLeakAwareCompositeByteBuf.java:885)
08:57:04.794 [DEBUG] [TestEventLogger]          reactor.netty.ByteBufFlux.lambda$aggregate$7(ByteBufFlux.java:259)

reactor/reactor-netty@32be1c7

@violetagg
Copy link
Member

@rstoyanchev I see the stack below, did you have something like this before?

2019-03-15 11:08:33.670 ERROR 35536 --- [r-http-kqueue-4] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:94)
	io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:208)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:321)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
	reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:538)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:158)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:362)
	io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:181)
	io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:259)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.lang.Thread.run(Thread.java:748)
#2:
	org.springframework.http.client.reactive.ReactorClientHttpResponse.lambda$getBody$2(ReactorClientHttpResponse.java:77)
	reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
	reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:205)
	reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:321)
	reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:319)
	reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:538)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:158)

@danielra
Copy link
Author

Oh, sorry. I thought specifying the version as 0.8.6.BUILD-SNAPSHOT would be enough to pickup your recent change.

I've now instead cloned the reactor-netty project and included the version built from the head of master (which does include the linked change) via an includeBuild entry in my demo project's settings.gradle file. Running the test case in this way, I still saw similar looking output after a few tries:

./gradlew clean test --debug --tests com.example.demo.DemoApplicationTests.reactorNetty

reactor-netty-test-log-with-leak-updated-reactor-and-netty-and-reactor-netty_2.txt

@rstoyanchev
Copy link
Contributor

@violetagg with 0.8.6 snapshots I see this from webClient tests:

 #1:
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
...
 #2:
Hint: Caller of readInbound() will handle the message from this point.
...

 #3:
Hint: 'DefaultChannelPipeline$TailContext#0' will handle the message from this point.
...
 #4
io.netty.buffer.AdvancedLeakAwareByteBuf.ensureWritable(AdvancedLeakAwareByteBuf.java:136)

And this with reactorNetty test:

 #1
Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
...

as well as this:

 #1:
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
...
 #2:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
 #3:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
 #4:
io.netty.buffer.AdvancedLeakAwareByteBuf.getByte(AdvancedLeakAwareByteBuf.java:154)
...
 #5
io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:106)
...
 #6:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
 #7:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
 #8:
io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670)
...
 #9:
io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedByte(AdvancedLeakAwareByteBuf.java:160)
...
 #10:
Hint: 'reactor.left.httpCodec' will handle the message from this point.
...
 #11:
Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
...

As for something like:

#2:
	org.springframework.http.client.reactive.ReactorClientHttpResponse.lambda$getBody$2(ReactorClientHttpResponse.java:77)

The only way I can explain in the given scenario, i.e. decodeToMono(String.class), is some issue here which would imply something potentially wrong with the doOnDiscard hook. I could try to simulate and prove by feeding DatabufferUtils#join or StringDecoder with pooled buffers via Flux#interval. Are you seeing this consistently? I have not yet. And do you no longer see any other leaks?

@rstoyanchev
Copy link
Contributor

The only way I can explain in the given scenario, i.e. decodeToMono(String.class), is some issue here which would imply something potentially wrong with the doOnDiscard hook.

I have tried and failed to reproduce the issue with DataBufferUtils only using a test like this:

@Test
public void joinWithCancellation() {

  Mono<DataBuffer> bufferFlux = DataBufferUtils.join(
      Flux.interval(Duration.ofMillis(40)).take(50).map(aLong -> stringBuffer("foo" + aLong)));

  StepVerifier.create(bufferFlux)
      .thenAwait(Duration.ofMillis(ThreadLocalRandom.current().nextInt(81, 116)))
      .thenCancel()
      .verify();
}

This is in DataBufferUtilsTests so at the end of the test there is a verifyAllocations check.

@danielra since you have a reproducible test case, I suggest creating a new issue under https://github.com/reactor/reactor-netty.

@violetagg
Copy link
Member

Here is the issue for Reactor Netty reactor/reactor-netty#700

@jkjome
Copy link

jkjome commented Apr 23, 2020

I am using the latest version of Spring Boot (2.2.6) and even though this issue is supposed to have been fixed, I'm seeing the same memory leak being reported: "LEAK: ByteBuf.release() was not called before it's garbage-collected". Is there some place in the code below where I am supposed to be, somehow, calling ByteBuf.release(), even though it seems like Spring and Netty internal classes should be taking care of this? Or have I, otherwise, written the code improperly and should be doing it some other, better, way?

Edit: I should mention, I found the following two "Hint" messages in the stacktrace....

Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
Hint: 'reactor.left.httpCodec' will handle the message from this point.

public StreamingResponseBody getSomeClientCertProtectedPdf(final HttpServletResponse response) {
    final String urlStr = "https://somedomain.com/somepathto/somepdf.pdf";
    final HttpClient httpClient = HttpClient.create().secure(spec -> {
        final KeyManagerFactory keyManagerFactory = ....;
        final TrustManagerFactory trustManagerFactory = ....;
        spec.sslContext(SslContextBuilder.forClient()
                        .keyManager(keyManagerFactory)
                        .trustManager(trustManagerFactory)
                        .build());
    });
    final WebClient client = WebClient.builder()
        .clientConnector(new ReactorClientHttpConnector(httpClient))
        .baseUrl(urlStr).build();
    final Flux<DataBuffer> dataBufferFlux = client.get()
            .accept(MediaType.APPLICATION_PDF)
            .retrieve()
            .bodyToFlux(DataBuffer.class);
    return out -> DataBufferUtils.write(dataBufferFlux, response.getOutputStream()).blockLast(Duration.ofSeconds(20));
}

@rstoyanchev
Copy link
Contributor

rstoyanchev commented Apr 24, 2020

@jkjom it may sound similar but in your scenario you're getting raw data from the WebClient and that means you now have the responsibility to release buffers. You then use DataBufferUtils#write and its Javadoc says:

Does not close the output stream when the flux is terminated, and does not release the data buffers in the source. If releasing is required, then subscribe to the returned Flux with a releaseConsumer().

You can follow that recommendation or use bodyToFlux(byte[].class) or bodyToFlux(ByteBuffer.class) which will release the underlying ByteBuf's and give you a copy.

@jkjome
Copy link

jkjome commented Apr 24, 2020

@rstoyanchev Thanks for your help! I finally found something that worked to get rid of ByteBuf.release() memory leak messages....

return out -> DataBufferUtils.write(dataBufferFlux, out).doOnNext(DataBufferUtils.releaseConsumer()).blockLast(Duration.ofSeconds(20));

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: external-project Needs a fix in external project
Projects
None yet
Development

No branches or pull requests

5 participants