Skip to content

Commit

Permalink
Updates after review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Huber committed Jan 10, 2023
1 parent 0c78545 commit e53048c
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 138 deletions.
8 changes: 3 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,8 @@
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.core.publisher.FluxOnAssembly.AssemblySnapshot;
import reactor.core.publisher.FluxOnAssembly.CheckpointHeavySnapshot;
import reactor.core.publisher.FluxOnAssembly.CheckpointLightSnapshot;
Expand All @@ -73,8 +75,6 @@
import reactor.util.function.Tuple7;
import reactor.util.function.Tuple8;
import reactor.util.function.Tuples;
import reactor.core.observability.SignalListener;
import reactor.core.observability.SignalListenerFactory;
import reactor.util.retry.Retry;

/**
Expand Down Expand Up @@ -4249,8 +4249,6 @@ public final Mono<T> single() {
* Wrap the item produced by this {@link Mono} source into an Optional
* or emit an empty Optional for an empty source.
* <p>
* <img class="marble" src="doc-files/marbles/singleForMono.svg" alt="">
* <p>
*
* @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,14 +32,6 @@
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
/**
* Wraps the item from the source into an Optional, emits
* an empty Optional instead for empty source or signals
* IndexOutOfBoundsException for a multi-item source.
*
* @param <T> the value type
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class MonoSingleOptional<T> extends InternalMonoOperator<T, Optional<T>> {

MonoSingleOptional(Mono<? extends T> source) {
Expand Down Expand Up @@ -145,7 +137,6 @@ public void onComplete() {

int c = count;
if (c == 0) {

complete(Optional.empty());
}
else if (c == 1) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
8/*
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,12 @@ public void callableValued() {
.expectNext("foo")
.verifyComplete();
}

@Test
public void callableError() {
StepVerifier.create(Mono.error(new IllegalStateException("failed")).single())
.expectErrorMessage("failed");
}

@Test
public void normalEmpty() {
Expand All @@ -59,7 +65,12 @@ public void normalValued() {
.expectNext("foo")
.verifyComplete();
}


@Test
public void normalError() {
StepVerifier.create(Mono.error(new IllegalStateException("failed")).hide().single())
.expectErrorMessage("failed");
}
// see https://github.com/reactor/reactor-core/issues/2663
@Test
void fusionMonoSingleMonoDoesntTriggerFusion() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;

import org.junit.jupiter.api.Test;

import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.test.StepVerifier;

class MonoSingleOptionalCallableTest {

@Test
void testCallableFusedEmptySource() {
Mono<Optional<Integer>> mono = Mono
.<Integer>fromSupplier(() -> null)
.singleOptional();

StepVerifier.create(mono)
.expectNext(Optional.empty())
.verifyComplete();
}

@Test
void testCallableFusedSingleEmptySourceOnBlock() {
Mono<Optional<Integer>> mono = Mono
.<Integer>fromSupplier(() -> null)
.singleOptional();

assertEquals(Optional.empty(), mono.block());
}

@Test
void testCallableFusedSingleEmptySourceOnCall() throws Exception {
Mono<Optional<Integer>> mono = Mono
.<Integer>fromSupplier(() -> null)
.singleOptional();

assertThat(mono).isInstanceOf(MonoSingleOptionalCallable.class);

assertEquals(Optional.empty(), ((Callable<?>) mono).call());
}

@Test
void sourceNull() {
assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> {
new MonoSingleOptionalCallable<>(null);
});
}

@Test
void normal() {
StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1))
.expectNext(Optional.of(1))
.verifyComplete();
}

@Test
void normalBackpressured() {
StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1), 0)
.expectSubscription()
.expectNoEvent(Duration.ofMillis(50))
.thenRequest(1)
.expectNext(Optional.of(1))
.verifyComplete();
}

//scalarCallable empty/error/just are not instantiating MonoSingleOptionalCallable and are covered in MonoSingleTest
//we still cover the case where a callable source throws

@Test
void failingCallable() {
StepVerifier.create(new MonoSingleOptionalCallable<>(() -> { throw new IllegalStateException("test"); } ))
.verifyErrorMessage("test");
}

@Test
void emptyCallable() {
StepVerifier.create(new MonoSingleOptionalCallable<>(() -> null))
.expectNext(Optional.empty())
.verifyComplete();
}

@Test
void valuedCallable() {
@SuppressWarnings("unchecked")
Callable<Integer> fluxCallable = (Callable<Integer>) Mono.fromCallable(() -> 1).flux();


StepVerifier.create(new MonoSingleOptionalCallable<>(fluxCallable))
.expectNext(Optional.of(1))
.verifyComplete();
}

@Test
void fusionMonoSingleOptionalCallableDoesntTriggerFusion() {
Mono<Optional<Integer>> fusedCase = Mono
.fromCallable(() -> 1)
.singleOptional();

assertThat(fusedCase)
.as("fusedCase assembly check")
.isInstanceOf(MonoSingleOptionalCallable.class)
.isNotInstanceOf(Fuseable.class);

assertThatCode(() -> fusedCase.filter(v -> true).block())
.as("fusedCase fused")
.doesNotThrowAnyException();
}

@Test
void scanOperator(){
MonoSingleOptionalCallable<String> test = new MonoSingleOptionalCallable<>(() -> "foo");

assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC);
}

}

0 comments on commit e53048c

Please sign in to comment.