Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support retry cancel feature fully for AsyncFeign #1801

Merged
merged 3 commits into from Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 36 additions & 14 deletions core/src/main/java/feign/AsynchronousMethodHandler.java
Expand Up @@ -88,18 +88,18 @@ public Object invoke(Object[] argv) throws Throwable {
private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
Options options,
Retryer retryer) {
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
CancellableFuture<Object> resultFuture = new CancellableFuture<>();

executeAndDecode(template, options)
.whenComplete((response, throwable) -> {
if (throwable != null) {
if (shouldRetry(retryer, throwable, resultFuture)) {
if (!resultFuture.isDone() && shouldRetry(retryer, throwable, resultFuture)) {
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}

executeAndDecode(template, options, retryer)
.whenComplete(pipeTo(resultFuture));
resultFuture.setInner(
executeAndDecode(template, options, retryer));
}
} else {
resultFuture.complete(response);
Expand All @@ -109,6 +109,38 @@ private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
return resultFuture;
}

private static class CancellableFuture<T> extends CompletableFuture<T> {
private CompletableFuture<T> inner = null;

public void setInner(CompletableFuture<T> value) {
inner = value;
inner.whenComplete(pipeTo(this));
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
final boolean result = super.cancel(mayInterruptIfRunning);
if (inner != null) {
inner.cancel(mayInterruptIfRunning);
}
return result;
}

private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
return (value, throwable) -> {
if (completableFuture.isDone()) {
return;
}

if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
completableFuture.complete(value);
}
};
}
}

private boolean shouldRetry(Retryer retryer,
Throwable throwable,
CompletableFuture<Object> resultFuture) {
Expand Down Expand Up @@ -136,16 +168,6 @@ private boolean shouldRetry(Retryer retryer,
}
}

private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
return (value, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
completableFuture.complete(value);
}
};
}

private CompletableFuture<Object> executeAndDecode(RequestTemplate template, Options options) {
Request request = targetRequest(template);

Expand Down
52 changes: 52 additions & 0 deletions core/src/test/java/feign/AsyncFeignTest.java
Expand Up @@ -48,16 +48,21 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okio.Buffer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.rules.ExpectedException;

public class AsyncFeignTest {
Expand Down Expand Up @@ -656,6 +661,53 @@ public void whenReturnTypeIsResponseNoErrorHandling() throws Throwable {
execs.shutdown();
}

@ParameterizedTest
@ValueSource(ints = {1, 5, 10, 100, 1000})
public void cancelRetry(final int expectedTryCount) throws Throwable {
// Arrange
final CompletableFuture<Boolean> maximumTryCompleted = new CompletableFuture<>();
final AtomicInteger actualTryCount = new AtomicInteger();
final AtomicBoolean isCancelled = new AtomicBoolean(true);

final int RUNNING_TIME_MILLIS = 100;
final ExecutorService execs = Executors.newSingleThreadExecutor();
final AsyncClient<Void> clientMock =
(request, options, requestContext) -> CompletableFuture.supplyAsync(() -> {
final int tryCount = actualTryCount.addAndGet(1);
if (tryCount < expectedTryCount) {
throw new CompletionException(new IOException());
}

if (tryCount > expectedTryCount) {
isCancelled.set(false);
throw new CompletionException(new IOException());
}

maximumTryCompleted.complete(true);
try {
Thread.sleep(RUNNING_TIME_MILLIS);
throw new IOException();
} catch (Throwable e) {
throw new CompletionException(e);
}
}, execs);
final TestInterfaceAsync sut = AsyncFeign.<Void>builder()
.client(clientMock)
.retryer(new Retryer.Default(0, Long.MAX_VALUE, expectedTryCount * 2))
.target(TestInterfaceAsync.class, "http://localhost:" + server.getPort());

// Act
final CompletableFuture<String> actual = sut.post();
maximumTryCompleted.join();
actual.cancel(true);
Thread.sleep(RUNNING_TIME_MILLIS * 5);

// Assert
assertThat(actualTryCount.get()).isEqualTo(expectedTryCount);
assertThat(isCancelled.get()).isTrue();
execs.shutdown();
}

private static class MockRetryer implements Retryer {

boolean tripped;
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Expand Up @@ -86,6 +86,7 @@
<json.version>20220924</json.version>

<junit.version>4.13.2</junit.version>
<junit5.version>5.9.1</junit5.version>
<jackson.version>2.14.0-rc3</jackson.version>
<jackson-databind.version>2.14.0-rc3</jackson-databind.version>
<assertj.version>3.23.1</assertj.version>
Expand Down Expand Up @@ -297,6 +298,12 @@
<version>${junit.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit5.version}</version>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand Down Expand Up @@ -399,6 +406,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down