Skip to content

Commit

Permalink
Merge #2595 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Dec 1, 2022
2 parents f1ff064 + 20ffb8f commit 40809fa
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
Expand Up @@ -95,12 +95,14 @@ public Mono<HttpClientResponse> response() {
@Override
public <V> Flux<V> response(BiFunction<? super HttpClientResponse, ? super BufferFlux, ? extends Publisher<V>> receiver) {
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp.receive()))
.doFinally(s -> discard(resp)));
.doFinally(s -> discard(resp))
.contextWrite(resp.currentContextView()));
}

@Override
public <V> Flux<V> responseConnection(BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher<V>> receiver) {
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp)));
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp))
.contextWrite(resp.currentContextView()));
}

@Override
Expand All @@ -119,7 +121,8 @@ public BufferFlux responseContent() {
@Override
public <V> Mono<V> responseSingle(BiFunction<? super HttpClientResponse, ? super BufferMono, ? extends Mono<V>> receiver) {
return _connect().flatMap(resp -> receiver.apply(resp, resp.receive().aggregate())
.doFinally(s -> discard(resp)));
.doFinally(s -> discard(resp))
.contextWrite(resp.currentContextView()));
}

// RequestSender methods
Expand Down
Expand Up @@ -94,6 +94,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -1166,7 +1168,7 @@ void doOnError() {
}

@Test
void withConnector() {
void withConnector_1() {
disposableServer = createServer()
.handle((req, resp) ->
resp.sendString(Mono.just(getHeader(req.requestHeaders(), "test",
Expand All @@ -1193,6 +1195,67 @@ void withConnector() {
.verifyComplete();
}

@ParameterizedTest
@MethodSource("dataWithConnector_2")
void withConnector_2(boolean withConnector, String expectation) {
disposableServer =
createServer().handle((req, resp) -> resp.sendString(Mono.just(getHeader(req.requestHeaders(), "test",
"header not found from server"))))
.bindNow();

HttpClient client = createHttpClientForContextWithPort();
if (withConnector) {
client = client.mapConnect(c -> c.contextWrite(Context.of("test", "Second")));
}

HttpClient.ResponseReceiver<?> responseReceiver =
client.post()
.uri("/")
.send((req, out) -> Mono.deferContextual(ctx -> {
req.requestHeaders().set("test", ctx.getOrDefault("test", "Fail"));
return Mono.empty();
}));

doWithConnector_2(
responseReceiver.responseConnection((res, conn) -> Mono.deferContextual(ctx ->
conn.inbound()
.receive()
.aggregate()
.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);

doWithConnector_2(
responseReceiver.response((res, bytes) -> Mono.deferContextual(ctx ->
bytes.aggregate()
.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);

doWithConnector_2(
responseReceiver.responseSingle((res, bytes) -> Mono.deferContextual(ctx ->
bytes.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);
}

static Object[][] dataWithConnector_2() {
return new Object[][]{
{true, "SecondSecond"},
{false, "FirstFirst"}
};
}

private void doWithConnector_2(Publisher<String> content, String expectation) {
StepVerifier.create(content)
.expectNext(expectation)
.expectComplete()
.verify(Duration.ofSeconds(5));
}

@Test
void testPreferContentLengthWhenPost() {
disposableServer =
Expand Down

0 comments on commit 40809fa

Please sign in to comment.