Skip to content

Commit

Permalink
Use method signature to refine RSocket @MessageMapping
Browse files Browse the repository at this point in the history
Before this change an @MessageMapping could be matched to any RSocket
interaction type, which is arguably too flexible, makes it difficult to
reason what would happen in case of a significant mismatch of
cardinality, e.g. request for Fire-And-Forget (1-to-0) mapped to a
method that returns Flux, and could result in payloads being ignored,
or not seen unintentionally.

This commit checks @ConnectMapping method on startup and rejects them
if they return any values (sync or async). It also refines each
@MessageMapping to match only the RSocket interaction type it fits
based on the input and output cardinality of the handler method.
Subsequently if a request is not matched, we'll do a second search to
identify partial matches (by route only) and raise a helpful error that
explains which interaction type is actually supported.

The reference docs has been updated to explain the options.

Closes gh-23999
  • Loading branch information
rstoyanchev committed Nov 18, 2019
1 parent 769a15c commit 842b424
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 66 deletions.
Expand Up @@ -64,18 +64,34 @@
* authenticated user.</li>
* </ul>
*
* <p>How the return value is handled depends on the processing scenario. For
* STOMP over WebSocket, it is turned into a message and sent to a default response
* destination or to a custom destination specified with an {@link SendTo @SendTo}
* or {@link org.springframework.messaging.simp.annotation.SendToUser @SendToUser}
* annotation. For RSocket, the response is used to reply to the stream request.
* <p>Return value handling depends on the processing scenario:
* <ul>
* <li>STOMP over WebSocket -- the value is turned into a message and sent to a
* default response destination or to a custom destination specified with an
* {@link SendTo @SendTo} or
* {@link org.springframework.messaging.simp.annotation.SendToUser @SendToUser}
* annotation.
* <li>RSocket -- the response is used to reply to the stream request.
* </ul>
*
* <p>Specializations of this annotation including
* {@link org.springframework.messaging.simp.annotation.SubscribeMapping @SubscribeMapping} or
* <p>Specializations of this annotation include
* {@link org.springframework.messaging.simp.annotation.SubscribeMapping @SubscribeMapping}
* (e.g. STOMP subscriptions) and
* {@link org.springframework.messaging.rsocket.annotation.ConnectMapping @ConnectMapping}
* further narrow the mapping by message type. Both can be combined with a
* type-level {@code @MessageMapping} for declaring a common pattern prefix
* (or prefixes).
* (e.g. RSocket connections). Both narrow the primary mapping further and also match
* against the message type. Both can be combined with a type-level
* {@code @MessageMapping} that declares a common pattern prefix (or prefixes).
*
* <p>For further details on the use of this annotation in different contexts,
* see the following sections of the Spring Framework reference:
* <ul>
* <li>STOMP over WebSocket
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-stomp-handle-annotations">
* "Annotated Controllers"</a>.
* <li>RSocket
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a>.
* </ul>
*
* <p><b>NOTE:</b> When using controller interfaces (e.g. for AOP proxying),
* make sure to consistently put <i>all</i> your mapping annotations - such as
Expand Down
Expand Up @@ -232,6 +232,15 @@ public MultiValueMap<String, T> getDestinationLookup() {
return CollectionUtils.unmodifiableMultiValueMap(this.destinationLookup);
}

/**
* Return the argument resolvers initialized during {@link #afterPropertiesSet()}.
* Primarily for internal use in sub-classes.
* @since 5.2.2
*/
protected HandlerMethodArgumentResolverComposite getArgumentResolvers() {
return this.invocableHelper.getArgumentResolvers();
}


@Override
public void afterPropertiesSet() {
Expand Down Expand Up @@ -377,6 +386,7 @@ protected final void registerHandlerMethod(Object handler, Method method, T mapp
oldHandlerMethod.getBean() + "' bean method\n" + oldHandlerMethod + " mapped.");
}

mapping = extendMapping(mapping, newHandlerMethod);
this.handlerMethods.put(mapping, newHandlerMethod);

for (String pattern : getDirectLookupMappings(mapping)) {
Expand All @@ -402,6 +412,21 @@ private HandlerMethod createHandlerMethod(Object handler, Method method) {
return handlerMethod;
}

/**
* This method is invoked just before mappings are added. It allows
* sub-classes to update the mapping with the {@link HandlerMethod} in mind.
* This can be useful when the method signature is used to refine the
* mapping, e.g. based on the cardinality of input and output.
* <p>By default this method returns the mapping that is passed in.
* @param mapping the mapping to be added
* @param handlerMethod the target handler for the mapping
* @return a new mapping or the same
* @since 5.2.2
*/
protected T extendMapping(T mapping, HandlerMethod handlerMethod) {
return mapping;
}

/**
* Return String-based destinations for the given mapping, if any, that can
* be used to find matches with a direct lookup (i.e. non-patterns).
Expand All @@ -414,7 +439,13 @@ private HandlerMethod createHandlerMethod(Object handler, Method method) {

@Override
public Mono<Void> handleMessage(Message<?> message) throws MessagingException {
Match<T> match = getHandlerMethod(message);
Match<T> match = null;
try {
match = getHandlerMethod(message);
}
catch (Exception ex) {
return Mono.error(ex);
}
if (match == null) {
// handleNoMatch would have been invoked already
return Mono.empty();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,7 @@
* @author Rossen Stoyanchev
* @since 5.2
*/
class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {
public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {

protected final Log logger = LogFactory.getLog(getClass());

Expand Down Expand Up @@ -125,7 +125,7 @@ public Mono<Object> resolveArgument(MethodParameter parameter, Message<?> messag
* the given method parameter.
*/
@Nullable
private HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
public HandlerMethodArgumentResolver getArgumentResolver(MethodParameter parameter) {
HandlerMethodArgumentResolver result = this.argumentResolverCache.get(parameter);
if (result == null) {
for (HandlerMethodArgumentResolver methodArgumentResolver : this.argumentResolvers) {
Expand Down
Expand Up @@ -80,6 +80,14 @@ public void addArgumentResolvers(List<? extends HandlerMethodArgumentResolver> r
this.argumentResolvers.addResolvers(resolvers);
}

/**
* Return the configured resolvers.
* @since 5.2.2
*/
public HandlerMethodArgumentResolverComposite getArgumentResolvers() {
return this.argumentResolvers;
}

/**
* Add the return value handlers to use for message handling and exception
* handling methods.
Expand Down
Expand Up @@ -56,6 +56,13 @@ public class RSocketFrameTypeMessageCondition extends AbstractMessageCondition<R
}


static final RSocketFrameTypeMessageCondition CONNECT_CONDITION =

This comment has been minimized.

Copy link
@artembilan

artembilan Nov 19, 2019

Member

@rstoyanchev ,

This is braking change.
It was public since 5.2.0 and Spring Integration relies on this constant: https://build.spring.io/browse/INT-MASTERSPRING40-895/.

Anyway even if not SI, some other projects may use it already, therefore breaking change in the current 5.2.2.

Thanks

This comment has been minimized.

Copy link
@rstoyanchev

rstoyanchev Nov 19, 2019

Author Contributor

@artembilan I didn't realize these were used, and they were so recently introduced. I've restored them and deprecated the one that matches all interaction types, in favor of the new ones that are more specific and should be used instead.

Thanks for the feedback.

new RSocketFrameTypeMessageCondition(FrameType.SETUP, FrameType.METADATA_PUSH);

static final RSocketFrameTypeMessageCondition EMPTY_CONDITION = new RSocketFrameTypeMessageCondition();

This comment has been minimized.

Copy link
@artembilan

artembilan Nov 19, 2019

Member

Would be also great to make this new one as public as well since there is no REQUEST_CONDITION any more.
Although that one should be at least deprecated instead of removal what is breaking change.




private final Set<FrameType> frameTypes;


Expand All @@ -68,6 +75,10 @@ public RSocketFrameTypeMessageCondition(Collection<FrameType> frameTypes) {
this.frameTypes = Collections.unmodifiableSet(new LinkedHashSet<>(frameTypes));
}

private RSocketFrameTypeMessageCondition() {
this.frameTypes = Collections.emptySet();
}


public Set<FrameType> getFrameTypes() {
return this.frameTypes;
Expand Down Expand Up @@ -124,18 +135,71 @@ public int compareTo(RSocketFrameTypeMessageCondition other, Message<?> message)
}


/** Condition to match the initial SETUP frame and subsequent metadata pushes. */
public static final RSocketFrameTypeMessageCondition CONNECT_CONDITION =
new RSocketFrameTypeMessageCondition(
FrameType.SETUP,
FrameType.METADATA_PUSH);
/**
* Return a condition for matching the RSocket request interaction type with
* that is selected based on the delcared request and response cardinality
* of some handler method.
* <p>The table below shows the selections made:
* <table>
* <tr>
* <th>Request Cardinality</th>
* <th>Response Cardinality</th>
* <th>Interaction Types</th>
* </tr>
* <tr>
* <td>0,1</td>
* <td>0</td>
* <td>Fire-And-Forget, Request-Response</td>
* </tr>
* <tr>
* <td>0,1</td>
* <td>1</td>
* <td>Request-Response</td>
* </tr>
* <tr>
* <td>0,1</td>
* <td>2</td>
* <td>Request-Stream</td>
* </tr>
* <tr>
* <td>2</td>
* <td>Any</td>
* <td>Request-Channel</td>
* </tr>
* </table>
* @param cardinalityIn -- the request cardinality: 1 for a single payload,
* 2 for many payloads, and 0 if input is not handled.
* @param cardinalityOut -- the response cardinality: 0 for no output
* payloads, 1 for a single payload, and 2 for many payloads.
* @return a condition to use for matching the interaction type
* @since 5.2.2
*/
public static RSocketFrameTypeMessageCondition getCondition(int cardinalityIn, int cardinalityOut) {
switch (cardinalityIn) {
case 0:
case 1:
switch (cardinalityOut) {
case 0: return FF_RR_CONDITION;
case 1: return RR_CONDITION;
case 2: return RS_CONDITION;
default: throw new IllegalStateException("Invalid cardinality: " + cardinalityOut);
}
case 2:
return RC_CONDITION;
default:
throw new IllegalStateException("Invalid cardinality: " + cardinalityIn);
}
}


private static final RSocketFrameTypeMessageCondition FF_CONDITION = from(FrameType.REQUEST_FNF);
private static final RSocketFrameTypeMessageCondition RR_CONDITION = from(FrameType.REQUEST_RESPONSE);
private static final RSocketFrameTypeMessageCondition RS_CONDITION = from(FrameType.REQUEST_STREAM);
private static final RSocketFrameTypeMessageCondition RC_CONDITION = from(FrameType.REQUEST_CHANNEL);
private static final RSocketFrameTypeMessageCondition FF_RR_CONDITION = FF_CONDITION.combine(RR_CONDITION);

/** Condition to match one of the 4 stream request types. */
public static final RSocketFrameTypeMessageCondition REQUEST_CONDITION =
new RSocketFrameTypeMessageCondition(
FrameType.REQUEST_FNF,
FrameType.REQUEST_RESPONSE,
FrameType.REQUEST_STREAM,
FrameType.REQUEST_CHANNEL);
private static RSocketFrameTypeMessageCondition from(FrameType... frameTypes) {
return new RSocketFrameTypeMessageCondition(frameTypes);
}

}

0 comments on commit 842b424

Please sign in to comment.