Skip to content

Commit

Permalink
HttpHandlerConnector uses non-blocking thread
Browse files Browse the repository at this point in the history
Closes gh-23936
  • Loading branch information
rstoyanchev committed Nov 6, 2019
1 parent 627a9be commit 5d2fc2f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Expand Up @@ -25,6 +25,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpCookie;
Expand Down Expand Up @@ -75,6 +76,13 @@ public HttpHandlerConnector(HttpHandler handler) {
public Mono<ClientHttpResponse> connect(HttpMethod httpMethod, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

return Mono.defer(() -> doConnect(httpMethod, uri, requestCallback))
.subscribeOn(Schedulers.parallel());
}

private Mono<ClientHttpResponse> doConnect(
HttpMethod httpMethod, URI uri, Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

MonoProcessor<ClientHttpResponse> result = MonoProcessor.create();

MockClientHttpRequest mockClientRequest = new MockClientHttpRequest(httpMethod, uri);
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
Expand Down Expand Up @@ -51,7 +52,7 @@ public class HttpHandlerConnectorTests {


@Test
public void adaptRequest() throws Exception {
public void adaptRequest() {

TestHttpHandler handler = new TestHttpHandler(response -> {
response.setStatusCode(HttpStatus.OK);
Expand Down Expand Up @@ -79,7 +80,7 @@ public void adaptRequest() throws Exception {
}

@Test
public void adaptResponse() throws Exception {
public void adaptResponse() {

ResponseCookie cookie = ResponseCookie.from("custom-cookie", "c0").build();

Expand All @@ -104,6 +105,22 @@ public void adaptResponse() throws Exception {
assertThat(DataBufferTestUtils.dumpString(buffer, UTF_8)).isEqualTo("Custom body");
}

@Test // gh-23936
public void handlerOnNonBlockingThread() {

TestHttpHandler handler = new TestHttpHandler(response -> {

assertThat(Schedulers.isInNonBlockingThread()).isTrue();

This comment has been minimized.

Copy link
@bsideup

bsideup Nov 6, 2019

👍


response.setStatusCode(HttpStatus.OK);
return response.setComplete();
});

new HttpHandlerConnector(handler)
.connect(HttpMethod.POST, URI.create("/path"), request -> request.writeWith(Mono.empty()))
.block(Duration.ofSeconds(5));
}

private DataBuffer toDataBuffer(String body) {
return new DefaultDataBufferFactory().wrap(body.getBytes(UTF_8));
}
Expand Down

0 comments on commit 5d2fc2f

Please sign in to comment.