Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
poutsma committed Jul 6, 2023
1 parent ce75a4a commit 8a09a51
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest

private final HttpClient httpClient;

private final Executor executor;

private final HttpMethod method;

private final URI uri;
Expand All @@ -55,13 +57,12 @@ final class ReactorNettyClientRequest extends AbstractStreamingClientHttpRequest

private final Duration readTimeout;

private final Executor executor = new SimpleAsyncTaskExecutor();


public ReactorNettyClientRequest(HttpClient httpClient, URI uri, HttpMethod method, Duration exchangeTimeout,
Duration readTimeout) {
public ReactorNettyClientRequest(HttpClient httpClient, Executor executor, URI uri, HttpMethod method,
Duration exchangeTimeout, Duration readTimeout) {

this.httpClient = httpClient;
this.executor = executor;
this.method = method;
this.uri = uri;
this.exchangeTimeout = exchangeTimeout;
Expand Down Expand Up @@ -120,11 +121,10 @@ else if (cause instanceof IOException ioEx) {


private Publisher<ByteBuf> bodyToPublisher(Body body, ByteBufAllocator allocator) {
Flow.Publisher<ByteBuf> flow = OutputStreamPublisher.create(
return FlowAdapters.toPublisher(OutputStreamPublisher.create(
outputStream -> body.writeTo(StreamUtils.nonClosing(outputStream)),
new ByteBufMapper(allocator),
this.executor);
return FlowAdapters.toPublisher(flow);
this.executor));
}


Expand All @@ -147,7 +147,6 @@ public ByteBuf map(int b) {

@Override
public ByteBuf map(byte[] b, int off, int len) {
System.out.println("Allocating " + len);
ByteBuf byteBuf = this.allocator.buffer(len);
byteBuf.writeBytes(b, off, len);
return byteBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.Executor;

import io.netty.channel.ChannelOption;
import reactor.netty.http.client.HttpClient;

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;

Expand All @@ -36,17 +38,44 @@ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactor

private final HttpClient httpClient;


private Duration exchangeTimeout = Duration.ofSeconds(5);

private Duration readTimeout = Duration.ofSeconds(10);

private Executor executor;


/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* with a default {@link HttpClient} that has compression enabled.
*/
public ReactorNettyClientRequestFactory() {
this(HttpClient.create().compress(true));
}

/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* based on the given {@link HttpClient}.
* @param httpClient the client to base on
*/
public ReactorNettyClientRequestFactory(HttpClient httpClient) {
this(httpClient, new SimpleAsyncTaskExecutor());
}

/**
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
* based on the given {@link HttpClient} and {@code Executor}. The executor
* is used to bridge blocking {@code OutputStream} write operations to a
* reactive stream.
* @param httpClient the client to base on
* @param executor the executor to execute blocking write calls
*/
public ReactorNettyClientRequestFactory(HttpClient httpClient, Executor executor) {
Assert.notNull(httpClient, "HttpClient must not be null");
Assert.notNull(executor, "Executor must not be null");
this.httpClient = httpClient;
this.executor = executor;
}

/**
Expand Down Expand Up @@ -116,6 +145,6 @@ public void setExchangeTimeout(Duration exchangeTimeout) {

@Override
public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
return new ReactorNettyClientRequest(this.httpClient, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
return new ReactorNettyClientRequest(this.httpClient, this.executor, uri, httpMethod, this.exchangeTimeout, this.readTimeout);
}
}

0 comments on commit 8a09a51

Please sign in to comment.