-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
interpolate.py
227 lines (163 loc) · 5.7 KB
/
interpolate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import re
import typing
from collections.abc import Iterable, Mapping
from functools import singledispatch
from funcy import memoize, rpartial
from dvc.exceptions import DvcException
from dvc.utils.flatten import flatten
if typing.TYPE_CHECKING:
from typing import List, Match
from pyparsing import ParseException
from typing_extensions import NoReturn
from .context import Context
BRACE_OPEN = "${"
BRACE_CLOSE = "}"
LBRACK = "["
RBRACK = "]"
PERIOD = "."
KEYCRE = re.compile(
r"""
(?<!\\) # escape \${}
\${ # starts with ${
(?P<inner>.*?) # match every char inside
} # end with {
""",
re.VERBOSE,
)
@memoize
def get_parser():
from pyparsing import CharsNotIn, ParserElement, Suppress, ZeroOrMore
ParserElement.enablePackrat()
word = CharsNotIn(f"{PERIOD}{LBRACK}{RBRACK}")
idx = Suppress(LBRACK) + word + Suppress(RBRACK)
attr = Suppress(PERIOD) + word
parser = word + ZeroOrMore(attr ^ idx)
parser.setParseAction(PERIOD.join)
return parser
class ParseError(DvcException):
pass
def get_matches(template: str):
return list(KEYCRE.finditer(template))
def is_interpolated_string(val):
return isinstance(val, str) and bool(get_matches(val))
def normalize_key(key: str):
return key.replace(LBRACK, PERIOD).replace(RBRACK, "")
def format_and_raise_parse_error(exc) -> "NoReturn":
raise ParseError(_format_exc_msg(exc))
def embrace(s: str):
return BRACE_OPEN + s + BRACE_CLOSE
@singledispatch
def to_str(obj) -> str:
return str(obj)
@to_str.register(bool)
def _(obj: bool):
return "true" if obj else "false"
@to_str.register(dict)
def _(obj: dict):
from dvc.config import Config
config = Config().get("parsing", {})
result = ""
for k, v in flatten(obj).items():
if isinstance(v, bool):
if v:
result += f"--{k} "
else:
if config.get("bool", "store_true") == "boolean_optional":
result += f"--no-{k} "
elif isinstance(v, str):
result += f"--{k} '{v}' "
elif isinstance(v, Iterable):
for n, i in enumerate(v):
if isinstance(i, str):
i = f"'{i}'"
elif isinstance(i, Iterable):
raise ParseError(
f"Cannot interpolate nested iterable in '{k}'"
)
if config.get("list", "nargs") == "append":
result += f"--{k} {i} "
else:
result += f"{i} " if n > 0 else f"--{k} {i} "
else:
result += f"--{k} {v} "
return result.rstrip()
def _format_exc_msg(exc: "ParseException"):
from pyparsing import ParseException
from dvc.utils import colorize
exc.loc += 2 # 2 because we append `${` at the start of expr below
expr = exc.pstr
exc.pstr = embrace(exc.pstr)
error = ParseException.explain(exc, depth=0)
_, pointer, *explains = error.splitlines()
pstr = "{brace_open}{expr}{brace_close}".format(
brace_open=colorize(BRACE_OPEN, color="blue"),
expr=colorize(expr, color="magenta"),
brace_close=colorize(BRACE_CLOSE, color="blue"),
)
msg = "\n".join(explains)
pointer = colorize(pointer, color="red")
return "\n".join([pstr, pointer, colorize(msg, color="red", style="bold")])
def recurse(f):
seq = (list, tuple, set)
def wrapper(data, *args):
g = rpartial(wrapper, *args)
if isinstance(data, Mapping):
return {g(k): g(v) for k, v in data.items()}
if isinstance(data, seq):
return type(data)(map(g, data))
if isinstance(data, str):
return f(data, *args)
return data
return wrapper
def check_recursive_parse_errors(data):
func = recurse(check_expression)
return func(data)
def check_expression(s: str):
matches = get_matches(s)
for match in matches:
get_expression(match)
def parse_expr(s: str):
from pyparsing import ParseException
try:
result = get_parser().parseString(s, parseAll=True)
except ParseException as exc:
format_and_raise_parse_error(exc)
raise AssertionError("unreachable")
joined = result.asList()
assert len(joined) == 1
return joined[0]
def get_expression(match: "Match", skip_checks: bool = False):
inner = match["inner"]
return inner if skip_checks else parse_expr(inner)
def validate_value(value, key):
from .context import PRIMITIVES
not_primitive = value is not None and not isinstance(value, PRIMITIVES)
not_foreach = key is not None and "foreach" not in key
if not_primitive and not_foreach:
if isinstance(value, dict):
if key == "cmd":
return True
raise ParseError(
f"Cannot interpolate data of type '{type(value).__name__}'"
)
def str_interpolate(
template: str,
matches: "List[Match]",
context: "Context",
skip_checks: bool = False,
key=None,
):
index, buf = 0, ""
for match in matches:
start, end = match.span(0)
expr = get_expression(match, skip_checks=skip_checks)
value = context.select(expr, unwrap=True)
validate_value(value, key)
buf += template[index:start] + to_str(value)
index = end
buf += template[index:]
# regex already backtracks and avoids any `${` starting with
# backslashes(`\`). We just need to replace those by `${`.
return buf.replace(r"\${", BRACE_OPEN)
def is_exact_string(src: str, matches: "List[Match]"):
return len(matches) == 1 and src == matches[0].group(0)