Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .singleOptional() to Mono and Flux #3317

Merged
merged 6 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 32 additions & 1 deletion reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -4245,6 +4245,37 @@ public final Mono<T> single() {
return Mono.onAssembly(new MonoSingleMono<>(this));
}

/**
* Wrap the item produced by this {@link Mono} source into an Optional
* or emit an empty Optional for an empty source.
* <p>
* <img class="marble" src="doc-files/marbles/singleOptional.svg" alt="">
* <p>
*
* @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal
*/
public final Mono<Optional<T>> singleOptional() {
if (this instanceof Callable) {
if (this instanceof Fuseable.ScalarCallable) {
@SuppressWarnings("unchecked")
Fuseable.ScalarCallable<T> scalarCallable = (Fuseable.ScalarCallable<T>) this;

T v;
try {
v = scalarCallable.call();
}
catch (Exception e) {
return Mono.error(Exceptions.unwrap(e));
}
return Mono.just(Optional.ofNullable(v));
}
@SuppressWarnings("unchecked")
Callable<T> thiz = (Callable<T>)this;
return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz));
}
return Mono.onAssembly(new MonoSingleOptional<>(this));
}

/**
* Subscribe to this {@link Mono} and request unbounded demand.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.Optional;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* Emits a single item from the source wrapped into an Optional, emits
chemicL marked this conversation as resolved.
Show resolved Hide resolved
* an empty Optional instead for empty source.
*
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoSingleOptional<T> extends InternalMonoOperator<T, Optional<T>> {

MonoSingleOptional(Mono<? extends T> source) {
super(source);
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super Optional<T>> actual) {
return new MonoSingleOptional.SingleOptionalSubscriber<>(actual);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}

static final class SingleOptionalSubscriber<T> extends Operators.MonoInnerProducerBase<Optional<T>> implements InnerConsumer<T> {

Subscription s;

boolean done;

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return super.scanUnsafe(key);
}

@Override
public Context currentContext() {
return actual().currentContext();
}

SingleOptionalSubscriber(CoreSubscriber<? super Optional<T>> actual) {
super(actual);
}

@Override
public void doOnRequest(long n) {
s.request(Long.MAX_VALUE);
}

@Override
public void doOnCancel() {
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual().onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual().currentContext());
return;
}
done = true;
complete(Optional.of(t));
chemicL marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual().currentContext());
return;
}
done = true;
discardTheValue();

actual().onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
complete(Optional.empty());
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;

/**
* Expects and emits a single item from the source Callable and warps it into an Optional,
* emits an empty Optional for empty source.
*
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoSingleOptionalCallable<T> extends Mono<Optional<T>>
chemicL marked this conversation as resolved.
Show resolved Hide resolved
implements Callable<Optional<T>>, SourceProducer<Optional<T>> {

final Callable<? extends T> callable;

MonoSingleOptionalCallable(Callable<? extends T> source) {
this.callable = Objects.requireNonNull(source, "source");
}

@Override
public void subscribe(CoreSubscriber<? super Optional<T>> actual) {
Operators.MonoInnerProducerBase<Optional<T>>
sds = new Operators.MonoInnerProducerBase<>(actual);

actual.onSubscribe(sds);

if (sds.isCancelled()) {
return;
}

try {
T t = callable.call();
sds.complete(Optional.ofNullable(t));
}
catch (Throwable e) {
actual.onError(Operators.onOperatorError(e, actual.currentContext()));
}

}

@Override
public Optional<T> block() {
//duration is ignored below
return block(Duration.ZERO);
}

@Override
public Optional<T> block(Duration m) {
final T v;

try {
v = callable.call();
}
catch (Throwable e) {
throw Exceptions.propagate(e);
}

return Optional.ofNullable(v);
}

@Override
public Optional<T> call() throws Exception {
final T v = callable.call();

return Optional.ofNullable(v);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
}