/
_hooks.py
325 lines (263 loc) · 11.2 KB
/
_hooks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
"""
Internal hook annotation, representation and calling machinery.
"""
import inspect
import sys
import warnings
class HookspecMarker:
"""Decorator helper class for marking functions as hook specifications.
You can instantiate it with a project_name to get a decorator.
Calling :py:meth:`.PluginManager.add_hookspecs` later will discover all marked functions
if the :py:class:`.PluginManager` uses the same project_name.
"""
def __init__(self, project_name):
self.project_name = project_name
def __call__(
self, function=None, firstresult=False, historic=False, warn_on_impl=None
):
"""if passed a function, directly sets attributes on the function
which will make it discoverable to :py:meth:`.PluginManager.add_hookspecs`.
If passed no function, returns a decorator which can be applied to a function
later using the attributes supplied.
If ``firstresult`` is ``True`` the 1:N hook call (N being the number of registered
hook implementation functions) will stop at I<=N when the I'th function
returns a non-``None`` result.
If ``historic`` is ``True`` calls to a hook will be memorized and replayed
on later registered plugins.
"""
def setattr_hookspec_opts(func):
if historic and firstresult:
raise ValueError("cannot have a historic firstresult hook")
setattr(
func,
self.project_name + "_spec",
dict(
firstresult=firstresult,
historic=historic,
warn_on_impl=warn_on_impl,
),
)
return func
if function is not None:
return setattr_hookspec_opts(function)
else:
return setattr_hookspec_opts
class HookimplMarker:
"""Decorator helper class for marking functions as hook implementations.
You can instantiate with a ``project_name`` to get a decorator.
Calling :py:meth:`.PluginManager.register` later will discover all marked functions
if the :py:class:`.PluginManager` uses the same project_name.
"""
def __init__(self, project_name):
self.project_name = project_name
def __call__(
self,
function=None,
hookwrapper=False,
optionalhook=False,
tryfirst=False,
trylast=False,
specname=None,
):
"""if passed a function, directly sets attributes on the function
which will make it discoverable to :py:meth:`.PluginManager.register`.
If passed no function, returns a decorator which can be applied to a
function later using the attributes supplied.
If ``optionalhook`` is ``True`` a missing matching hook specification will not result
in an error (by default it is an error if no matching spec is found).
If ``tryfirst`` is ``True`` this hook implementation will run as early as possible
in the chain of N hook implementations for a specification.
If ``trylast`` is ``True`` this hook implementation will run as late as possible
in the chain of N hook implementations.
If ``hookwrapper`` is ``True`` the hook implementations needs to execute exactly
one ``yield``. The code before the ``yield`` is run early before any non-hookwrapper
function is run. The code after the ``yield`` is run after all non-hookwrapper
function have run. The ``yield`` receives a :py:class:`.callers._Result` object
representing the exception or result outcome of the inner calls (including other
hookwrapper calls).
If ``specname`` is provided, it will be used instead of the function name when
matching this hook implementation to a hook specification during registration.
"""
def setattr_hookimpl_opts(func):
setattr(
func,
self.project_name + "_impl",
dict(
hookwrapper=hookwrapper,
optionalhook=optionalhook,
tryfirst=tryfirst,
trylast=trylast,
specname=specname,
),
)
return func
if function is None:
return setattr_hookimpl_opts
else:
return setattr_hookimpl_opts(function)
def normalize_hookimpl_opts(opts):
opts.setdefault("tryfirst", False)
opts.setdefault("trylast", False)
opts.setdefault("hookwrapper", False)
opts.setdefault("optionalhook", False)
opts.setdefault("specname", None)
_PYPY = hasattr(sys, "pypy_version_info")
def varnames(func):
"""Return tuple of positional and keywrord argument names for a function,
method, class or callable.
In case of a class, its ``__init__`` method is considered.
For methods the ``self`` parameter is not included.
"""
if inspect.isclass(func):
try:
func = func.__init__
except AttributeError:
return (), ()
elif not inspect.isroutine(func): # callable object?
try:
func = getattr(func, "__call__", func)
except Exception:
return (), ()
try: # func MUST be a function or method here or we won't parse any args
spec = inspect.getfullargspec(func)
except TypeError:
return (), ()
args, defaults = tuple(spec.args), spec.defaults
if defaults:
index = -len(defaults)
args, kwargs = args[:index], tuple(args[index:])
else:
kwargs = ()
# strip any implicit instance arg
# pypy3 uses "obj" instead of "self" for default dunder methods
implicit_names = ("self",) if not _PYPY else ("self", "obj")
if args:
if inspect.ismethod(func) or (
"." in getattr(func, "__qualname__", ()) and args[0] in implicit_names
):
args = args[1:]
return args, kwargs
class _HookRelay:
"""hook holder object for performing 1:N hook calls where N is the number
of registered plugins.
"""
class _HookCaller:
def __init__(self, name, hook_execute, specmodule_or_class=None, spec_opts=None):
self.name = name
self._wrappers = []
self._nonwrappers = []
self._hookexec = hook_execute
self._call_history = None
self.spec = None
if specmodule_or_class is not None:
assert spec_opts is not None
self.set_specification(specmodule_or_class, spec_opts)
def has_spec(self):
return self.spec is not None
def set_specification(self, specmodule_or_class, spec_opts):
assert not self.has_spec()
self.spec = HookSpec(specmodule_or_class, self.name, spec_opts)
if spec_opts.get("historic"):
self._call_history = []
def is_historic(self):
return self._call_history is not None
def _remove_plugin(self, plugin):
def remove(wrappers):
for i, method in enumerate(wrappers):
if method.plugin == plugin:
del wrappers[i]
return True
if remove(self._wrappers) is None:
if remove(self._nonwrappers) is None:
raise ValueError(f"plugin {plugin!r} not found")
def get_hookimpls(self):
# Order is important for _hookexec
return self._nonwrappers + self._wrappers
def _add_hookimpl(self, hookimpl):
"""Add an implementation to the callback chain."""
if hookimpl.hookwrapper:
methods = self._wrappers
else:
methods = self._nonwrappers
if hookimpl.trylast:
methods.insert(0, hookimpl)
elif hookimpl.tryfirst:
methods.append(hookimpl)
else:
# find last non-tryfirst method
i = len(methods) - 1
while i >= 0 and methods[i].tryfirst:
i -= 1
methods.insert(i + 1, hookimpl)
def __repr__(self):
return f"<_HookCaller {self.name!r}>"
def __call__(self, *args, **kwargs):
if args:
raise TypeError("hook calling supports only keyword arguments")
assert not self.is_historic()
# This is written to avoid expensive operations when not needed.
if self.spec:
for argname in self.spec.argnames:
if argname not in kwargs:
notincall = tuple(set(self.spec.argnames) - kwargs.keys())
warnings.warn(
"Argument(s) {} which are declared in the hookspec "
"can not be found in this hook call".format(notincall),
stacklevel=2,
)
break
firstresult = self.spec.opts.get("firstresult")
else:
firstresult = False
return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
def call_historic(self, result_callback=None, kwargs=None):
"""Call the hook with given ``kwargs`` for all registered plugins and
for all plugins which will be registered afterwards.
If ``result_callback`` is not ``None`` it will be called for for each
non-``None`` result obtained from a hook implementation.
"""
self._call_history.append((kwargs or {}, result_callback))
# Historizing hooks don't return results.
# Remember firstresult isn't compatible with historic.
res = self._hookexec(self.name, self.get_hookimpls(), kwargs, False)
if result_callback is None:
return
for x in res or []:
result_callback(x)
def call_extra(self, methods, kwargs):
"""Call the hook with some additional temporarily participating
methods using the specified ``kwargs`` as call parameters."""
old = list(self._nonwrappers), list(self._wrappers)
for method in methods:
opts = dict(hookwrapper=False, trylast=False, tryfirst=False)
hookimpl = HookImpl(None, "<temp>", method, opts)
self._add_hookimpl(hookimpl)
try:
return self(**kwargs)
finally:
self._nonwrappers, self._wrappers = old
def _maybe_apply_history(self, method):
"""Apply call history to a new hookimpl if it is marked as historic."""
if self.is_historic():
for kwargs, result_callback in self._call_history:
res = self._hookexec(self.name, [method], kwargs, False)
if res and result_callback is not None:
result_callback(res[0])
class HookImpl:
def __init__(self, plugin, plugin_name, function, hook_impl_opts):
self.function = function
self.argnames, self.kwargnames = varnames(self.function)
self.plugin = plugin
self.opts = hook_impl_opts
self.plugin_name = plugin_name
self.__dict__.update(hook_impl_opts)
def __repr__(self):
return f"<HookImpl plugin_name={self.plugin_name!r}, plugin={self.plugin!r}>"
class HookSpec:
def __init__(self, namespace, name, opts):
self.namespace = namespace
self.function = function = getattr(namespace, name)
self.name = name
self.argnames, self.kwargnames = varnames(function)
self.opts = opts
self.warn_on_impl = opts.get("warn_on_impl")