Skip to content

Commit

Permalink
Separate step for retrieve in RSocketRequester
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Nov 25, 2019
1 parent b234c77 commit 6ed1b58
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ interface Builder {
}

/**
* Spec for providing input data for an RSocket request and triggering the exchange.
* Spec to declare the input for an RSocket request.
*/
interface RequestSpec extends MetadataSpec<RequestSpec> {
interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec {

/**
* Append additional metadata entries through a {@code Consumer}.
Expand All @@ -262,7 +262,7 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
RequestSpec data(Object data);
RetrieveSpec data(Object data);

/**
* Variant of {@link #data(Object)} that also accepts a hint for the
Expand All @@ -274,7 +274,7 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
* @param elementClass the type of values to be produced
* @return spec to declare the expected response
*/
RequestSpec data(Object producer, Class<?> elementClass);
RetrieveSpec data(Object producer, Class<?> elementClass);

/**
* Variant of {@link #data(Object, Class)} for when the type hint has
Expand All @@ -285,7 +285,38 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
RequestSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
RetrieveSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}


/**
* Spec for providing additional composite metadata entries.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* The metadata value be a concrete value or any producer of a single
* value that can be adapted to a {@link Publisher} via
* {@link ReactiveAdapterRegistry}.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}


/**
* Spec to declare the expected output for an RSocket request.
* @since 5.2.2
*/
interface RetrieveSpec {

/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
Expand Down Expand Up @@ -330,26 +361,4 @@ interface RequestSpec extends MetadataSpec<RequestSpec> {
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}

/**
* Spec for specifying the metadata.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {

/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* The metadata value be a concrete value or any producer of a single
* value that can be adapted to a {@link Publisher} via
* {@link ReactiveAdapterRegistry}.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RequestSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RetrieveSpec =
data(producer, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -77,7 +77,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(producer:
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RequestSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher: Publisher<T>): RSocketRequester.RetrieveSpec =
data(publisher, object : ParameterizedTypeReference<T>() {})

/**
Expand All @@ -89,7 +89,7 @@ inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(publisher
* @author Sebastien Deleuze
* @since 5.2
*/
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RequestSpec =
inline fun <reified T : Any> RSocketRequester.RequestSpec.dataWithType(flow: Flow<T>): RSocketRequester.RetrieveSpec =
data(flow, object : ParameterizedTypeReference<T>() {})


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec;
import org.springframework.messaging.rsocket.RSocketRequester.RetrieveSpec;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

Expand Down Expand Up @@ -90,7 +91,7 @@ public void sendMono() {
testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), "");
}

private void testSendMono(Function<RequestSpec, RequestSpec> mapper, String expectedValue) {
private void testSendMono(Function<RequestSpec, RetrieveSpec> mapper, String expectedValue) {
mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5));

assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget");
Expand All @@ -114,7 +115,7 @@ public void sendFlux() {
testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values);
}

private void testSendFlux(Function<RequestSpec, RequestSpec> mapper, String... expectedValues) {
private void testSendFlux(Function<RequestSpec, RetrieveSpec> mapper, String... expectedValues) {
this.rsocket.reset();
mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));

Expand Down

0 comments on commit 6ed1b58

Please sign in to comment.