Skip to content

Commit

Permalink
Support retry cancel feature fully for AsyncFeign (#1801)
Browse files Browse the repository at this point in the history
* Support retry cancel feature for AsyncFeign

* Support retry cancel feature fully for AsyncFeign

Co-authored-by: Marvin Froeder <velo@users.noreply.github.com>
  • Loading branch information
wplong11 and velo committed Nov 2, 2022
1 parent 950935d commit d09b88e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 14 deletions.
50 changes: 36 additions & 14 deletions core/src/main/java/feign/AsynchronousMethodHandler.java
Expand Up @@ -88,18 +88,18 @@ public Object invoke(Object[] argv) throws Throwable {
private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
Options options,
Retryer retryer) {
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
CancellableFuture<Object> resultFuture = new CancellableFuture<>();

executeAndDecode(template, options)
.whenComplete((response, throwable) -> {
if (throwable != null) {
if (shouldRetry(retryer, throwable, resultFuture)) {
if (!resultFuture.isDone() && shouldRetry(retryer, throwable, resultFuture)) {
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}

executeAndDecode(template, options, retryer)
.whenComplete(pipeTo(resultFuture));
resultFuture.setInner(
executeAndDecode(template, options, retryer));
}
} else {
resultFuture.complete(response);
Expand All @@ -109,6 +109,38 @@ private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
return resultFuture;
}

private static class CancellableFuture<T> extends CompletableFuture<T> {
private CompletableFuture<T> inner = null;

public void setInner(CompletableFuture<T> value) {
inner = value;
inner.whenComplete(pipeTo(this));
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
final boolean result = super.cancel(mayInterruptIfRunning);
if (inner != null) {
inner.cancel(mayInterruptIfRunning);
}
return result;
}

private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
return (value, throwable) -> {
if (completableFuture.isDone()) {
return;
}

if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
completableFuture.complete(value);
}
};
}
}

private boolean shouldRetry(Retryer retryer,
Throwable throwable,
CompletableFuture<Object> resultFuture) {
Expand Down Expand Up @@ -136,16 +168,6 @@ private boolean shouldRetry(Retryer retryer,
}
}

private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
return (value, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
completableFuture.complete(value);
}
};
}

private CompletableFuture<Object> executeAndDecode(RequestTemplate template, Options options) {
Request request = targetRequest(template);

Expand Down
52 changes: 52 additions & 0 deletions core/src/test/java/feign/AsyncFeignTest.java
Expand Up @@ -48,16 +48,21 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okio.Buffer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.rules.ExpectedException;

public class AsyncFeignTest {
Expand Down Expand Up @@ -656,6 +661,53 @@ public void whenReturnTypeIsResponseNoErrorHandling() throws Throwable {
execs.shutdown();
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10, 100, 1000})
public void cancelRetry(final int expectedTryCount) throws Throwable {
// Arrange
final CompletableFuture<Boolean> maximumTryCompleted = new CompletableFuture<>();
final AtomicInteger actualTryCount = new AtomicInteger();
final AtomicBoolean isCancelled = new AtomicBoolean(true);

final int RUNNING_TIME_MILLIS = 100;
final ExecutorService execs = Executors.newSingleThreadExecutor();
final AsyncClient<Void> clientMock =
(request, options, requestContext) -> CompletableFuture.supplyAsync(() -> {
final int tryCount = actualTryCount.addAndGet(1);
if (tryCount < expectedTryCount) {
throw new CompletionException(new IOException());
}

if (tryCount > expectedTryCount) {
isCancelled.set(false);
throw new CompletionException(new IOException());
}

maximumTryCompleted.complete(true);
try {
Thread.sleep(RUNNING_TIME_MILLIS);
throw new IOException();
} catch (Throwable e) {
throw new CompletionException(e);
}
}, execs);
final TestInterfaceAsync sut = AsyncFeign.<Void>builder()
.client(clientMock)
.retryer(new Retryer.Default(0, Long.MAX_VALUE, expectedTryCount * 2))
.target(TestInterfaceAsync.class, "http://localhost:" + server.getPort());

// Act
final CompletableFuture<String> actual = sut.post();
maximumTryCompleted.join();
actual.cancel(true);
Thread.sleep(RUNNING_TIME_MILLIS * 5);

// Assert
assertThat(actualTryCount.get()).isEqualTo(expectedTryCount);
assertThat(isCancelled.get()).isTrue();
execs.shutdown();
}

private static class MockRetryer implements Retryer {

boolean tripped;
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Expand Up @@ -86,6 +86,7 @@
<json.version>20220924</json.version>

<junit.version>4.13.2</junit.version>
<junit5.version>5.9.1</junit5.version>
<jackson.version>2.14.0-rc3</jackson.version>
<jackson-databind.version>2.14.0-rc3</jackson-databind.version>
<assertj.version>3.23.1</assertj.version>
Expand Down Expand Up @@ -297,6 +298,12 @@
<version>${junit.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit5.version}</version>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand Down Expand Up @@ -399,6 +406,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down

0 comments on commit d09b88e

Please sign in to comment.