diff --git a/core/src/main/java/feign/AsynchronousMethodHandler.java b/core/src/main/java/feign/AsynchronousMethodHandler.java index 1c91a16b4..82466fc15 100644 --- a/core/src/main/java/feign/AsynchronousMethodHandler.java +++ b/core/src/main/java/feign/AsynchronousMethodHandler.java @@ -88,18 +88,18 @@ public Object invoke(Object[] argv) throws Throwable { private CompletableFuture executeAndDecode(RequestTemplate template, Options options, Retryer retryer) { - CompletableFuture resultFuture = new CompletableFuture<>(); + CancellableFuture resultFuture = new CancellableFuture<>(); executeAndDecode(template, options) .whenComplete((response, throwable) -> { if (throwable != null) { - if (shouldRetry(retryer, throwable, resultFuture)) { + if (!resultFuture.isDone() && shouldRetry(retryer, throwable, resultFuture)) { if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } - executeAndDecode(template, options, retryer) - .whenComplete(pipeTo(resultFuture)); + resultFuture.setInner( + executeAndDecode(template, options, retryer)); } } else { resultFuture.complete(response); @@ -109,6 +109,38 @@ private CompletableFuture executeAndDecode(RequestTemplate template, return resultFuture; } + private static class CancellableFuture extends CompletableFuture { + private CompletableFuture inner = null; + + public void setInner(CompletableFuture value) { + inner = value; + inner.whenComplete(pipeTo(this)); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + final boolean result = super.cancel(mayInterruptIfRunning); + if (inner != null) { + inner.cancel(mayInterruptIfRunning); + } + return result; + } + + private static BiConsumer pipeTo(CompletableFuture completableFuture) { + return (value, throwable) -> { + if (completableFuture.isDone()) { + return; + } + + if (throwable != null) { + completableFuture.completeExceptionally(throwable); + } else { + completableFuture.complete(value); + } + }; + } + } + private boolean shouldRetry(Retryer retryer, Throwable throwable, CompletableFuture resultFuture) { @@ -136,16 +168,6 @@ private boolean shouldRetry(Retryer retryer, } } - private static BiConsumer pipeTo(CompletableFuture completableFuture) { - return (value, throwable) -> { - if (throwable != null) { - completableFuture.completeExceptionally(throwable); - } else { - completableFuture.complete(value); - } - }; - } - private CompletableFuture executeAndDecode(RequestTemplate template, Options options) { Request request = targetRequest(template); diff --git a/core/src/test/java/feign/AsyncFeignTest.java b/core/src/test/java/feign/AsyncFeignTest.java index 6ccfc714d..bd29ed906 100644 --- a/core/src/test/java/feign/AsyncFeignTest.java +++ b/core/src/test/java/feign/AsyncFeignTest.java @@ -48,16 +48,21 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okio.Buffer; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.junit.rules.ExpectedException; public class AsyncFeignTest { @@ -656,6 +661,53 @@ public void whenReturnTypeIsResponseNoErrorHandling() throws Throwable { execs.shutdown(); } + @ParameterizedTest + @ValueSource(ints = {1, 5, 10, 100, 1000}) + public void cancelRetry(final int expectedTryCount) throws Throwable { + // Arrange + final CompletableFuture maximumTryCompleted = new CompletableFuture<>(); + final AtomicInteger actualTryCount = new AtomicInteger(); + final AtomicBoolean isCancelled = new AtomicBoolean(true); + + final int RUNNING_TIME_MILLIS = 100; + final ExecutorService execs = Executors.newSingleThreadExecutor(); + final AsyncClient clientMock = + (request, options, requestContext) -> CompletableFuture.supplyAsync(() -> { + final int tryCount = actualTryCount.addAndGet(1); + if (tryCount < expectedTryCount) { + throw new CompletionException(new IOException()); + } + + if (tryCount > expectedTryCount) { + isCancelled.set(false); + throw new CompletionException(new IOException()); + } + + maximumTryCompleted.complete(true); + try { + Thread.sleep(RUNNING_TIME_MILLIS); + throw new IOException(); + } catch (Throwable e) { + throw new CompletionException(e); + } + }, execs); + final TestInterfaceAsync sut = AsyncFeign.builder() + .client(clientMock) + .retryer(new Retryer.Default(0, Long.MAX_VALUE, expectedTryCount * 2)) + .target(TestInterfaceAsync.class, "http://localhost:" + server.getPort()); + + // Act + final CompletableFuture actual = sut.post(); + maximumTryCompleted.join(); + actual.cancel(true); + Thread.sleep(RUNNING_TIME_MILLIS * 5); + + // Assert + assertThat(actualTryCount.get()).isEqualTo(expectedTryCount); + assertThat(isCancelled.get()).isTrue(); + execs.shutdown(); + } + private static class MockRetryer implements Retryer { boolean tripped; diff --git a/pom.xml b/pom.xml index 351abb116..6b951ee04 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,7 @@ 20220924 4.13.2 + 5.9.1 2.14.0-rc3 2.14.0-rc3 3.23.1 @@ -297,6 +298,12 @@ ${junit.version} + + org.junit.jupiter + junit-jupiter-params + ${junit5.version} + + org.hamcrest hamcrest @@ -399,6 +406,12 @@ test + + org.junit.jupiter + junit-jupiter-params + test + + org.assertj assertj-core