From c1e89afd17f9c8d2c3c010d36994bee8e6968773 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ionel=20Cristian=20M=C4=83rie=C8=99?= Date: Wed, 30 Jan 2019 04:08:57 +0200 Subject: [PATCH] Add cleanup_on_signal and allow registering cleanup on multiple signals. Also fix incorrect handling for default handlers of INT/TERM. --- src/pytest_cov/embed.py | 25 +++--- tests/test_pytest_cov.py | 168 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 10 deletions(-) diff --git a/src/pytest_cov/embed.py b/src/pytest_cov/embed.py index fb6a9f37..df28ae67 100644 --- a/src/pytest_cov/embed.py +++ b/src/pytest_cov/embed.py @@ -88,21 +88,26 @@ def cleanup(cov=None): multiprocessing_finish = cleanup # in case someone dared to use this internal -_previous_handler = None +_previous_handlers = {} -def _sigterm_handler(num, frame): +def _signal_cleanup_handler(signum, frame): cleanup() + _previous_handler = _previous_handlers.get(signum) if _previous_handler == signal.SIG_IGN: - pass + return elif _previous_handler: - _previous_handler(num, frame) - else: - raise SystemExit(0) + _previous_handler(signum, frame) + elif signum == signal.SIGTERM: + os._exit(128 + signum) + elif signum == signal.SIGINT: + raise KeyboardInterrupt() -def cleanup_on_sigterm(): - global _previous_handler +def cleanup_on_signal(signum): + _previous_handlers[signum] = signal.getsignal(signum) + signal.signal(signum, _signal_cleanup_handler) + - _previous_handler = signal.getsignal(signal.SIGTERM) - signal.signal(signal.SIGTERM, _sigterm_handler) +def cleanup_on_sigterm(): + cleanup_on_signal(signal.SIGTERM) diff --git a/tests/test_pytest_cov.py b/tests/test_pytest_cov.py index da44e19a..8809eaed 100644 --- a/tests/test_pytest_cov.py +++ b/tests/test_pytest_cov.py @@ -986,6 +986,174 @@ def test_run_target(): ]) assert result.ret == 0 + +@pytest.mark.skipif('sys.platform == "win32"', + reason="fork not available on Windows") +def test_cleanup_on_sigterm(testdir): + script = testdir.makepyfile(''' +import os, signal, subprocess, sys, time + +def cleanup(num, frame): + print("num == signal.SIGTERM => %s" % (num == signal.SIGTERM)) + raise Exception() + +def test_run(): + proc = subprocess.Popen([sys.executable, __file__], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + time.sleep(1) + proc.terminate() + stdout, stderr = proc.communicate() + assert not stderr + assert stdout == b"""num == signal.SIGTERM => True +captured Exception() +""" + assert proc.returncode == 0 + +if __name__ == "__main__": + signal.signal(signal.SIGTERM, cleanup) + + from pytest_cov.embed import cleanup_on_sigterm + cleanup_on_sigterm() + + try: + time.sleep(10) + except BaseException as exc: + print("captured %r" % exc) +''') + + result = testdir.runpytest('-vv', + '--cov=%s' % script.dirpath(), + '--cov-report=term-missing', + script) + + result.stdout.fnmatch_lines([ + '*- coverage: platform *, python * -*', + 'test_cleanup_on_sigterm* 26-27', + '*1 passed*' + ]) + assert result.ret == 0 + + +@pytest.mark.skipif('sys.platform == "win32"', reason="fork not available on Windows") +@pytest.mark.parametrize('setup', [ + ('signal.signal(signal.SIGTERM, signal.SIG_DFL); cleanup_on_sigterm()', '88% 18-19'), + ('cleanup_on_sigterm()', '88% 18-19'), + ('cleanup()', '75% 16-19'), +]) +def test_cleanup_on_sigterm_sig_dfl(testdir, setup): + script = testdir.makepyfile(''' +import os, signal, subprocess, sys, time + +def test_run(): + proc = subprocess.Popen([sys.executable, __file__], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + time.sleep(1) + proc.terminate() + stdout, stderr = proc.communicate() + assert not stderr + assert stdout == b"" + assert proc.returncode in [128 + signal.SIGTERM, -signal.SIGTERM] + +if __name__ == "__main__": + from pytest_cov.embed import cleanup_on_sigterm, cleanup + {0} + + try: + time.sleep(10) + except BaseException as exc: + print("captured %r" % exc) +'''.format(setup[0])) + + result = testdir.runpytest('-vv', + '--cov=%s' % script.dirpath(), + '--cov-report=term-missing', + script) + + result.stdout.fnmatch_lines([ + '*- coverage: platform *, python * -*', + 'test_cleanup_on_sigterm* %s' % setup[1], + '*1 passed*' + ]) + assert result.ret == 0 + + +@pytest.mark.skipif('sys.platform == "win32"', reason="fork not available on Windows") +def test_cleanup_on_sigterm_sig_dfl_sigint(testdir): + script = testdir.makepyfile(''' +import os, signal, subprocess, sys, time + +def test_run(): + proc = subprocess.Popen([sys.executable, __file__], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + time.sleep(1) + proc.send_signal(signal.SIGINT) + stdout, stderr = proc.communicate() + assert not stderr + assert stdout == b"""captured KeyboardInterrupt() +""" + assert proc.returncode == 0 + +if __name__ == "__main__": + from pytest_cov.embed import cleanup_on_signal + cleanup_on_signal(signal.SIGINT) + + try: + time.sleep(10) + except BaseException as exc: + print("captured %r" % exc) +''') + + result = testdir.runpytest('-vv', + '--cov=%s' % script.dirpath(), + '--cov-report=term-missing', + script) + + result.stdout.fnmatch_lines([ + '*- coverage: platform *, python * -*', + 'test_cleanup_on_sigterm* 88% 19-20', + '*1 passed*' + ]) + assert result.ret == 0 + +@pytest.mark.skipif('sys.platform == "win32"', reason="fork not available on Windows") +def test_cleanup_on_sigterm_sig_ign(testdir): + script = testdir.makepyfile(''' +import os, signal, subprocess, sys, time + +def test_run(): + proc = subprocess.Popen([sys.executable, __file__], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + time.sleep(1) + proc.send_signal(signal.SIGINT) + time.sleep(1) + proc.terminate() + stdout, stderr = proc.communicate() + assert not stderr + assert stdout == b"" + # it appears signal handling is buggy on python 2? + if sys.version_info == 3: assert proc.returncode in [128 + signal.SIGTERM, -signal.SIGTERM] + +if __name__ == "__main__": + signal.signal(signal.SIGINT, signal.SIG_IGN) + + from pytest_cov.embed import cleanup_on_signal + cleanup_on_signal(signal.SIGINT) + + try: + time.sleep(10) + except BaseException as exc: + print("captured %r" % exc) + ''') + + result = testdir.runpytest('-vv', + '--cov=%s' % script.dirpath(), + '--cov-report=term-missing', + script) + + result.stdout.fnmatch_lines([ + '*- coverage: platform *, python * -*', + 'test_cleanup_on_sigterm* 89% 23-24', + '*1 passed*' + ]) + assert result.ret == 0 + + MODULE = ''' def func(): return 1