diff --git a/celery/backends/asynchronous.py b/celery/backends/asynchronous.py index 32475d5eaa6..cedae5013a8 100644 --- a/celery/backends/asynchronous.py +++ b/celery/backends/asynchronous.py @@ -66,18 +66,30 @@ def wait_for(self, p, wait, timeout=None): class greenletDrainer(Drainer): spawn = None _g = None + _drain_complete_event = None # event, sended (and recreated) after every drain_events iteration + + def _create_drain_complete_event(self): + """create new self._drain_complete_event object""" + pass + + def _send_drain_complete_event(self): + """raise self._drain_complete_event for wakeup .wait_for""" + pass def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._started = threading.Event() self._stopped = threading.Event() self._shutdown = threading.Event() + self._create_drain_complete_event() def run(self): self._started.set() while not self._stopped.is_set(): try: self.result_consumer.drain_events(timeout=1) + self._send_drain_complete_event() + self._create_drain_complete_event() except socket.timeout: pass self._shutdown.set() @@ -89,8 +101,14 @@ def start(self): def stop(self): self._stopped.set() + self._send_drain_complete_event() self._shutdown.wait(THREAD_TIMEOUT_MAX) + def wait_for(self, p, wait, timeout=None): + self.start() + if not p.ready: + self._drain_complete_event.wait(timeout=timeout) + @register_drainer('eventlet') class eventletDrainer(greenletDrainer): @@ -101,10 +119,12 @@ def spawn(self, func): sleep(0) return g - def wait_for(self, p, wait, timeout=None): - self.start() - if not p.ready: - self._g._exit_event.wait(timeout=timeout) + def _create_drain_complete_event(self): + from eventlet.event import Event + self._drain_complete_event = Event() + + def _send_drain_complete_event(self): + self._drain_complete_event.send() @register_drainer('gevent') @@ -116,11 +136,13 @@ def spawn(self, func): gevent.sleep(0) return g - def wait_for(self, p, wait, timeout=None): - import gevent - self.start() - if not p.ready: - gevent.wait([self._g], timeout=timeout) + def _create_drain_complete_event(self): + from gevent.event import Event + self._drain_complete_event = Event() + + def _send_drain_complete_event(self): + self._drain_complete_event.set() + self._create_drain_complete_event() class AsyncBackendMixin: