Skip to content

Commit

Permalink
introducing Async.ofExpectedSize (#2839)
Browse files Browse the repository at this point in the history
A typed interface to build CompletableFuture.allOf without intermediate
List allocations.
  • Loading branch information
dfa1 committed Jun 8, 2022
1 parent 3379470 commit 9da87e2
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 17 deletions.
126 changes: 116 additions & 10 deletions src/main/java/graphql/execution/Async.java
Expand Up @@ -6,6 +6,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -19,29 +20,134 @@
@SuppressWarnings("FutureReturnValueIgnored")
public class Async {

@FunctionalInterface
public interface CFFactory<T, U> {
CompletableFuture<U> apply(T input, int index, List<U> previousResults);
public interface CombinedBuilder<T> {

void add(CompletableFuture<T> completableFuture);

CompletableFuture<List<T>> await();
}

public static <U> CompletableFuture<List<U>> each(List<CompletableFuture<U>> futures) {
CompletableFuture<List<U>> overallResult = new CompletableFuture<>();
/**
* Combines 1 or more CF. It is a wrapper around CompletableFuture.allOf.
*/
public static <T> CombinedBuilder<T> ofExpectedSize(int expectedSize) {
if (expectedSize == 0) {
return new Empty<>();
} else if (expectedSize == 1) {
return new Single<>();
} else {
return new Many<>(expectedSize);
}
}

private static class Empty<T> implements CombinedBuilder<T> {

private int ix;

@Override
public void add(CompletableFuture<T> completableFuture) {
this.ix++;
}


@Override
public CompletableFuture<List<T>> await() {
Assert.assertTrue(ix == 0, () -> "expected size was " + 0 + " got " + ix);
return CompletableFuture.completedFuture(Collections.emptyList());
}
}

private static class Single<T> implements CombinedBuilder<T> {

// avoiding array allocation as there is only 1 CF
private CompletableFuture<T> completableFuture;
private int ix;

@Override
public void add(CompletableFuture<T> completableFuture) {
this.completableFuture = completableFuture;
this.ix++;
}

@Override
public CompletableFuture<List<T>> await() {
Assert.assertTrue(ix == 1, () -> "expected size was " + 1 + " got " + ix);

CompletableFuture<List<T>> overallResult = new CompletableFuture<>();
completableFuture
.whenComplete((ignored, exception) -> {
if (exception != null) {
overallResult.completeExceptionally(exception);
return;
}
List<T> results = Collections.singletonList(completableFuture.join());
overallResult.complete(results);
});
return overallResult;
}
}

private static class Many<T> implements CombinedBuilder<T> {

private final CompletableFuture<T>[] array;
private int ix;

@SuppressWarnings("unchecked")
CompletableFuture<U>[] arrayOfFutures = futures.toArray(new CompletableFuture[0]);
CompletableFuture
.allOf(arrayOfFutures)
private Many(int size) {
this.array = new CompletableFuture[size];
this.ix = 0;
}

@Override
public void add(CompletableFuture<T> completableFuture) {
array[ix++] = completableFuture;
}

@Override
public CompletableFuture<List<T>> await() {
Assert.assertTrue(ix == array.length, () -> "expected size was " + array.length + " got " + ix);

CompletableFuture<List<T>> overallResult = new CompletableFuture<>();
CompletableFuture.allOf(array)
.whenComplete((ignored, exception) -> {
if (exception != null) {
overallResult.completeExceptionally(exception);
return;
}
List<U> results = new ArrayList<>(arrayOfFutures.length);
for (CompletableFuture<U> future : arrayOfFutures) {
List<T> results = new ArrayList<>(array.length);
for (CompletableFuture<T> future : array) {
results.add(future.join());
}
overallResult.complete(results);
});
return overallResult;
}

}

@FunctionalInterface
public interface CFFactory<T, U> {
CompletableFuture<U> apply(T input, int index, List<U> previousResults);
}

public static <U> CompletableFuture<List<U>> each(List<CompletableFuture<U>> futures) {
CompletableFuture<List<U>> overallResult = new CompletableFuture<>();

@SuppressWarnings("unchecked")
CompletableFuture<U>[] arrayOfFutures = futures.toArray(new CompletableFuture[0]);
CompletableFuture
.allOf(arrayOfFutures)
.whenComplete((ignored, exception) -> {
if (exception != null) {
overallResult.completeExceptionally(exception);
return;
}
List<U> results = new ArrayList<>(arrayOfFutures.length);
for (CompletableFuture<U> future : arrayOfFutures) {
results.add(future.join());
}
overallResult.complete(results);
});
return overallResult;
}

Expand Down
12 changes: 7 additions & 5 deletions src/main/java/graphql/execution/AsyncExecutionStrategy.java
Expand Up @@ -12,7 +12,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import static graphql.collect.ImmutableKit.map;

/**
* The standard graphql execution strategy that runs fields asynchronously non-blocking.
Expand Down Expand Up @@ -47,7 +46,7 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont

MergedSelectionSet fields = parameters.getFields();
Set<String> fieldNames = fields.keySet();
List<CompletableFuture<FieldValueInfo>> futures = new ArrayList<>(fieldNames.size());
Async.CombinedBuilder<FieldValueInfo> futures = Async.ofExpectedSize(fields.size());
List<String> resolvedFields = new ArrayList<>(fieldNames.size());
for (String fieldName : fieldNames) {
MergedField currentField = fields.getSubField(fieldName);
Expand All @@ -63,15 +62,18 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
executionStrategyCtx.onDispatched(overallResult);

Async.each(futures).whenComplete((completeValueInfos, throwable) -> {
futures.await().whenComplete((completeValueInfos, throwable) -> {
BiConsumer<List<ExecutionResult>, Throwable> handleResultsConsumer = handleResults(executionContext, resolvedFields, overallResult);
if (throwable != null) {
handleResultsConsumer.accept(null, throwable.getCause());
return;
}
List<CompletableFuture<ExecutionResult>> executionResultFuture = map(completeValueInfos, FieldValueInfo::getFieldValue);
Async.CombinedBuilder<ExecutionResult> executionResultFutures = Async.ofExpectedSize(completeValueInfos.size());
for (FieldValueInfo completeValueInfo : completeValueInfos) {
executionResultFutures.add(completeValueInfo.getFieldValue());
}
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
Async.each(executionResultFuture).whenComplete(handleResultsConsumer);
executionResultFutures.await().whenComplete(handleResultsConsumer);
}).exceptionally((ex) -> {
// if there are any issues with combining/handling the field results,
// complete the future at all costs and bubble up any thrown exception so
Expand Down
Expand Up @@ -11,8 +11,8 @@
import java.util.concurrent.CompletableFuture;

/**
* Async non-blocking execution, but serial: only one field at the the time will be resolved.
* See {@link AsyncExecutionStrategy} for a non serial (parallel) execution of every field.
* Async non-blocking execution, but serial: only one field at the time will be resolved.
* See {@link AsyncExecutionStrategy} for a non-serial (parallel) execution of every field.
*/
@PublicApi
public class AsyncSerialExecutionStrategy extends AbstractAsyncExecutionStrategy {
Expand Down

0 comments on commit 9da87e2

Please sign in to comment.