Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-457 - Improve performance for I/O bound list resolvers by using parallel execution #458

Merged
merged 2 commits into from Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions tartiflette/__init__.py
Expand Up @@ -33,6 +33,7 @@ async def create_engine(
query_cache_decorator: Optional[Callable] = UNDEFINED_VALUE,
json_loader: Optional[Callable[[str], Dict[str, Any]]] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> "Engine":
"""
Create an engine by analyzing the SDL and connecting it with the imported
Expand All @@ -56,6 +57,8 @@ async def create_engine(
:param json_loader: A callable that will replace default python
json module.loads for ast_json loading
:param custom_default_arguments_coercer: callable that will replace the
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
tartiflette `default_arguments_coercer
:type sdl: Union[str, List[str]]
:type schema_name: str
Expand All @@ -66,6 +69,7 @@ async def create_engine(
:type query_cache_decorator: Optional[Callable]
:type json_loader: Optional[Callable[[str], Dict[str, Any]]]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:return: a Cooked Engine instance
:rtype: Engine

Expand All @@ -90,6 +94,7 @@ async def create_engine(
query_cache_decorator=query_cache_decorator,
json_loader=json_loader,
custom_default_arguments_coercer=custom_default_arguments_coercer,
coerce_list_concurrently=coerce_list_concurrently,
)

return e
18 changes: 15 additions & 3 deletions tartiflette/coercers/outputs/compute.py
@@ -1,18 +1,25 @@
from functools import partial
from typing import Callable

from tartiflette.coercers.outputs.list_coercer import list_coercer
from tartiflette.coercers.outputs.list_coercer import (
list_coercer_concurrently,
list_coercer_sequentially,
)
from tartiflette.coercers.outputs.non_null_coercer import non_null_coercer

__all__ = ("get_output_coercer",)


def get_output_coercer(graphql_type: "GraphQLType") -> Callable:
def get_output_coercer(
graphql_type: "GraphQLType", concurrently: bool
) -> Callable:
"""
Computes and returns the output coercer to use for the filled in schema
type.
:param graphql_type: the schema type for which compute the coercer
:param concurrently: whether list should be coerced concurrently
:type graphql_type: GraphQLType
:type concurrently: bool
:return: the computed coercer wrap with directives if defined
:rtype: Callable
"""
Expand All @@ -22,7 +29,12 @@ def get_output_coercer(graphql_type: "GraphQLType") -> Callable:
wrapped_type = inner_type.wrapped_type
if inner_type.is_list_type:
wrapper_coercers.append(
partial(list_coercer, item_type=wrapped_type)
partial(
list_coercer_concurrently
if concurrently
else list_coercer_sequentially,
item_type=wrapped_type,
)
)
elif inner_type.is_non_null_type:
wrapper_coercers.append(non_null_coercer)
Expand Down
65 changes: 63 additions & 2 deletions tartiflette/coercers/outputs/list_coercer.py
@@ -1,15 +1,17 @@
import asyncio

from typing import Any, Callable, List

from tartiflette.coercers.common import Path
from tartiflette.coercers.outputs.null_coercer import null_coercer_wrapper
from tartiflette.resolver.factory import complete_value_catching_error
from tartiflette.utils.errors import extract_exceptions_from_results

__all__ = ("list_coercer",)
__all__ = ("list_coercer_sequentially", "list_coercer_concurrently")


@null_coercer_wrapper
async def list_coercer(
async def list_coercer_sequentially(
result: Any,
info: "ResolveInfo",
execution_context: "ExecutionContext",
Expand Down Expand Up @@ -65,3 +67,62 @@ async def list_coercer(
raise exceptions

return results


@null_coercer_wrapper
async def list_coercer_concurrently(
result: Any,
info: "ResolveInfo",
execution_context: "ExecutionContext",
field_nodes: List["FieldNode"],
path: "Path",
item_type: "GraphQLOutputType",
inner_coercer: Callable,
) -> List[Any]:
"""
Computes the value of a list.
:param result: resolved value
:param info: information related to the execution and the resolved field
:param execution_context: instance of the query execution context
:param field_nodes: AST nodes related to the resolved field
:param path: the path traveled until this resolver
:param item_type: GraphQLType of list items
:param inner_coercer: the pre-computed coercer to use on the result
:type result: Any
:type info: ResolveInfo
:type execution_context: ExecutionContext
:type field_nodes: List[FieldNode]
:type path: Path
:type item_type: GraphQLOutputType
:type inner_coercer: Callable
:return: the computed value
:rtype: List[Any]
"""
# pylint: disable=too-many-locals
if not isinstance(result, list):
raise TypeError(
"Expected Iterable, but did not find one for field "
f"{info.parent_type.name}.{info.field_name}."
)

results = await asyncio.gather(
*[
complete_value_catching_error(
item,
info,
execution_context,
field_nodes,
Path(path, index),
item_type,
inner_coercer,
)
for index, item in enumerate(result)
],
return_exceptions=True,
)

exceptions = extract_exceptions_from_results(results)
if exceptions:
raise exceptions

return results
13 changes: 13 additions & 0 deletions tartiflette/engine.py
Expand Up @@ -139,6 +139,8 @@ class Engine:
Tartiflette GraphQL engine.
"""

# pylint: disable=too-many-instance-attributes

def __init__(
self,
sdl=None,
Expand All @@ -150,6 +152,7 @@ def __init__(
query_cache_decorator=UNDEFINED_VALUE,
json_loader=None,
custom_default_arguments_coercer=None,
coerce_list_concurrently=None,
) -> None:
"""
Creates an uncooked Engine instance.
Expand All @@ -163,6 +166,7 @@ def __init__(
self._custom_default_arguments_coercer = (
custom_default_arguments_coercer
)
self._coerce_list_concurrently = coerce_list_concurrently
self._modules = modules
self._query_cache_decorator = (
query_cache_decorator
Expand All @@ -189,6 +193,7 @@ async def cook(
query_cache_decorator: Optional[Callable] = UNDEFINED_VALUE,
json_loader: Optional[Callable[[str], Dict[str, Any]]] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
schema_name: Optional[str] = None,
) -> None:
"""
Expand All @@ -213,6 +218,8 @@ async def cook(
json module.loads for ast_json loading
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:param schema_name: name of the SDL
:type sdl: Union[str, List[str]]
:type error_coercer: Callable[[Exception, Dict[str, Any]], Dict[str, Any]]
Expand All @@ -222,6 +229,7 @@ async def cook(
:type query_cache_decorator: Optional[Callable]
:type json_loader: Optional[Callable[[str], Dict[str, Any]]]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:type schema_name: Optional[str]
"""
# pylint: disable=too-many-arguments,too-many-locals
Expand Down Expand Up @@ -294,6 +302,11 @@ async def cook(
custom_default_resolver,
custom_default_type_resolver,
custom_default_arguments_coercer,
(
coerce_list_concurrently
if coerce_list_concurrently is not None
else self._coerce_list_concurrently
),
)
self._build_response = partial(
build_response, error_coercer=self._error_coercer
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/resolver/resolver.py
Expand Up @@ -38,23 +38,27 @@ def __init__(
schema_name: str = "default",
type_resolver: Optional[Callable] = None,
arguments_coercer: Optional[Callable] = None,
concurrently: Optional[bool] = None,
) -> None:
"""
:param name: name of the field to wrap
:param schema_name: name of the schema to which link the resolver
:param type_resolver: callable to use to resolve the type of an
abstract type
:param arguments_coercer: the callable to use to coerce field arguments
:param concurrently: whether or not list will be coerced concurrently
:type name: str
:type schema_name: str
:type type_resolver: Optional[Callable]
:type arguments_coercer: Optional[Callable]
:type concurrently: Optional[bool]
"""
self.name = name
self._type_resolver = type_resolver
self._implementation = None
self._schema_name = schema_name
self._arguments_coercer = arguments_coercer
self._concurrently = concurrently

def bake(self, schema: "GraphQLSchema") -> None:
"""
Expand All @@ -71,6 +75,7 @@ def bake(self, schema: "GraphQLSchema") -> None:
field = schema.get_field_by_name(self.name)
field.raw_resolver = self._implementation
field.query_arguments_coercer = self._arguments_coercer
field.query_concurrently = self._concurrently

field_wrapped_type = get_wrapped_type(
get_graphql_type(schema, field.gql_type)
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/schema/bakery.py
Expand Up @@ -33,6 +33,7 @@ async def bake(
custom_default_resolver: Optional[Callable] = None,
custom_default_type_resolver: Optional[Callable] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> "GraphQLSchema":
"""
Bakes and returns a GraphQLSchema instance.
Expand All @@ -44,10 +45,13 @@ async def bake(
to deduct the type of a result)
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:type schema_name: str
:type custom_default_resolver: Optional[Callable]
:type custom_default_type_resolver: Optional[Callable]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:return: a baked GraphQLSchema instance
:rtype: GraphQLSchema
"""
Expand All @@ -56,5 +60,6 @@ async def bake(
custom_default_resolver,
custom_default_type_resolver,
custom_default_arguments_coercer,
coerce_list_concurrently,
)
return schema
10 changes: 10 additions & 0 deletions tartiflette/schema/schema.py
Expand Up @@ -179,6 +179,7 @@ def __init__(self, name: str = "default") -> None:
self.name = name
self.default_type_resolver: Optional[Callable] = None
self.default_arguments_coercer: Optional[Callable] = None
self.coerce_list_concurrently: Optional[bool] = None

# Operation type names
self.query_operation_name: str = _DEFAULT_QUERY_OPERATION_NAME
Expand Down Expand Up @@ -1115,6 +1116,7 @@ async def bake(
custom_default_resolver: Optional[Callable] = None,
custom_default_type_resolver: Optional[Callable] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> None:
"""
Bake the final schema (it should not change after this) used for
Expand All @@ -1126,16 +1128,24 @@ async def bake(
to deduct the type of a result)
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:type custom_default_resolver: Optional[Callable]
:type custom_default_type_resolver: Optional[Callable]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
"""
self.default_type_resolver = (
custom_default_type_resolver or default_type_resolver
)
self.default_arguments_coercer = (
custom_default_arguments_coercer or gather_arguments_coercer
)
self.coerce_list_concurrently = (
coerce_list_concurrently
if coerce_list_concurrently is not None
else True
)
self._inject_introspection_fields()

self._validate_extensions() # Validate this before bake
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/subscription/subscription.py
Expand Up @@ -40,19 +40,23 @@ def __init__(
name: str,
schema_name: str = "default",
arguments_coercer: Optional[Callable] = None,
concurrently: Optional[bool] = None,
) -> None:
"""
:param name: name of the subscription field
:param schema_name: name of the schema to which link the subscription
:param arguments_coercer: callable to use to coerce field arguments
:param concurrently: whether list should be coerced concurrently
:type name: str
:type schema_name: str
:type arguments_coercer: Optional[Callable]
:type concurrently: Optional[bool]
"""
self.name = name
self._implementation = None
self._schema_name = schema_name
self._arguments_coercer = arguments_coercer
self._concurrently = concurrently

def bake(self, schema: "GraphQLSchema") -> None:
"""
Expand Down Expand Up @@ -81,6 +85,7 @@ def bake(self, schema: "GraphQLSchema") -> None:

field.subscribe = self._implementation
field.subscription_arguments_coercer = self._arguments_coercer
field.subscription_concurrently = self._concurrently

def __call__(self, implementation: Callable) -> Callable:
"""
Expand Down
16 changes: 15 additions & 1 deletion tartiflette/types/field.py
Expand Up @@ -64,6 +64,11 @@ def __init__(
self.query_arguments_coercer: Optional[Callable] = None
self.subscription_arguments_coercer: Optional[Callable] = None

# Concurrently
self.concurrently: Optional[bool] = None
self.query_concurrently: Optional[bool] = None
self.subscription_concurrently: Optional[bool] = None

# Introspection attributes
self.isDeprecated: bool = False # pylint: disable=invalid-name
self.args: List["GraphQLArgument"] = []
Expand Down Expand Up @@ -160,6 +165,13 @@ def bake(
else:
self.arguments_coercer = schema.default_arguments_coercer

if self.subscription_concurrently is not None:
self.concurrently = self.subscription_concurrently
elif self.query_concurrently is not None:
self.concurrently = self.query_concurrently
else:
self.concurrently = schema.coerce_list_concurrently

# Directives
directives_definition = compute_directive_nodes(
schema, self.directives
Expand Down Expand Up @@ -192,7 +204,9 @@ def bake(
is_resolver=True,
with_default=True,
),
output_coercer=get_output_coercer(self.graphql_type),
output_coercer=get_output_coercer(
self.graphql_type, self.concurrently
),
)

for argument in self.arguments.values():
Expand Down