Skip to content

Commit

Permalink
Avoid re-creating connect Mono<RSocket>
Browse files Browse the repository at this point in the history
Closes gh-25330
  • Loading branch information
rstoyanchev committed Jul 1, 2020
1 parent 79c339b commit b2a4d1c
Showing 1 changed file with 35 additions and 39 deletions.
Expand Up @@ -23,8 +23,10 @@
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
Expand Down Expand Up @@ -174,10 +176,6 @@ public Mono<RSocketRequester> connectWebSocket(URI uri) {

@Override
public Mono<RSocketRequester> connect(ClientTransport transport) {
return Mono.defer(() -> doConnect(transport));
}

private Mono<RSocketRequester> doConnect(ClientTransport transport) {
RSocketStrategies rsocketStrategies = getRSocketStrategies();
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
Expand All @@ -186,21 +184,28 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());

MimeType dataMimeType = getDataMimeType(rsocketStrategies);
Mono<Payload> setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies);

Function<Payload, Mono<RSocket>> connectFunction;
if (rsocketConnectorPresent) {
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.flatMap(payload ->
new RSocketConnectorHelper().connect(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, payload, rsocketStrategies, transport));
connectFunction = payload -> new RSocketConnectorHelper().getRSocketMono(
this.rsocketConnectorConfigurers, this.rsocketFactoryConfigurers,
metaMimeType, dataMimeType, setupPayload, rsocketStrategies, transport, payload);
}
else {
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.flatMap(payload ->
new RSocketFactoryHelper().connect(
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType, payload,
rsocketStrategies, transport));
connectFunction = payload -> new RSocketFactoryHelper().getRSocketMono(
this.rsocketFactoryConfigurers, metaMimeType, dataMimeType,
setupPayload, rsocketStrategies, transport, payload);
}

// In RSocket 1.0.2 we can pass a Mono for the setup Payload. Until then we have to
// resolve it and then cache the Mono<RSocket> because it may be a ReconnectMono.

return setupPayload
.map(connectFunction)
.cache()
.flatMap(mono -> mono.map(rsocket ->
new DefaultRSocketRequester(rsocket, dataMimeType, metaMimeType, rsocketStrategies)));
}

private RSocketStrategies getRSocketStrategies() {
Expand Down Expand Up @@ -285,14 +290,13 @@ private Mono<Payload> getSetupPayload(
}


@SuppressWarnings("deprecation")
private static class RSocketConnectorHelper {

@SuppressWarnings("deprecation")
Mono<RSocketRequester> connect(
List<RSocketConnectorConfigurer> connectorConfigurers,
Mono<RSocket> getRSocketMono(List<RSocketConnectorConfigurer> connectorConfigurers,
List<ClientRSocketFactoryConfigurer> factoryConfigurers,
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport) {
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {

io.rsocket.core.RSocketConnector connector = io.rsocket.core.RSocketConnector.create();
connectorConfigurers.forEach(c -> c.configure(connector));
Expand All @@ -307,27 +311,23 @@ Mono<RSocketRequester> connect(
connector.payloadDecoder(PayloadDecoder.ZERO_COPY);
}

connector.metadataMimeType(metaMimeType.toString());
connector.dataMimeType(dataMimeType.toString());

if (setupPayload != EMPTY_SETUP_PAYLOAD) {
connector.setupPayload(setupPayload);
connector.setupPayload(payload);
}

return connector
.metadataMimeType(metaMimeType.toString())
.dataMimeType(dataMimeType.toString())
.connect(transport)
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
return connector.connect(transport);
}
}


@SuppressWarnings("deprecation")
private static class RSocketFactoryHelper {

Mono<RSocketRequester> connect(
List<ClientRSocketFactoryConfigurer> configurers,
MimeType metaMimeType, MimeType dataMimeType, Payload setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport) {
Mono<RSocket> getRSocketMono(List<ClientRSocketFactoryConfigurer> configurers,
MimeType metaMimeType, MimeType dataMimeType, Mono<Payload> setupPayload,
RSocketStrategies rsocketStrategies, ClientTransport transport, Payload payload) {

io.rsocket.RSocketFactory.ClientRSocketFactory factory = io.rsocket.RSocketFactory.connect();
configurers.forEach(c -> c.configure(factory));
Expand All @@ -336,16 +336,12 @@ Mono<RSocketRequester> connect(
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
}

factory.metadataMimeType(metaMimeType.toString());
factory.dataMimeType(dataMimeType.toString());
if (setupPayload != EMPTY_SETUP_PAYLOAD) {
factory.setupPayload(setupPayload);
factory.setupPayload(payload);
}

return factory.metadataMimeType(metaMimeType.toString())
.dataMimeType(dataMimeType.toString())
.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
return factory.transport(transport).start();
}
}

Expand Down

0 comments on commit b2a4d1c

Please sign in to comment.