From 25e945908c37f1a8def185247731293e9160ff94 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 19 Nov 2019 12:12:46 -0500 Subject: [PATCH] Fix RSocket module according changes in SF Related to https://github.com/spring-projects/spring-framework/issues/23999 * Since Spring Integration inbound endpoints are generic in their method signature we can't rely on a new `EMPTY_CONDITION` because it turns on configuration merge into just `FrameType.REQUEST_FNF` & `FrameType.REQUEST_RESPONSE`. So, use `FrameType.REQUEST_FNF`, `FrameType.REQUEST_RESPONSE`, `FrameType.REQUEST_STREAM` & `FrameType.REQUEST_CHANNEL` explicitly to cover all the valid request-response models for SI endpoints * Rework `RSocketOutboundGatewayIntegrationTests` according new logic. Plus refactor to earlier subscription into `FluxMessageChannel` to avoid potential race conditions --- .../IntegrationRSocketMessageHandler.java | 8 +- ...SocketOutboundGatewayIntegrationTests.java | 169 +++++++++--------- 2 files changed, 89 insertions(+), 88 deletions(-) diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java index a0fa75ffac9..235ebaf7471 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java @@ -32,6 +32,8 @@ import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.util.ReflectionUtils; +import io.rsocket.frame.FrameType; + /** * The {@link RSocketMessageHandler} extension for Spring Integration needs. *

@@ -79,7 +81,11 @@ public boolean detectEndpoints() { public void addEndpoint(IntegrationRSocketEndpoint endpoint) { registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD, new CompositeMessageCondition( - RSocketFrameTypeMessageCondition.REQUEST_CONDITION, + new RSocketFrameTypeMessageCondition( + FrameType.REQUEST_FNF, + FrameType.REQUEST_RESPONSE, + FrameType.REQUEST_STREAM, + FrameType.REQUEST_CHANNEL), new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR } diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java index 0eb4e4f4064..ec2d5737f2d 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java @@ -180,6 +180,15 @@ void serverEcho() { private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .map(Message::getPayload) + .cast(String.class)) + .expectNext("Hello") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo") @@ -187,13 +196,7 @@ private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel, .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - StepVerifier.create( - Flux.from(resultChannel) - .map(Message::getPayload) - .cast(String.class)) - .expectNext("Hello") - .thenCancel() - .verify(); + verifier.verify(); } @Test @@ -209,6 +212,15 @@ void serverEchoAsync() { private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .map(Message::getPayload) + .cast(String.class)) + .expectNext("Hello async") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo-async") @@ -216,13 +228,7 @@ private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultCha .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - StepVerifier.create( - Flux.from(resultChannel) - .map(Message::getPayload) - .cast(String.class)) - .expectNext("Hello async") - .thenCancel() - .verify(); + verifier.verify(); } @Test @@ -238,6 +244,17 @@ void serverEchoStream() { private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + @SuppressWarnings("unchecked") + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .next() + .map(Message::getPayload) + .flatMapMany((payload) -> (Flux) payload)) + .expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo-stream") @@ -245,22 +262,7 @@ private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultCh .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - Message resultMessage = - Flux.from(resultChannel) - .blockFirst(); - - assertThat(resultMessage) - .isNotNull() - .extracting(Message::getPayload) - .isInstanceOf(Flux.class); - - @SuppressWarnings("unchecked") - Flux resultStream = (Flux) resultMessage.getPayload(); - StepVerifier.create(resultStream) - .expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7") - .thenCancel() - .verify(); - + verifier.verify(); } @Test @@ -276,6 +278,17 @@ void serverEchoChannel() { private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + @SuppressWarnings("unchecked") + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .next() + .map(Message::getPayload) + .flatMapMany((payload) -> (Flux) payload)) + .expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload(Flux.range(1, 10).map(i -> "Hello " + i)) .setHeader(ROUTE_HEADER, "echo-channel") @@ -283,21 +296,7 @@ private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultC .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - Message resultMessage = - Flux.from(resultChannel) - .blockFirst(); - - assertThat(resultMessage) - .isNotNull() - .extracting(Message::getPayload) - .isInstanceOf(Flux.class); - - @SuppressWarnings("unchecked") - Flux resultStream = (Flux) resultMessage.getPayload(); - StepVerifier.create(resultStream) - .expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async") - .thenCancel() - .verify(); + verifier.verify(); } @@ -314,26 +313,21 @@ void serverVoidReturnValue() { private void voidReturnValue(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create(resultChannel) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(100)) + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "void-return-value") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel) + .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - Message resultMessage = - Flux.from(resultChannel) - .blockFirst(); - - assertThat(resultMessage) - .isNotNull() - .extracting(Message::getPayload) - .isInstanceOf(Flux.class); - - Flux resultStream = (Flux) resultMessage.getPayload(); - StepVerifier.create(resultStream) - .expectComplete() - .verify(); + verifier.verify(); } @Test @@ -349,26 +343,21 @@ void serverVoidReturnValueFromExceptionHandler() { private void voidReturnValueFromExceptionHandler(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create(resultChannel) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(100)) + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("bad") .setHeader(ROUTE_HEADER, "void-return-value") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel) + .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - Message resultMessage = - Flux.from(resultChannel) - .blockFirst(); - - assertThat(resultMessage) - .isNotNull() - .extracting(Message::getPayload) - .isInstanceOf(Flux.class); - - Flux resultStream = (Flux) resultMessage.getPayload(); - StepVerifier.create(resultStream) - .expectComplete() - .verify(); + verifier.verify(); } @Test @@ -384,6 +373,15 @@ void serverHandleWithThrownException() { private void handleWithThrownException(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .map(Message::getPayload) + .cast(String.class)) + .expectNext("Invalid input error handled") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("a") .setHeader(ROUTE_HEADER, "thrown-exception") @@ -391,13 +389,7 @@ private void handleWithThrownException(MessageChannel inputChannel, FluxMessageC .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - StepVerifier.create( - Flux.from(resultChannel) - .map(Message::getPayload) - .cast(String.class)) - .expectNext("Invalid input error handled") - .thenCancel() - .verify(); + verifier.verify(); } @Test @@ -413,6 +405,15 @@ void serverHandleWithErrorSignal() { private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChannel resultChannel, RSocketRequester rsocketRequester) { + StepVerifier verifier = + StepVerifier.create( + Flux.from(resultChannel) + .map(Message::getPayload) + .cast(String.class)) + .expectNext("Invalid input error handled") + .thenCancel() + .verifyLater(); + inputChannel.send( MessageBuilder.withPayload("a") .setHeader(ROUTE_HEADER, "error-signal") @@ -420,13 +421,7 @@ private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChann .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); - StepVerifier.create( - Flux.from(resultChannel) - .map(Message::getPayload) - .cast(String.class)) - .expectNext("Invalid input error handled") - .thenCancel() - .verify(); + verifier.verify(); } @Test