Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a How-To for customizing WebClient's TcpClient #17856

Closed
epiwd opened this issue Aug 13, 2019 · 10 comments
Closed

Provide a How-To for customizing WebClient's TcpClient #17856

epiwd opened this issue Aug 13, 2019 · 10 comments
Assignees
Labels
type: documentation A documentation update
Milestone

Comments

@epiwd
Copy link

epiwd commented Aug 13, 2019

Description:
Spring webflux internals break during devtools restart when WebClient bean is customized with a custom client connector.

spring boot version: Tested on 2.1.7.RELEASE and 2.2.0.M5 with a fresh kotlin start.spring.io project:
image

Only added 2 files to what was produced by start.spring.io:
Controller (to show failure case):

@Controller
class TestController(
    private val webClient: WebClient
) {
    @GetMapping("/test")
    fun test(): Mono<String> {
        val result = webClient.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve()
        return result.bodyToMono(String::class.java)
    }
}

Configuration:

@Configuration
class TestConfiguration {
    @Bean
    fun webClient(webClientBuilder: WebClient.Builder): WebClient {
        val tcpClient = TcpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
            .doOnConnected {
                it.addHandlerLast(ReadTimeoutHandler(60))
            }
        val connector = ReactorClientHttpConnector(HttpClient.from(tcpClient))
        return webClientBuilder.clientConnector(connector).build()
    }
}

Replication Steps:

  1. Create the simple application as described above.
  2. Hit localhost:8080/test. The first time, you will see the data come in.
  3. Change the url in the controller, and rebuild the project.
  4. Hit localhost:8080/testagain. This time, there will be a netty error:
2019-08-13 11:21:03.925 ERROR 14689 --- [ctor-http-nio-2] a.w.r.e.AbstractErrorWebExceptionHandler : [68419476] 500 Server Error for HTTP GET "/test"

java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Request to GET https://postman-echo.com/get?a=322 [DefaultWebClient]
	|_ checkpoint ⇢ Handler com.test.test.TestController#test() [DispatcherHandler]
	|_ checkpoint ⇢ HTTP GET "/test" [ExceptionHandlingWebHandler]
Stack trace:
		at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:855) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:340) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:333) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:766) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:472) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:69) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:322) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:120) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at reactor.netty.resources.PooledConnectionProvider$PooledConnectionAllocator.lambda$connectChannel$0(PooledConnectionProvider.java:248) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribeWith(Mono.java:4030) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3899) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3835) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.netty.internal.shaded.reactor.pool.SimplePool.drainLoop(SimplePool.java:200) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.internal.shaded.reactor.pool.SimplePool.drain(SimplePool.java:169) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.internal.shaded.reactor.pool.SimplePool.doAcquire(SimplePool.java:129) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.internal.shaded.reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:334) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onSubscribe(PooledConnectionProvider.java:503) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.internal.shaded.reactor.pool.SimplePool$QueueBorrowerMono.subscribe(SimplePool.java:324) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.resources.PooledConnectionProvider.disposableAcquire(PooledConnectionProvider.java:212) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.resources.PooledConnectionProvider.lambda$acquire$2(PooledConnectionProvider.java:166) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.lambda$subscribe$0(HttpClientConnect.java:332) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.resubscribe(FluxRetryPredicate.java:124) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoRetryPredicate.subscribeOrReturn(MonoRetryPredicate.java:51) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:46) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:335) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:240) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1582) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2138) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:285) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:160) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:46) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:149) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2138) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1946) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1820) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.Mono.subscribe(Mono.java:3920) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:56) ~[reactor-core-3.3.0.M3.jar:3.3.0.M3]
		at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:436) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:91) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:161) ~[reactor-netty-0.9.0.M3.jar:0.9.0.M3]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328) ~[netty-codec-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302) ~[netty-codec-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511) ~[netty-transport-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.38.Final.jar:4.1.38.Final]
		at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]

A temporary workaround is to not specify a custom client connector, but it'd be nice to be able to keep so we can hook in handlers, etc.

Thanks in advance!

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Aug 13, 2019
@wilkinsona
Copy link
Member

Here's a single file that reproduces the problem:

package com.example.demo;

import java.net.URI;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;

import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

@SpringBootApplication
public class Gh17856Application {

	public static void main(String[] args) {
		SpringApplication.run(Gh17856Application.class, args);
	}
	
	@Bean
	WebClient webClient(WebClient.Builder webClientBuilder) {
		TcpClient tcpClient = TcpClient.create();
		ClientHttpConnector connector = new ReactorClientHttpConnector(HttpClient.from(tcpClient));
		return webClientBuilder.clientConnector(connector).build();
	}

}

@RestController
class TestController {

	private final WebClient webClient;
	
	TestController(WebClient webClient) {
		this.webClient = webClient;
	}

	@GetMapping("/test")
	Mono<String> test() {
		ResponseSpec result = webClient.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve();
		return result.bodyToMono(String.class);
	}

}

The SingleThreadEventExecutor is rejecting the execute call is doing so as it was shut down when the application context was closed for the DevTools-triggered restart. Closing the context calls ReactorResourceFactory.destroy() which disposes of all the Reactor Netty resources. I've yet to figure out why the disposed executor it apparently still being used by the WebClient that's built following the restart.

@wilkinsona
Copy link
Member

This is looking like a Reactor Netty problem to me. Here's an attempt at reproducing the problem without Boot's involvement:

package com.example.demo;

import java.net.URI;

import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.web.reactive.function.client.WebClient;

import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;

public class StandaloneReproduction {
	
	public static void main(String[] args) {
		perform();
		perform();
	}
	
	private static void perform() {
		ReactorResourceFactory factory = new ReactorResourceFactory();
		factory.afterPropertiesSet();
		WebClient.Builder builder = WebClient.builder();
		TcpClient tcpClient = TcpClient.create();
		WebClient client = builder.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();
		System.out.println(client.method(HttpMethod.GET).uri(URI.create("https://postman-echo.com/get?a=42")).retrieve().bodyToMono(String.class).block());
		factory.destroy();
	}

}

With Reactor Netty 0.8, the second perform() attempt fails with an NPE:

Exception in thread "main" java.lang.NullPointerException
	at reactor.netty.resources.ColocatedEventLoopGroup.register(ColocatedEventLoopGroup.java:69)
	at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:322)
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:159)
	at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:120)
	at io.netty.channel.pool.SimpleChannelPool.connectChannel(SimpleChannelPool.java:263)
	at io.netty.channel.pool.SimpleChannelPool.acquireHealthyFromPoolOrNew(SimpleChannelPool.java:175)
	at io.netty.channel.pool.SimpleChannelPool.notifyHealthCheck(SimpleChannelPool.java:248)
	at io.netty.channel.pool.SimpleChannelPool.doHealthCheck(SimpleChannelPool.java:223)
	at io.netty.channel.pool.SimpleChannelPool.access$100(SimpleChannelPool.java:41)
	at io.netty.channel.pool.SimpleChannelPool$3.run(SimpleChannelPool.java:195)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1494)
		at com.example.demo.StandaloneReproduction.perform(StandaloneReproduction.java:26)
		at com.example.demo.StandaloneReproduction.main(StandaloneReproduction.java:17)

With 0.9, the failure is closer to the one reported above:

Exception in thread "main" java.lang.IllegalStateException: executor not accepting a task
	at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:60)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Request to GET https://postman-echo.com/get?a=42 [DefaultWebClient]
Stack trace:
		at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:60)
		at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:196)
		at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:50)
		at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:184)
		at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:170)
		at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
		at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474)
		at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
		at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
		at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:527)
		at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:98)
		at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
		at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:982)
		at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:505)
		at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:416)
		at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:475)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
		at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1514)
		at com.example.demo.StandaloneReproduction.perform(StandaloneReproduction.java:26)
		at com.example.demo.StandaloneReproduction.main(StandaloneReproduction.java:17)

@violetagg I'm starting to get really out of my depth here. Can you please help me out and take a look?

@violetagg
Copy link
Member

violetagg commented Aug 15, 2019

@wilkinsona

The example is not quite correct.

ReactorResourceFactory factory = new ReactorResourceFactory();
factory.afterPropertiesSet();

The code above creates the global HttpResources like the threads and the connection pool
However this code below

TcpClient tcpClient = TcpClient.create();
WebClient client = builder.clientConnector(new ReactorClientHttpConnector(HttpClient.from(tcpClient))).build();

Specifies

  • Create Tcp client with a connection pool from the global TcpResources and choose whatever you have for the threads. In pure TCP use case this will be the threads from the global TcpResources, but in HTTP use case this will be the threads from the global HttpResources
  • Create Http client from this Tcp client

So at the end you are running with a connection pool from the global TcpResources, but with threads from the global HttpResources

With the code below you destroy global HttpResources but not global TcpResources.

factory.destroy();

There are two possible solutions

Create Tcp client with global HttpResources

TcpClient tcpClient = TcpClient.create(factory.getConnectionProvider()).runOn(factory.getLoopResources());

Or destroy global TcpResources

TcpResources.disposeLoopsAndConnections();

@wilkinsona
Copy link
Member

wilkinsona commented Aug 15, 2019

Thanks very much, @violetagg.

Applying the advice to the original problem and aligning with the recommendation in the reference docs to use a bean to customise the client connector, results in a bean definition like the following:

@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
	TcpClient tcpClient = TcpClient.create(resourceFactory.getConnectionProvider())
		.runOn(resourceFactory.getLoopResources())
		.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
		.doOnConnected((connection) -> connection.addHandlerLast(new ReadTimeoutHandler(60)));
	return new ReactorClientHttpConnector(HttpClient.from(tcpClient));
}

In this particular case a WebClient bean is also required so a second bean is defined for that as shown in the following example:

@Bean
WebClient webClient(WebClient.Builder builder) {
	return builder.build();
}

The documentation already mentions the ReactorResourceFactory and that, by default, it's used for server and client resources. I wonder if we should add something to tie things together a bit more and suggest/recommend injecting the ReactorResourceFactory and using it as shown above when customising the client connector. What do you think, @bclozel?

@wilkinsona wilkinsona added for: team-attention An issue we'd like other members of the team to review type: documentation A documentation update and removed status: waiting-for-triage An issue we've not yet triaged labels Aug 15, 2019
@wilkinsona wilkinsona added this to the 2.1.x milestone Aug 15, 2019
@violetagg
Copy link
Member

violetagg commented Aug 15, 2019

One question: why do you create the HttpClient from a TcpClient and not directly with HttpClient.create()? The last will use by default the connection pool and the threads from the HttpResources and thus you do not need to specify them.

@wilkinsona
Copy link
Member

I'd assumed that it was the only way to configure a connect timeout and to add a read timeout handler. If that's possible via HttpClient then it sounds like it would be a nice additional refinement.

@violetagg
Copy link
Member

you can do it like this

@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
    HttpClient httpClient =
            HttpClient.create()
                      .tcpConfiguration(tcpClient ->
                              tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
                                       .doOnConnected((connection) ->
                                              connection.addHandlerLast(new ReadTimeoutHandler(60))));
    return new ReactorClientHttpConnector(httpClient);
}

@wilkinsona
Copy link
Member

Thanks, @violetagg. I've just realised that one downside to the above is that it's not using the injected resourceFactory. If the ReactorResourceFactory has been customized so that it doesn't use the global resources a problem similar to that originally described in this issue may occur.

@wilkinsona
Copy link
Member

We're going to add a how-to in the reference docs showing how to customise the TCP client while sharing resources between WebClient and server.

@wilkinsona wilkinsona removed the for: team-attention An issue we'd like other members of the team to review label Aug 21, 2019
@wilkinsona wilkinsona changed the title Event executor terminated after devtools restart when custom client connector added to webclient Provide a How-To for customizing WebClient's TcpClient Sep 2, 2019
@wilkinsona wilkinsona self-assigned this Sep 4, 2019
@wilkinsona wilkinsona modified the milestones: 2.1.x, 2.1.8 Sep 4, 2019
@yonhbu
Copy link

yonhbu commented Jun 16, 2021

you can do it like this

@Bean
ClientHttpConnector clientHttpConnector(ReactorResourceFactory resourceFactory) {
    HttpClient httpClient =
            HttpClient.create()
                      .tcpConfiguration(tcpClient ->
                              tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 60000)
                                       .doOnConnected((connection) ->
                                              connection.addHandlerLast(new ReadTimeoutHandler(60))));
    return new ReactorClientHttpConnector(httpClient);
}

How can to do the unit testing and this methodo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: documentation A documentation update
Projects
None yet
Development

No branches or pull requests

5 participants