From 9da87e2f88d8abd9e5c2cb51ec8b3149822ad21b Mon Sep 17 00:00:00 2001 From: Davide Angelocola Date: Wed, 8 Jun 2022 11:50:14 +0200 Subject: [PATCH] introducing Async.ofExpectedSize (#2839) A typed interface to build CompletableFuture.allOf without intermediate List allocations. --- src/main/java/graphql/execution/Async.java | 126 ++++++++++++++++-- .../execution/AsyncExecutionStrategy.java | 12 +- .../AsyncSerialExecutionStrategy.java | 4 +- 3 files changed, 125 insertions(+), 17 deletions(-) diff --git a/src/main/java/graphql/execution/Async.java b/src/main/java/graphql/execution/Async.java index d33f5afcfa..d30dc792d6 100644 --- a/src/main/java/graphql/execution/Async.java +++ b/src/main/java/graphql/execution/Async.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -19,29 +20,134 @@ @SuppressWarnings("FutureReturnValueIgnored") public class Async { - @FunctionalInterface - public interface CFFactory { - CompletableFuture apply(T input, int index, List previousResults); + public interface CombinedBuilder { + + void add(CompletableFuture completableFuture); + + CompletableFuture> await(); } - public static CompletableFuture> each(List> futures) { - CompletableFuture> overallResult = new CompletableFuture<>(); + /** + * Combines 1 or more CF. It is a wrapper around CompletableFuture.allOf. + */ + public static CombinedBuilder ofExpectedSize(int expectedSize) { + if (expectedSize == 0) { + return new Empty<>(); + } else if (expectedSize == 1) { + return new Single<>(); + } else { + return new Many<>(expectedSize); + } + } + + private static class Empty implements CombinedBuilder { + + private int ix; + + @Override + public void add(CompletableFuture completableFuture) { + this.ix++; + } + + + @Override + public CompletableFuture> await() { + Assert.assertTrue(ix == 0, () -> "expected size was " + 0 + " got " + ix); + return CompletableFuture.completedFuture(Collections.emptyList()); + } + } + + private static class Single implements CombinedBuilder { + + // avoiding array allocation as there is only 1 CF + private CompletableFuture completableFuture; + private int ix; + + @Override + public void add(CompletableFuture completableFuture) { + this.completableFuture = completableFuture; + this.ix++; + } + + @Override + public CompletableFuture> await() { + Assert.assertTrue(ix == 1, () -> "expected size was " + 1 + " got " + ix); + + CompletableFuture> overallResult = new CompletableFuture<>(); + completableFuture + .whenComplete((ignored, exception) -> { + if (exception != null) { + overallResult.completeExceptionally(exception); + return; + } + List results = Collections.singletonList(completableFuture.join()); + overallResult.complete(results); + }); + return overallResult; + } + } + + private static class Many implements CombinedBuilder { + + private final CompletableFuture[] array; + private int ix; @SuppressWarnings("unchecked") - CompletableFuture[] arrayOfFutures = futures.toArray(new CompletableFuture[0]); - CompletableFuture - .allOf(arrayOfFutures) + private Many(int size) { + this.array = new CompletableFuture[size]; + this.ix = 0; + } + + @Override + public void add(CompletableFuture completableFuture) { + array[ix++] = completableFuture; + } + + @Override + public CompletableFuture> await() { + Assert.assertTrue(ix == array.length, () -> "expected size was " + array.length + " got " + ix); + + CompletableFuture> overallResult = new CompletableFuture<>(); + CompletableFuture.allOf(array) .whenComplete((ignored, exception) -> { if (exception != null) { overallResult.completeExceptionally(exception); return; } - List results = new ArrayList<>(arrayOfFutures.length); - for (CompletableFuture future : arrayOfFutures) { + List results = new ArrayList<>(array.length); + for (CompletableFuture future : array) { results.add(future.join()); } overallResult.complete(results); }); + return overallResult; + } + + } + + @FunctionalInterface + public interface CFFactory { + CompletableFuture apply(T input, int index, List previousResults); + } + + public static CompletableFuture> each(List> futures) { + CompletableFuture> overallResult = new CompletableFuture<>(); + + @SuppressWarnings("unchecked") + CompletableFuture[] arrayOfFutures = futures.toArray(new CompletableFuture[0]); + CompletableFuture + .allOf(arrayOfFutures) + .whenComplete((ignored, exception) -> { + if (exception != null) { + overallResult.completeExceptionally(exception); + return; + } + List results = new ArrayList<>(arrayOfFutures.length); + for (CompletableFuture future : arrayOfFutures) { + results.add(future.join()); + } + overallResult.complete(results); + }); return overallResult; } diff --git a/src/main/java/graphql/execution/AsyncExecutionStrategy.java b/src/main/java/graphql/execution/AsyncExecutionStrategy.java index 3800657680..6181182c5b 100644 --- a/src/main/java/graphql/execution/AsyncExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncExecutionStrategy.java @@ -12,7 +12,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import static graphql.collect.ImmutableKit.map; /** * The standard graphql execution strategy that runs fields asynchronously non-blocking. @@ -47,7 +46,7 @@ public CompletableFuture execute(ExecutionContext executionCont MergedSelectionSet fields = parameters.getFields(); Set fieldNames = fields.keySet(); - List> futures = new ArrayList<>(fieldNames.size()); + Async.CombinedBuilder futures = Async.ofExpectedSize(fields.size()); List resolvedFields = new ArrayList<>(fieldNames.size()); for (String fieldName : fieldNames) { MergedField currentField = fields.getSubField(fieldName); @@ -63,15 +62,18 @@ public CompletableFuture execute(ExecutionContext executionCont CompletableFuture overallResult = new CompletableFuture<>(); executionStrategyCtx.onDispatched(overallResult); - Async.each(futures).whenComplete((completeValueInfos, throwable) -> { + futures.await().whenComplete((completeValueInfos, throwable) -> { BiConsumer, Throwable> handleResultsConsumer = handleResults(executionContext, resolvedFields, overallResult); if (throwable != null) { handleResultsConsumer.accept(null, throwable.getCause()); return; } - List> executionResultFuture = map(completeValueInfos, FieldValueInfo::getFieldValue); + Async.CombinedBuilder executionResultFutures = Async.ofExpectedSize(completeValueInfos.size()); + for (FieldValueInfo completeValueInfo : completeValueInfos) { + executionResultFutures.add(completeValueInfo.getFieldValue()); + } executionStrategyCtx.onFieldValuesInfo(completeValueInfos); - Async.each(executionResultFuture).whenComplete(handleResultsConsumer); + executionResultFutures.await().whenComplete(handleResultsConsumer); }).exceptionally((ex) -> { // if there are any issues with combining/handling the field results, // complete the future at all costs and bubble up any thrown exception so diff --git a/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java b/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java index 889156ce85..ac6c0855af 100644 --- a/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java @@ -11,8 +11,8 @@ import java.util.concurrent.CompletableFuture; /** - * Async non-blocking execution, but serial: only one field at the the time will be resolved. - * See {@link AsyncExecutionStrategy} for a non serial (parallel) execution of every field. + * Async non-blocking execution, but serial: only one field at the time will be resolved. + * See {@link AsyncExecutionStrategy} for a non-serial (parallel) execution of every field. */ @PublicApi public class AsyncSerialExecutionStrategy extends AbstractAsyncExecutionStrategy {