Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(transport): Detect eventlet's Queue monkeypatch and work around it #484

Merged
merged 5 commits into from Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 20 additions & 3 deletions sentry_sdk/worker.py
Expand Up @@ -45,16 +45,33 @@ def _timed_queue_join(self, timeout):
# type: (float) -> bool
deadline = time() + timeout
queue = self._queue
queue.all_tasks_done.acquire() # type: ignore

real_all_tasks_done = getattr(
queue, "all_tasks_done", None
) # type: Optional[Any]
if real_all_tasks_done is not None:
real_all_tasks_done.acquire()
all_tasks_done = real_all_tasks_done # type: Optional[Any]
elif queue.__module__.startswith("eventlet."):
all_tasks_done = getattr(queue, "_cond", None)
else:
all_tasks_done = None

try:
while queue.unfinished_tasks: # type: ignore
delay = deadline - time()
if delay <= 0:
return False
queue.all_tasks_done.wait(timeout=delay) # type: ignore
if all_tasks_done is not None:
all_tasks_done.wait(timeout=delay)
else:
# worst case, we just poll the number of remaining tasks
sleep(0.1)

return True
finally:
queue.all_tasks_done.release() # type: ignore
if real_all_tasks_done is not None:
real_all_tasks_done.release() # type: ignore

def start(self):
# type: () -> None
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Expand Up @@ -6,3 +6,4 @@ Werkzeug==0.14.1
pytest-localserver==0.4.1
pytest-cov==2.6.0
gevent
eventlet
32 changes: 32 additions & 0 deletions tests/conftest.py
Expand Up @@ -4,6 +4,9 @@

import pytest

import gevent
import eventlet

import sentry_sdk
from sentry_sdk._compat import reraise, string_types, iteritems
from sentry_sdk.transport import Transport
Expand Down Expand Up @@ -100,6 +103,9 @@ def _capture_internal_warnings():
if "Something has already installed a non-asyncio" in str(warning.message):
continue

if "dns.hash" in str(warning.message) or "dns/namedict" in warning.filename:
continue

raise AssertionError(warning)


Expand Down Expand Up @@ -235,3 +241,29 @@ def read_event(self):

def read_flush(self):
assert self.file.readline() == b"flush\n"


# scope=session ensures that fixture is run earlier
@pytest.fixture(scope="session", params=[None, "eventlet", "gevent"])
def maybe_monkeypatched_threading(request):
if request.param == "eventlet":
try:
eventlet.monkey_patch()
except AttributeError as e:
if "'thread.RLock' object has no attribute" in str(e):
# https://bitbucket.org/pypy/pypy/issues/2962/gevent-cannot-patch-rlock-under-pypy-27-7
pytest.skip("https://github.com/eventlet/eventlet/issues/546")
else:
raise
elif request.param == "gevent":
try:
gevent.monkey.patch_all()
except Exception as e:
if "_RLock__owner" in str(e):
pytest.skip("https://github.com/gevent/gevent/issues/1380")
else:
raise
else:
assert request.param is None

return request.param
16 changes: 14 additions & 2 deletions tests/test_transport.py
Expand Up @@ -22,8 +22,19 @@ def inner(*args, **kwargs):


@pytest.mark.parametrize("debug", (True, False))
def test_transport_works(httpserver, request, capsys, caplog, debug, make_client):
@pytest.mark.parametrize("client_flush_method", ["close", "flush"])
def test_transport_works(
httpserver,
request,
capsys,
caplog,
debug,
make_client,
client_flush_method,
maybe_monkeypatched_threading,
):
httpserver.serve_content("ok", 200)

caplog.set_level(logging.DEBUG)

client = make_client(
Expand All @@ -34,7 +45,8 @@ def test_transport_works(httpserver, request, capsys, caplog, debug, make_client

add_breadcrumb(level="info", message="i like bread", timestamp=datetime.now())
capture_message("löl")
client.close()

getattr(client, client_flush_method)()

out, err = capsys.readouterr()
assert not err and not out
Expand Down
31 changes: 6 additions & 25 deletions tests/utils/test_contextvars.py
@@ -1,37 +1,18 @@
import random
import time

import pytest
import gevent


from sentry_sdk.utils import _is_threading_local_monkey_patched


def try_gevent_patch_all():
try:
gevent.monkey.patch_all()
except Exception as e:
if "_RLock__owner" in str(e):
pytest.skip("https://github.com/gevent/gevent/issues/1380")
else:
raise


def test_gevent_is_patched():
try_gevent_patch_all()
assert _is_threading_local_monkey_patched()


def test_gevent_is_not_patched():
assert not _is_threading_local_monkey_patched()

def test_thread_local_is_patched(maybe_monkeypatched_threading):
if maybe_monkeypatched_threading is None:
assert not _is_threading_local_monkey_patched()
else:
assert _is_threading_local_monkey_patched()

@pytest.mark.parametrize("with_gevent", [True, False])
def test_leaks(with_gevent):
if with_gevent:
try_gevent_patch_all()

def test_leaks(maybe_monkeypatched_threading):
import threading

# Need to explicitly call _get_contextvars because the SDK has already
Expand Down