diff --git a/celery/backends/asynchronous.py b/celery/backends/asynchronous.py index 32475d5eaa..cedae5013a 100644 --- a/celery/backends/asynchronous.py +++ b/celery/backends/asynchronous.py @@ -66,18 +66,30 @@ def wait_for(self, p, wait, timeout=None): class greenletDrainer(Drainer): spawn = None _g = None + _drain_complete_event = None # event, sended (and recreated) after every drain_events iteration + + def _create_drain_complete_event(self): + """create new self._drain_complete_event object""" + pass + + def _send_drain_complete_event(self): + """raise self._drain_complete_event for wakeup .wait_for""" + pass def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._started = threading.Event() self._stopped = threading.Event() self._shutdown = threading.Event() + self._create_drain_complete_event() def run(self): self._started.set() while not self._stopped.is_set(): try: self.result_consumer.drain_events(timeout=1) + self._send_drain_complete_event() + self._create_drain_complete_event() except socket.timeout: pass self._shutdown.set() @@ -89,8 +101,14 @@ def start(self): def stop(self): self._stopped.set() + self._send_drain_complete_event() self._shutdown.wait(THREAD_TIMEOUT_MAX) + def wait_for(self, p, wait, timeout=None): + self.start() + if not p.ready: + self._drain_complete_event.wait(timeout=timeout) + @register_drainer('eventlet') class eventletDrainer(greenletDrainer): @@ -101,10 +119,12 @@ def spawn(self, func): sleep(0) return g - def wait_for(self, p, wait, timeout=None): - self.start() - if not p.ready: - self._g._exit_event.wait(timeout=timeout) + def _create_drain_complete_event(self): + from eventlet.event import Event + self._drain_complete_event = Event() + + def _send_drain_complete_event(self): + self._drain_complete_event.send() @register_drainer('gevent') @@ -116,11 +136,13 @@ def spawn(self, func): gevent.sleep(0) return g - def wait_for(self, p, wait, timeout=None): - import gevent - self.start() - if not p.ready: - gevent.wait([self._g], timeout=timeout) + def _create_drain_complete_event(self): + from gevent.event import Event + self._drain_complete_event = Event() + + def _send_drain_complete_event(self): + self._drain_complete_event.set() + self._create_drain_complete_event() class AsyncBackendMixin: diff --git a/t/unit/backends/test_asynchronous.py b/t/unit/backends/test_asynchronous.py index c0fe894900..6593cd53e5 100644 --- a/t/unit/backends/test_asynchronous.py +++ b/t/unit/backends/test_asynchronous.py @@ -158,7 +158,10 @@ def sleep(self): def result_consumer_drain_events(self, timeout=None): import eventlet - eventlet.sleep(0) + # `drain_events` of asynchronous backends with pubsub have to sleep + # while waiting events for not more then `interval` timeout, + # but events may coming sooner + eventlet.sleep(timeout/10) def schedule_thread(self, thread): import eventlet @@ -204,7 +207,10 @@ def sleep(self): def result_consumer_drain_events(self, timeout=None): import gevent - gevent.sleep(0) + # `drain_events` of asynchronous backends with pubsub have to sleep + # while waiting events for not more then `interval` timeout, + # but events may coming sooner + gevent.sleep(timeout/10) def schedule_thread(self, thread): import gevent