-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
test_copier.py
142 lines (117 loc) · 5.4 KB
/
test_copier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import pathlib
from unittest import mock
from unittest.mock import Mock
import pytest
import lightning_app
from lightning_app.storage.copier import _Copier, _copy_files
from lightning_app.storage.path import Path
from lightning_app.storage.requests import _ExistsRequest, _GetRequest
from lightning_app.testing.helpers import _MockQueue
class MockPatch:
@staticmethod
def _handle_get_request(work, request):
return Path._handle_get_request(work, request)
@staticmethod
def _handle_exists_request(work, request):
return Path._handle_exists_request(work, request)
@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir")
@mock.patch("lightning_app.storage.path.pathlib.Path.stat")
@mock.patch("lightning_app.storage.copier._filesystem")
def test_copier_copies_all_files(fs_mock, stat_mock, dir_mock, tmpdir):
"""Test that the Copier calls the copy with the information provided in the request."""
stat_mock().st_size = 0
dir_mock.return_value = False
copy_request_queue = _MockQueue()
copy_response_queue = _MockQueue()
work = mock.Mock()
work.name = MockPatch()
work._paths = {"file": dict(source="src", path="file", hash="123", destination="dest", name="name")}
with mock.patch.dict(os.environ, {"SHARED_MOUNT_DIRECTORY": str(tmpdir / ".shared")}):
copier = _Copier(work, copy_request_queue=copy_request_queue, copy_response_queue=copy_response_queue)
request = _GetRequest(source="src", path="file", hash="123", destination="dest", name="name")
copy_request_queue.put(request)
copier.run_once()
fs_mock().put.assert_called_once_with("file", tmpdir / ".shared" / "123")
@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir")
@mock.patch("lightning_app.storage.path.pathlib.Path.stat")
def test_copier_handles_exception(stat_mock, dir_mock, monkeypatch):
"""Test that the Copier captures exceptions from the file copy and forwards them through the queue without
raising it."""
stat_mock().st_size = 0
dir_mock.return_value = False
copy_request_queue = _MockQueue()
copy_response_queue = _MockQueue()
fs = mock.Mock()
fs.exists.return_value = False
fs.put = mock.Mock(side_effect=OSError("Something went wrong"))
monkeypatch.setattr(lightning_app.storage.copier, "_filesystem", mock.Mock(return_value=fs))
work = mock.Mock()
work.name = MockPatch()
work._paths = {"file": dict(source="src", path="file", hash="123", destination="dest", name="name")}
copier = _Copier(work, copy_request_queue=copy_request_queue, copy_response_queue=copy_response_queue)
request = _GetRequest(source="src", path="file", hash="123", destination="dest", name="name")
copy_request_queue.put(request)
copier.run_once()
response = copy_response_queue.get()
assert type(response.exception) == OSError
assert response.exception.args[0] == "Something went wrong"
def test_copier_existence_check(tmpdir):
"""Test that the Copier responds to an existence check request."""
copy_request_queue = _MockQueue()
copy_response_queue = _MockQueue()
work = mock.Mock()
work.name = MockPatch()
work._paths = {
"file": dict(source="src", path=str(tmpdir / "notexists"), hash="123", destination="dest", name="name")
}
copier = _Copier(work, copy_request_queue=copy_request_queue, copy_response_queue=copy_response_queue)
# A Path that does NOT exist
request = _ExistsRequest(source="src", path=str(tmpdir / "notexists"), destination="dest", name="name", hash="123")
copy_request_queue.put(request)
copier.run_once()
response = copy_response_queue.get()
assert response.exists is False
# A Path that DOES exist
request = _ExistsRequest(source="src", path=str(tmpdir), destination="dest", name="name", hash="123")
copy_request_queue.put(request)
copier.run_once()
response = copy_response_queue.get()
assert response.exists is True
def test_copy_files(tmpdir):
"""Test that the `test_copy_files` utility can handle both files and folders when the destination does not
exist."""
# copy from a src that does not exist
src = pathlib.Path(tmpdir, "dir1")
dst = pathlib.Path(tmpdir, "dir2")
with pytest.raises(FileNotFoundError):
_copy_files(src, dst)
# copy to a dst dir that does not exist
src.mkdir()
(src / "empty.txt").touch()
assert not dst.exists()
_copy_files(src, dst)
assert dst.is_dir()
# copy to a destination dir that already exists (no error should be raised)
_copy_files(src, dst)
assert dst.is_dir()
# copy file to a dst that does not exist
src = pathlib.Path(tmpdir, "dir3", "src-file.txt")
dst = pathlib.Path(tmpdir, "dir4", "dst-file.txt")
src.parent.mkdir(parents=True)
src.touch()
assert not dst.exists()
_copy_files(src, dst)
assert dst.is_file()
def test_copy_files_with_exception(tmpdir):
"""Test that the `test_copy_files` utility properly raises exceptions from within the ThreadPoolExecutor."""
fs_mock = Mock()
fs_mock().put = Mock(side_effect=ValueError("error from thread"))
src = pathlib.Path(tmpdir, "src")
src.mkdir()
assert src.is_dir()
pathlib.Path(src, "file.txt").touch()
dst = pathlib.Path(tmpdir, "dest")
with mock.patch("lightning_app.storage.copier._filesystem", fs_mock):
with pytest.raises(ValueError, match="error from thread"):
_copy_files(src, dst)