From 7191e9af76828eaf8f0a8f5bbe348bfb1bed886c Mon Sep 17 00:00:00 2001 From: "Craig J. Bass" <1889973+craigjbass@users.noreply.github.com> Date: Tue, 25 Jan 2022 14:21:39 +0000 Subject: [PATCH] Revert "fix: reduce latency of AsyncResult.get under gevent (#7052)" This reverts commit cc5569222db3c1e5bee3a70d679f747940988fec. --- celery/backends/asynchronous.py | 40 +++++++--------------------- t/unit/backends/test_asynchronous.py | 12 ++------- 2 files changed, 11 insertions(+), 41 deletions(-) diff --git a/celery/backends/asynchronous.py b/celery/backends/asynchronous.py index cedae5013a..32475d5eaa 100644 --- a/celery/backends/asynchronous.py +++ b/celery/backends/asynchronous.py @@ -66,30 +66,18 @@ def wait_for(self, p, wait, timeout=None): class greenletDrainer(Drainer): spawn = None _g = None - _drain_complete_event = None # event, sended (and recreated) after every drain_events iteration - - def _create_drain_complete_event(self): - """create new self._drain_complete_event object""" - pass - - def _send_drain_complete_event(self): - """raise self._drain_complete_event for wakeup .wait_for""" - pass def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._started = threading.Event() self._stopped = threading.Event() self._shutdown = threading.Event() - self._create_drain_complete_event() def run(self): self._started.set() while not self._stopped.is_set(): try: self.result_consumer.drain_events(timeout=1) - self._send_drain_complete_event() - self._create_drain_complete_event() except socket.timeout: pass self._shutdown.set() @@ -101,14 +89,8 @@ def start(self): def stop(self): self._stopped.set() - self._send_drain_complete_event() self._shutdown.wait(THREAD_TIMEOUT_MAX) - def wait_for(self, p, wait, timeout=None): - self.start() - if not p.ready: - self._drain_complete_event.wait(timeout=timeout) - @register_drainer('eventlet') class eventletDrainer(greenletDrainer): @@ -119,12 +101,10 @@ def spawn(self, func): sleep(0) return g - def _create_drain_complete_event(self): - from eventlet.event import Event - self._drain_complete_event = Event() - - def _send_drain_complete_event(self): - self._drain_complete_event.send() + def wait_for(self, p, wait, timeout=None): + self.start() + if not p.ready: + self._g._exit_event.wait(timeout=timeout) @register_drainer('gevent') @@ -136,13 +116,11 @@ def spawn(self, func): gevent.sleep(0) return g - def _create_drain_complete_event(self): - from gevent.event import Event - self._drain_complete_event = Event() - - def _send_drain_complete_event(self): - self._drain_complete_event.set() - self._create_drain_complete_event() + def wait_for(self, p, wait, timeout=None): + import gevent + self.start() + if not p.ready: + gevent.wait([self._g], timeout=timeout) class AsyncBackendMixin: diff --git a/t/unit/backends/test_asynchronous.py b/t/unit/backends/test_asynchronous.py index 479fd85583..c0fe894900 100644 --- a/t/unit/backends/test_asynchronous.py +++ b/t/unit/backends/test_asynchronous.py @@ -158,11 +158,7 @@ def sleep(self): def result_consumer_drain_events(self, timeout=None): import eventlet - - # `drain_events` of asynchronous backends with pubsub have to sleep - # while waiting events for not more then `interval` timeout, - # but events may coming sooner - eventlet.sleep(timeout/10) + eventlet.sleep(0) def schedule_thread(self, thread): import eventlet @@ -208,11 +204,7 @@ def sleep(self): def result_consumer_drain_events(self, timeout=None): import gevent - - # `drain_events` of asynchronous backends with pubsub have to sleep - # while waiting events for not more then `interval` timeout, - # but events may coming sooner - gevent.sleep(timeout/10) + gevent.sleep(0) def schedule_thread(self, thread): import gevent