Skip to content

Commit

Permalink
Ensure Reactor Context injected with HttpClient#mapConnect is ava…
Browse files Browse the repository at this point in the history
…ilable on send request/receive response (#2595)
  • Loading branch information
violetagg committed Dec 1, 2022
1 parent 6f4612f commit 2ed18f4
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 5 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,12 +93,14 @@ public Mono<HttpClientResponse> response() {
@Override
public <V> Flux<V> response(BiFunction<? super HttpClientResponse, ? super ByteBufFlux, ? extends Publisher<V>> receiver) {
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp.receive()))
.doFinally(s -> discard(resp)));
.doFinally(s -> discard(resp))
.contextWrite(resp.currentContextView()));
}

@Override
public <V> Flux<V> responseConnection(BiFunction<? super HttpClientResponse, ? super Connection, ? extends Publisher<V>> receiver) {
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp)));
return _connect().flatMapMany(resp -> Flux.from(receiver.apply(resp, resp))
.contextWrite(resp.currentContextView()));
}

@Override
Expand All @@ -117,7 +119,8 @@ public ByteBufFlux responseContent() {
@Override
public <V> Mono<V> responseSingle(BiFunction<? super HttpClientResponse, ? super ByteBufMono, ? extends Mono<V>> receiver) {
return _connect().flatMap(resp -> receiver.apply(resp, resp.receive().aggregate())
.doFinally(s -> discard(resp)));
.doFinally(s -> discard(resp))
.contextWrite(resp.currentContextView()));
}

// RequestSender methods
Expand Down
Expand Up @@ -95,6 +95,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -1167,7 +1169,7 @@ void doOnError() {
}

@Test
void withConnector() {
void withConnector_1() {
disposableServer = createServer()
.handle((req, resp) ->
resp.sendString(Mono.just(req.requestHeaders()
Expand All @@ -1194,6 +1196,66 @@ void withConnector() {
.verifyComplete();
}

@ParameterizedTest
@MethodSource("dataWithConnector_2")
void withConnector_2(boolean withConnector, String expectation) {
disposableServer =
createServer().handle((req, resp) -> resp.sendString(Mono.just(req.requestHeaders().get("test"))))
.bindNow();

HttpClient client = createHttpClientForContextWithPort();
if (withConnector) {
client = client.mapConnect(c -> c.contextWrite(Context.of("test", "Second")));
}

HttpClient.ResponseReceiver<?> responseReceiver =
client.post()
.uri("/")
.send((req, out) -> Mono.deferContextual(ctx -> {
req.requestHeaders().set("test", ctx.getOrDefault("test", "Fail"));
return Mono.empty();
}));

doWithConnector_2(
responseReceiver.responseConnection((res, conn) -> Mono.deferContextual(ctx ->
conn.inbound()
.receive()
.aggregate()
.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);

doWithConnector_2(
responseReceiver.response((res, bytes) -> Mono.deferContextual(ctx ->
bytes.aggregate()
.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);

doWithConnector_2(
responseReceiver.responseSingle((res, bytes) -> Mono.deferContextual(ctx ->
bytes.asString()
.flatMap(s -> Mono.just(s + ctx.getOrDefault("test", "Fail")))))
.contextWrite(Context.of("test", "First")),
expectation);
}

static Object[][] dataWithConnector_2() {
return new Object[][]{
{true, "SecondSecond"},
{false, "FirstFirst"}
};
}

private void doWithConnector_2(Publisher<String> content, String expectation) {
StepVerifier.create(content)
.expectNext(expectation)
.expectComplete()
.verify(Duration.ofSeconds(5));
}

@Test
void testPreferContentLengthWhenPost() {
disposableServer =
Expand Down

0 comments on commit 2ed18f4

Please sign in to comment.