/
RSocketRequester.java
364 lines (319 loc) · 13.9 KB
/
RSocketRequester.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.rsocket;
import java.net.URI;
import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.Decoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.MimeType;
/**
* A thin wrapper around a sending {@link RSocket} with a fluent API accepting
* and returning higher level Objects for input and for output, along with
* methods to prepare routing and other metadata.
*
* @author Rossen Stoyanchev
* @author Brian Clozel
* @since 5.2
*/
public interface RSocketRequester {
/**
* Return the underlying sending RSocket.
*/
RSocket rsocket();
/**
* Return the data {@code MimeType} selected for the underlying RSocket
* at connection time. On the client side this is configured via
* {@link RSocketRequester.Builder#dataMimeType(MimeType)} while on the
* server side it's obtained from the {@link ConnectionSetupPayload}.
*/
MimeType dataMimeType();
/**
* Return the metadata {@code MimeType} selected for the underlying RSocket
* at connection time. On the client side this is configured via
* {@link RSocketRequester.Builder#metadataMimeType(MimeType)} while on the
* server side it's obtained from the {@link ConnectionSetupPayload}.
*/
MimeType metadataMimeType();
/**
* Begin to specify a new request with the given route to a remote handler.
* <p>The route can be a template with placeholders, e.g.
* {@code "flight.{code}"} in which case the supplied route variables are
* formatted via {@code toString()} and expanded into the template.
* If a formatted variable contains a "." it is replaced with the escape
* sequence "%2E" to avoid treating it as separator by the responder .
* <p>If the connection is set to use composite metadata, the route is
* encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route
* is encoded according to the mime type for the connection.
* @param route the route expressing a remote handler mapping
* @param routeVars variables to be expanded into the route template
* @return a spec for further defining and executing the request
*/
RequestSpec route(String route, Object... routeVars);
/**
* Begin to specify a new request with the given metadata value, which can
* be a concrete value or any producer of a single value that can be adapted
* to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
* @param metadata the metadata value to encode
* @param mimeType the mime type that describes the metadata;
* This is required for connection using composite metadata. Otherwise the
* value is encoded according to the mime type for the connection and this
* argument may be left as {@code null}.
*/
RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
/**
* Obtain a builder to create a client {@link RSocketRequester} by connecting
* to an RSocket server.
*/
static RSocketRequester.Builder builder() {
return new DefaultRSocketRequesterBuilder();
}
/**
* Wrap an existing {@link RSocket}. Typically used in client or server
* responders to wrap the {@code RSocket} for the remote side.
*/
static RSocketRequester wrap(
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
RSocketStrategies strategies) {
return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies);
}
/**
* Builder to create a requester by connecting to a server.
*/
interface Builder {
/**
* Configure the payload data MimeType to specify on the {@code SETUP}
* frame that applies to the whole connection.
* <p>If not set, this will be initialized to the MimeType of the first
* {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default}
* {@code Decoder}, or otherwise the MimeType of the first decoder.
*/
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
/**
* Configure the payload metadata MimeType to specify on the {@code SETUP}
* frame that applies to the whole connection.
* <p>By default this is set to
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
* route, if provided, is encoded as a {@code "message/x.rsocket.routing.v0"}
* composite metadata entry. If this is set to any other MimeType, it is
* assumed that's the MimeType for the route, if provided.
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Set the data for the setup payload. The data will be encoded
* according to the configured {@link #dataMimeType(MimeType)}.
* The data be a concrete value or any producer of a single value that
* can be adapted to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupData(Object data);
/**
* Set the route for the setup payload. The rules for formatting and
* encoding the route are the same as those for a request route as
* described in {@link #route(String, Object...)}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupRoute(String route, Object... routeVars);
/**
* Add metadata entry to the setup payload. Composite metadata must be
* in use if this is called more than once or in addition to
* {@link #setupRoute(String, Object...)}. The metadata value be a
* concrete value or any producer of a single value that can be adapted
* to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
*/
RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType);
/**
* Provide {@link RSocketStrategies} to use.
* <p>By default this is based on default settings of
* {@link RSocketStrategies.Builder} but may be further customized via
* {@link #rsocketStrategies(Consumer)}.
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
/**
* Customize the {@link RSocketStrategies}.
* <p>By default this starts out as {@link RSocketStrategies#builder()}.
* However if strategies were {@link #rsocketStrategies(RSocketStrategies) set}
* explicitly, then they are {@link RSocketStrategies#mutate() mutated}.
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <ul>
* <li>The data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory} and will be overridden. Use the
* shortcuts {@link #dataMimeType(MimeType)} and
* {@link #metadataMimeType(MimeType)} on this builder instead.
* <li>The frame decoder also cannot be set directly and instead is set
* to match the configured {@code DataBufferFactory}.
* <li>For the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload)
* setupPayload}, consider using methods on this builder to specify the
* route, other metadata, and data as Object values to be encoded.
* <li>To configure client side responding, see
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
* </ul>
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
/**
* Configure this builder through a {@code Consumer}. This enables
* libraries such as Spring Security to provide shortcuts for applying
* a set of related customizations.
* @param configurer the configurer to apply
*/
RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer);
/**
* Connect to the server over TCP.
* @param host the server host
* @param port the server port
* @return an {@code RSocketRequester} for the connection
* @see TcpClientTransport
*/
Mono<RSocketRequester> connectTcp(String host, int port);
/**
* Connect to the server over WebSocket.
* @param uri the RSocket server endpoint URI
* @return an {@code RSocketRequester} for the connection
* @see WebsocketClientTransport
*/
Mono<RSocketRequester> connectWebSocket(URI uri);
/**
* Connect to the server with the given {@code ClientTransport}.
* @param transport the client transport to use
* @return an {@code RSocketRequester} for the connection
*/
Mono<RSocketRequester> connect(ClientTransport transport);
}
/**
* Spec to declare the input for an RSocket request.
*/
interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec {
/**
* Append additional metadata entries through a {@code Consumer}.
* This enables libraries such as Spring Security to provide shortcuts
* for applying a set of customizations.
* @param configurer the configurer to apply
* @throws IllegalArgumentException if not using composite metadata.
*/
RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);
/**
* Provide payload data for the request. This can be one of:
* <ul>
* <li>Concrete value
* <li>{@link Publisher} of value(s)
* <li>Any other producer of value(s) that can be adapted to a
* {@link Publisher} via {@link ReactiveAdapterRegistry}
* </ul>
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
RetrieveSpec data(Object data);
/**
* Variant of {@link #data(Object)} that also accepts a hint for the
* types of values that will be produced. The class hint is used to
* find a compatible {@code Encoder} once, up front vs per value.
* @param producer the source of payload data value(s). This must be a
* {@link Publisher} or another producer adaptable to a
* {@code Publisher} via {@link ReactiveAdapterRegistry}
* @param elementClass the type of values to be produced
* @return spec to declare the expected response
*/
RetrieveSpec data(Object producer, Class<?> elementClass);
/**
* Variant of {@link #data(Object, Class)} for when the type hint has
* to have a generic type. See {@link ParameterizedTypeReference}.
* @param producer the source of payload data value(s). This must be a
* {@link Publisher} or another producer adaptable to a
* {@code Publisher} via {@link ReactiveAdapterRegistry}
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
RetrieveSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}
/**
* Spec for providing additional composite metadata entries.
*
* @param <S> a self reference to the spec type
*/
interface MetadataSpec<S extends MetadataSpec<S>> {
/**
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* The metadata value be a concrete value or any producer of a single
* value that can be adapted to a {@link Publisher} via
* {@link ReactiveAdapterRegistry}.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
*/
S metadata(Object metadata, MimeType mimeType);
}
/**
* Spec to declare the expected output for an RSocket request.
* @since 5.2.2
*/
interface RetrieveSpec {
/**
* Perform a {@link RSocket#fireAndForget fireAndForget}.
*/
Mono<Void> send();
/**
* Perform a {@link RSocket#requestResponse requestResponse} exchange.
* <p>If the return type is {@code Mono<Void>}, the {@code Mono} will
* complete after all data is consumed.
* <p><strong>Note:</strong> This method will raise an error if
* the request payload is a multi-valued {@link Publisher} as there is
* no many-to-one RSocket interaction.
* @param dataType the expected data type for the response
* @param <T> parameter for the expected data type
* @return the decoded response
*/
<T> Mono<T> retrieveMono(Class<T> dataType);
/**
* Variant of {@link #retrieveMono(Class)} for when the dataType has
* to have a generic type. See {@link ParameterizedTypeReference}.
*/
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
/**
* Perform an {@link RSocket#requestStream requestStream} or a
* {@link RSocket#requestChannel requestChannel} exchange depending on
* whether the request input is single or multi-payload.
* <p>If the return type is {@code Flux<Void>}, the {@code Flux} will
* complete after all data is consumed.
* @param dataType the expected type for values in the response
* @param <T> parameterize the expected type of values
* @return the decoded response
*/
<T> Flux<T> retrieveFlux(Class<T> dataType);
/**
* Variant of {@link #retrieveFlux(Class)} for when the dataType has
* to have a generic type. See {@link ParameterizedTypeReference}.
*/
<T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
}
}