Skip to content

Commit

Permalink
add test_process_isolation_executable_transmit
Browse files Browse the repository at this point in the history
test_process_isolation_executable_transmit will validate that transmit will not fail due to missing process isolation executable while still properly pass on the related kwargs to the output

Signed-off-by: Hao Liu <haoli@redhat.com>
  • Loading branch information
TheRealHaoLiu committed Jun 2, 2022
1 parent 5370620 commit 0e67829
Showing 1 changed file with 48 additions and 0 deletions.
48 changes: 48 additions & 0 deletions test/integration/test_transmit_worker_process.py
Expand Up @@ -168,6 +168,54 @@ def process_method(results_sockfile_read):

self.check_artifacts(str(process_dir), job_type)

@pytest.mark.parametrize("process_isolation_executable", ['exist', 'does_not_exist'])
def test_process_isolation_executable_transmit(self, tmp_path, process_isolation_executable, mocker):
"""
Case transmit should not fail if process isolation executable does not exist
"""
def mock_check_isolation_executable_installed(executable):
if executable == "exist":
return True

mocker.patch.object(ansible_runner.interface, 'check_isolation_executable_installed', mock_check_isolation_executable_installed)

job_kwargs = self.get_job_kwargs('run')
job_kwargs['process_isolation'] = True
job_kwargs['process_isolation_executable'] = process_isolation_executable

outgoing_buffer_file = tmp_path / 'buffer_out'
outgoing_buffer_file.touch()
outgoing_buffer = outgoing_buffer_file.open('b+r')

transmitter = ansible_runner.interface.run(
streamer='transmit',
_output=outgoing_buffer,
**job_kwargs,
)

# valide process_isolation kwargs are passed to transmitter
assert transmitter.kwargs['process_isolation'] == job_kwargs['process_isolation']
assert transmitter.kwargs['process_isolation_executable'] == job_kwargs['process_isolation_executable']

# validate that transmit did not fail due to missing process isolation executable
assert transmitter.rc in (None, 0)

# validate that transmit buffer is not empty
outgoing_buffer.seek(0)
sent = outgoing_buffer.read()
assert sent # should not be blank at least

# validate buffer contains kwargs
assert b'kwargs' in sent

# validate kwargs in buffer contain correct process_isolation and process_isolation_executable
for line in sent.decode('utf-8').split('\n'):
if "kwargs" in line:
kwargs = json.loads(line).get("kwargs", {})
assert kwargs['process_isolation'] is True
assert kwargs['process_isolation_executable'] == process_isolation_executable
break


@pytest.fixture
def transmit_stream(project_fixtures, tmp_path):
Expand Down

0 comments on commit 0e67829

Please sign in to comment.