Skip to content

Commit

Permalink
Fix empty payload handling in RSocketRequester
Browse files Browse the repository at this point in the history
Closes gh-24088
  • Loading branch information
rstoyanchev committed Nov 27, 2019
1 parent 5a552f1 commit 26d800c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
Expand Up @@ -57,7 +57,7 @@ final class DefaultRSocketRequester implements RSocketRequester {

private final RSocketStrategies strategies;

private final DataBuffer emptyDataBuffer;
private final Mono<DataBuffer> emptyBufferMono;


DefaultRSocketRequester(
Expand All @@ -73,7 +73,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
this.dataMimeType = dataMimeType;
this.metadataMimeType = metadataMimeType;
this.strategies = strategies;
this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]);
this.emptyBufferMono = Mono.just(this.strategies.dataBufferFactory().wrap(new byte[0]));
}


Expand Down Expand Up @@ -193,7 +193,7 @@ else if (adapter != null) {
}

if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
this.payloadMono = firstPayload(Mono.when(publisher).then(Mono.just(emptyDataBuffer)));
this.payloadMono = Mono.when(publisher).then(firstPayload(emptyBufferMono));
this.payloadFlux = null;
return;
}
Expand All @@ -204,7 +204,7 @@ else if (adapter != null) {
if (adapter != null && !adapter.isMultiValue()) {
Mono<DataBuffer> data = Mono.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.defaultIfEmpty(emptyDataBuffer);
.switchIfEmpty(emptyBufferMono);
this.payloadMono = firstPayload(data);
this.payloadFlux = null;
return;
Expand All @@ -213,7 +213,7 @@ else if (adapter != null) {
this.payloadMono = null;
this.payloadFlux = Flux.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.defaultIfEmpty(emptyDataBuffer)
.switchIfEmpty(emptyBufferMono)
.switchOnFirst((signal, inner) -> {
DataBuffer data = signal.get();
if (data != null) {
Expand Down Expand Up @@ -250,12 +250,7 @@ private Mono<Payload> firstPayload(Mono<DataBuffer> encodedData) {

@Override
public Mono<Void> send() {
return getPayloadMonoRequired().flatMap(rsocket::fireAndForget);
}

private Mono<Payload> getPayloadMonoRequired() {
Assert.state(this.payloadFlux == null, "No RSocket interaction model for Flux request to Mono response.");
return this.payloadMono != null ? this.payloadMono : firstPayload(Mono.just(emptyDataBuffer));
return getPayloadMono().flatMap(rsocket::fireAndForget);
}

@Override
Expand All @@ -268,19 +263,9 @@ public <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef) {
return retrieveMono(ResolvableType.forType(dataTypeRef));
}

@Override
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
return retrieveFlux(ResolvableType.forClass(dataType));
}

@Override
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
return retrieveFlux(ResolvableType.forType(dataTypeRef));
}

@SuppressWarnings("unchecked")
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
Mono<Payload> payloadMono = getPayloadMonoRequired().flatMap(rsocket::requestResponse);
Mono<Payload> payloadMono = getPayloadMono().flatMap(rsocket::requestResponse);

if (isVoid(elementType)) {
return (Mono<T>) payloadMono.then();
Expand All @@ -291,11 +276,22 @@ private <T> Mono<T> retrieveMono(ResolvableType elementType) {
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}

@Override
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
return retrieveFlux(ResolvableType.forClass(dataType));
}

@Override
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
return retrieveFlux(ResolvableType.forType(dataTypeRef));
}

@SuppressWarnings("unchecked")
private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
Flux<Payload> payloadFlux = this.payloadMono != null ?
this.payloadMono.flatMapMany(rsocket::requestStream) :
rsocket.requestChannel(this.payloadFlux);

Flux<Payload> payloadFlux = (this.payloadFlux != null ?
rsocket.requestChannel(this.payloadFlux) :
getPayloadMono().flatMapMany(rsocket::requestStream));

if (isVoid(elementType)) {
return payloadFlux.thenMany(Flux.empty());
Expand All @@ -306,6 +302,11 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}

private Mono<Payload> getPayloadMono() {
Assert.state(this.payloadFlux == null, "No RSocket interaction with Flux request and Mono response.");
return this.payloadMono != null ? this.payloadMono : firstPayload(emptyBufferMono);
}

private DataBuffer retainDataAndReleasePayload(Payload payload) {
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
}
Expand Down
Expand Up @@ -145,15 +145,6 @@ public void sendWithoutData() {
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
}

@Test
public void sendMonoWithoutData() {
this.requester.route("toA").retrieveMono(String.class).block(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
}

@Test
public void testSendWithAsyncMetadata() {

Expand Down Expand Up @@ -205,6 +196,15 @@ public void retrieveMonoVoid() {
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
}

@Test
public void retrieveMonoWithoutData() {
this.requester.route("toA").retrieveMono(String.class).block(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestResponse");
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
}

@Test
public void retrieveFlux() {
String[] values = new String[] {"bodyA", "bodyB", "bodyC"};
Expand All @@ -227,11 +227,20 @@ public void retrieveFluxVoid() {
assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestStream");
}

@Test
public void retrieveFluxWithoutData() {
this.requester.route("toA").retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("requestStream");
assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA");
assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("");
}

@Test
public void fluxToMonoIsRejected() {
assertThatIllegalStateException()
.isThrownBy(() -> this.requester.route("").data(Flux.just("a", "b")).retrieveMono(String.class))
.withMessage("No RSocket interaction model for Flux request to Mono response.");
.withMessage("No RSocket interaction with Flux request and Mono response.");
}

private Payload toPayload(String value) {
Expand Down

0 comments on commit 26d800c

Please sign in to comment.