diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/ExponentialBackoff.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/ExponentialBackoff.java index 275d2167f..bcef2941f 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/ExponentialBackoff.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/ExponentialBackoff.java @@ -39,7 +39,7 @@ public static Function, Publisher> randomExponentialBacko AtomicInteger index = new AtomicInteger(); return t -> t .onItem().transformToUni(failure -> { - int iteration = index.incrementAndGet(); + int iteration = index.getAndIncrement(); if (iteration >= numRetries) { return Uni.createFrom().failure( new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries, diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java index 950702cc0..0f8b72d64 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java @@ -11,7 +11,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.smallrye.mutiny.Multi; @@ -87,7 +86,6 @@ public void testWithOtherStreamFailing() { } @Test - @Disabled("To be investigated - the switch to the strict serializer broken the cancellation check") public void testWhatTheWhenStreamFailsTheUpstreamIsCancelled() { AtomicBoolean subscribed = new AtomicBoolean(); AtomicBoolean cancelled = new AtomicBoolean(); @@ -318,9 +316,9 @@ public void testRetryWithRandomBackoff() { .onFailure().retry().withBackOff(Duration.ofMillis(10), Duration.ofHours(1)).withJitter(0.1) .atMost(4) .subscribe().withSubscriber(AssertSubscriber.create(100)); - await().until(() -> subscriber.getItems().size() >= 8); + await().until(() -> subscriber.getItems().size() >= 10); subscriber - .assertItems(0, 1, 0, 1, 0, 1, 0, 1) + .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries .assertFailedWith(IllegalStateException.class, "Retries exhausted"); } @@ -335,9 +333,9 @@ public void testRetryWithRandomBackoffAndDefaultJitter() { .atMost(4) .subscribe().withSubscriber(AssertSubscriber.create(100)); - await().until(() -> subscriber.getItems().size() >= 8); + await().until(() -> subscriber.getItems().size() >= 10); subscriber - .assertItems(0, 1, 0, 1, 0, 1, 0, 1) + .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries .assertFailedWith(IllegalStateException.class, "Retries exhausted: 4/4"); } @@ -352,9 +350,9 @@ public void testRetryWithDefaultMax() { await() .atMost(1, TimeUnit.MINUTES) - .until(() -> subscriber.getItems().size() >= 8); + .until(() -> subscriber.getItems().size() >= 10); subscriber - .assertItems(0, 1, 0, 1, 0, 1, 0, 1) + .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries .assertFailedWith(IllegalStateException.class, "Retries exhausted: 4/4"); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryTest.java index bd40a8f00..488393322 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnFailureRetryTest.java @@ -1,5 +1,6 @@ package io.smallrye.mutiny.operators; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.concurrent.atomic.AtomicInteger; @@ -84,4 +85,19 @@ public void testWithMapperFailure() { .subscribe().withSubscriber(UniAssertSubscriber.create()) .assertItem(1); } + + @Test + public void testThatNumberOfRetryIfCorrect() { + AtomicInteger calls = new AtomicInteger(); + + UniAssertSubscriber subscriber = Uni.createFrom(). item(() -> { + calls.incrementAndGet(); + throw new RuntimeException("boom"); + }) + .onFailure().retry().atMost(3) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + subscriber.awaitFailure(); + assertThat(calls).hasValue(4); // initial subscription + 3 retries + } }