Skip to content

Commit

Permalink
fix: reduce latency of AsyncResult.get under gevent (celery#7052)
Browse files Browse the repository at this point in the history
Wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout.

Does not block event loop, because `drain_events` of asynchronous backends with pubsub commonly sleeping for some nonzero time while waiting events.
  • Loading branch information
mrmaxi committed Nov 14, 2021
1 parent fe37cd8 commit 8ce5ab8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
40 changes: 31 additions & 9 deletions celery/backends/asynchronous.py
Expand Up @@ -66,18 +66,30 @@ def wait_for(self, p, wait, timeout=None):
class greenletDrainer(Drainer):
spawn = None
_g = None
_drain_complete_event = None # event, sended (and recreated) after every drain_events iteration

def _create_drain_complete_event(self):
"""create new self._drain_complete_event object"""
pass

def _send_drain_complete_event(self):
"""raise self._drain_complete_event for wakeup .wait_for"""
pass

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._started = threading.Event()
self._stopped = threading.Event()
self._shutdown = threading.Event()
self._create_drain_complete_event()

def run(self):
self._started.set()
while not self._stopped.is_set():
try:
self.result_consumer.drain_events(timeout=1)
self._send_drain_complete_event()
self._create_drain_complete_event()
except socket.timeout:
pass
self._shutdown.set()
Expand All @@ -89,8 +101,14 @@ def start(self):

def stop(self):
self._stopped.set()
self._send_drain_complete_event()
self._shutdown.wait(THREAD_TIMEOUT_MAX)

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._drain_complete_event.wait(timeout=timeout)


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):
Expand All @@ -101,10 +119,12 @@ def spawn(self, func):
sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._g._exit_event.wait(timeout=timeout)
def _create_drain_complete_event(self):
from eventlet.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.send()


@register_drainer('gevent')
Expand All @@ -116,11 +136,13 @@ def spawn(self, func):
gevent.sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
import gevent
self.start()
if not p.ready:
gevent.wait([self._g], timeout=timeout)
def _create_drain_complete_event(self):
from gevent.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.set()
self._create_drain_complete_event()


class AsyncBackendMixin:
Expand Down
10 changes: 8 additions & 2 deletions t/unit/backends/test_asynchronous.py
Expand Up @@ -158,7 +158,10 @@ def sleep(self):

def result_consumer_drain_events(self, timeout=None):
import eventlet
eventlet.sleep(0)
# `drain_events` of asynchronous backends with pubsub have to sleep
# while waiting events for not more then `interval` timeout,
# but events may coming sooner
eventlet.sleep(timeout/10)

def schedule_thread(self, thread):
import eventlet
Expand Down Expand Up @@ -204,7 +207,10 @@ def sleep(self):

def result_consumer_drain_events(self, timeout=None):
import gevent
gevent.sleep(0)
# `drain_events` of asynchronous backends with pubsub have to sleep
# while waiting events for not more then `interval` timeout,
# but events may coming sooner
gevent.sleep(timeout/10)

def schedule_thread(self, thread):
import gevent
Expand Down

0 comments on commit 8ce5ab8

Please sign in to comment.