Skip to content

Commit

Permalink
fix: reduce latency of AsyncResult.get under gevent (celery#7052)
Browse files Browse the repository at this point in the history
wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout
  • Loading branch information
mrmaxi committed Nov 14, 2021
1 parent fe37cd8 commit 4e1fc5a
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions celery/backends/asynchronous.py
Expand Up @@ -66,18 +66,30 @@ def wait_for(self, p, wait, timeout=None):
class greenletDrainer(Drainer):
spawn = None
_g = None
_drain_complete_event = None # event, sended (and recreated) after every drain_events iteration

def _create_drain_complete_event(self):
"""create new self._drain_complete_event object"""
pass

def _send_drain_complete_event(self):
"""raise self._drain_complete_event for wakeup .wait_for"""
pass

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._started = threading.Event()
self._stopped = threading.Event()
self._shutdown = threading.Event()
self._create_drain_complete_event()

def run(self):
self._started.set()
while not self._stopped.is_set():
try:
self.result_consumer.drain_events(timeout=1)
self._send_drain_complete_event()
self._create_drain_complete_event()
except socket.timeout:
pass
self._shutdown.set()
Expand All @@ -89,8 +101,14 @@ def start(self):

def stop(self):
self._stopped.set()
self._send_drain_complete_event()
self._shutdown.wait(THREAD_TIMEOUT_MAX)

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._drain_complete_event.wait(timeout=timeout)


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):
Expand All @@ -101,10 +119,12 @@ def spawn(self, func):
sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._g._exit_event.wait(timeout=timeout)
def _create_drain_complete_event(self):
from eventlet.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.send()


@register_drainer('gevent')
Expand All @@ -116,11 +136,13 @@ def spawn(self, func):
gevent.sleep(0)
return g

def wait_for(self, p, wait, timeout=None):
import gevent
self.start()
if not p.ready:
gevent.wait([self._g], timeout=timeout)
def _create_drain_complete_event(self):
from gevent.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.set()
self._create_drain_complete_event()


class AsyncBackendMixin:
Expand Down

0 comments on commit 4e1fc5a

Please sign in to comment.