-
Notifications
You must be signed in to change notification settings - Fork 344
/
test_transmit_worker_process.py
54 lines (38 loc) · 1.43 KB
/
test_transmit_worker_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
import io
from ansible_runner.streaming import Transmitter, Worker, Processor
def test_remote_job_interface(tmpdir, test_data_dir):
worker_dir = str(tmpdir.mkdir('for_worker'))
process_dir = str(tmpdir.mkdir('for_process'))
original_dir = os.path.join(test_data_dir, 'debug')
outgoing_buffer = io.BytesIO()
# Intended AWX and Tower use case
transmitter = Transmitter(
_output = outgoing_buffer,
private_data_dir = original_dir,
playbook = 'debug.yml'
)
print(transmitter.kwargs)
assert transmitter.kwargs.get('playbook', '') == 'debug.yml'
status, rc = transmitter.run()
assert rc in (None, 0)
assert status == 'unstarted'
outgoing_buffer.seek(0) # rewind so we can start reading
sent = outgoing_buffer.getvalue()
assert sent # should not be blank at least
assert b'zipfile' in sent
incoming_buffer = io.BytesIO()
worker = Worker(
_input = outgoing_buffer,
_output = incoming_buffer,
private_data_dir = worker_dir
)
worker.run()
assert set(os.listdir(worker_dir)) == set(['artifacts', 'inventory', 'project']), outgoing_buffer.getvalue()
incoming_buffer.seek(0) # again, be kind, rewind
processor = Processor(
_input = incoming_buffer,
private_data_dir = process_dir
)
processor.run()
assert set(os.listdir(process_dir)) == set(['artifacts']), outgoing_buffer.getvalue()