diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java
index a0fa75ffac9..235ebaf7471 100644
--- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java
+++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java
@@ -32,6 +32,8 @@
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.ReflectionUtils;
+import io.rsocket.frame.FrameType;
+
/**
* The {@link RSocketMessageHandler} extension for Spring Integration needs.
*
@@ -79,7 +81,11 @@ public boolean detectEndpoints() {
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
new CompositeMessageCondition(
- RSocketFrameTypeMessageCondition.REQUEST_CONDITION,
+ new RSocketFrameTypeMessageCondition(
+ FrameType.REQUEST_FNF,
+ FrameType.REQUEST_RESPONSE,
+ FrameType.REQUEST_STREAM,
+ FrameType.REQUEST_CHANNEL),
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR
}
diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java
index 0eb4e4f4064..ec2d5737f2d 100644
--- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java
+++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java
@@ -180,6 +180,15 @@ void serverEcho() {
private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .map(Message::getPayload)
+ .cast(String.class))
+ .expectNext("Hello")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("Hello")
.setHeader(ROUTE_HEADER, "echo")
@@ -187,13 +196,7 @@ private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel,
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- StepVerifier.create(
- Flux.from(resultChannel)
- .map(Message::getPayload)
- .cast(String.class))
- .expectNext("Hello")
- .thenCancel()
- .verify();
+ verifier.verify();
}
@Test
@@ -209,6 +212,15 @@ void serverEchoAsync() {
private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .map(Message::getPayload)
+ .cast(String.class))
+ .expectNext("Hello async")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("Hello")
.setHeader(ROUTE_HEADER, "echo-async")
@@ -216,13 +228,7 @@ private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultCha
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- StepVerifier.create(
- Flux.from(resultChannel)
- .map(Message::getPayload)
- .cast(String.class))
- .expectNext("Hello async")
- .thenCancel()
- .verify();
+ verifier.verify();
}
@Test
@@ -238,6 +244,17 @@ void serverEchoStream() {
private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ @SuppressWarnings("unchecked")
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .next()
+ .map(Message::getPayload)
+ .flatMapMany((payload) -> (Flux) payload))
+ .expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("Hello")
.setHeader(ROUTE_HEADER, "echo-stream")
@@ -245,22 +262,7 @@ private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultCh
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- Message> resultMessage =
- Flux.from(resultChannel)
- .blockFirst();
-
- assertThat(resultMessage)
- .isNotNull()
- .extracting(Message::getPayload)
- .isInstanceOf(Flux.class);
-
- @SuppressWarnings("unchecked")
- Flux resultStream = (Flux) resultMessage.getPayload();
- StepVerifier.create(resultStream)
- .expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7")
- .thenCancel()
- .verify();
-
+ verifier.verify();
}
@Test
@@ -276,6 +278,17 @@ void serverEchoChannel() {
private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ @SuppressWarnings("unchecked")
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .next()
+ .map(Message::getPayload)
+ .flatMapMany((payload) -> (Flux) payload))
+ .expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload(Flux.range(1, 10).map(i -> "Hello " + i))
.setHeader(ROUTE_HEADER, "echo-channel")
@@ -283,21 +296,7 @@ private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultC
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- Message> resultMessage =
- Flux.from(resultChannel)
- .blockFirst();
-
- assertThat(resultMessage)
- .isNotNull()
- .extracting(Message::getPayload)
- .isInstanceOf(Flux.class);
-
- @SuppressWarnings("unchecked")
- Flux resultStream = (Flux) resultMessage.getPayload();
- StepVerifier.create(resultStream)
- .expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
- .thenCancel()
- .verify();
+ verifier.verify();
}
@@ -314,26 +313,21 @@ void serverVoidReturnValue() {
private void voidReturnValue(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(resultChannel)
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(100))
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("Hello")
.setHeader(ROUTE_HEADER, "void-return-value")
- .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
+ .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- Message> resultMessage =
- Flux.from(resultChannel)
- .blockFirst();
-
- assertThat(resultMessage)
- .isNotNull()
- .extracting(Message::getPayload)
- .isInstanceOf(Flux.class);
-
- Flux> resultStream = (Flux>) resultMessage.getPayload();
- StepVerifier.create(resultStream)
- .expectComplete()
- .verify();
+ verifier.verify();
}
@Test
@@ -349,26 +343,21 @@ void serverVoidReturnValueFromExceptionHandler() {
private void voidReturnValueFromExceptionHandler(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(resultChannel)
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(100))
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("bad")
.setHeader(ROUTE_HEADER, "void-return-value")
- .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
+ .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- Message> resultMessage =
- Flux.from(resultChannel)
- .blockFirst();
-
- assertThat(resultMessage)
- .isNotNull()
- .extracting(Message::getPayload)
- .isInstanceOf(Flux.class);
-
- Flux> resultStream = (Flux>) resultMessage.getPayload();
- StepVerifier.create(resultStream)
- .expectComplete()
- .verify();
+ verifier.verify();
}
@Test
@@ -384,6 +373,15 @@ void serverHandleWithThrownException() {
private void handleWithThrownException(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .map(Message::getPayload)
+ .cast(String.class))
+ .expectNext("Invalid input error handled")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("a")
.setHeader(ROUTE_HEADER, "thrown-exception")
@@ -391,13 +389,7 @@ private void handleWithThrownException(MessageChannel inputChannel, FluxMessageC
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- StepVerifier.create(
- Flux.from(resultChannel)
- .map(Message::getPayload)
- .cast(String.class))
- .expectNext("Invalid input error handled")
- .thenCancel()
- .verify();
+ verifier.verify();
}
@Test
@@ -413,6 +405,15 @@ void serverHandleWithErrorSignal() {
private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChannel resultChannel,
RSocketRequester rsocketRequester) {
+ StepVerifier verifier =
+ StepVerifier.create(
+ Flux.from(resultChannel)
+ .map(Message::getPayload)
+ .cast(String.class))
+ .expectNext("Invalid input error handled")
+ .thenCancel()
+ .verifyLater();
+
inputChannel.send(
MessageBuilder.withPayload("a")
.setHeader(ROUTE_HEADER, "error-signal")
@@ -420,13 +421,7 @@ private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChann
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
.build());
- StepVerifier.create(
- Flux.from(resultChannel)
- .map(Message::getPayload)
- .cast(String.class))
- .expectNext("Invalid input error handled")
- .thenCancel()
- .verify();
+ verifier.verify();
}
@Test