Skip to content

Commit

Permalink
Merge pull request #678 from cescoffier/fix-675
Browse files Browse the repository at this point in the history
Fix off-by-one retry count when using exponential backoff
  • Loading branch information
jponge committed Sep 9, 2021
2 parents 36c5c3b + 4448517 commit c111fa5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
Expand Up @@ -39,7 +39,7 @@ public static Function<Multi<Throwable>, Publisher<Long>> randomExponentialBacko
AtomicInteger index = new AtomicInteger();
return t -> t
.onItem().transformToUni(failure -> {
int iteration = index.incrementAndGet();
int iteration = index.getAndIncrement();
if (iteration >= numRetries) {
return Uni.createFrom().failure(
new IllegalStateException("Retries exhausted: " + iteration + "/" + numRetries,
Expand Down
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -87,7 +86,6 @@ public void testWithOtherStreamFailing() {
}

@Test
@Disabled("To be investigated - the switch to the strict serializer broken the cancellation check")
public void testWhatTheWhenStreamFailsTheUpstreamIsCancelled() {
AtomicBoolean subscribed = new AtomicBoolean();
AtomicBoolean cancelled = new AtomicBoolean();
Expand Down Expand Up @@ -318,9 +316,9 @@ public void testRetryWithRandomBackoff() {
.onFailure().retry().withBackOff(Duration.ofMillis(10), Duration.ofHours(1)).withJitter(0.1)
.atMost(4)
.subscribe().withSubscriber(AssertSubscriber.create(100));
await().until(() -> subscriber.getItems().size() >= 8);
await().until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1)
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.assertFailedWith(IllegalStateException.class, "Retries exhausted");

}
Expand All @@ -335,9 +333,9 @@ public void testRetryWithRandomBackoffAndDefaultJitter() {
.atMost(4)
.subscribe().withSubscriber(AssertSubscriber.create(100));

await().until(() -> subscriber.getItems().size() >= 8);
await().until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1)
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.assertFailedWith(IllegalStateException.class, "Retries exhausted: 4/4");
}

Expand All @@ -352,9 +350,9 @@ public void testRetryWithDefaultMax() {

await()
.atMost(1, TimeUnit.MINUTES)
.until(() -> subscriber.getItems().size() >= 8);
.until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1)
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.assertFailedWith(IllegalStateException.class, "Retries exhausted: 4/4");
}

Expand Down
@@ -1,5 +1,6 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -84,4 +85,19 @@ public void testWithMapperFailure() {
.subscribe().withSubscriber(UniAssertSubscriber.create())
.assertItem(1);
}

@Test
public void testThatNumberOfRetryIfCorrect() {
AtomicInteger calls = new AtomicInteger();

UniAssertSubscriber<Integer> subscriber = Uni.createFrom().<Integer> item(() -> {
calls.incrementAndGet();
throw new RuntimeException("boom");
})
.onFailure().retry().atMost(3)
.subscribe().withSubscriber(UniAssertSubscriber.create());

subscriber.awaitFailure();
assertThat(calls).hasValue(4); // initial subscription + 3 retries
}
}

0 comments on commit c111fa5

Please sign in to comment.