forked from pytest-dev/pytest-xdist
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_dsession.py
326 lines (276 loc) · 10.1 KB
/
test_dsession.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
from xdist.dsession import DSession, get_default_max_worker_restart
from xdist.report import report_collection_diff
from xdist.scheduler import EachScheduling, LoadScheduling
import py
import pytest
import execnet
XSpec = execnet.XSpec
def run(item, node, excinfo=None):
runner = item.config.pluginmanager.getplugin("runner")
rep = runner.ItemTestReport(item=item, excinfo=excinfo, when="call")
rep.node = node
return rep
class MockGateway:
_count = 0
def __init__(self):
self.id = str(self._count)
self._count += 1
class MockNode:
def __init__(self):
self.sent = []
self.gateway = MockGateway()
self._shutdown = False
def send_runtest_some(self, indices):
self.sent.extend(indices)
def send_runtest_all(self):
self.sent.append("ALL")
def shutdown(self):
self._shutdown = True
@property
def shutting_down(self):
return self._shutdown
def dumpqueue(queue):
while queue.qsize():
print(queue.get())
class TestEachScheduling:
def test_schedule_load_simple(self, testdir):
node1 = MockNode()
node2 = MockNode()
config = testdir.parseconfig("--tx=2*popen")
sched = EachScheduling(config)
sched.add_node(node1)
sched.add_node(node2)
collection = ["a.py::test_1"]
assert not sched.collection_is_completed
sched.add_node_collection(node1, collection)
assert not sched.collection_is_completed
sched.add_node_collection(node2, collection)
assert sched.collection_is_completed
assert sched.node2collection[node1] == collection
assert sched.node2collection[node2] == collection
sched.schedule()
assert sched.tests_finished
assert node1.sent == ["ALL"]
assert node2.sent == ["ALL"]
sched.mark_test_complete(node1, 0)
assert sched.tests_finished
sched.mark_test_complete(node2, 0)
assert sched.tests_finished
def test_schedule_remove_node(self, testdir):
node1 = MockNode()
config = testdir.parseconfig("--tx=popen")
sched = EachScheduling(config)
sched.add_node(node1)
collection = ["a.py::test_1"]
assert not sched.collection_is_completed
sched.add_node_collection(node1, collection)
assert sched.collection_is_completed
assert sched.node2collection[node1] == collection
sched.schedule()
assert sched.tests_finished
crashitem = sched.remove_node(node1)
assert crashitem
assert sched.tests_finished
assert not sched.nodes
class TestLoadScheduling:
def test_schedule_load_simple(self, testdir):
config = testdir.parseconfig("--tx=2*popen")
sched = LoadScheduling(config)
sched.add_node(MockNode())
sched.add_node(MockNode())
node1, node2 = sched.nodes
collection = ["a.py::test_1", "a.py::test_2"]
assert not sched.collection_is_completed
sched.add_node_collection(node1, collection)
assert not sched.collection_is_completed
sched.add_node_collection(node2, collection)
assert sched.collection_is_completed
assert sched.node2collection[node1] == collection
assert sched.node2collection[node2] == collection
sched.schedule()
assert not sched.pending
assert sched.tests_finished
assert len(node1.sent) == 1
assert len(node2.sent) == 1
assert node1.sent == [0]
assert node2.sent == [1]
sched.mark_test_complete(node1, node1.sent[0])
assert sched.tests_finished
def test_schedule_batch_size(self, testdir):
config = testdir.parseconfig("--tx=2*popen")
sched = LoadScheduling(config)
sched.add_node(MockNode())
sched.add_node(MockNode())
node1, node2 = sched.nodes
col = ["xyz"] * 6
sched.add_node_collection(node1, col)
sched.add_node_collection(node2, col)
sched.schedule()
# assert not sched.tests_finished
sent1 = node1.sent
sent2 = node2.sent
assert sent1 == [0, 2]
assert sent2 == [1, 3]
assert sched.pending == [4, 5]
assert sched.node2pending[node1] == sent1
assert sched.node2pending[node2] == sent2
assert len(sched.pending) == 2
sched.mark_test_complete(node1, 0)
assert node1.sent == [0, 2, 4]
assert sched.pending == [5]
assert node2.sent == [1, 3]
sched.mark_test_complete(node1, 2)
assert node1.sent == [0, 2, 4, 5]
assert not sched.pending
def test_schedule_fewer_tests_than_nodes(self, testdir):
config = testdir.parseconfig("--tx=2*popen")
sched = LoadScheduling(config)
sched.add_node(MockNode())
sched.add_node(MockNode())
sched.add_node(MockNode())
node1, node2, node3 = sched.nodes
col = ["xyz"] * 2
sched.add_node_collection(node1, col)
sched.add_node_collection(node2, col)
sched.schedule()
# assert not sched.tests_finished
sent1 = node1.sent
sent2 = node2.sent
sent3 = node3.sent
assert sent1 == [0]
assert sent2 == [1]
assert sent3 == []
assert not sched.pending
def test_schedule_fewer_than_two_tests_per_node(self, testdir):
config = testdir.parseconfig("--tx=2*popen")
sched = LoadScheduling(config)
sched.add_node(MockNode())
sched.add_node(MockNode())
sched.add_node(MockNode())
node1, node2, node3 = sched.nodes
col = ["xyz"] * 5
sched.add_node_collection(node1, col)
sched.add_node_collection(node2, col)
sched.schedule()
# assert not sched.tests_finished
sent1 = node1.sent
sent2 = node2.sent
sent3 = node3.sent
assert sent1 == [0, 3]
assert sent2 == [1, 4]
assert sent3 == [2]
assert not sched.pending
def test_add_remove_node(self, testdir):
node = MockNode()
config = testdir.parseconfig("--tx=popen")
sched = LoadScheduling(config)
sched.add_node(node)
collection = ["test_file.py::test_func"]
sched.add_node_collection(node, collection)
assert sched.collection_is_completed
sched.schedule()
assert not sched.pending
crashitem = sched.remove_node(node)
assert crashitem == collection[0]
def test_different_tests_collected(self, testdir):
"""
Test that LoadScheduling is reporting collection errors when
different test ids are collected by workers.
"""
class CollectHook(object):
"""
Dummy hook that stores collection reports.
"""
def __init__(self):
self.reports = []
def pytest_collectreport(self, report):
self.reports.append(report)
collect_hook = CollectHook()
config = testdir.parseconfig("--tx=2*popen")
config.pluginmanager.register(collect_hook, "collect_hook")
node1 = MockNode()
node2 = MockNode()
sched = LoadScheduling(config)
sched.add_node(node1)
sched.add_node(node2)
sched.add_node_collection(node1, ["a.py::test_1"])
sched.add_node_collection(node2, ["a.py::test_2"])
sched.schedule()
assert len(collect_hook.reports) == 1
rep = collect_hook.reports[0]
assert "Different tests were collected between" in rep.longrepr
class TestDistReporter:
@py.test.mark.xfail
def test_rsync_printing(self, testdir, linecomp):
config = testdir.parseconfig()
from _pytest.pytest_terminal import TerminalReporter
rep = TerminalReporter(config, file=linecomp.stringio)
config.pluginmanager.register(rep, "terminalreporter")
dsession = DSession(config)
class gw1:
id = "X1"
spec = execnet.XSpec("popen")
class gw2:
id = "X2"
spec = execnet.XSpec("popen")
# class rinfo:
# version_info = (2, 5, 1, 'final', 0)
# executable = "hello"
# platform = "xyz"
# cwd = "qwe"
# dsession.pytest_xdist_newgateway(gw1, rinfo)
# linecomp.assert_contains_lines([
# "*X1*popen*xyz*2.5*"
# ])
dsession.pytest_xdist_rsyncstart(source="hello", gateways=[gw1, gw2])
linecomp.assert_contains_lines(["[X1,X2] rsyncing: hello"])
def test_report_collection_diff_equal():
"""Test reporting of equal collections."""
from_collection = to_collection = ["aaa", "bbb", "ccc"]
assert report_collection_diff(from_collection, to_collection, 1, 2) is None
def test_default_max_worker_restart():
class config:
class option:
maxworkerrestart = None
numprocesses = 0
assert get_default_max_worker_restart(config) is None
config.option.numprocesses = 2
assert get_default_max_worker_restart(config) == 8
config.option.maxworkerrestart = "1"
assert get_default_max_worker_restart(config) == 1
config.option.maxworkerrestart = "0"
assert get_default_max_worker_restart(config) == 0
def test_report_collection_diff_different():
"""Test reporting of different collections."""
from_collection = ["aaa", "bbb", "ccc", "YYY"]
to_collection = ["aZa", "bbb", "XXX", "ccc"]
error_message = (
"Different tests were collected between 1 and 2. The difference is:\n"
"--- 1\n"
"\n"
"+++ 2\n"
"\n"
"@@ -1,4 +1,4 @@\n"
"\n"
"-aaa\n"
"+aZa\n"
" bbb\n"
"+XXX\n"
" ccc\n"
"-YYY"
)
msg = report_collection_diff(from_collection, to_collection, "1", "2")
assert msg == error_message
@pytest.mark.xfail(reason="duplicate test ids not supported yet")
def test_pytest_issue419(testdir):
testdir.makepyfile(
"""
import pytest
@pytest.mark.parametrize('birth_year', [1988, 1988, ])
def test_2011_table(birth_year):
pass
"""
)
reprec = testdir.inline_run("-n1")
reprec.assertoutcome(passed=2)
assert 0