From 6ed1b5835b84bab0fda48956cdad143080c063ae Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 25 Nov 2019 11:47:41 +0000 Subject: [PATCH] Separate step for retrieve in RSocketRequester Closes gh-24073 --- .../messaging/rsocket/RSocketRequester.java | 63 +++++++++++-------- .../rsocket/RSocketRequesterExtensions.kt | 6 +- .../rsocket/DefaultRSocketRequesterTests.java | 5 +- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index d2981ef41b76..320b45f3510b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -238,9 +238,9 @@ interface Builder { } /** - * Spec for providing input data for an RSocket request and triggering the exchange. + * Spec to declare the input for an RSocket request. */ - interface RequestSpec extends MetadataSpec { + interface RequestSpec extends MetadataSpec, RetrieveSpec { /** * Append additional metadata entries through a {@code Consumer}. @@ -262,7 +262,7 @@ interface RequestSpec extends MetadataSpec { * @param data the Object value for the payload data * @return spec to declare the expected response */ - RequestSpec data(Object data); + RetrieveSpec data(Object data); /** * Variant of {@link #data(Object)} that also accepts a hint for the @@ -274,7 +274,7 @@ interface RequestSpec extends MetadataSpec { * @param elementClass the type of values to be produced * @return spec to declare the expected response */ - RequestSpec data(Object producer, Class elementClass); + RetrieveSpec data(Object producer, Class elementClass); /** * Variant of {@link #data(Object, Class)} for when the type hint has @@ -285,7 +285,38 @@ interface RequestSpec extends MetadataSpec { * @param elementTypeRef the type of values to be produced * @return spec to declare the expected response */ - RequestSpec data(Object producer, ParameterizedTypeReference elementTypeRef); + RetrieveSpec data(Object producer, ParameterizedTypeReference elementTypeRef); + } + + + /** + * Spec for providing additional composite metadata entries. + * + * @param a self reference to the spec type + */ + interface MetadataSpec> { + + /** + * Use this to append additional metadata entries when using composite + * metadata. An {@link IllegalArgumentException} is raised if this + * method is used when not using composite metadata. + * The metadata value be a concrete value or any producer of a single + * value that can be adapted to a {@link Publisher} via + * {@link ReactiveAdapterRegistry}. + * @param metadata an Object to be encoded with a suitable + * {@link org.springframework.core.codec.Encoder Encoder}, or a + * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer} + * @param mimeType the mime type that describes the metadata + */ + S metadata(Object metadata, MimeType mimeType); + } + + + /** + * Spec to declare the expected output for an RSocket request. + * @since 5.2.2 + */ + interface RetrieveSpec { /** * Perform a {@link RSocket#fireAndForget fireAndForget}. @@ -330,26 +361,4 @@ interface RequestSpec extends MetadataSpec { Flux retrieveFlux(ParameterizedTypeReference dataTypeRef); } - /** - * Spec for specifying the metadata. - * - * @param a self reference to the spec type - */ - interface MetadataSpec> { - - /** - * Use this to append additional metadata entries when using composite - * metadata. An {@link IllegalArgumentException} is raised if this - * method is used when not using composite metadata. - * The metadata value be a concrete value or any producer of a single - * value that can be adapted to a {@link Publisher} via - * {@link ReactiveAdapterRegistry}. - * @param metadata an Object to be encoded with a suitable - * {@link org.springframework.core.codec.Encoder Encoder}, or a - * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer} - * @param mimeType the mime type that describes the metadata - */ - S metadata(Object metadata, MimeType mimeType); - } - } diff --git a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt index a51a69e55252..63029eb6110e 100644 --- a/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt +++ b/spring-messaging/src/main/kotlin/org/springframework/messaging/rsocket/RSocketRequesterExtensions.kt @@ -65,7 +65,7 @@ suspend fun RSocketRequester.Builder.connectWebSocketAndAwait(uri: URI): RSocket * @author Sebastien Deleuze * @since 5.2 */ -inline fun RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RequestSpec = +inline fun RSocketRequester.RequestSpec.dataWithType(producer: Any): RSocketRequester.RetrieveSpec = data(producer, object : ParameterizedTypeReference() {}) /** @@ -77,7 +77,7 @@ inline fun RSocketRequester.RequestSpec.dataWithType(producer: * @author Sebastien Deleuze * @since 5.2 */ -inline fun RSocketRequester.RequestSpec.dataWithType(publisher: Publisher): RSocketRequester.RequestSpec = +inline fun RSocketRequester.RequestSpec.dataWithType(publisher: Publisher): RSocketRequester.RetrieveSpec = data(publisher, object : ParameterizedTypeReference() {}) /** @@ -89,7 +89,7 @@ inline fun RSocketRequester.RequestSpec.dataWithType(publisher * @author Sebastien Deleuze * @since 5.2 */ -inline fun RSocketRequester.RequestSpec.dataWithType(flow: Flow): RSocketRequester.RequestSpec = +inline fun RSocketRequester.RequestSpec.dataWithType(flow: Flow): RSocketRequester.RetrieveSpec = data(flow, object : ParameterizedTypeReference() {}) diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java index 532e1d76b31e..7392d901eeb3 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java @@ -40,6 +40,7 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.rsocket.RSocketRequester.RequestSpec; +import org.springframework.messaging.rsocket.RSocketRequester.RetrieveSpec; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -90,7 +91,7 @@ public void sendMono() { testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), ""); } - private void testSendMono(Function mapper, String expectedValue) { + private void testSendMono(Function mapper, String expectedValue) { mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5)); assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget"); @@ -114,7 +115,7 @@ public void sendFlux() { testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values); } - private void testSendFlux(Function mapper, String... expectedValues) { + private void testSendFlux(Function mapper, String... expectedValues) { this.rsocket.reset(); mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5));