AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(Retry.fromFunction(companion -> // (1)
companion.map(rs -> { // (2)
if (rs.totalRetries() < 3) return rs.totalRetries(); // (3)
else throw Exceptions.propagate(rs.failure()); // (4)
})
));
-
We customize
Retry
by adapting from aFunction
lambda rather than providing a concrete class -
The companion emits
RetrySignal
objects, which bear number of retries so far and last failure -
To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
-
In order to terminate the sequence in error, we throw the original exception after these three retries.