From b1224835be7e591fc784c565a7efdb7f09d29d26 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 14 May 2020 13:54:45 +0100 Subject: [PATCH] Add metadataPush support to RSocketRequester Closes gh-24322 --- .../rsocket/DefaultRSocketRequester.java | 7 +- .../messaging/rsocket/RSocketRequester.java | 13 ++- .../FireAndForgetCountingInterceptor.java | 77 ------------- ...RSocketClientToServerIntegrationTests.java | 101 +++++++++++++++++- src/docs/asciidoc/rsocket.adoc | 2 + 5 files changed, 116 insertions(+), 84 deletions(-) delete mode 100644 spring-messaging/src/test/java/org/springframework/messaging/rsocket/FireAndForgetCountingInterceptor.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index be879bbcfbeb..1e543f020923 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -248,6 +248,11 @@ private Mono firstPayload(Mono encodedData) { .doOnDiscard(Payload.class, Payload::release); } + @Override + public Mono sendMetadata() { + return getPayloadMono().flatMap(rsocket::metadataPush); + } + @Override public Mono send() { return getPayloadMono().flatMap(rsocket::fireAndForget); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 78cd8b3f028e..1188b5431a73 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -276,6 +276,12 @@ interface RequestSpec extends MetadataSpec, RetrieveSpec { */ RequestSpec metadata(Consumer> configurer); + /** + * Perform a {@link RSocket#metadataPush(Payload) metadataPush}. + * @since 5.3 + */ + Mono sendMetadata(); + /** * Provide payload data for the request. This can be one of: *
    @@ -344,7 +350,12 @@ interface MetadataSpec> { interface RetrieveSpec { /** - * Perform a {@link RSocket#fireAndForget fireAndForget}. + * Perform a {@link RSocket#fireAndForget fireAndForget} sending the + * provided data and metadata. + * @return a completion that indicates if the payload was sent + * successfully or not. Note, however that is a one-way send and there + * is no indication of whether or how the even was handled on the + * remote end. */ Mono send(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/FireAndForgetCountingInterceptor.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/FireAndForgetCountingInterceptor.java deleted file mode 100644 index b11aac45543c..000000000000 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/FireAndForgetCountingInterceptor.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2002-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.rsocket; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; - -import io.rsocket.AbstractRSocket; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.plugins.RSocketInterceptor; -import io.rsocket.util.RSocketProxy; -import reactor.core.publisher.Mono; - -/** - * Intercept received RSockets and count successfully completed requests seen - * on the server side. This is useful for verifying fire-and-forget - * interactions. - * - * @author Rossen Stoyanchev - */ -class FireAndForgetCountingInterceptor extends AbstractRSocket implements RSocketInterceptor { - - private final List rsockets = new CopyOnWriteArrayList<>(); - - - public int getRSocketCount() { - return this.rsockets.size(); - } - - public int getFireAndForgetCount(int index) { - return this.rsockets.get(index).getFireAndForgetCount(); - } - - - @Override - public RSocket apply(RSocket rsocket) { - CountingDecorator decorator = new CountingDecorator(rsocket); - this.rsockets.add(decorator); - return decorator; - } - - - private static class CountingDecorator extends RSocketProxy { - - private final AtomicInteger fireAndForget = new AtomicInteger(0); - - CountingDecorator(RSocket delegate) { - super(delegate); - } - - public int getFireAndForgetCount() { - return this.fireAndForget.get(); - } - - @Override - public Mono fireAndForget(Payload payload) { - return super.fireAndForget(payload).doOnSuccess(aVoid -> this.fireAndForget.incrementAndGet()); - } - } - -} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index 195f66445661..a4426c6f116d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -17,16 +17,21 @@ package org.springframework.messaging.rsocket; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import io.rsocket.Payload; +import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketServer; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.metadata.WellKnownMimeType; +import io.rsocket.plugins.RSocketInterceptor; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -35,10 +40,13 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; +import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.stereotype.Controller; +import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -51,11 +59,14 @@ */ public class RSocketClientToServerIntegrationTests { + private static final MimeType FOO_MIME_TYPE = MimeTypeUtils.parseMimeType("messaging/x.foo"); + + private static AnnotationConfigApplicationContext context; private static CloseableChannel server; - private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor(); + private static CountingInterceptor interceptor = new CountingInterceptor(); private static RSocketRequester requester; @@ -65,7 +76,7 @@ public class RSocketClientToServerIntegrationTests { public static void setupOnce() { MimeType metadataMimeType = MimeTypeUtils.parseMimeType( - WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString()); + WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()); context = new AnnotationConfigApplicationContext(ServerConfig.class); RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); @@ -105,8 +116,7 @@ public void fireAndForget() { .thenCancel() .verify(Duration.ofSeconds(5)); - assertThat(interceptor.getRSocketCount()).isEqualTo(1); - assertThat(interceptor.getFireAndForgetCount(0)) + assertThat(interceptor.getFireAndForgetCount()) .as("Fire and forget requests did not actually complete handling on the server side") .isEqualTo(3); } @@ -155,6 +165,24 @@ public void echoChannel() { .verify(Duration.ofSeconds(5)); } + @Test + public void metadataPush() { + Flux.just("bar", "baz") + .concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata()) + .blockLast(); + + StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads) + .expectNext("bar") + .expectNext("baz") + .thenAwait(Duration.ofMillis(50)) + .thenCancel() + .verify(Duration.ofSeconds(5)); + + assertThat(interceptor.getMetadataPushCount()) + .as("Metadata pushes did not actually complete handling on the server side") + .isEqualTo(2); + } + @Test public void voidReturnValue() { Mono result = requester.route("void-return-value").data("Hello").retrieveMono(String.class); @@ -199,6 +227,9 @@ static class ServerController { final ReplayProcessor fireForgetPayloads = ReplayProcessor.create(); + final ReplayProcessor metadataPushPayloads = ReplayProcessor.create(); + + @MessageMapping("receive") void receive(String payload) { this.fireForgetPayloads.onNext(payload); @@ -241,6 +272,11 @@ Mono voidReturnValue(String payload) { Mono.error(new IllegalStateException("bad")); } + @ConnectMapping("foo-updates") + public void handleMetadata(@Header("foo") String foo) { + this.metadataPushPayloads.onNext(foo); + } + @MessageExceptionHandler Mono handleException(IllegalArgumentException ex) { return Mono.delay(Duration.ofMillis(10)).map(aLong -> ex.getMessage() + " handled"); @@ -270,8 +306,63 @@ public RSocketMessageHandler messageHandler() { @Bean public RSocketStrategies rsocketStrategies() { - return RSocketStrategies.create(); + return RSocketStrategies.builder() + .metadataExtractorRegistry(registry -> + registry.metadataToExtract(FOO_MIME_TYPE, String.class, "foo")) + .build(); } } + + private static class CountingInterceptor implements RSocket, RSocketInterceptor { + + private RSocket delegate; + + private final AtomicInteger fireAndForgetCount = new AtomicInteger(0); + + private final AtomicInteger metadataPushCount = new AtomicInteger(0); + + + public int getFireAndForgetCount() { + return this.fireAndForgetCount.get(); + } + + public int getMetadataPushCount() { + return this.metadataPushCount.get(); + } + + @Override + public RSocket apply(RSocket rsocket) { + Assert.isNull(this.delegate, "Unexpected RSocket connection"); + this.delegate = rsocket; + return this; + } + + @Override + public Mono fireAndForget(Payload payload) { + return this.delegate.fireAndForget(payload) + .doOnSuccess(aVoid -> this.fireAndForgetCount.incrementAndGet()); + } + + @Override + public Mono metadataPush(Payload payload) { + return this.delegate.metadataPush(payload) + .doOnSuccess(aVoid -> this.metadataPushCount.incrementAndGet()); + } + + @Override + public Mono requestResponse(Payload payload) { + return this.delegate.requestResponse(payload); + } + + @Override + public Flux requestStream(Payload payload) { + return this.delegate.requestStream(payload); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return this.delegate.requestChannel(payloads); + } + } } diff --git a/src/docs/asciidoc/rsocket.adoc b/src/docs/asciidoc/rsocket.adoc index cc5ee2ea1d41..0ea88e33bd57 100644 --- a/src/docs/asciidoc/rsocket.adoc +++ b/src/docs/asciidoc/rsocket.adoc @@ -582,6 +582,8 @@ values are supported by a registered `Encoder`. For example: For `Fire-and-Forget` use the `send()` method that returns `Mono`. Note that the `Mono` indicates only that the message was successfully sent, and not that it was handled. +For `Metadata-Push` use the `sendMetadata()` method with a `Mono` return value. + [[rsocket-annot-responders]]