Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AsyncResult behave more like in older versions in cross-thread usage #1743

Merged
merged 5 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/changes/1739.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Make ``AsyncResult`` print a warning when it detects improper
cross-thread usage instead of hanging.

``AsyncResult`` has *never* been safe to use from multiple threads.
It, like most gevent objects, is intended to work with greenlets from
a single thread. Using ``AsyncResult`` from multiple threads has
undefined semantics. The safest way to communicate between threads is
using an event loop async watcher.

Those undefined semantics changed in recent gevent versions, making it
more likely that an abused ``AsyncResult`` would misbehave in ways
that could cause the program to hang.

Now, when ``AsyncResult`` detects a situation that would hang, it
prints a warning to stderr. Note that this is best-effort, and hangs
are still possible, especially under PyPy 7.3.3.

At the same time, ``AsyncResult`` is tuned to behave more like it did
in older versions, meaning that the hang is once again much less
likely. If you were getting lucky and using ``AsyncResult``
successfully across threads, this may restore your luck.
94 changes: 87 additions & 7 deletions src/gevent/_abstract_linkable.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from __future__ import print_function

import sys
from gc import get_objects

from greenlet import greenlet
from greenlet import error as greenlet_error

from gevent._compat import thread_mod_name
Expand Down Expand Up @@ -40,6 +42,15 @@ class _FakeNotifier(object):
def __init__(self):
self.pending = False

def get_roots_and_hubs():
from gevent.hub import Hub # delay import
return {
x.parent: x
for x in get_objects()
if isinstance(x, Hub)
}


class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and
Expand Down Expand Up @@ -328,13 +339,82 @@ def _notify_links(self, arrived_while_waiting):
# must have more links than we started with. We need to schedule the
# wakeup.
self._check_and_notify()
# If we added unswitched greenlets, however, don't add them back to the links yet.
# We wouldn't be able to call them in this hub anyway.
# TODO: Instead of just adding these back to self._links, we should try to detect their
# "home" hub and mode the callback to that hub. As it stands, there's a chance that
# if no greenlet tries to acquire/release this object in that hub, these objects
# will never get to run.
self._links.extend(unswitched)
if unswitched:
self._handle_unswitched_notifications(unswitched)


def _handle_unswitched_notifications(self, unswitched):
# Given a list of callable objects that raised
# ``greenlet.error`` when we called them: If we can determine
# that it is a parked greenlet (the callablle is a
# ``greenlet.switch`` method) and we can determine the hub
# that the greenlet belongs to (either its parent, or, in the
# case of a main greenlet, find a hub with the same parent as
# this greenlet object) then:

# Move this to be a callback in that thread.
# (This relies on holding the GIL *or* ``Hub.loop.run_callback`` being
# thread-safe! Note that the CFFI implementations are definitely
# NOT thread-safe. TODO: Make them? Or an alternative?)
#
# Otherwise, print some error messages.

# TODO: Inline this for individual links. That handles the
# "only while ready" case automatically. Be careful about locking in that case.
#
# TODO: Add a 'strict' mode that prevents doing this dance, since it's
# inherently not safe.
root_greenlets = None
printed_tb = False
only_while_ready = not self._notify_all

while unswitched:
if only_while_ready and not self.ready():
self.__print_unswitched_warning(unswitched, printed_tb)
break

link = unswitched.pop(0)

hub = None # Also serves as a "handled?" flag
# Is it a greenlet.switch method?
if (getattr(link, '__name__', None) == 'switch'
and isinstance(getattr(link, '__self__', None), greenlet)):
glet = link.__self__
parent = glet.parent

while parent is not None:
if hasattr(parent, 'loop'): # Assuming the hub.
hub = glet.parent
break
parent = glet.parent

if hub is None:
if root_greenlets is None:
root_greenlets = get_roots_and_hubs()
hub = root_greenlets.get(glet)

if hub is not None:
hub.loop.run_callback(link, self)
if hub is None:
# We couldn't handle it
self.__print_unswitched_warning(link, printed_tb)
printed_tb = True


def __print_unswitched_warning(self, link, printed_tb):
print('gevent: error: Unable to switch to greenlet', link,
'from', self, '; crossing thread boundaries is not allowed.',
file=sys.stderr)

if not printed_tb:
printed_tb = True
print(
'gevent: error: '
'This is a result of using gevent objects from multiple threads,',
'and is a bug in the calling code.', file=sys.stderr)

import traceback
traceback.print_stack()

def _quiet_unlink_all(self, obj):
if obj is None:
Expand Down
6 changes: 6 additions & 0 deletions src/gevent/_gevent_c_abstract_linkable.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ cdef InvalidThreadUseError
cdef Timeout
cdef _get_thread_ident
cdef bint _greenlet_imported
cdef get_objects

cdef extern from "greenlet/greenlet.h":

Expand All @@ -32,6 +33,8 @@ cdef inline void greenlet_init():

cdef void _init()

cdef dict get_roots_and_hubs()

cdef class _FakeNotifier(object):
cdef bint pending

Expand Down Expand Up @@ -66,6 +69,9 @@ cdef class AbstractLinkable(object):
@cython.nonecheck(False)
cpdef _notify_links(self, list arrived_while_waiting)

cdef _handle_unswitched_notifications(self, list unswitched)
cdef __print_unswitched_warning(self, link, bint printed_tb)

cpdef _drop_lock_for_switch_out(self)
cpdef _acquire_lock_for_switch_in(self)

Expand Down
25 changes: 25 additions & 0 deletions src/gevent/_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,31 @@ def __enter__(self):
def __exit__(self, t, v, tb):
self.release()

def _handle_unswitched_notifications(self, unswitched):
# If we fail to switch to a greenlet in another thread to send
# a notification, just re-queue it, in the hopes that the
# other thread will eventually run notifications itself.
#
# We CANNOT do what the ``super()`` does and actually allow
# this notification to get run sometime in the future by
# scheduling a callback in the other thread. The algorithm
# that we use to handle cross-thread locking/unlocking was
# designed before the schedule-a-callback mechanism was
# implemented. If we allow this to be run as a callback, we
# can find ourself the victim of ``InvalidSwitchError`` (or
# worse, silent corruption) because the switch can come at an
# unexpected time: *after* the destination thread has already
# acquired the lock.
#
# This manifests in a fairly reliable test failure,
# ``gevent.tests.test__semaphore``
# ``TestSemaphoreMultiThread.test_dueling_threads_with_hub``,
# but ONLY when running in PURE_PYTHON mode.
#
# TODO: Maybe we can rewrite that part of the algorithm to be friendly to
# running the callbacks?
self._links.extend(unswitched)

def __add_link(self, link):
if not self._notifier:
self.rawlink(link)
Expand Down
53 changes: 38 additions & 15 deletions src/gevent/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class Event(AbstractLinkable): # pylint:disable=undefined-variable
one or more others. It has the same interface as
:class:`threading.Event` but works across greenlets.

.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.

An event object manages an internal flag that can be set to true
with the :meth:`set` method and reset to false with the
:meth:`clear` method. The :meth:`wait` method blocks until the
Expand Down Expand Up @@ -166,22 +170,30 @@ def _reset_internal_locks(self): # pragma: no cover


class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
"""A one-time event that stores a value or an exception.
"""
A one-time event that stores a value or an exception.

Like :class:`Event` it wakes up all the waiters when :meth:`set`
or :meth:`set_exception` is called. Waiters may receive the passed
value or exception by calling :meth:`get` instead of :meth:`wait`.
An :class:`AsyncResult` instance cannot be reset.

Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception`
is called. Waiters may receive the passed value or exception by calling :meth:`get`
instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset.
.. important::
This object is for communicating among greenlets within the
same thread *only*! Do not try to use it to communicate across threads.

To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as
those made in the future) will return the value:
To pass a value call :meth:`set`. Calls to :meth:`get` (those that
are currently blocking as well as those made in the future) will
return the value::

>>> from gevent.event import AsyncResult
>>> result = AsyncResult()
>>> result.set(100)
>>> result.get()
100

To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception:
To pass an exception call :meth:`set_exception`. This will cause
:meth:`get` to raise that exception::

>>> result = AsyncResult()
>>> result.set_exception(RuntimeError('failure'))
Expand All @@ -190,7 +202,8 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
...
RuntimeError: failure

:class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target:
:class:`AsyncResult` implements :meth:`__call__` and thus can be
used as :meth:`link` target::

>>> import gevent
>>> result = AsyncResult()
Expand All @@ -202,23 +215,33 @@ class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
ZeroDivisionError

.. note::

The order and timing in which waiting greenlets are awakened is not determined.
As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
(those not waiting to be awakened) may run between the current greenlet yielding and
the waiting greenlets being awakened. These details may change in the future.

.. versionchanged:: 1.1
The exact order in which waiting greenlets are awakened is not the same
as in 1.0.

The exact order in which waiting greenlets
are awakened is not the same as in 1.0.

.. versionchanged:: 1.1
Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
merged.

Callbacks :meth:`linked <rawlink>` to this object are required to
be hashable, and duplicates are merged.

.. versionchanged:: 1.5a3
Waiting greenlets are now awakened in the order in which they waited.

Waiting greenlets are now awakened in the order in which they
waited.

.. versionchanged:: 1.5a3
The low-level ``rawlink`` method (most users won't use this) now automatically
unlinks waiters before calling them.

The low-level ``rawlink`` method
(most users won't use this) now automatically unlinks waiters
before calling them.
"""

__slots__ = ('_value', '_exc_info', '_imap_task_index')
Expand Down
14 changes: 14 additions & 0 deletions src/gevent/monkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def ignores(*args, **kwargs):

# maps module name -> {attribute name: original item}
# e.g. "time" -> {"sleep": built-in function sleep}
# NOT A PUBLIC API. However, third-party monkey-patchers may be using
# it? TODO: Provide better API for them.
saved = {}


Expand Down Expand Up @@ -229,6 +231,18 @@ def is_object_patched(mod_name, item_name):
return is_module_patched(mod_name) and item_name in saved[mod_name]


def is_anything_patched():
# Check if this module has done any patching in the current process.
# This is currently only used in gevent tests.
#
# Not currently a documented, public API, because I'm not convinced
# it is 100% reliable in the event of third-party patch functions that
# don't use ``saved``.
#
# .. versionadded:: NEXT
return bool(saved)


def _get_original(name, items):
d = saved.get(name, {})
values = []
Expand Down
4 changes: 4 additions & 0 deletions src/gevent/testing/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,7 @@ def assertEqualFlakyRaceCondition(self, a, b):

def assertStartsWith(self, it, has_prefix):
self.assertTrue(it.startswith(has_prefix), (it, has_prefix))

def assertNotMonkeyPatched(self):
from gevent import monkey
self.assertFalse(monkey.is_anything_patched())