diff --git a/src/execution/execute.ts b/src/execution/execute.ts index cda6ab8254..2e6dce1c27 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -1052,10 +1052,97 @@ async function completeAsyncIteratorValue( [], ]; let index = 0; - const streamUsage = getStreamUsage(exeContext, fieldGroup, path); // eslint-disable-next-line no-constant-condition while (true) { - if (streamUsage && index >= streamUsage.initialCount) { + const itemPath = addPath(path, index, undefined); + let iteration; + try { + // eslint-disable-next-line no-await-in-loop + iteration = await asyncIterator.next(); + } catch (rawError) { + throw locatedError(rawError, toNodes(fieldGroup), pathToArray(path)); + } + + // TODO: add test case for stream returning done before initialCount + /* c8 ignore next 3 */ + if (iteration.done) { + break; + } + + const item = iteration.value; + // TODO: add tests for stream backed by asyncIterator that returns a promise + /* c8 ignore start */ + if (isPromise(item)) { + completedResults.push( + completePromisedListItemValue( + item, + graphqlWrappedResult, + exeContext, + itemType, + fieldGroup, + info, + itemPath, + incrementalContext, + deferMap, + ), + ); + containsPromise = true; + } else if ( + /* c8 ignore stop */ + completeListItemValue( + item, + completedResults, + graphqlWrappedResult, + exeContext, + itemType, + fieldGroup, + info, + itemPath, + incrementalContext, + deferMap, + ) + // TODO: add tests for stream backed by asyncIterator that completes to a promise + /* c8 ignore start */ + ) { + containsPromise = true; + } + /* c8 ignore stop */ + index++; + } + + return containsPromise + ? /* c8 ignore start */ Promise.all(completedResults).then((resolved) => [ + resolved, + graphqlWrappedResult[1], + ]) + : /* c8 ignore stop */ graphqlWrappedResult; +} + +/** + * Complete a async iterator value by completing the result and calling + * recursively until all the results are completed. + */ +async function completeAsyncIteratorValueWithPossibleStream( + exeContext: ExecutionContext, + itemType: GraphQLOutputType, + fieldGroup: FieldGroup, + info: GraphQLResolveInfo, + path: Path, + asyncIterator: AsyncIterator, + streamUsage: StreamUsage, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, +): Promise>> { + let containsPromise = false; + const completedResults: Array = []; + const graphqlWrappedResult: GraphQLWrappedResult> = [ + completedResults, + [], + ]; + let index = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + if (index >= streamUsage.initialCount) { const returnFn = asyncIterator.return; let streamRecord: SubsequentResultRecord | CancellableStreamRecord; if (returnFn === undefined) { @@ -1166,17 +1253,32 @@ function completeListValue( deferMap: ReadonlyMap | undefined, ): PromiseOrValue>> { const itemType = returnType.ofType; + const streamUsage = getStreamUsage(exeContext, fieldGroup, path); if (isAsyncIterable(result)) { const asyncIterator = result[Symbol.asyncIterator](); - return completeAsyncIteratorValue( + if (streamUsage === undefined) { + return completeAsyncIteratorValue( + exeContext, + itemType, + fieldGroup, + info, + path, + asyncIterator, + incrementalContext, + deferMap, + ); + } + + return completeAsyncIteratorValueWithPossibleStream( exeContext, itemType, fieldGroup, info, path, asyncIterator, + streamUsage, incrementalContext, deferMap, ); @@ -1188,13 +1290,27 @@ function completeListValue( ); } - return completeIterableValue( + if (streamUsage === undefined) { + return completeIterableValue( + exeContext, + itemType, + fieldGroup, + info, + path, + result, + incrementalContext, + deferMap, + ); + } + + return completeIterableValueWithPossibleStream( exeContext, itemType, fieldGroup, info, path, result, + streamUsage, incrementalContext, deferMap, ); @@ -1219,13 +1335,79 @@ function completeIterableValue( [], ]; let index = 0; - const streamUsage = getStreamUsage(exeContext, fieldGroup, path); + for (const item of items) { + // No need to modify the info object containing the path, + // since from here on it is not ever accessed by resolver functions. + const itemPath = addPath(path, index, undefined); + + if (isPromise(item)) { + completedResults.push( + completePromisedListItemValue( + item, + graphqlWrappedResult, + exeContext, + itemType, + fieldGroup, + info, + itemPath, + incrementalContext, + deferMap, + ), + ); + containsPromise = true; + } else if ( + completeListItemValue( + item, + completedResults, + graphqlWrappedResult, + exeContext, + itemType, + fieldGroup, + info, + itemPath, + incrementalContext, + deferMap, + ) + ) { + containsPromise = true; + } + index++; + } + + return containsPromise + ? Promise.all(completedResults).then((resolved) => [ + resolved, + graphqlWrappedResult[1], + ]) + : graphqlWrappedResult; +} + +function completeIterableValueWithPossibleStream( + exeContext: ExecutionContext, + itemType: GraphQLOutputType, + fieldGroup: FieldGroup, + info: GraphQLResolveInfo, + path: Path, + items: Iterable, + streamUsage: StreamUsage, + incrementalContext: IncrementalContext | undefined, + deferMap: ReadonlyMap | undefined, +): PromiseOrValue>> { + // This is specified as a simple map, however we're optimizing the path + // where the list contains no Promises by avoiding creating another Promise. + let containsPromise = false; + const completedResults: Array = []; + const graphqlWrappedResult: GraphQLWrappedResult> = [ + completedResults, + [], + ]; + let index = 0; const iterator = items[Symbol.iterator](); let iteration = iterator.next(); while (!iteration.done) { const item = iteration.value; - if (streamUsage && index >= streamUsage.initialCount) { + if (index >= streamUsage.initialCount) { const streamRecord: SubsequentResultRecord = { label: streamUsage.label, path,