-
Notifications
You must be signed in to change notification settings - Fork 575
/
lazy.py
173 lines (147 loc) · 5.6 KB
/
lazy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Most of this work is copyright (C) 2013-2021 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
#
# END HEADER
from inspect import getfullargspec
from typing import MutableMapping
from weakref import WeakKeyDictionary
from hypothesis.internal.reflection import (
arg_string,
convert_keyword_arguments,
convert_positional_arguments,
get_pretty_function_description,
)
from hypothesis.strategies._internal.strategies import SearchStrategy
unwrap_cache: MutableMapping[SearchStrategy, SearchStrategy] = WeakKeyDictionary()
unwrap_depth = 0
def unwrap_strategies(s):
global unwrap_depth
if not isinstance(s, SearchStrategy):
return s
try:
return unwrap_cache[s]
except KeyError:
pass
unwrap_cache[s] = s
try:
unwrap_depth += 1
try:
result = unwrap_strategies(s.wrapped_strategy)
unwrap_cache[s] = result
try:
assert result.force_has_reusable_values == s.force_has_reusable_values
except AttributeError:
pass
try:
result.force_has_reusable_values = s.force_has_reusable_values
except AttributeError:
pass
return result
except AttributeError:
return s
finally:
unwrap_depth -= 1
if unwrap_depth <= 0:
unwrap_cache.clear()
assert unwrap_depth >= 0
def _repr_filter(condition):
return f".filter({get_pretty_function_description(condition)})"
class LazyStrategy(SearchStrategy):
"""A strategy which is defined purely by conversion to and from another
strategy.
Its parameter and distribution come from that other strategy.
"""
def __init__(self, function, args, kwargs, filters=(), *, force_repr=None):
super().__init__()
self.__wrapped_strategy = None
self.__representation = force_repr
self.function = function
self.__args = args
self.__kwargs = kwargs
self.__filters = filters
@property
def supports_find(self):
return self.wrapped_strategy.supports_find
def calc_is_empty(self, recur):
return recur(self.wrapped_strategy)
def calc_has_reusable_values(self, recur):
return recur(self.wrapped_strategy)
def calc_is_cacheable(self, recur):
for source in (self.__args, self.__kwargs.values()):
for v in source:
if isinstance(v, SearchStrategy) and not v.is_cacheable:
return False
return True
@property
def wrapped_strategy(self):
if self.__wrapped_strategy is None:
unwrapped_args = tuple(unwrap_strategies(s) for s in self.__args)
unwrapped_kwargs = {
k: unwrap_strategies(v) for k, v in self.__kwargs.items()
}
base = self.function(*self.__args, **self.__kwargs)
if unwrapped_args == self.__args and unwrapped_kwargs == self.__kwargs:
self.__wrapped_strategy = base
else:
self.__wrapped_strategy = self.function(
*unwrapped_args, **unwrapped_kwargs
)
for f in self.__filters:
self.__wrapped_strategy = self.__wrapped_strategy.filter(f)
return self.__wrapped_strategy
def filter(self, condition):
return LazyStrategy(
self.function,
self.__args,
self.__kwargs,
self.__filters + (condition,),
force_repr=f"{self!r}{_repr_filter(condition)}",
)
def do_validate(self):
w = self.wrapped_strategy
assert isinstance(w, SearchStrategy), f"{self!r} returned non-strategy {w!r}"
w.validate()
def __repr__(self):
if self.__representation is None:
_args = self.__args
_kwargs = self.__kwargs
argspec = getfullargspec(self.function)
defaults = dict(argspec.kwonlydefaults or {})
if argspec.defaults is not None:
for name, value in zip(
reversed(argspec.args), reversed(argspec.defaults)
):
defaults[name] = value
if len(argspec.args) > 1 or argspec.defaults:
_args, _kwargs = convert_positional_arguments(
self.function, _args, _kwargs
)
else:
_args, _kwargs = convert_keyword_arguments(
self.function, _args, _kwargs
)
kwargs_for_repr = dict(_kwargs)
for k, v in defaults.items():
if k in kwargs_for_repr and kwargs_for_repr[k] is v:
del kwargs_for_repr[k]
self.__representation = "{}({}){}".format(
self.function.__name__,
arg_string(self.function, _args, kwargs_for_repr, reorder=False),
"".join(map(_repr_filter, self.__filters)),
)
return self.__representation
def do_draw(self, data):
return data.draw(self.wrapped_strategy)
@property
def label(self):
return self.wrapped_strategy.label