Skip to content

Commit

Permalink
Fix RSocketRequester API for requests without payload
Browse files Browse the repository at this point in the history
This commit makes it possible to send requests without
requiring to call data(Mono.empty()).

It clarifies the semantics by renaming ResponseSpec
to InteractionSpec in order to leverage more directly RSocket
vocabulary and make things more consistent now that RequestSpec
extends it.

It also improves the Consumer variant of metadata in order to prevent
calling data in the callback (only metadata is now accessible).

Closes spring-projectsgh-23649
  • Loading branch information
sdeleuze committed Sep 18, 2019
1 parent 1dfe304 commit cf33744
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 82 deletions.
Expand Up @@ -135,24 +135,24 @@ public RequestSpec metadata(Object metadata, MimeType mimeType) {
}

@Override
public RequestSpec metadata(Consumer<RequestSpec> configurer) {
public RequestSpec metadata(Consumer<MetadataSpec<?>> configurer) {
configurer.accept(this);
return this;
}

@Override
public ResponseSpec data(Object data) {
public InteractionSpec data(Object data) {
Assert.notNull(data, "'data' must not be null");
return toResponseSpec(data, ResolvableType.NONE);
return toInteractionSpec(data, ResolvableType.NONE);
}

@Override
public ResponseSpec data(Object producer, Class<?> elementClass) {
public InteractionSpec data(Object producer, Class<?> elementClass) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementClass, "'elementClass' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
return toInteractionSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
}

@Nullable
Expand All @@ -161,15 +161,15 @@ private ReactiveAdapter getAdapter(Class<?> aClass) {
}

@Override
public ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
public InteractionSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null");
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
return toInteractionSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
}

private ResponseSpec toResponseSpec(Object input, ResolvableType elementType) {
private InteractionSpec toInteractionSpec(Object input, ResolvableType elementType) {
ReactiveAdapter adapter = getAdapter(input.getClass());
Publisher<?> publisher;
if (input instanceof Publisher) {
Expand All @@ -184,12 +184,12 @@ else if (adapter != null) {
.map(this::firstPayload)
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
return new DefaultInteractionSpec(payloadMono);
}

if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
Mono<Payload> payloadMono = Mono.when(publisher).then(emptyPayload());
return new DefaultResponseSpec(payloadMono);
return new DefaultInteractionSpec(payloadMono);
}

Encoder<?> encoder = elementType != ResolvableType.NONE && !Object.class.equals(elementType.resolve()) ?
Expand All @@ -200,7 +200,7 @@ else if (adapter != null) {
.map(value -> encodeData(value, elementType, encoder))
.map(this::firstPayload)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadMono);
return new DefaultInteractionSpec(payloadMono);
}

Flux<Payload> payloadFlux = Flux.from(publisher)
Expand All @@ -217,7 +217,7 @@ else if (adapter != null) {
})
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
return new DefaultResponseSpec(payloadFlux);
return new DefaultInteractionSpec(payloadFlux);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -245,23 +245,47 @@ private Payload firstPayload(DataBuffer data) {
private Mono<Payload> emptyPayload() {
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
}
}

@Override
public Mono<Void> send() {
return data(Mono.empty()).send();
}

@Override
public <T> Mono<T> retrieveMono(Class<T> dataType) {
return data(Mono.empty()).retrieveMono(dataType);
}

@Override
public <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef) {
return data(Mono.empty()).retrieveMono(dataTypeRef);
}

@Override
public <T> Flux<T> retrieveFlux(Class<T> dataType) {
return data(Mono.empty()).retrieveFlux(dataType);
}

@Override
public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {
return data(Mono.empty()).retrieveFlux(dataTypeRef);
}
}

private class DefaultResponseSpec implements ResponseSpec {
private class DefaultInteractionSpec implements InteractionSpec {

@Nullable
private final Mono<Payload> payloadMono;

@Nullable
private final Flux<Payload> payloadFlux;

DefaultResponseSpec(Mono<Payload> payloadMono) {
DefaultInteractionSpec(Mono<Payload> payloadMono) {
this.payloadMono = payloadMono;
this.payloadFlux = null;
}

DefaultResponseSpec(Flux<Payload> payloadFlux) {
DefaultInteractionSpec(Flux<Payload> payloadFlux) {
this.payloadMono = null;
this.payloadFlux = payloadFlux;
}
Expand Down
Expand Up @@ -231,22 +231,10 @@ interface Builder {

}


/**
* Spec for providing input data for an RSocket request.
* Spec for providing input data for an RSocket request and declare the expected interaction model.
*/
interface RequestSpec {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
RequestSpec metadata(Object metadata, MimeType mimeType);
interface RequestSpec extends MetadataSpec<RequestSpec>, InteractionSpec {

/**
* Append additional metadata entries through a {@code Consumer}.
Expand All @@ -255,7 +243,7 @@ interface RequestSpec {
* @param configurer the configurer to apply
* @throws IllegalArgumentException if not using composite metadata.
*/
RequestSpec metadata(Consumer<RequestSpec> configurer);
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);

/**
* Provide payload data for the request. This can be one of:
Expand All @@ -268,7 +256,7 @@ interface RequestSpec {
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
ResponseSpec data(Object data);
InteractionSpec data(Object data);

/**
* Variant of {@link #data(Object)} that also accepts a hint for the
Expand All @@ -280,7 +268,7 @@ interface RequestSpec {
* @param elementClass the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, Class<?> elementClass);
InteractionSpec data(Object producer, Class<?> elementClass);

/**
* Variant of {@link #data(Object, Class)} for when the type hint has
Expand All @@ -291,22 +279,40 @@ interface RequestSpec {
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
InteractionSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}

/**
* Spec for specifying the metadata.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}

/**
* Spect to declare the type of request and expected response.
* Spec for declaring the expected interaction model.
*/
interface ResponseSpec {
interface InteractionSpec {

/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
* Perform a {@link RSocket#fireAndForget fireAndForget} interaction.
*/
Mono<Void> send();

/**
* Perform a {@link RSocket#requestResponse requestResponse} exchange.
* Perform a {@link RSocket#requestResponse requestResponse} interaction.
* <p>If the return type is {@code Mono<Void>}, the {@code Mono} will
* complete after all data is consumed.
* <p><strong>Note:</strong> This method will raise an error if
Expand All @@ -326,7 +332,7 @@ interface ResponseSpec {

/**
* Perform an {@link RSocket#requestStream requestStream} or a
* {@link RSocket#requestChannel requestChannel} exchange depending on
* {@link RSocket#requestChannel requestChannel} interaction depending on
* whether the request input is single or multi-payload.
* <p>If the return type is {@code Flux<Void>}, the {@code Flux} will
* complete after all data is consumed.
Expand Down
Expand Up @@ -66,7 +66,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.InteractionSpec =
data(producer, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -78,7 +78,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer:
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.InteractionSpec =
data(publisher, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -90,58 +90,58 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.ResponseSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.InteractionSpec =
data(flow, object : ParameterizedTypeReference<T>() {})


/**
* Coroutines variant of [RSocketRequester.ResponseSpec.send].
* Coroutines variant of [RSocketRequester.InteractionSpec.send].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend fun RSocketRequester.ResponseSpec.sendAndAwait() {
suspend fun RSocketRequester.InteractionSpec.sendAndAwait() {
send().awaitFirstOrNull()
}

/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveMono].
* Coroutines variant of [RSocketRequester.InteractionSpec.retrieveMono].
*
* @author Sebastien Deleuze
* @since 5.2
*/
suspend inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveAndAwait(): T =
suspend inline fun <reified T : Any> RSocketRequester.InteractionSpec.retrieveAndAwait(): T =
retrieveMono(object : ParameterizedTypeReference<T>() {}).awaitSingle()

/**
* Coroutines variant of [RSocketRequester.ResponseSpec.retrieveFlux].
* Coroutines variant of [RSocketRequester.InteractionSpec.retrieveFlux].
*
* @author Sebastien Deleuze
* @since 5.2
*/
@ExperimentalCoroutinesApi
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlow(): Flow<T> =
inline fun <reified T : Any> RSocketRequester.InteractionSpec.retrieveFlow(): Flow<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {}).asFlow()

/**
* Extension for [RSocketRequester.ResponseSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* Extension for [RSocketRequester.InteractionSpec.retrieveMono] providing a `retrieveMono<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveMono(): Mono<T> =
inline fun <reified T : Any> RSocketRequester.InteractionSpec.retrieveMono(): Mono<T> =
retrieveMono(object : ParameterizedTypeReference<T>() {})


/**
* Extension for [RSocketRequester.ResponseSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* Extension for [RSocketRequester.InteractionSpec.retrieveFlux] providing a `retrieveFlux<Foo>()`
* variant leveraging Kotlin reified type parameters. This extension is not subject to type
* erasure and retains actual generic type arguments.
*
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.ResponseSpec.retrieveFlux(): Flux<T> =
inline fun <reified T : Any> RSocketRequester.InteractionSpec.retrieveFlux(): Flux<T> =
retrieveFlux(object : ParameterizedTypeReference<T>() {})
Expand Up @@ -37,8 +37,8 @@

import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester.InteractionSpec;
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
import org.springframework.messaging.rsocket.RSocketRequester.ResponseSpec;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void sendMono() {
testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), "");
}

private void testSendMono(Function<RequestSpec, ResponseSpec> mapper, String expectedValue) {
private void testSendMono(Function<RequestSpec, InteractionSpec> mapper, String expectedValue) {
mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget");
Expand All @@ -111,7 +111,7 @@ public void sendFlux() {
testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values);
}

private void testSendFlux(Function<RequestSpec, ResponseSpec> mapper, String... expectedValues) {
private void testSendFlux(Function<RequestSpec, InteractionSpec> mapper, String... expectedValues) {
this.rsocket.reset();
mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));

Expand Down

0 comments on commit cf33744

Please sign in to comment.