From 87e574900533e0affcd3f5a84aca96c3a58d4aeb Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 28 Aug 2019 09:38:41 +0200 Subject: [PATCH] fix(transport): Detect eventlet's Queue monkeypatch and work around it (#484) * fix(transport): Detect eventlet's Queue monkeypatch and work around it * fix: Remove nonsense changes * doc: Add comment * fix: Fix tests under PyPy * ref: Unify greenlet/eventlet fixtures --- sentry_sdk/worker.py | 23 ++++++++++++++++++++--- test-requirements.txt | 1 + tests/conftest.py | 32 ++++++++++++++++++++++++++++++++ tests/test_transport.py | 16 ++++++++++++++-- tests/utils/test_contextvars.py | 31 ++++++------------------------- 5 files changed, 73 insertions(+), 30 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 92ba8f184f..304a77faf8 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -45,16 +45,33 @@ def _timed_queue_join(self, timeout): # type: (float) -> bool deadline = time() + timeout queue = self._queue - queue.all_tasks_done.acquire() # type: ignore + + real_all_tasks_done = getattr( + queue, "all_tasks_done", None + ) # type: Optional[Any] + if real_all_tasks_done is not None: + real_all_tasks_done.acquire() + all_tasks_done = real_all_tasks_done # type: Optional[Any] + elif queue.__module__.startswith("eventlet."): + all_tasks_done = getattr(queue, "_cond", None) + else: + all_tasks_done = None + try: while queue.unfinished_tasks: # type: ignore delay = deadline - time() if delay <= 0: return False - queue.all_tasks_done.wait(timeout=delay) # type: ignore + if all_tasks_done is not None: + all_tasks_done.wait(timeout=delay) + else: + # worst case, we just poll the number of remaining tasks + sleep(0.1) + return True finally: - queue.all_tasks_done.release() # type: ignore + if real_all_tasks_done is not None: + real_all_tasks_done.release() # type: ignore def start(self): # type: () -> None diff --git a/test-requirements.txt b/test-requirements.txt index d4fafb53dc..7df9102ce8 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,3 +6,4 @@ Werkzeug==0.15.3 pytest-localserver==0.4.1 pytest-cov==2.6.0 gevent +eventlet diff --git a/tests/conftest.py b/tests/conftest.py index bd0b40ad8f..0f10f037e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,9 @@ import pytest +import gevent +import eventlet + import sentry_sdk from sentry_sdk._compat import reraise, string_types, iteritems from sentry_sdk.transport import Transport @@ -100,6 +103,9 @@ def _capture_internal_warnings(): if "Something has already installed a non-asyncio" in str(warning.message): continue + if "dns.hash" in str(warning.message) or "dns/namedict" in warning.filename: + continue + raise AssertionError(warning) @@ -235,3 +241,29 @@ def read_event(self): def read_flush(self): assert self.file.readline() == b"flush\n" + + +# scope=session ensures that fixture is run earlier +@pytest.fixture(scope="session", params=[None, "eventlet", "gevent"]) +def maybe_monkeypatched_threading(request): + if request.param == "eventlet": + try: + eventlet.monkey_patch() + except AttributeError as e: + if "'thread.RLock' object has no attribute" in str(e): + # https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7 + pytest.skip("https://github.com/eventlet/eventlet/issues/546") + else: + raise + elif request.param == "gevent": + try: + gevent.monkey.patch_all() + except Exception as e: + if "_RLock__owner" in str(e): + pytest.skip("https://github.com/gevent/gevent/issues/1380") + else: + raise + else: + assert request.param is None + + return request.param diff --git a/tests/test_transport.py b/tests/test_transport.py index 3a1d8c88ae..a90aea5162 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -22,8 +22,19 @@ def inner(*args, **kwargs): @pytest.mark.parametrize("debug", (True, False)) -def test_transport_works(httpserver, request, capsys, caplog, debug, make_client): +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +def test_transport_works( + httpserver, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + maybe_monkeypatched_threading, +): httpserver.serve_content("ok", 200) + caplog.set_level(logging.DEBUG) client = make_client( @@ -34,7 +45,8 @@ def test_transport_works(httpserver, request, capsys, caplog, debug, make_client add_breadcrumb(level="info", message="i like bread", timestamp=datetime.now()) capture_message("löl") - client.close() + + getattr(client, client_flush_method)() out, err = capsys.readouterr() assert not err and not out diff --git a/tests/utils/test_contextvars.py b/tests/utils/test_contextvars.py index 3a926c08c9..62344f1409 100644 --- a/tests/utils/test_contextvars.py +++ b/tests/utils/test_contextvars.py @@ -1,37 +1,18 @@ import random import time -import pytest -import gevent - from sentry_sdk.utils import _is_threading_local_monkey_patched -def try_gevent_patch_all(): - try: - gevent.monkey.patch_all() - except Exception as e: - if "_RLock__owner" in str(e): - pytest.skip("https://github.com/gevent/gevent/issues/1380") - else: - raise - - -def test_gevent_is_patched(): - try_gevent_patch_all() - assert _is_threading_local_monkey_patched() - - -def test_gevent_is_not_patched(): - assert not _is_threading_local_monkey_patched() - +def test_thread_local_is_patched(maybe_monkeypatched_threading): + if maybe_monkeypatched_threading is None: + assert not _is_threading_local_monkey_patched() + else: + assert _is_threading_local_monkey_patched() -@pytest.mark.parametrize("with_gevent", [True, False]) -def test_leaks(with_gevent): - if with_gevent: - try_gevent_patch_all() +def test_leaks(maybe_monkeypatched_threading): import threading # Need to explicitly call _get_contextvars because the SDK has already