Skip to content

Commit

Permalink
Fix RSocketRequester API for requests without payload
Browse files Browse the repository at this point in the history
This commit makes it possible to send requests without
requiring to call data(Mono.empty()). It introduces a
dedicated MetadataSpec interface and merge ResponseSpec
into RequestSpec for more flexibility.

Closes gh-23649
  • Loading branch information
sdeleuze committed Sep 18, 2019
1 parent ab58e29 commit 5adc3d6
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 106 deletions.
Expand Up @@ -116,6 +116,12 @@ private class DefaultRequestSpec implements RequestSpec {

private final MetadataEncoder metadataEncoder;

@Nullable
private Mono<Payload> payloadMono = Mono.empty();

@Nullable
private Flux<Payload> payloadFlux = null;


public DefaultRequestSpec(String route, Object... vars) {
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
Expand All @@ -135,24 +141,26 @@ public RequestSpec metadata(Object metadata, MimeType mimeType) {
}

@Override
public RequestSpec metadata(Consumer<RequestSpec> configurer) {
public RequestSpec metadata(Consumer<MetadataSpec<?>> configurer) {
configurer.accept(this);
return this;
}

@Override
public ResponseSpec data(Object data) {
public RequestSpec data(Object data) {
Assert.notNull(data, "'data' must not be null");
return toResponseSpec(data, ResolvableType.NONE);
createPayload(data, ResolvableType.NONE);
return this;
}

@Override
public ResponseSpec data(Object producer, Class<?> elementClass) {
public RequestSpec data(Object producer, Class<?> elementClass) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementClass, "'elementClass' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
createPayload(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
return this;
}

@Nullable
Expand All @@ -161,15 +169,16 @@ private ReactiveAdapter getAdapter(Class<?> aClass) {
}

@Override
public ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
public RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
createPayload(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
return this;
}

private ResponseSpec toResponseSpec(Object input, ResolvableType elementType) {
private void createPayload(Object input, ResolvableType elementType) {
ReactiveAdapter adapter = getAdapter(input.getClass());
Publisher<?> publisher;
if (input instanceof Publisher) {
Expand All @@ -179,31 +188,35 @@ else if (adapter != null) {
publisher = adapter.toPublisher(input);
}
else {
Mono<Payload> payloadMono = Mono
this.payloadMono = Mono
.fromCallable(() -> encodeData(input, ResolvableType.forInstance(input), null))
.map(this::firstPayload)
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadFlux = null;
return;
}

if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
Mono<Payload> payloadMono = Mono.when(publisher).then(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadMono = Mono.when(publisher).then(emptyPayload());
this.payloadFlux = null;
return;
}

Encoder<?> encoder = elementType != ResolvableType.NONE && !Object.class.equals(elementType.resolve()) ?
strategies.encoder(elementType, dataMimeType) : null;

if (adapter != null && !adapter.isMultiValue()) {
Mono<Payload> payloadMono = Mono.from(publisher)
this.payloadMono = Mono.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.map(this::firstPayload)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
this.payloadFlux = null;
return;
}

Flux<Payload> payloadFlux = Flux.from(publisher)
this.payloadMono = null;
this.payloadFlux = Flux.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.switchOnFirst((signal, inner) -> {
DataBuffer data = signal.get();
Expand All @@ -217,7 +230,6 @@ else if (adapter != null) {
})
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadFlux);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -245,26 +257,6 @@ private Payload firstPayload(DataBuffer data) {
private Mono<Payload> emptyPayload() {
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
}
}


private class DefaultResponseSpec implements ResponseSpec {

@Nullable
private final Mono<Payload> payloadMono;

@Nullable
private final Flux<Payload> payloadFlux;

DefaultResponseSpec(Mono<Payload> payloadMono) {
this.payloadMono = payloadMono;
this.payloadFlux = null;
}

DefaultResponseSpec(Flux<Payload> payloadFlux) {
this.payloadMono = null;
this.payloadFlux = payloadFlux;
}

@Override
public Mono<Void> send() {
Expand Down Expand Up @@ -325,5 +317,4 @@ private DataBuffer retainDataAndReleasePayload(Payload payload) {
return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory());
}
}

}
Expand Up @@ -231,22 +231,10 @@ interface Builder {

}


/**
* Spec for providing input data for an RSocket request.
* Spec for providing input data for an RSocket request and triggering the exchange.
*/
interface RequestSpec {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
RequestSpec metadata(Object metadata, MimeType mimeType);
interface RequestSpec extends MetadataSpec<RequestSpec> {

/**
* Append additional metadata entries through a {@code Consumer}.
Expand All @@ -255,7 +243,7 @@ interface RequestSpec {
* @param configurer the configurer to apply
* @throws IllegalArgumentException if not using composite metadata.
*/
RequestSpec metadata(Consumer<RequestSpec> configurer);
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);

/**
* Provide payload data for the request. This can be one of:
Expand All @@ -268,7 +256,7 @@ interface RequestSpec {
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
ResponseSpec data(Object data);
RequestSpec data(Object data);

/**
* Variant of {@link #data(Object)} that also accepts a hint for the
Expand All @@ -280,7 +268,7 @@ interface RequestSpec {
* @param elementClass the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, Class<?> elementClass);
RequestSpec data(Object producer, Class<?> elementClass);

/**
* Variant of {@link #data(Object, Class)} for when the type hint has
Expand All @@ -291,14 +279,7 @@ interface RequestSpec {
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}


/**
* Spect to declare the type of request and expected response.
*/
interface ResponseSpec {
RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
Expand Down Expand Up @@ -343,4 +324,23 @@ interface ResponseSpec {
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}

/**
* Spec for specifying the metadata.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}

}
Expand Up @@ -66,7 +66,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RequestSpec =
data(producer, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -78,7 +78,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer:
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RequestSpec =
data(publisher, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -90,58 +90,58 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RequestSpec =
data(flow, object : ParameterizedTypeReference<T>() {})


/**
* Coroutines variant of [RSocketRequester.ResponseSpec.send].
* Coroutines variant of [RSocketRequester.RequestSpec.send].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun RSocketRequester.ResponseSpec.sendAndAwait() {
suspend fun RSocketRequester.RequestSpec.sendAndAwait() {
send().awaitFirstOrNull()
}

/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveMono].
* Coroutines variant of [RSocketRequester.RequestSpec.retrieveMono].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
suspend inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveAndAwait(): T =
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingle()

/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveFlux].
* Coroutines variant of [RSocketRequester.RequestSpec.retrieveFlux].
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlow(): Flow<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveFlow(): Flow<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {}).asFlow()

/**
* Extension for [RSocketRequester.ResponseSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* Extension for [RSocketRequester.RequestSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveMono(): Mono<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveMono(): Mono<T> =
retrieveMono(object : ParameterizedTypeReference<T>() {})


/**
* Extension for [RSocketRequester.ResponseSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* Extension for [RSocketRequester.RequestSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlux(): Flux<T> =
inline fun <reified T : Any> RSocketRequester.RequestSpec.retrieveFlux(): Flux<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {})
Expand Up @@ -38,7 +38,6 @@
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -87,7 +86,7 @@ public void sendMono() {
testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), "");
}

private void testSendMono(Function<RequestSpec, ResponseSpec> mapper, String expectedValue) {
private void testSendMono(Function<RequestSpec, RequestSpec> mapper, String expectedValue) {
mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget");
Expand All @@ -111,7 +110,7 @@ public void sendFlux() {
testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values);
}

private void testSendFlux(Function<RequestSpec, ResponseSpec> mapper, String... expectedValues) {
private void testSendFlux(Function<RequestSpec, RequestSpec> mapper, String... expectedValues) {
this.rsocket.reset();
mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));

Expand Down

0 comments on commit 5adc3d6

Please sign in to comment.