forked from spring-projects/spring-framework
-
Notifications
You must be signed in to change notification settings - Fork 1
/
RSocketMessageHandler.java
496 lines (447 loc) · 19.7 KB
/
RSocketMessageHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
/*
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket.annotation.support;
import java.lang.reflect.AnnotatedElement;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.FrameType;
import io.rsocket.metadata.WellKnownMimeType;
import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.handler.CompositeMessageCondition;
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.messaging.handler.MessageCondition;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler;
import org.springframework.messaging.handler.annotation.reactive.PayloadMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.RouteMatcher;
import org.springframework.util.StringUtils;
/**
* Extension of {@link MessageMappingMessageHandler} for handling RSocket
* requests with {@link ConnectMapping @ConnectMapping} and
* {@link MessageMapping @MessageMapping} methods.
*
* <p>For server scenarios this class can be declared as a bean in Spring
* configuration and that would detect {@code @MessageMapping} methods in
* {@code @Controller} beans. What beans are checked can be changed through a
* {@link #setHandlerPredicate(Predicate) handlerPredicate}. Given an instance
* of this class, you can then use {@link #responder()} to obtain a
* {@link SocketAcceptor} adapter to register with the
* {@link io.rsocket.core.RSocketServer}.
*
* <p>For a client, possibly in the same process as a server, consider using the
* static factory method {@link #responder(RSocketStrategies, Object...)} to
* obtain a client responder to be registered via
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector
* RSocketRequester.Builder}.
*
* <p>For {@code @MessageMapping} methods, this class automatically determines
* the RSocket interaction type based on the input and output cardinality of the
* method. See the
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a> section of the Spring Framework reference for more details.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
public class RSocketMessageHandler extends MessageMappingMessageHandler {
private final List<Encoder<?>> encoders = new ArrayList<>();
private RSocketStrategies strategies = RSocketStrategies.create();
@Nullable
private MimeType defaultDataMimeType;
private MimeType defaultMetadataMimeType = MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
public RSocketMessageHandler() {
setRSocketStrategies(this.strategies);
}
/**
* Configure the encoders to use for encoding handler method return values.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the encoders in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the encoders in it.
* <p>By default this is set to the
* {@linkplain org.springframework.messaging.rsocket.RSocketStrategies.Builder#encoder(Encoder[]) defaults}
* from {@code RSocketStrategies}.
*/
public void setEncoders(List<? extends Encoder<?>> encoders) {
this.encoders.clear();
this.encoders.addAll(encoders);
this.strategies = this.strategies.mutate()
.encoders(list -> {
list.clear();
list.addAll(encoders);
})
.build();
}
/**
* Return the configured {@link #setEncoders(List) encoders}.
*/
public List<? extends Encoder<?>> getEncoders() {
return this.encoders;
}
/**
* {@inheritDoc}
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the decoders in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the decoders in them.
* <p>By default this is set to the
* {@linkplain org.springframework.messaging.rsocket.RSocketStrategies.Builder#decoder(Decoder[]) defaults}
* from {@code RSocketStrategies}.
*/
@Override
public void setDecoders(List<? extends Decoder<?>> decoders) {
super.setDecoders(decoders);
this.strategies = this.strategies.mutate()
.decoders(list -> {
list.clear();
list.addAll(decoders);
})
.build();
}
/**
* {@inheritDoc}
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the route matcher in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the matcher in it.
* <p>By default this is set to the
* {@linkplain org.springframework.messaging.rsocket.RSocketStrategies.Builder#routeMatcher(RouteMatcher) defaults}
* from {@code RSocketStrategies}.
*/
@Override
public void setRouteMatcher(@Nullable RouteMatcher routeMatcher) {
super.setRouteMatcher(routeMatcher);
this.strategies = this.strategies.mutate().routeMatcher(routeMatcher).build();
}
/**
* Configure the registry for adapting various reactive types.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the registry in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the registry in it.
* <p>By default this is set to the
* {@link org.springframework.messaging.rsocket.RSocketStrategies.Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) defaults}
* from {@code RSocketStrategies}.
*/
@Override
public void setReactiveAdapterRegistry(ReactiveAdapterRegistry registry) {
super.setReactiveAdapterRegistry(registry);
this.strategies = this.strategies.mutate().reactiveAdapterStrategy(registry).build();
}
/**
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the extractor in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the extractor in it.
* <p>By default this is set to the
* {@link org.springframework.messaging.rsocket.RSocketStrategies.Builder#metadataExtractor(MetadataExtractor) defaults}
* from {@code RSocketStrategies}.
* @param extractor the extractor to use
*/
public void setMetadataExtractor(MetadataExtractor extractor) {
this.strategies = this.strategies.mutate().metadataExtractor(extractor).build();
}
/**
* Return the configured {@link #setMetadataExtractor MetadataExtractor}.
*/
public MetadataExtractor getMetadataExtractor() {
return this.strategies.metadataExtractor();
}
/**
* Configure this handler through an {@link RSocketStrategies} instance which
* can be re-used to initialize a client-side {@link RSocketRequester}.
* <p>When this property is set, in turn it sets the following:
* <ul>
* <li>{@link #setDecoders(List)}
* <li>{@link #setEncoders(List)}
* <li>{@link #setRouteMatcher(RouteMatcher)}
* <li>{@link #setMetadataExtractor(MetadataExtractor)}
* <li>{@link #setReactiveAdapterRegistry(ReactiveAdapterRegistry)}
* </ul>
* <p>By default this is set to {@link RSocketStrategies#create()} which in
* turn sets default settings for all related properties.
*/
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
this.strategies = rsocketStrategies;
this.encoders.clear();
this.encoders.addAll(this.strategies.encoders());
super.setDecoders(this.strategies.decoders());
super.setRouteMatcher(this.strategies.routeMatcher());
super.setReactiveAdapterRegistry(this.strategies.reactiveAdapterRegistry());
}
/**
* Return the {@link #setRSocketStrategies configured} {@code RSocketStrategies}.
*/
public RSocketStrategies getRSocketStrategies() {
return this.strategies;
}
/**
* Configure the default content type to use for data payloads if the
* {@code SETUP} frame did not specify one.
* <p>By default this is not set.
* @param mimeType the MimeType to use
*/
public void setDefaultDataMimeType(@Nullable MimeType mimeType) {
this.defaultDataMimeType = mimeType;
}
/**
* Return the configured
* {@link #setDefaultDataMimeType defaultDataMimeType}, or {@code null}.
*/
@Nullable
public MimeType getDefaultDataMimeType() {
return this.defaultDataMimeType;
}
/**
* Configure the default {@code MimeType} for payload data if the
* {@code SETUP} frame did not specify one.
* <p>By default this is set to {@code "message/x.rsocket.composite-metadata.v0"}
* @param mimeType the MimeType to use
*/
public void setDefaultMetadataMimeType(MimeType mimeType) {
Assert.notNull(mimeType, "'metadataMimeType' is required");
this.defaultMetadataMimeType = mimeType;
}
/**
* Return the configured
* {@link #setDefaultMetadataMimeType defaultMetadataMimeType}.
*/
public MimeType getDefaultMetadataMimeType() {
return this.defaultMetadataMimeType;
}
@Override
public void afterPropertiesSet() {
// Add argument resolver before parent initializes argument resolution
getArgumentResolverConfigurer().addCustomResolver(new RSocketRequesterMethodArgumentResolver());
super.afterPropertiesSet();
getHandlerMethods().forEach((composite, handler) -> {
if (composite.getMessageConditions().contains(RSocketFrameTypeMessageCondition.CONNECT_CONDITION)) {
MethodParameter returnType = handler.getReturnType();
if (getCardinality(returnType) > 0) {
throw new IllegalStateException(
"Invalid @ConnectMapping method. " +
"Return type must be void or a void async type: " + handler);
}
}
});
}
@Override
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();
handlers.add(new RSocketPayloadReturnValueHandler(this.encoders, getReactiveAdapterRegistry()));
handlers.addAll(getReturnValueHandlerConfigurer().getCustomHandlers());
return handlers;
}
@Override
@Nullable
protected CompositeMessageCondition getCondition(AnnotatedElement element) {
MessageMapping ann1 = AnnotatedElementUtils.findMergedAnnotation(element, MessageMapping.class);
if (ann1 != null && ann1.value().length > 0) {
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.EMPTY_CONDITION,
new DestinationPatternsMessageCondition(processDestinations(ann1.value()), obtainRouteMatcher()));
}
ConnectMapping ann2 = AnnotatedElementUtils.findMergedAnnotation(element, ConnectMapping.class);
if (ann2 != null) {
String[] patterns = processDestinations(ann2.value());
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
new DestinationPatternsMessageCondition(patterns, obtainRouteMatcher()));
}
return null;
}
@Override
protected CompositeMessageCondition extendMapping(CompositeMessageCondition composite, HandlerMethod handler) {
List<MessageCondition<?>> conditions = composite.getMessageConditions();
Assert.isTrue(conditions.size() == 2 &&
conditions.get(0) instanceof RSocketFrameTypeMessageCondition &&
conditions.get(1) instanceof DestinationPatternsMessageCondition,
"Unexpected message condition types");
if (conditions.get(0) != RSocketFrameTypeMessageCondition.EMPTY_CONDITION) {
return composite;
}
int responseCardinality = getCardinality(handler.getReturnType());
int requestCardinality = 0;
for (MethodParameter parameter : handler.getMethodParameters()) {
if (getArgumentResolvers().getArgumentResolver(parameter) instanceof PayloadMethodArgumentResolver) {
requestCardinality = getCardinality(parameter);
}
}
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.getCondition(requestCardinality, responseCardinality),
conditions.get(1));
}
private int getCardinality(MethodParameter parameter) {
Class<?> clazz = parameter.getParameterType();
ReactiveAdapter adapter = getReactiveAdapterRegistry().getAdapter(clazz);
if (adapter == null) {
return clazz.equals(void.class) ? 0 : 1;
}
else if (parameter.nested().getNestedParameterType().equals(Void.class)) {
return 0;
}
else {
return adapter.isMultiValue() ? 2 : 1;
}
}
@Override
protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?> message) {
FrameType frameType = RSocketFrameTypeMessageCondition.getFrameType(message);
if (frameType == FrameType.SETUP || frameType == FrameType.METADATA_PUSH) {
return; // optional handling
}
if (frameType == FrameType.REQUEST_FNF) {
// Can't propagate error to client, so just log
logger.warn("No handler for fireAndForget to '" + destination + "'");
return;
}
Set<FrameType> frameTypes = getHandlerMethods().keySet().stream()
.map(CompositeMessageCondition::getMessageConditions)
.filter(conditions -> conditions.get(1).getMatchingCondition(message) != null)
.map(conditions -> (RSocketFrameTypeMessageCondition) conditions.get(0))
.flatMap(condition -> condition.getFrameTypes().stream())
.collect(Collectors.toSet());
throw new MessageDeliveryException(frameTypes.isEmpty() ?
"No handler for destination '" + destination + "'" :
"Destination '" + destination + "' does not support " + frameType + ". " +
"Supported interaction(s): " + frameTypes);
}
/**
* Return an RSocket {@link SocketAcceptor} backed by this
* {@code RSocketMessageHandler} instance that can be plugged in as a
* {@link io.rsocket.core.RSocketConnector#acceptor(SocketAcceptor) client} or
* {@link io.rsocket.core.RSocketServer#acceptor(SocketAcceptor) server}
* RSocket responder.
* <p>The initial {@link ConnectionSetupPayload} is handled through
* {@link ConnectMapping @ConnectionMapping} methods that can be asynchronous
* and return {@code Mono<Void>} with an error signal preventing the
* connection. Such a method can also start requests to the client but that
* must be done decoupled from handling and from the current thread.
* <p>Subsequent requests on the connection can be handled with
* {@link MessageMapping MessageMapping} methods.
*/
public SocketAcceptor responder() {
return (setupPayload, sendingRSocket) -> {
MessagingRSocket responder;
try {
responder = createResponder(setupPayload, sendingRSocket);
}
catch (Throwable ex) {
return Mono.error(ex);
}
return responder.handleConnectionSetupPayload(setupPayload).then(Mono.just(responder));
};
}
private MessagingRSocket createResponder(ConnectionSetupPayload setupPayload, RSocket rsocket) {
String str = setupPayload.dataMimeType();
MimeType dataMimeType = StringUtils.hasText(str) ? MimeTypeUtils.parseMimeType(str) : this.defaultDataMimeType;
Assert.notNull(dataMimeType, "No `dataMimeType` in ConnectionSetupPayload and no default value");
Assert.isTrue(isDataMimeTypeSupported(dataMimeType), () -> "Data MimeType '" + dataMimeType + "' not supported");
str = setupPayload.metadataMimeType();
MimeType metaMimeType = StringUtils.hasText(str) ? MimeTypeUtils.parseMimeType(str) : this.defaultMetadataMimeType;
Assert.notNull(metaMimeType, "No `metadataMimeType` in ConnectionSetupPayload and no default value");
RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, metaMimeType, this.strategies);
return new MessagingRSocket(dataMimeType, metaMimeType, getMetadataExtractor(),
requester, this, obtainRouteMatcher(), this.strategies);
}
private boolean isDataMimeTypeSupported(MimeType dataMimeType) {
for (Encoder<?> encoder : getEncoders()) {
for (MimeType encodable : encoder.getEncodableMimeTypes()) {
if (encodable.isCompatibleWith(dataMimeType)) {
return true;
}
}
}
return false;
}
/**
* Static factory method to create an RSocket {@link SocketAcceptor}
* backed by handlers with annotated methods. Effectively a shortcut for:
* <pre class="code">
* RSocketMessageHandler handler = new RSocketMessageHandler();
* handler.setHandlers(handlers);
* handler.setRSocketStrategies(strategies);
* handler.afterPropertiesSet();
*
* SocketAcceptor acceptor = handler.responder();
* </pre>
* <p>This is intended for programmatic creation and registration of a
* client-side responder. For example:
* <pre class="code">
* SocketAcceptor responder =
* RSocketMessageHandler.responder(strategies, new ClientHandler());
*
* RSocketRequester.builder()
* .rsocketConnector(connector -> connector.acceptor(responder))
* .connectTcp("localhost", server.address().getPort());
* </pre>
*
* <p>Note that the given handlers do not need to have any stereotype
* annotations such as {@code @Controller} which helps to avoid overlap with
* server side handlers that may be used in the same application. However,
* for more advanced scenarios, e.g. discovering handlers through a custom
* stereotype annotation, consider declaring {@code RSocketMessageHandler}
* as a bean, and then obtain the responder from it.
* @param strategies the strategies to set on the created
* {@code RSocketMessageHandler}
* @param candidateHandlers a list of Objects and/or Classes with annotated
* handler methods; used to call {@link #setHandlers(List)} with
* on the created {@code RSocketMessageHandler}
* @return a configurer that may be passed into
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector}
* @since 5.2.6
*/
public static SocketAcceptor responder(RSocketStrategies strategies, Object... candidateHandlers) {
Assert.notEmpty(candidateHandlers, "No handlers");
List<Object> handlers = new ArrayList<>(candidateHandlers.length);
for (Object obj : candidateHandlers) {
handlers.add(obj instanceof Class<?> clazz ? BeanUtils.instantiateClass(clazz) : obj);
}
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setHandlers(handlers);
handler.setRSocketStrategies(strategies);
handler.afterPropertiesSet();
return handler.responder();
}
}