Skip to content

Commit

Permalink
fix(transport): Detect eventlet's Queue monkeypatch and work around it (
Browse files Browse the repository at this point in the history
#484)

* fix(transport): Detect eventlet's Queue monkeypatch and work around it

* fix: Remove nonsense changes

* doc: Add comment

* fix: Fix tests under PyPy

* ref: Unify greenlet/eventlet fixtures
  • Loading branch information
untitaker committed Aug 28, 2019
1 parent ebc00b2 commit 87e5749
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 30 deletions.
23 changes: 20 additions & 3 deletions sentry_sdk/worker.py
Expand Up @@ -45,16 +45,33 @@ def _timed_queue_join(self, timeout):
# type: (float) -> bool
deadline = time() + timeout
queue = self._queue
queue.all_tasks_done.acquire() # type: ignore

real_all_tasks_done = getattr(
queue, "all_tasks_done", None
) # type: Optional[Any]
if real_all_tasks_done is not None:
real_all_tasks_done.acquire()
all_tasks_done = real_all_tasks_done # type: Optional[Any]
elif queue.__module__.startswith("eventlet."):
all_tasks_done = getattr(queue, "_cond", None)
else:
all_tasks_done = None

try:
while queue.unfinished_tasks: # type: ignore
delay = deadline - time()
if delay <= 0:
return False
queue.all_tasks_done.wait(timeout=delay) # type: ignore
if all_tasks_done is not None:
all_tasks_done.wait(timeout=delay)
else:
# worst case, we just poll the number of remaining tasks
sleep(0.1)

return True
finally:
queue.all_tasks_done.release() # type: ignore
if real_all_tasks_done is not None:
real_all_tasks_done.release() # type: ignore

def start(self):
# type: () -> None
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Expand Up @@ -6,3 +6,4 @@ Werkzeug==0.15.3
pytest-localserver==0.4.1
pytest-cov==2.6.0
gevent
eventlet
32 changes: 32 additions & 0 deletions tests/conftest.py
Expand Up @@ -4,6 +4,9 @@

import pytest

import gevent
import eventlet

import sentry_sdk
from sentry_sdk._compat import reraise, string_types, iteritems
from sentry_sdk.transport import Transport
Expand Down Expand Up @@ -100,6 +103,9 @@ def _capture_internal_warnings():
if "Something has already installed a non-asyncio" in str(warning.message):
continue

if "dns.hash" in str(warning.message) or "dns/namedict" in warning.filename:
continue

raise AssertionError(warning)


Expand Down Expand Up @@ -235,3 +241,29 @@ def read_event(self):

def read_flush(self):
assert self.file.readline() == b"flush\n"


# scope=session ensures that fixture is run earlier
@pytest.fixture(scope="session", params=[None, "eventlet", "gevent"])
def maybe_monkeypatched_threading(request):
if request.param == "eventlet":
try:
eventlet.monkey_patch()
except AttributeError as e:
if "'thread.RLock' object has no attribute" in str(e):
# https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7
pytest.skip("https://github.com/eventlet/eventlet/issues/546")
else:
raise
elif request.param == "gevent":
try:
gevent.monkey.patch_all()
except Exception as e:
if "_RLock__owner" in str(e):
pytest.skip("https://github.com/gevent/gevent/issues/1380")
else:
raise
else:
assert request.param is None

return request.param
16 changes: 14 additions & 2 deletions tests/test_transport.py
Expand Up @@ -22,8 +22,19 @@ def inner(*args, **kwargs):


@pytest.mark.parametrize("debug", (True, False))
def test_transport_works(httpserver, request, capsys, caplog, debug, make_client):
@pytest.mark.parametrize("client_flush_method", ["close", "flush"])
def test_transport_works(
httpserver,
request,
capsys,
caplog,
debug,
make_client,
client_flush_method,
maybe_monkeypatched_threading,
):
httpserver.serve_content("ok", 200)

caplog.set_level(logging.DEBUG)

client = make_client(
Expand All @@ -34,7 +45,8 @@ def test_transport_works(httpserver, request, capsys, caplog, debug, make_client

add_breadcrumb(level="info", message="i like bread", timestamp=datetime.now())
capture_message("löl")
client.close()

getattr(client, client_flush_method)()

out, err = capsys.readouterr()
assert not err and not out
Expand Down
31 changes: 6 additions & 25 deletions tests/utils/test_contextvars.py
@@ -1,37 +1,18 @@
import random
import time

import pytest
import gevent


from sentry_sdk.utils import _is_threading_local_monkey_patched


def try_gevent_patch_all():
try:
gevent.monkey.patch_all()
except Exception as e:
if "_RLock__owner" in str(e):
pytest.skip("https://github.com/gevent/gevent/issues/1380")
else:
raise


def test_gevent_is_patched():
try_gevent_patch_all()
assert _is_threading_local_monkey_patched()


def test_gevent_is_not_patched():
assert not _is_threading_local_monkey_patched()

def test_thread_local_is_patched(maybe_monkeypatched_threading):
if maybe_monkeypatched_threading is None:
assert not _is_threading_local_monkey_patched()
else:
assert _is_threading_local_monkey_patched()

@pytest.mark.parametrize("with_gevent", [True, False])
def test_leaks(with_gevent):
if with_gevent:
try_gevent_patch_all()

def test_leaks(maybe_monkeypatched_threading):
import threading

# Need to explicitly call _get_contextvars because the SDK has already
Expand Down

0 comments on commit 87e5749

Please sign in to comment.