Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

Commit

Permalink
Revert "fix: reduce latency of AsyncResult.get under gevent (celery#7052
Browse files Browse the repository at this point in the history
)"

This reverts commit cc55692.
  • Loading branch information
craigjbass committed Jan 25, 2022
1 parent 56275f5 commit 7191e9a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 41 deletions.
40 changes: 9 additions & 31 deletions celery/backends/asynchronous.py
Expand Up @@ -66,30 +66,18 @@ def wait_for(self, p, wait, timeout=None):
class greenletDrainer(Drainer):
spawn = None
_g = None
_drain_complete_event = None # event, sended (and recreated) after every drain_events iteration

def _create_drain_complete_event(self):
"""create new self._drain_complete_event object"""
pass

def _send_drain_complete_event(self):
"""raise self._drain_complete_event for wakeup .wait_for"""
pass

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._started = threading.Event()
self._stopped = threading.Event()
self._shutdown = threading.Event()
self._create_drain_complete_event()

def run(self):
self._started.set()
while not self._stopped.is_set():
try:
self.result_consumer.drain_events(timeout=1)
self._send_drain_complete_event()
self._create_drain_complete_event()
except socket.timeout:
pass
self._shutdown.set()
Expand All @@ -101,14 +89,8 @@ def start(self):

def stop(self):
self._stopped.set()
self._send_drain_complete_event()
self._shutdown.wait(THREAD_TIMEOUT_MAX)

def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._drain_complete_event.wait(timeout=timeout)


@register_drainer('eventlet')
class eventletDrainer(greenletDrainer):
Expand All @@ -119,12 +101,10 @@ def spawn(self, func):
sleep(0)
return g

def _create_drain_complete_event(self):
from eventlet.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.send()
def wait_for(self, p, wait, timeout=None):
self.start()
if not p.ready:
self._g._exit_event.wait(timeout=timeout)


@register_drainer('gevent')
Expand All @@ -136,13 +116,11 @@ def spawn(self, func):
gevent.sleep(0)
return g

def _create_drain_complete_event(self):
from gevent.event import Event
self._drain_complete_event = Event()

def _send_drain_complete_event(self):
self._drain_complete_event.set()
self._create_drain_complete_event()
def wait_for(self, p, wait, timeout=None):
import gevent
self.start()
if not p.ready:
gevent.wait([self._g], timeout=timeout)


class AsyncBackendMixin:
Expand Down
12 changes: 2 additions & 10 deletions t/unit/backends/test_asynchronous.py
Expand Up @@ -158,11 +158,7 @@ def sleep(self):

def result_consumer_drain_events(self, timeout=None):
import eventlet

# `drain_events` of asynchronous backends with pubsub have to sleep
# while waiting events for not more then `interval` timeout,
# but events may coming sooner
eventlet.sleep(timeout/10)
eventlet.sleep(0)

def schedule_thread(self, thread):
import eventlet
Expand Down Expand Up @@ -208,11 +204,7 @@ def sleep(self):

def result_consumer_drain_events(self, timeout=None):
import gevent

# `drain_events` of asynchronous backends with pubsub have to sleep
# while waiting events for not more then `interval` timeout,
# but events may coming sooner
gevent.sleep(timeout/10)
gevent.sleep(0)

def schedule_thread(self, thread):
import gevent
Expand Down

2 comments on commit 7191e9a

@mrmaxi
Copy link

@mrmaxi mrmaxi commented on 7191e9a Feb 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello!
I accidentally noticed that you has revert my commit. Could you explain why, please? Is this any trouble with it?

@craigjbass
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were bisecting various change sets to narrow down a production issue

Please sign in to comment.