forked from iterative/dvc
-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_lockfile.py
219 lines (171 loc) · 6.93 KB
/
test_lockfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import logging
import os
from collections import OrderedDict
from operator import itemgetter
import pytest
from dvc.dvcfile import PIPELINE_LOCK, Lockfile
from dvc.stage.utils import split_params_deps
from dvc.utils.fs import remove
from dvc.utils.serialize import dumps_yaml, parse_yaml_for_update
from dvc.utils.strictyaml import YAMLValidationError, make_relpath
from dvc_data.hashfile.hash_info import HashInfo
from tests.func.test_run_multistage import supported_params
FS_STRUCTURE = {
"foo": "bar\nfoobar",
"bar": "foo\nfoobar",
"foobar": "foobar\nbar",
"params.yaml": dumps_yaml(supported_params),
"params2.yaml": dumps_yaml(supported_params),
}
def read_lock_file(file=PIPELINE_LOCK):
with open(file, encoding="utf-8") as f:
data = parse_yaml_for_update(f.read(), file)
assert isinstance(data, OrderedDict)
return data
def assert_eq_lockfile(previous, new):
for content in (previous, new):
assert isinstance(content, OrderedDict)
# if they both are OrderedDict, then `==` will also check for order
assert previous == new
def test_deps_outs_are_sorted_by_path(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
run_head(*deps, name="copy-first-line")
initial_content = read_lock_file()
lock = initial_content["stages"]["copy-first-line"]
# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "outs"]
# `path` key appear first and then the `md5`
assert all(
list(dep.keys()) == ["path", "md5", "size"] for dep in lock["deps"]
)
assert all(
list(out.keys()) == ["path", "md5", "size"] for out in lock["outs"]
)
# deps are always sorted by the file path naming
assert list(map(itemgetter("path"), lock["deps"])) == sorted(deps)
# outs are too
assert list(map(itemgetter("path"), lock["outs"])) == [
d + "-1" for d in sorted(deps)
]
def test_order_is_preserved_when_pipeline_order_changes(
tmp_dir, dvc, run_head
):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")
initial_content = read_lock_file()
# reverse order of stage.outs and dump to the pipeline file
# then, again change stage.deps and dump to the pipeline file
reversal = stage.outs.reverse, stage.deps.reverse
for reverse_items in reversal:
reverse_items()
stage.dvcfile._dump_pipeline_file(stage)
# we only changed the order, should not reproduce
assert not dvc.reproduce(stage.addressing)
new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)
(tmp_dir / PIPELINE_LOCK).unlink()
assert dvc.reproduce(stage.addressing) == [stage]
new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)
def test_cmd_changes_other_orders_are_preserved(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
deps = ["foo", "bar", "foobar"]
stage = run_head(*deps, name="copy-first-line")
initial_content = read_lock_file()
# let's change cmd in pipeline file
# it should only change "cmd", otherwise it should be
# structurally same as cmd
stage.cmd = " ".join(stage.cmd.split())
stage.dvcfile._dump_pipeline_file(stage)
initial_content["stages"]["copy-first-line"]["cmd"] = stage.cmd
assert dvc.reproduce(stage.addressing) == [stage]
new_lock_content = read_lock_file()
assert_eq_lockfile(new_lock_content, initial_content)
def test_params_dump(tmp_dir, dvc, run_head):
tmp_dir.gen(FS_STRUCTURE)
stage = run_head(
"foo",
"bar",
"foobar",
name="copy-first-line",
params=[
"params2.yaml:answer,lists,name",
"params.yaml:lists,floats,nested.nested1,nested.nested1.nested2",
],
)
initial_content = read_lock_file()
lock = initial_content["stages"]["copy-first-line"]
# lock stage key order:
assert list(lock.keys()) == ["cmd", "deps", "params", "outs"]
assert list(lock["params"].keys()) == ["params.yaml", "params2.yaml"]
# # params keys are always sorted by the name
assert list(lock["params"]["params.yaml"].keys()) == [
"floats",
"lists",
"nested.nested1",
"nested.nested1.nested2",
]
assert list(lock["params"]["params2.yaml"]) == ["answer", "lists", "name"]
assert not dvc.reproduce(stage.addressing)
# let's change the order of params and dump them in pipeline file
params, _ = split_params_deps(stage)
for param in params:
param.params.reverse()
stage.dvcfile._dump_pipeline_file(stage)
assert not dvc.reproduce(stage.addressing)
(tmp_dir / PIPELINE_LOCK).unlink()
assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())
# remove build-cache and check if the same structure is built
for item in [dvc.stage_cache.cache_dir, PIPELINE_LOCK]:
remove(item)
assert dvc.reproduce(stage.addressing) == [stage]
assert_eq_lockfile(initial_content, read_lock_file())
@pytest.fixture
def v1_repo_lock(tmp_dir, dvc):
"""Generates a repo having v1 format lockfile"""
size = 5 if os.name == "nt" else 4
hi = HashInfo(name="md5", value="c157a79031e1c40f85931829bc5fc552")
v1_lockdata = {
"foo": {"cmd": "echo foo"},
"bar": {
"cmd": "echo bar>bar.txt",
"outs": [{"path": "bar.txt", **hi.to_dict(), "size": size}],
},
}
dvc.run(cmd="echo foo", name="foo", no_exec=True)
dvc.run(cmd="echo bar>bar.txt", outs=["bar.txt"], name="bar", no_exec=True)
(tmp_dir / "dvc.lock").dump(v1_lockdata)
yield v1_lockdata
def test_can_read_v1_lockfile(tmp_dir, dvc, v1_repo_lock):
assert dvc.status() == {
"bar": [{"changed outs": {"bar.txt": "not in cache"}}],
"foo": ["always changed"],
}
def test_migrates_v1_lockfile_to_v2_during_dump(
tmp_dir, dvc, v1_repo_lock, caplog
):
caplog.clear()
with caplog.at_level(logging.INFO, logger="dvc.dvcfile"):
assert dvc.reproduce()
assert "Migrating lock file 'dvc.lock' from v1 to v2" in caplog.messages
d = (tmp_dir / "dvc.lock").parse()
assert d == {"stages": v1_repo_lock, "schema": "2.0"}
@pytest.mark.parametrize(
"version_info", [{"schema": "1.1"}, {"schema": "2.1"}, {"schema": "3.0"}]
)
def test_lockfile_invalid_versions(tmp_dir, dvc, version_info):
lockdata = {**version_info, "stages": {"foo": {"cmd": "echo foo"}}}
(tmp_dir / "dvc.lock").dump(lockdata)
with pytest.raises(YAMLValidationError) as exc_info:
Lockfile(dvc, tmp_dir / "dvc.lock").load()
rel = make_relpath("dvc.lock")
assert f"'{rel}' validation failed" in str(exc_info.value)
assert (
str(exc_info.value.__cause__)
== f"invalid schema version {version_info['schema']}, "
"expected one of ['2.0'] for dictionary value @ "
"data['schema']"
)