Skip to content

Commit

Permalink
Allow async metadata in RSocketRequester
Browse files Browse the repository at this point in the history
This commit allows single-value async producers for the values of
metadata entries in both the SETUP and for requests. The same is also
enabled for data in the SETUP frame.

Close gh-23640
  • Loading branch information
rstoyanchev committed Nov 20, 2019
1 parent 82f4e93 commit 996f729
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 121 deletions.
Expand Up @@ -114,22 +114,20 @@ private DataBufferFactory bufferFactory() {

private class DefaultRequestSpec implements RequestSpec {

private final MetadataEncoder metadataEncoder;
private final MetadataEncoder metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);

@Nullable
private Mono<Payload> payloadMono = emptyPayload();
private Mono<Payload> payloadMono;

@Nullable
private Flux<Payload> payloadFlux = null;
private Flux<Payload> payloadFlux;


public DefaultRequestSpec(String route, Object... vars) {
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
this.metadataEncoder.route(route, vars);
}

public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
this.metadataEncoder = new MetadataEncoder(metadataMimeType(), strategies);
this.metadataEncoder.metadata(metadata, mimeType);
}

Expand Down Expand Up @@ -188,17 +186,14 @@ else if (adapter != null) {
publisher = adapter.toPublisher(input);
}
else {
this.payloadMono = Mono
.fromCallable(() -> encodeData(input, ResolvableType.forInstance(input), null))
.map(this::firstPayload)
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
ResolvableType type = ResolvableType.forInstance(input);
this.payloadMono = firstPayload(Mono.fromCallable(() -> encodeData(input, type, null)));
this.payloadFlux = null;
return;
}

if (isVoid(elementType) || (adapter != null && adapter.isNoValue())) {
this.payloadMono = Mono.when(publisher).then(emptyPayload());
this.payloadMono = firstPayload(Mono.when(publisher).then(Mono.just(emptyDataBuffer)));
this.payloadFlux = null;
return;
}
Expand All @@ -207,29 +202,29 @@ else if (adapter != null) {
strategies.encoder(elementType, dataMimeType) : null;

if (adapter != null && !adapter.isMultiValue()) {
this.payloadMono = Mono.from(publisher)
Mono<DataBuffer> data = Mono.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.map(this::firstPayload)
.switchIfEmpty(emptyPayload());
.defaultIfEmpty(emptyDataBuffer);
this.payloadMono = firstPayload(data);
this.payloadFlux = null;
return;
}

this.payloadMono = null;
this.payloadFlux = Flux.from(publisher)
.map(value -> encodeData(value, elementType, encoder))
.defaultIfEmpty(emptyDataBuffer)
.switchOnFirst((signal, inner) -> {
DataBuffer data = signal.get();
if (data != null) {
return Mono.fromCallable(() -> firstPayload(data))
return firstPayload(Mono.fromCallable(() -> data))
.concatWith(inner.skip(1).map(PayloadUtils::createPayload));
}
else {
return inner.map(PayloadUtils::createPayload);
}
})
.doOnDiscard(Payload.class, Payload::release)
.switchIfEmpty(emptyPayload());
.doOnDiscard(Payload.class, Payload::release);
}

@SuppressWarnings("unchecked")
Expand All @@ -242,26 +237,25 @@ private <T> DataBuffer encodeData(T value, ResolvableType elementType, @Nullable
value, bufferFactory(), elementType, dataMimeType, EMPTY_HINTS);
}

private Payload firstPayload(DataBuffer data) {
DataBuffer metadata;
try {
metadata = this.metadataEncoder.encode();
}
catch (Throwable ex) {
DataBufferUtils.release(data);
throw ex;
}
return PayloadUtils.createPayload(data, metadata);
}

private Mono<Payload> emptyPayload() {
return Mono.fromCallable(() -> firstPayload(emptyDataBuffer));
/**
* Create the 1st request payload with encoded data and metadata.
* @param encodedData the encoded payload data; expected to not be empty!
*/
private Mono<Payload> firstPayload(Mono<DataBuffer> encodedData) {
return Mono.zip(encodedData, this.metadataEncoder.encode())
.map(tuple -> PayloadUtils.createPayload(tuple.getT1(), tuple.getT2()))
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
.doOnDiscard(Payload.class, Payload::release);
}

@Override
public Mono<Void> send() {
Assert.state(this.payloadMono != null, "No RSocket interaction model for one-way send with Flux");
return this.payloadMono.flatMap(rsocket::fireAndForget);
return getPayloadMonoRequired().flatMap(rsocket::fireAndForget);
}

private Mono<Payload> getPayloadMonoRequired() {
Assert.state(this.payloadFlux == null, "No RSocket interaction model for Flux request to Mono response.");
return this.payloadMono != null ? this.payloadMono : firstPayload(Mono.just(emptyDataBuffer));
}

@Override
Expand All @@ -286,8 +280,7 @@ public <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef) {

@SuppressWarnings("unchecked")
private <T> Mono<T> retrieveMono(ResolvableType elementType) {
Assert.notNull(this.payloadMono, "No RSocket interaction model for Flux request to Mono response.");
Mono<Payload> payloadMono = this.payloadMono.flatMap(rsocket::requestResponse);
Mono<Payload> payloadMono = getPayloadMonoRequired().flatMap(rsocket::requestResponse);

if (isVoid(elementType)) {
return (Mono<T>) payloadMono.then();
Expand Down
Expand Up @@ -33,6 +33,7 @@
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;

import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
Expand All @@ -57,6 +58,8 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {

private static final Map<String, Object> HINTS = Collections.emptyMap();

private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];


@Nullable
private MimeType dataMimeType;
Expand Down Expand Up @@ -175,50 +178,14 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
factory.dataMimeType(dataMimeType.toString());
factory.metadataMimeType(metaMimeType.toString());

Payload setupPayload = getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies);
if (setupPayload != null) {
factory.setupPayload(setupPayload);
}

return factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies));
}

@Nullable
private Payload getSetupPayload(MimeType dataMimeType, MimeType metaMimeType, RSocketStrategies strategies) {
DataBuffer metadata = null;
if (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)) {
metadata = new MetadataEncoder(metaMimeType, strategies)
.metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars)
.encode();
}
DataBuffer data = null;
if (this.setupData != null) {
try {
ResolvableType type = ResolvableType.forClass(this.setupData.getClass());
Encoder<Object> encoder = strategies.encoder(type, dataMimeType);
Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type);
data = encoder.encodeValue(this.setupData, strategies.dataBufferFactory(), type, dataMimeType, HINTS);
}
catch (Throwable ex) {
if (metadata != null) {
DataBufferUtils.release(metadata);
}
throw ex;
}
}
if (metadata == null && data == null) {
return null;
}
metadata = metadata != null ? metadata : emptyBuffer(strategies);
data = data != null ? data : emptyBuffer(strategies);
return PayloadUtils.createPayload(data, metadata);
}

private DataBuffer emptyBuffer(RSocketStrategies strategies) {
return strategies.dataBufferFactory().wrap(new byte[0]);
return getSetupPayload(dataMimeType, metaMimeType, rsocketStrategies)
.doOnNext(factory::setupPayload)
.then(Mono.defer(() ->
factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, metaMimeType, rsocketStrategies))
));
}

private RSocketStrategies getRSocketStrategies() {
Expand Down Expand Up @@ -261,4 +228,45 @@ private static MimeType getMimeType(Decoder<?> decoder) {
return mimeType.getParameters().isEmpty() ? mimeType : new MimeType(mimeType, Collections.emptyMap());
}

private Mono<Payload> getSetupPayload(
MimeType dataMimeType, MimeType metaMimeType, RSocketStrategies strategies) {

Object data = this.setupData;
boolean hasMetadata = (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata));
if (!hasMetadata && data == null) {
return Mono.empty();
}

Mono<DataBuffer> dataMono = Mono.empty();
if (data != null) {
ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(data.getClass());
Assert.isTrue(adapter == null || !adapter.isMultiValue(), "Expected single value: " + data);
Mono<?> mono = (adapter != null ? Mono.from(adapter.toPublisher(data)) : Mono.just(data));
dataMono = mono.map(value -> {
ResolvableType type = ResolvableType.forClass(value.getClass());
Encoder<Object> encoder = strategies.encoder(type, dataMimeType);
Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type);
return encoder.encodeValue(value, strategies.dataBufferFactory(), type, dataMimeType, HINTS);
});
}

Mono<DataBuffer> metaMono = Mono.empty();
if (hasMetadata) {
metaMono = new MetadataEncoder(metaMimeType, strategies)
.metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars)
.encode();
}

Mono<DataBuffer> emptyBuffer = Mono.fromCallable(() ->
strategies.dataBufferFactory().wrap(EMPTY_BYTE_ARRAY));

dataMono = dataMono.switchIfEmpty(emptyBuffer);
metaMono = metaMono.switchIfEmpty(emptyBuffer);

return Mono.zip(dataMono, metaMono)
.map(tuple -> PayloadUtils.createPayload(tuple.getT1(), tuple.getT2()))
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
.doOnDiscard(Payload.class, Payload::release);
}

}

0 comments on commit 996f729

Please sign in to comment.