From 5716511fb1204cf39b563a25e47b7bff6fe15172 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 27 Aug 2019 09:43:50 +0200 Subject: [PATCH 1/5] fix(transport): Detect eventlet's Queue monkeypatch and work around it --- sentry_sdk/worker.py | 38 +++++++++++++++++++++++++++----------- test-requirements.txt | 1 + tests/conftest.py | 3 +++ tests/test_transport.py | 27 ++++++++++++++++++++++++--- 4 files changed, 55 insertions(+), 14 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 92ba8f184f..86c4361f36 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,7 +1,7 @@ import os -from threading import Thread, Lock -from time import sleep, time +import threading +import time from sentry_sdk._compat import queue, check_thread_support from sentry_sdk.utils import logger @@ -23,8 +23,8 @@ def __init__(self): # type: () -> None check_thread_support() self._queue = queue.Queue(-1) # type: Queue[Any] - self._lock = Lock() - self._thread = None # type: Optional[Thread] + self._lock = threading.Lock() + self._thread = None # type: Optional[threading.Thread] self._thread_for_pid = None # type: Optional[int] @property @@ -43,24 +43,40 @@ def _ensure_thread(self): def _timed_queue_join(self, timeout): # type: (float) -> bool - deadline = time() + timeout + deadline = time.time() + timeout queue = self._queue - queue.all_tasks_done.acquire() # type: ignore + + real_all_tasks_done = getattr( + queue, "all_tasks_done", None + ) # type: Optional[Any] + if real_all_tasks_done is not None: + real_all_tasks_done.acquire() + all_tasks_done = real_all_tasks_done # type: Optional[Any] + else: + # eventlet + all_tasks_done = getattr(queue, "_cond", None) + try: while queue.unfinished_tasks: # type: ignore - delay = deadline - time() + delay = deadline - time.time() if delay <= 0: return False - queue.all_tasks_done.wait(timeout=delay) # type: ignore + if all_tasks_done is not None: + all_tasks_done.wait(timeout=delay) + else: + # worst case, we just poll the number of remaining tasks + sleep(0.1) + return True finally: - queue.all_tasks_done.release() # type: ignore + if real_all_tasks_done is not None: + real_all_tasks_done.release() # type: ignore def start(self): # type: () -> None with self._lock: if not self.is_alive: - self._thread = Thread( + self._thread = threading.Thread( target=self._target, name="raven-sentry.BackgroundWorker" ) self._thread.setDaemon(True) @@ -112,4 +128,4 @@ def _target(self): logger.error("Failed processing job", exc_info=True) finally: self._queue.task_done() - sleep(0) + # time.sleep(0) diff --git a/test-requirements.txt b/test-requirements.txt index 64a551c367..a89b39e6f7 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,3 +6,4 @@ Werkzeug==0.14.1 pytest-localserver==0.4.1 pytest-cov==2.6.0 gevent +eventlet diff --git a/tests/conftest.py b/tests/conftest.py index bd0b40ad8f..8b26387c71 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -100,6 +100,9 @@ def _capture_internal_warnings(): if "Something has already installed a non-asyncio" in str(warning.message): continue + if "dns.hash" in str(warning.message) or "dns/namedict" in warning.filename: + continue + raise AssertionError(warning) diff --git a/tests/test_transport.py b/tests/test_transport.py index 3a1d8c88ae..7e3d6c180b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -4,14 +4,21 @@ from datetime import datetime +import eventlet import pytest -from sentry_sdk import Hub, Client, add_breadcrumb, capture_message + +@pytest.fixture(scope="session", params=[True, False]) +def eventlet_maybe_patched(request): + if request.param: + eventlet.monkey_patch() @pytest.fixture(params=[True, False]) def make_client(request): def inner(*args, **kwargs): + from sentry_sdk import Client + client = Client(*args, **kwargs) if request.param: client = pickle.loads(pickle.dumps(client)) @@ -22,8 +29,21 @@ def inner(*args, **kwargs): @pytest.mark.parametrize("debug", (True, False)) -def test_transport_works(httpserver, request, capsys, caplog, debug, make_client): +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +def test_transport_works( + httpserver, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + eventlet_maybe_patched, +): httpserver.serve_content("ok", 200) + + from sentry_sdk import Hub, Client, add_breadcrumb, capture_message + caplog.set_level(logging.DEBUG) client = make_client( @@ -34,7 +54,8 @@ def test_transport_works(httpserver, request, capsys, caplog, debug, make_client add_breadcrumb(level="info", message="i like bread", timestamp=datetime.now()) capture_message("löl") - client.close() + + getattr(client, client_flush_method)() out, err = capsys.readouterr() assert not err and not out From 7edb91271a16a72419b784dd736a4dac0c4373d5 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 27 Aug 2019 10:07:08 +0200 Subject: [PATCH 2/5] fix: Remove nonsense changes --- sentry_sdk/worker.py | 16 ++++++++-------- tests/test_transport.py | 6 ++---- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 86c4361f36..6199e77b3c 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,7 +1,7 @@ import os -import threading -import time +from threading import Thread, Lock +from time import sleep, time from sentry_sdk._compat import queue, check_thread_support from sentry_sdk.utils import logger @@ -23,8 +23,8 @@ def __init__(self): # type: () -> None check_thread_support() self._queue = queue.Queue(-1) # type: Queue[Any] - self._lock = threading.Lock() - self._thread = None # type: Optional[threading.Thread] + self._lock = Lock() + self._thread = None # type: Optional[Thread] self._thread_for_pid = None # type: Optional[int] @property @@ -43,7 +43,7 @@ def _ensure_thread(self): def _timed_queue_join(self, timeout): # type: (float) -> bool - deadline = time.time() + timeout + deadline = time() + timeout queue = self._queue real_all_tasks_done = getattr( @@ -58,7 +58,7 @@ def _timed_queue_join(self, timeout): try: while queue.unfinished_tasks: # type: ignore - delay = deadline - time.time() + delay = deadline - time() if delay <= 0: return False if all_tasks_done is not None: @@ -76,7 +76,7 @@ def start(self): # type: () -> None with self._lock: if not self.is_alive: - self._thread = threading.Thread( + self._thread = Thread( target=self._target, name="raven-sentry.BackgroundWorker" ) self._thread.setDaemon(True) @@ -128,4 +128,4 @@ def _target(self): logger.error("Failed processing job", exc_info=True) finally: self._queue.task_done() - # time.sleep(0) + sleep(0) diff --git a/tests/test_transport.py b/tests/test_transport.py index 7e3d6c180b..959ea8dffe 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -7,6 +7,8 @@ import eventlet import pytest +from sentry_sdk import Hub, Client, add_breadcrumb, capture_message + @pytest.fixture(scope="session", params=[True, False]) def eventlet_maybe_patched(request): @@ -17,8 +19,6 @@ def eventlet_maybe_patched(request): @pytest.fixture(params=[True, False]) def make_client(request): def inner(*args, **kwargs): - from sentry_sdk import Client - client = Client(*args, **kwargs) if request.param: client = pickle.loads(pickle.dumps(client)) @@ -42,8 +42,6 @@ def test_transport_works( ): httpserver.serve_content("ok", 200) - from sentry_sdk import Hub, Client, add_breadcrumb, capture_message - caplog.set_level(logging.DEBUG) client = make_client( From af4bebb62b470991e15c58d3ff10ad2fc23e1c1c Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 27 Aug 2019 10:09:35 +0200 Subject: [PATCH 3/5] doc: Add comment --- tests/test_transport.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_transport.py b/tests/test_transport.py index 959ea8dffe..bb13923c02 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -10,6 +10,7 @@ from sentry_sdk import Hub, Client, add_breadcrumb, capture_message +# scope=session ensures that fixture is run earlier @pytest.fixture(scope="session", params=[True, False]) def eventlet_maybe_patched(request): if request.param: From 8f1207a3ad036a3576cd170209f515bc4b191bc8 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 27 Aug 2019 12:11:30 +0200 Subject: [PATCH 4/5] fix: Fix tests under PyPy --- tests/test_transport.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_transport.py b/tests/test_transport.py index bb13923c02..b64f54f866 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -14,7 +14,12 @@ @pytest.fixture(scope="session", params=[True, False]) def eventlet_maybe_patched(request): if request.param: - eventlet.monkey_patch() + try: + eventlet.monkey_patch() + except AttributeError as e: + if "'thread.RLock' object has no attribute" in str(e): + # https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7 + pytest.skip("https://github.com/eventlet/eventlet/issues/546") @pytest.fixture(params=[True, False]) From 72aa688449f620ba158c043c231bd14e31964576 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Tue, 27 Aug 2019 13:47:38 +0200 Subject: [PATCH 5/5] ref: Unify greenlet/eventlet fixtures --- sentry_sdk/worker.py | 5 +++-- tests/conftest.py | 29 +++++++++++++++++++++++++++++ tests/test_transport.py | 15 +-------------- tests/utils/test_contextvars.py | 31 ++++++------------------------- 4 files changed, 39 insertions(+), 41 deletions(-) diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index 6199e77b3c..304a77faf8 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -52,9 +52,10 @@ def _timed_queue_join(self, timeout): if real_all_tasks_done is not None: real_all_tasks_done.acquire() all_tasks_done = real_all_tasks_done # type: Optional[Any] - else: - # eventlet + elif queue.__module__.startswith("eventlet."): all_tasks_done = getattr(queue, "_cond", None) + else: + all_tasks_done = None try: while queue.unfinished_tasks: # type: ignore diff --git a/tests/conftest.py b/tests/conftest.py index 8b26387c71..0f10f037e7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,9 @@ import pytest +import gevent +import eventlet + import sentry_sdk from sentry_sdk._compat import reraise, string_types, iteritems from sentry_sdk.transport import Transport @@ -238,3 +241,29 @@ def read_event(self): def read_flush(self): assert self.file.readline() == b"flush\n" + + +# scope=session ensures that fixture is run earlier +@pytest.fixture(scope="session", params=[None, "eventlet", "gevent"]) +def maybe_monkeypatched_threading(request): + if request.param == "eventlet": + try: + eventlet.monkey_patch() + except AttributeError as e: + if "'thread.RLock' object has no attribute" in str(e): + # https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7 + pytest.skip("https://github.com/eventlet/eventlet/issues/546") + else: + raise + elif request.param == "gevent": + try: + gevent.monkey.patch_all() + except Exception as e: + if "_RLock__owner" in str(e): + pytest.skip("https://github.com/gevent/gevent/issues/1380") + else: + raise + else: + assert request.param is None + + return request.param diff --git a/tests/test_transport.py b/tests/test_transport.py index b64f54f866..a90aea5162 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -4,24 +4,11 @@ from datetime import datetime -import eventlet import pytest from sentry_sdk import Hub, Client, add_breadcrumb, capture_message -# scope=session ensures that fixture is run earlier -@pytest.fixture(scope="session", params=[True, False]) -def eventlet_maybe_patched(request): - if request.param: - try: - eventlet.monkey_patch() - except AttributeError as e: - if "'thread.RLock' object has no attribute" in str(e): - # https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7 - pytest.skip("https://github.com/eventlet/eventlet/issues/546") - - @pytest.fixture(params=[True, False]) def make_client(request): def inner(*args, **kwargs): @@ -44,7 +31,7 @@ def test_transport_works( debug, make_client, client_flush_method, - eventlet_maybe_patched, + maybe_monkeypatched_threading, ): httpserver.serve_content("ok", 200) diff --git a/tests/utils/test_contextvars.py b/tests/utils/test_contextvars.py index 3a926c08c9..62344f1409 100644 --- a/tests/utils/test_contextvars.py +++ b/tests/utils/test_contextvars.py @@ -1,37 +1,18 @@ import random import time -import pytest -import gevent - from sentry_sdk.utils import _is_threading_local_monkey_patched -def try_gevent_patch_all(): - try: - gevent.monkey.patch_all() - except Exception as e: - if "_RLock__owner" in str(e): - pytest.skip("https://github.com/gevent/gevent/issues/1380") - else: - raise - - -def test_gevent_is_patched(): - try_gevent_patch_all() - assert _is_threading_local_monkey_patched() - - -def test_gevent_is_not_patched(): - assert not _is_threading_local_monkey_patched() - +def test_thread_local_is_patched(maybe_monkeypatched_threading): + if maybe_monkeypatched_threading is None: + assert not _is_threading_local_monkey_patched() + else: + assert _is_threading_local_monkey_patched() -@pytest.mark.parametrize("with_gevent", [True, False]) -def test_leaks(with_gevent): - if with_gevent: - try_gevent_patch_all() +def test_leaks(maybe_monkeypatched_threading): import threading # Need to explicitly call _get_contextvars because the SDK has already