Skip to content

Commit

Permalink
Optimize parallel execution when it's only one task
Browse files Browse the repository at this point in the history
See discussion in issue #190.

Also, run coverage in Python 3.10, since running in Python 3.11
reports false negatives in test_lists (probably bug in coverage).
  • Loading branch information
Cito committed Feb 25, 2023
1 parent 7fd3ce4 commit 1062cbb
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/graphql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@
# Validate GraphQL schema.
validate_schema,
assert_valid_schema,
# Uphold the spec rules about naming
# Uphold the spec rules about naming
assert_name,
assert_enum_value_name,
# Types
Expand Down
40 changes: 25 additions & 15 deletions src/graphql/execution/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def execute_fields(
if is_awaitable(result):
append_awaitable(response_name)

# If there are no coroutines, we can just return the object
# If there are no coroutines, we can just return the object.
if not awaitable_fields:
return results

Expand All @@ -450,12 +450,17 @@ def execute_fields(
# will yield this same map, but with any coroutines awaited in parallel and
# replaced with the values they yielded.
async def get_results() -> Dict[str, Any]:
results.update(
zip(
awaitable_fields,
await gather(*(results[field] for field in awaitable_fields)),
if len(awaitable_fields) == 1:
# If there is only one field, avoid the overhead of parallelization.
field = awaitable_fields[0]
results[field] = await results[field]
else:
results.update(
zip(
awaitable_fields,
await gather(*(results[field] for field in awaitable_fields)),
)

This comment has been minimized.

Copy link
@jkimbo

jkimbo Feb 26, 2023

Member

@Cito can I suggest moving this logic into a async_gather method on the ExecutionContext class so that it can overridden by a custom ExecutionContext class? That way the only change needed here would be to do this:

            results.update(
                zip(
                    awaitable_fields,
                    await self.async_gather([results[field] for field in awaitable_fields]),
                )
            )

Could also be a static method on the class.

This comment has been minimized.

Copy link
@Cito

Cito Feb 26, 2023

Author Member

Yes, that's a good idea @jkimbo. I already thought about making this more configurable. Will try to work on this again next weekend.

)
)
return results

return get_results()
Expand Down Expand Up @@ -758,13 +763,18 @@ async def await_completed(item: Any, item_path: Path) -> Any:

# noinspection PyShadowingNames
async def get_completed_results() -> List[Any]:
for index, result in zip(
awaitable_indices,
await gather(
*(completed_results[index] for index in awaitable_indices)
),
):
completed_results[index] = result
if len(awaitable_indices) == 1:
# If there is only one index, avoid the overhead of parallelization.
index = awaitable_indices[0]
completed_results[index] = await completed_results[index]
else:
for index, result in zip(
awaitable_indices,
await gather(
*(completed_results[index] for index in awaitable_indices)
),
):
completed_results[index] = result
return completed_results

return get_completed_results()
Expand Down Expand Up @@ -907,7 +917,7 @@ def complete_object_value(

# If there is an `is_type_of()` predicate function, call it with the current
# result. If `is_type_of()` returns False, then raise an error rather than
# continuing execution.
# continuing execution.
if return_type.is_type_of:
is_type_of = return_type.is_type_of(result, info)

Expand Down Expand Up @@ -943,7 +953,7 @@ def collect_subfields(
# We cannot use the field_nodes themselves as key for the cache, since they
# are not hashable as a list. We also do not want to use the field_nodes
# themselves (converted to a tuple) as keys, since hashing them is slow.
# Therefore we use the ids of the field_nodes as keys. Note that we do not
# Therefore, we use the ids of the field_nodes as keys. Note that we do not
# use the id of the list, since we want to hit the cache for all lists of
# the same nodes, not only for the same list of nodes. Also, the list id may
# even be reused, in which case we would get wrong results from the cache.
Expand Down
15 changes: 15 additions & 0 deletions tests/execution/test_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ def _complete(list_field):
Data(list_field),
)

def accepts_a_list_as_a_list_value():
result = _complete([])
assert result == ({"listField": []}, None)
list_field = ["just an apple"]
result = _complete(list_field)
assert result == ({"listField": list_field}, None)
list_field = ["apple", "banana", "coconut"]
result = _complete(list_field)
assert result == ({"listField": list_field}, None)

def accepts_a_tuple_as_a_list_value():
list_field = ("apple", "banana", "coconut")
result = _complete(list_field)
assert result == ({"listField": list(list_field)}, None)

def accepts_a_set_as_a_list_value():
# Note that sets are not ordered in Python.
list_field = {"apple", "banana", "coconut"}
Expand Down
40 changes: 40 additions & 0 deletions tests/execution/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ async def wait(self) -> bool:


def describe_parallel_execution():
@mark.asyncio
async def resolve_single_field():
# make sure that the special case of resolving a single field works
async def resolve(*_args):
return True

schema = GraphQLSchema(
GraphQLObjectType(
"Query",
{
"foo": GraphQLField(GraphQLBoolean, resolve=resolve),
},
)
)

awaitable_result = execute(schema, parse("{foo}"))
assert isinstance(awaitable_result, Awaitable)
result = await awaitable_result

assert result == ({"foo": True}, None)

@mark.asyncio
async def resolve_fields_in_parallel():
barrier = Barrier(2)
Expand All @@ -58,6 +79,25 @@ async def resolve(*_args):

assert result == ({"foo": True, "bar": True}, None)

@mark.asyncio
async def resolve_single_element_list():
# make sure that the special case of resolving a single element list works
async def resolve(*_args):
return [True]

schema = GraphQLSchema(
GraphQLObjectType(
"Query",
{"foo": GraphQLField(GraphQLList(GraphQLBoolean), resolve=resolve)},
)
)

awaitable_result = execute(schema, parse("{foo}"))
assert isinstance(awaitable_result, Awaitable)
result = await awaitable_result

assert result == ({"foo": [True]}, None)

@mark.asyncio
async def resolve_list_in_parallel():
barrier = Barrier(2)
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ deps =
commands =
# to also run the time-consuming tests: tox -e py310 -- --run-slow
# to run the benchmarks: tox -e py310 -- -k benchmarks --benchmark-enable
py37,py38.py39,py310,pypy39: pytest tests {posargs}
py311: pytest tests {posargs: --cov-report=term-missing --cov=graphql --cov=tests --cov-fail-under=100}
py37,py38.py39,py311,pypy39: pytest tests {posargs}
py310: pytest tests {posargs: --cov-report=term-missing --cov=graphql --cov=tests --cov-fail-under=100}

0 comments on commit 1062cbb

Please sign in to comment.