Skip to content

Commit

Permalink
Add metadataPush support to RSocketRequester
Browse files Browse the repository at this point in the history
Closes gh-24322
  • Loading branch information
rstoyanchev committed May 14, 2020
1 parent de37859 commit b122483
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 84 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -248,6 +248,11 @@ private Mono<Payload> firstPayload(Mono<DataBuffer> encodedData) {
.doOnDiscard(Payload.class, Payload::release);
}

@Override
public Mono<Void> sendMetadata() {
return getPayloadMono().flatMap(rsocket::metadataPush);
}

@Override
public Mono<Void> send() {
return getPayloadMono().flatMap(rsocket::fireAndForget);
Expand Down
Expand Up @@ -276,6 +276,12 @@ interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec {
*/
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);

/**
* Perform a {@link RSocket#metadataPush(Payload) metadataPush}.
* @since 5.3
*/
Mono<Void> sendMetadata();

/**
* Provide payload data for the request. This can be one of:
* <ul>
Expand Down Expand Up @@ -344,7 +350,12 @@ interface MetadataSpec<S extends MetadataSpec<S>> {
interface RetrieveSpec {

/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
* Perform a {@link RSocket#fireAndForget fireAndForget} sending the
* provided data and metadata.
* @return a completion that indicates if the payload was sent
* successfully or not. Note, however that is a one-way send and there
* is no indication of whether or how the even was handled on the
* remote end.
*/
Mono<Void> send();

Expand Down

This file was deleted.

Expand Up @@ -17,16 +17,21 @@
package org.springframework.messaging.rsocket;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
Expand All @@ -35,10 +40,13 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

Expand All @@ -51,11 +59,14 @@
*/
public class RSocketClientToServerIntegrationTests {

private static final MimeType FOO_MIME_TYPE = MimeTypeUtils.parseMimeType("messaging/x.foo");


private static AnnotationConfigApplicationContext context;

private static CloseableChannel server;

private static FireAndForgetCountingInterceptor interceptor = new FireAndForgetCountingInterceptor();
private static CountingInterceptor interceptor = new CountingInterceptor();

private static RSocketRequester requester;

Expand All @@ -65,7 +76,7 @@ public class RSocketClientToServerIntegrationTests {
public static void setupOnce() {

MimeType metadataMimeType = MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString());
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());

context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
Expand Down Expand Up @@ -105,8 +116,7 @@ public void fireAndForget() {
.thenCancel()
.verify(Duration.ofSeconds(5));

assertThat(interceptor.getRSocketCount()).isEqualTo(1);
assertThat(interceptor.getFireAndForgetCount(0))
assertThat(interceptor.getFireAndForgetCount())
.as("Fire and forget requests did not actually complete handling on the server side")
.isEqualTo(3);
}
Expand Down Expand Up @@ -155,6 +165,24 @@ public void echoChannel() {
.verify(Duration.ofSeconds(5));
}

@Test
public void metadataPush() {
Flux.just("bar", "baz")
.concatMap(s -> requester.route("foo-updates").metadata(s, FOO_MIME_TYPE).sendMetadata())
.blockLast();

StepVerifier.create(context.getBean(ServerController.class).metadataPushPayloads)
.expectNext("bar")
.expectNext("baz")
.thenAwait(Duration.ofMillis(50))
.thenCancel()
.verify(Duration.ofSeconds(5));

assertThat(interceptor.getMetadataPushCount())
.as("Metadata pushes did not actually complete handling on the server side")
.isEqualTo(2);
}

@Test
public void voidReturnValue() {
Mono<String> result = requester.route("void-return-value").data("Hello").retrieveMono(String.class);
Expand Down Expand Up @@ -199,6 +227,9 @@ static class ServerController {

final ReplayProcessor<String> fireForgetPayloads = ReplayProcessor.create();

final ReplayProcessor<String> metadataPushPayloads = ReplayProcessor.create();


@MessageMapping("receive")
void receive(String payload) {
this.fireForgetPayloads.onNext(payload);
Expand Down Expand Up @@ -241,6 +272,11 @@ Mono<Void> voidReturnValue(String payload) {
Mono.error(new IllegalStateException("bad"));
}

@ConnectMapping("foo-updates")
public void handleMetadata(@Header("foo") String foo) {
this.metadataPushPayloads.onNext(foo);
}

@MessageExceptionHandler
Mono<String> handleException(IllegalArgumentException ex) {
return Mono.delay(Duration.ofMillis(10)).map(aLong -> ex.getMessage() + " handled");
Expand Down Expand Up @@ -270,8 +306,63 @@ public RSocketMessageHandler messageHandler() {

@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.create();
return RSocketStrategies.builder()
.metadataExtractorRegistry(registry ->
registry.metadataToExtract(FOO_MIME_TYPE, String.class, "foo"))
.build();
}
}


private static class CountingInterceptor implements RSocket, RSocketInterceptor {

private RSocket delegate;

private final AtomicInteger fireAndForgetCount = new AtomicInteger(0);

private final AtomicInteger metadataPushCount = new AtomicInteger(0);


public int getFireAndForgetCount() {
return this.fireAndForgetCount.get();
}

public int getMetadataPushCount() {
return this.metadataPushCount.get();
}

@Override
public RSocket apply(RSocket rsocket) {
Assert.isNull(this.delegate, "Unexpected RSocket connection");
this.delegate = rsocket;
return this;
}

@Override
public Mono<Void> fireAndForget(Payload payload) {
return this.delegate.fireAndForget(payload)
.doOnSuccess(aVoid -> this.fireAndForgetCount.incrementAndGet());
}

@Override
public Mono<Void> metadataPush(Payload payload) {
return this.delegate.metadataPush(payload)
.doOnSuccess(aVoid -> this.metadataPushCount.incrementAndGet());
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
return this.delegate.requestResponse(payload);
}

@Override
public Flux<Payload> requestStream(Payload payload) {
return this.delegate.requestStream(payload);
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return this.delegate.requestChannel(payloads);
}
}
}
2 changes: 2 additions & 0 deletions src/docs/asciidoc/rsocket.adoc
Expand Up @@ -582,6 +582,8 @@ values are supported by a registered `Encoder`. For example:
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono`
indicates only that the message was successfully sent, and not that it was handled.

For `Metadata-Push` use the `sendMetadata()` method with a `Mono<Void>` return value.



[[rsocket-annot-responders]]
Expand Down

0 comments on commit b122483

Please sign in to comment.