-
Notifications
You must be signed in to change notification settings - Fork 122
/
_callers.py
182 lines (164 loc) · 7.11 KB
/
_callers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
"""
Call loop machinery
"""
from __future__ import annotations
from typing import cast
from typing import Generator
from typing import Mapping
from typing import NoReturn
from typing import Sequence
from typing import Tuple
from typing import Union
import warnings
from ._hooks import HookImpl
from ._result import HookCallError
from ._result import Result
from ._warnings import PluggyTeardownRaisedWarning
# Need to distinguish between old- and new-style hook wrappers.
# Wrapping with a tuple is the fastest type-safe way I found to do it.
Teardown = Union[
Tuple[Generator[None, Result[object], None], HookImpl],
Generator[None, object, object],
]
def _raise_wrapfail(
wrap_controller: (
Generator[None, Result[object], None] | Generator[None, object, object]
),
msg: str,
) -> NoReturn:
co = wrap_controller.gi_code
raise RuntimeError(
"wrap_controller at %r %s:%d %s"
% (co.co_name, co.co_filename, co.co_firstlineno, msg)
)
def _warn_teardown_exception(
hook_name: str, hook_impl: HookImpl, e: BaseException
) -> None:
msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
msg += f"{type(e).__name__}: {e}\n"
msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning" # noqa: E501
warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)
def _multicall(
hook_name: str,
hook_impls: Sequence[HookImpl],
caller_kwargs: Mapping[str, object],
firstresult: bool,
) -> object | list[object]:
"""Execute a call into multiple python functions/methods and return the
result(s).
``caller_kwargs`` comes from HookCaller.__call__().
"""
__tracebackhide__ = True
results: list[object] = []
exception = None
only_new_style_wrappers = True
try: # run impl and wrapper setup functions in a loop
teardowns: list[Teardown] = []
try:
for hook_impl in reversed(hook_impls):
try:
args = [caller_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in caller_kwargs:
raise HookCallError(
f"hook call must provide argument {argname!r}"
)
if hook_impl.hookwrapper:
only_new_style_wrappers = False
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
wrapper_gen = cast(Generator[None, Result[object], None], res)
next(wrapper_gen) # first yield
teardowns.append((wrapper_gen, hook_impl))
except StopIteration:
_raise_wrapfail(wrapper_gen, "did not yield")
elif hook_impl.wrapper:
try:
# If this cast is not valid, a type error is raised below,
# which is the desired response.
res = hook_impl.function(*args)
function_gen = cast(Generator[None, object, object], res)
next(function_gen) # first yield
teardowns.append(function_gen)
except StopIteration:
_raise_wrapfail(function_gen, "did not yield")
else:
res = hook_impl.function(*args)
if res is not None:
results.append(res)
if firstresult: # halt further impl calls
break
except BaseException as exc:
exception = exc
finally:
# Fast path - only new-style wrappers, no Result.
if only_new_style_wrappers:
if firstresult: # first result hooks return a single value
result = results[0] if results else None
else:
result = results
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
try:
if exception is not None:
teardown.throw(exception) # type: ignore[union-attr]
else:
teardown.send(result) # type: ignore[union-attr]
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close() # type: ignore[union-attr]
except StopIteration as si:
result = si.value
exception = None
continue
except BaseException as e:
exception = e
continue
_raise_wrapfail(teardown, "has second yield") # type: ignore[arg-type]
if exception is not None:
raise exception
else:
return result
# Slow path - need to support old-style wrappers.
else:
if firstresult: # first result hooks return a single value
outcome: Result[object | list[object]] = Result(
results[0] if results else None, exception
)
else:
outcome = Result(results, exception)
# run all wrapper post-yield blocks
for teardown in reversed(teardowns):
if isinstance(teardown, tuple):
try:
teardown[0].send(outcome)
except StopIteration:
pass
except BaseException as e:
_warn_teardown_exception(hook_name, teardown[1], e)
raise
else:
_raise_wrapfail(teardown[0], "has second yield")
else:
try:
if outcome._exception is not None:
teardown.throw(outcome._exception)
else:
teardown.send(outcome._result)
# Following is unreachable for a well behaved hook wrapper.
# Try to force finalizers otherwise postponed till GC action.
# Note: close() may raise if generator handles GeneratorExit.
teardown.close()
except StopIteration as si:
outcome.force_result(si.value)
continue
except BaseException as e:
outcome.force_exception(e)
continue
_raise_wrapfail(teardown, "has second yield")
return outcome.get_result()