Skip to content

Commit

Permalink
Merge pull request #458 from tartiflette/ISSUE-457
Browse files Browse the repository at this point in the history
ISSUE-457 - Improve performance for I/O bound list resolvers by using parallel execution
  • Loading branch information
abusi committed Dec 7, 2020
2 parents fcdefb3 + 4a86953 commit 397feb5
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 6 deletions.
5 changes: 5 additions & 0 deletions tartiflette/__init__.py
Expand Up @@ -33,6 +33,7 @@ async def create_engine(
query_cache_decorator: Optional[Callable] = UNDEFINED_VALUE,
json_loader: Optional[Callable[[str], Dict[str, Any]]] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> "Engine":
"""
Create an engine by analyzing the SDL and connecting it with the imported
Expand All @@ -56,6 +57,8 @@ async def create_engine(
:param json_loader: A callable that will replace default python
json module.loads for ast_json loading
:param custom_default_arguments_coercer: callable that will replace the
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
tartiflette `default_arguments_coercer
:type sdl: Union[str, List[str]]
:type schema_name: str
Expand All @@ -66,6 +69,7 @@ async def create_engine(
:type query_cache_decorator: Optional[Callable]
:type json_loader: Optional[Callable[[str], Dict[str, Any]]]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:return: a Cooked Engine instance
:rtype: Engine
Expand All @@ -90,6 +94,7 @@ async def create_engine(
query_cache_decorator=query_cache_decorator,
json_loader=json_loader,
custom_default_arguments_coercer=custom_default_arguments_coercer,
coerce_list_concurrently=coerce_list_concurrently,
)

return e
18 changes: 15 additions & 3 deletions tartiflette/coercers/outputs/compute.py
@@ -1,18 +1,25 @@
from functools import partial
from typing import Callable

from tartiflette.coercers.outputs.list_coercer import list_coercer
from tartiflette.coercers.outputs.list_coercer import (
list_coercer_concurrently,
list_coercer_sequentially,
)
from tartiflette.coercers.outputs.non_null_coercer import non_null_coercer

__all__ = ("get_output_coercer",)


def get_output_coercer(graphql_type: "GraphQLType") -> Callable:
def get_output_coercer(
graphql_type: "GraphQLType", concurrently: bool
) -> Callable:
"""
Computes and returns the output coercer to use for the filled in schema
type.
:param graphql_type: the schema type for which compute the coercer
:param concurrently: whether list should be coerced concurrently
:type graphql_type: GraphQLType
:type concurrently: bool
:return: the computed coercer wrap with directives if defined
:rtype: Callable
"""
Expand All @@ -22,7 +29,12 @@ def get_output_coercer(graphql_type: "GraphQLType") -> Callable:
wrapped_type = inner_type.wrapped_type
if inner_type.is_list_type:
wrapper_coercers.append(
partial(list_coercer, item_type=wrapped_type)
partial(
list_coercer_concurrently
if concurrently
else list_coercer_sequentially,
item_type=wrapped_type,
)
)
elif inner_type.is_non_null_type:
wrapper_coercers.append(non_null_coercer)
Expand Down
65 changes: 63 additions & 2 deletions tartiflette/coercers/outputs/list_coercer.py
@@ -1,15 +1,17 @@
import asyncio

from typing import Any, Callable, List

from tartiflette.coercers.common import Path
from tartiflette.coercers.outputs.null_coercer import null_coercer_wrapper
from tartiflette.resolver.factory import complete_value_catching_error
from tartiflette.utils.errors import extract_exceptions_from_results

__all__ = ("list_coercer",)
__all__ = ("list_coercer_sequentially", "list_coercer_concurrently")


@null_coercer_wrapper
async def list_coercer(
async def list_coercer_sequentially(
result: Any,
info: "ResolveInfo",
execution_context: "ExecutionContext",
Expand Down Expand Up @@ -65,3 +67,62 @@ async def list_coercer(
raise exceptions

return results


@null_coercer_wrapper
async def list_coercer_concurrently(
result: Any,
info: "ResolveInfo",
execution_context: "ExecutionContext",
field_nodes: List["FieldNode"],
path: "Path",
item_type: "GraphQLOutputType",
inner_coercer: Callable,
) -> List[Any]:
"""
Computes the value of a list.
:param result: resolved value
:param info: information related to the execution and the resolved field
:param execution_context: instance of the query execution context
:param field_nodes: AST nodes related to the resolved field
:param path: the path traveled until this resolver
:param item_type: GraphQLType of list items
:param inner_coercer: the pre-computed coercer to use on the result
:type result: Any
:type info: ResolveInfo
:type execution_context: ExecutionContext
:type field_nodes: List[FieldNode]
:type path: Path
:type item_type: GraphQLOutputType
:type inner_coercer: Callable
:return: the computed value
:rtype: List[Any]
"""
# pylint: disable=too-many-locals
if not isinstance(result, list):
raise TypeError(
"Expected Iterable, but did not find one for field "
f"{info.parent_type.name}.{info.field_name}."
)

results = await asyncio.gather(
*[
complete_value_catching_error(
item,
info,
execution_context,
field_nodes,
Path(path, index),
item_type,
inner_coercer,
)
for index, item in enumerate(result)
],
return_exceptions=True,
)

exceptions = extract_exceptions_from_results(results)
if exceptions:
raise exceptions

return results
13 changes: 13 additions & 0 deletions tartiflette/engine.py
Expand Up @@ -139,6 +139,8 @@ class Engine:
Tartiflette GraphQL engine.
"""

# pylint: disable=too-many-instance-attributes

def __init__(
self,
sdl=None,
Expand All @@ -150,6 +152,7 @@ def __init__(
query_cache_decorator=UNDEFINED_VALUE,
json_loader=None,
custom_default_arguments_coercer=None,
coerce_list_concurrently=None,
) -> None:
"""
Creates an uncooked Engine instance.
Expand All @@ -163,6 +166,7 @@ def __init__(
self._custom_default_arguments_coercer = (
custom_default_arguments_coercer
)
self._coerce_list_concurrently = coerce_list_concurrently
self._modules = modules
self._query_cache_decorator = (
query_cache_decorator
Expand All @@ -189,6 +193,7 @@ async def cook(
query_cache_decorator: Optional[Callable] = UNDEFINED_VALUE,
json_loader: Optional[Callable[[str], Dict[str, Any]]] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
schema_name: Optional[str] = None,
) -> None:
"""
Expand All @@ -213,6 +218,8 @@ async def cook(
json module.loads for ast_json loading
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:param schema_name: name of the SDL
:type sdl: Union[str, List[str]]
:type error_coercer: Callable[[Exception, Dict[str, Any]], Dict[str, Any]]
Expand All @@ -222,6 +229,7 @@ async def cook(
:type query_cache_decorator: Optional[Callable]
:type json_loader: Optional[Callable[[str], Dict[str, Any]]]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:type schema_name: Optional[str]
"""
# pylint: disable=too-many-arguments,too-many-locals
Expand Down Expand Up @@ -294,6 +302,11 @@ async def cook(
custom_default_resolver,
custom_default_type_resolver,
custom_default_arguments_coercer,
(
coerce_list_concurrently
if coerce_list_concurrently is not None
else self._coerce_list_concurrently
),
)
self._build_response = partial(
build_response, error_coercer=self._error_coercer
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/resolver/resolver.py
Expand Up @@ -38,23 +38,27 @@ def __init__(
schema_name: str = "default",
type_resolver: Optional[Callable] = None,
arguments_coercer: Optional[Callable] = None,
concurrently: Optional[bool] = None,
) -> None:
"""
:param name: name of the field to wrap
:param schema_name: name of the schema to which link the resolver
:param type_resolver: callable to use to resolve the type of an
abstract type
:param arguments_coercer: the callable to use to coerce field arguments
:param concurrently: whether or not list will be coerced concurrently
:type name: str
:type schema_name: str
:type type_resolver: Optional[Callable]
:type arguments_coercer: Optional[Callable]
:type concurrently: Optional[bool]
"""
self.name = name
self._type_resolver = type_resolver
self._implementation = None
self._schema_name = schema_name
self._arguments_coercer = arguments_coercer
self._concurrently = concurrently

def bake(self, schema: "GraphQLSchema") -> None:
"""
Expand All @@ -71,6 +75,7 @@ def bake(self, schema: "GraphQLSchema") -> None:
field = schema.get_field_by_name(self.name)
field.raw_resolver = self._implementation
field.query_arguments_coercer = self._arguments_coercer
field.query_concurrently = self._concurrently

field_wrapped_type = get_wrapped_type(
get_graphql_type(schema, field.gql_type)
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/schema/bakery.py
Expand Up @@ -33,6 +33,7 @@ async def bake(
custom_default_resolver: Optional[Callable] = None,
custom_default_type_resolver: Optional[Callable] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> "GraphQLSchema":
"""
Bakes and returns a GraphQLSchema instance.
Expand All @@ -44,10 +45,13 @@ async def bake(
to deduct the type of a result)
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:type schema_name: str
:type custom_default_resolver: Optional[Callable]
:type custom_default_type_resolver: Optional[Callable]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
:return: a baked GraphQLSchema instance
:rtype: GraphQLSchema
"""
Expand All @@ -56,5 +60,6 @@ async def bake(
custom_default_resolver,
custom_default_type_resolver,
custom_default_arguments_coercer,
coerce_list_concurrently,
)
return schema
10 changes: 10 additions & 0 deletions tartiflette/schema/schema.py
Expand Up @@ -179,6 +179,7 @@ def __init__(self, name: str = "default") -> None:
self.name = name
self.default_type_resolver: Optional[Callable] = None
self.default_arguments_coercer: Optional[Callable] = None
self.coerce_list_concurrently: Optional[bool] = None

# Operation type names
self.query_operation_name: str = _DEFAULT_QUERY_OPERATION_NAME
Expand Down Expand Up @@ -1115,6 +1116,7 @@ async def bake(
custom_default_resolver: Optional[Callable] = None,
custom_default_type_resolver: Optional[Callable] = None,
custom_default_arguments_coercer: Optional[Callable] = None,
coerce_list_concurrently: Optional[bool] = None,
) -> None:
"""
Bake the final schema (it should not change after this) used for
Expand All @@ -1126,16 +1128,24 @@ async def bake(
to deduct the type of a result)
:param custom_default_arguments_coercer: callable that will replace the
tartiflette `default_arguments_coercer`
:param coerce_list_concurrently: whether or not list will be coerced
concurrently
:type custom_default_resolver: Optional[Callable]
:type custom_default_type_resolver: Optional[Callable]
:type custom_default_arguments_coercer: Optional[Callable]
:type coerce_list_concurrently: Optional[bool]
"""
self.default_type_resolver = (
custom_default_type_resolver or default_type_resolver
)
self.default_arguments_coercer = (
custom_default_arguments_coercer or gather_arguments_coercer
)
self.coerce_list_concurrently = (
coerce_list_concurrently
if coerce_list_concurrently is not None
else True
)
self._inject_introspection_fields()

self._validate_extensions() # Validate this before bake
Expand Down
5 changes: 5 additions & 0 deletions tartiflette/subscription/subscription.py
Expand Up @@ -40,19 +40,23 @@ def __init__(
name: str,
schema_name: str = "default",
arguments_coercer: Optional[Callable] = None,
concurrently: Optional[bool] = None,
) -> None:
"""
:param name: name of the subscription field
:param schema_name: name of the schema to which link the subscription
:param arguments_coercer: callable to use to coerce field arguments
:param concurrently: whether list should be coerced concurrently
:type name: str
:type schema_name: str
:type arguments_coercer: Optional[Callable]
:type concurrently: Optional[bool]
"""
self.name = name
self._implementation = None
self._schema_name = schema_name
self._arguments_coercer = arguments_coercer
self._concurrently = concurrently

def bake(self, schema: "GraphQLSchema") -> None:
"""
Expand Down Expand Up @@ -81,6 +85,7 @@ def bake(self, schema: "GraphQLSchema") -> None:

field.subscribe = self._implementation
field.subscription_arguments_coercer = self._arguments_coercer
field.subscription_concurrently = self._concurrently

def __call__(self, implementation: Callable) -> Callable:
"""
Expand Down
16 changes: 15 additions & 1 deletion tartiflette/types/field.py
Expand Up @@ -64,6 +64,11 @@ def __init__(
self.query_arguments_coercer: Optional[Callable] = None
self.subscription_arguments_coercer: Optional[Callable] = None

# Concurrently
self.concurrently: Optional[bool] = None
self.query_concurrently: Optional[bool] = None
self.subscription_concurrently: Optional[bool] = None

# Introspection attributes
self.isDeprecated: bool = False # pylint: disable=invalid-name
self.args: List["GraphQLArgument"] = []
Expand Down Expand Up @@ -160,6 +165,13 @@ def bake(
else:
self.arguments_coercer = schema.default_arguments_coercer

if self.subscription_concurrently is not None:
self.concurrently = self.subscription_concurrently
elif self.query_concurrently is not None:
self.concurrently = self.query_concurrently
else:
self.concurrently = schema.coerce_list_concurrently

# Directives
directives_definition = compute_directive_nodes(
schema, self.directives
Expand Down Expand Up @@ -192,7 +204,9 @@ def bake(
is_resolver=True,
with_default=True,
),
output_coercer=get_output_coercer(self.graphql_type),
output_coercer=get_output_coercer(
self.graphql_type, self.concurrently
),
)

for argument in self.arguments.values():
Expand Down

0 comments on commit 397feb5

Please sign in to comment.