forked from release-engineering/pubtools-pulplib
/
test_publish.py
258 lines (209 loc) · 8.41 KB
/
test_publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import logging
import pytest
from pubtools.pulplib import (
Repository,
YumRepository,
Task,
Distributor,
DetachedException,
PublishOptions,
TaskFailedException,
)
def test_detached():
"""publish raises if called on a detached repo"""
with pytest.raises(DetachedException):
Repository(id="some-repo").publish()
def test_publish_no_distributors(client):
"""publish succeeds and returns no tasks if repo contains no distributors."""
repo = Repository(id="some-repo")
repo.__dict__["_client"] = client
assert repo.publish().result() == []
def test_publish_distributors(fast_poller, requests_mocker, client):
"""publish succeeds and returns tasks from each applicable distributor"""
repo = YumRepository(
id="some-repo",
distributors=(
Distributor(id="yum_distributor", type_id="yum_distributor"),
Distributor(id="other_distributor", type_id="other_distributor"),
Distributor(id="cdn_distributor", type_id="rpm_rsync_distributor"),
),
)
repo.__dict__["_client"] = client
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/",
[
{"json": {"spawned_tasks": [{"task_id": "task1"}, {"task_id": "task2"}]}},
{"json": {"spawned_tasks": [{"task_id": "task3"}]}},
],
)
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/tasks/search/",
[
{
"json": [
{"task_id": "task1", "state": "finished"},
{"task_id": "task2", "state": "skipped"},
]
},
{"json": [{"task_id": "task3", "state": "finished"}]},
],
)
# It should have succeeded, with the tasks as retrieved from Pulp
assert sorted(repo.publish().result()) == [
Task(id="task1", succeeded=True, completed=True),
Task(id="task2", succeeded=True, completed=True),
Task(id="task3", succeeded=True, completed=True),
]
# It should have first issued a request to publish yum_distributor
req = requests_mocker.request_history
assert (
req[0].url
== "https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/"
)
assert req[0].json() == {"id": "yum_distributor", "override_config": {}}
# Then polled for resulting tasks to succeed
assert req[1].url == "https://pulp.example.com/pulp/api/v2/tasks/search/"
assert req[1].json() == {
"criteria": {"filters": {"task_id": {"$in": ["task1", "task2"]}}}
}
# Then published the next distributor
assert (
req[2].url
== "https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/"
)
assert req[2].json() == {"id": "cdn_distributor", "override_config": {}}
# Then waited for those tasks to finish too
assert req[3].url == "https://pulp.example.com/pulp/api/v2/tasks/search/"
assert req[3].json() == {"criteria": {"filters": {"task_id": {"$in": ["task3"]}}}}
# And there should have been no more requests
assert len(req) == 4
def test_publish_with_options(requests_mocker, client):
"""publish passes expected config into distributors based on publish options"""
repo = YumRepository(
id="some-repo",
distributors=(
Distributor(id="yum_distributor", type_id="yum_distributor"),
Distributor(id="cdn_distributor", type_id="rpm_rsync_distributor"),
),
)
repo.__dict__["_client"] = client
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/",
[
{"json": {"spawned_tasks": [{"task_id": "task1"}]}},
{"json": {"spawned_tasks": [{"task_id": "task2"}]}},
],
)
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/tasks/search/",
[
{"json": [{"task_id": "task1", "state": "finished"}]},
{"json": [{"task_id": "task2", "state": "finished"}]},
],
)
options = PublishOptions(clean=True, force=True, origin_only=True)
# It should have succeeded, with the tasks as retrieved from Pulp
assert sorted(repo.publish(options).result()) == [
Task(id="task1", succeeded=True, completed=True),
Task(id="task2", succeeded=True, completed=True),
]
req = requests_mocker.request_history
# The yum_distributor request should have set force_full, but not
# delete since it's not recognized by that distributor
assert req[0].json()["override_config"] == {"force_full": True}
# The cdn_distributor request should have set force_full, delete
# and content_units_only
assert req[2].json()["override_config"] == {
"force_full": True,
"delete": True,
"content_units_only": True,
}
def test_publish_fail(fast_poller, requests_mocker, client):
"""publish raises TaskFailedException if publish task fails"""
repo = YumRepository(
id="some-repo",
distributors=(Distributor(id="yum_distributor", type_id="yum_distributor"),),
)
repo.__dict__["_client"] = client
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/",
json={"spawned_tasks": [{"task_id": "task1"}]},
)
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/tasks/search/",
json=[{"task_id": "task1", "state": "error"}],
)
publish_f = repo.publish()
# It should raise this exception
with pytest.raises(TaskFailedException) as error:
publish_f.result()
# The exception should have a reference to the task which failed
assert error.value.task.id == "task1"
assert "Task task1 failed" in str(error.value)
def test_publish_broken_response(fast_poller, requests_mocker, client):
"""publish raises an exception if Pulp /publish/ responded with parseable
JSON, but not of the expected structure
"""
repo = YumRepository(
id="some-repo",
distributors=(Distributor(id="yum_distributor", type_id="yum_distributor"),),
)
repo.__dict__["_client"] = client
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/",
json={"spawned_tasks": ["oops, not a valid response"]},
)
publish_f = repo.publish()
# It should raise some kind of exception due to the invalid spawned_tasks structure
assert publish_f.exception()
def test_publish_retries(fast_poller, requests_mocker, client, caplog):
"""publish retries distributors as they fail"""
caplog.set_level(logging.WARNING)
repo = YumRepository(
id="some-repo",
distributors=(
Distributor(id="yum_distributor", type_id="yum_distributor"),
Distributor(id="cdn_distributor", type_id="cdn_distributor"),
),
)
repo.__dict__["_client"] = client
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/",
[
{"json": {"spawned_tasks": [{"task_id": "task1"}]}},
{"json": {"spawned_tasks": [{"task_id": "task2"}]}},
{"json": {"spawned_tasks": [{"task_id": "task3"}]}},
],
)
requests_mocker.post(
"https://pulp.example.com/pulp/api/v2/tasks/search/",
[
{"json": [{"task_id": "task1", "state": "finished"}]},
{"json": [{"task_id": "task2", "state": "error"}]},
{"json": [{"task_id": "task3", "state": "finished"}]},
],
)
publish_f = repo.publish()
# It should succeed
tasks = publish_f.result()
# It should return only the *successful* tasks
assert sorted([t.id for t in tasks]) == ["task1", "task3"]
# Pick out the HTTP requests triggering distributors
publish_reqs = [
req
for req in requests_mocker.request_history
if req.url
== "https://pulp.example.com/pulp/api/v2/repositories/some-repo/actions/publish/"
]
publish_distributors = [req.json()["id"] for req in publish_reqs]
# It should have triggered cdn_distributor twice, since the first attempt failed
assert publish_distributors == [
"yum_distributor",
"cdn_distributor",
"cdn_distributor",
]
# The retry should have been logged
messages = caplog.messages
assert (
messages[-1].splitlines()[0] == "Retrying due to error: Task task2 failed [1/6]"
)