Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncResult.get() has at least 1 second latency while executing under gevent #7052

Closed
4 tasks done
mrmaxi opened this issue Nov 7, 2021 · 1 comment · Fixed by #7089
Closed
4 tasks done

AsyncResult.get() has at least 1 second latency while executing under gevent #7052

mrmaxi opened this issue Nov 7, 2021 · 1 comment · Fixed by #7089

Comments

@mrmaxi
Copy link
Contributor

mrmaxi commented Nov 7, 2021

Checklist

  • I have checked the issues list
    for similar or identical enhancement to an existing feature.
  • I have checked the pull requests list
    for existing proposed enhancements.
  • I have checked the commit log
    to find out if the if the same enhancement was already implemented in the
    master branch.
  • I have included all related issues and possible duplicate issues in this issue
    (If there are none, check this box anyway).

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

  • None

Brief Summary

When it's used asynchronous backend as Redis with pubsub, result of short task (0.1s length, for example) available immediate.

Once i add gevent, result of the same short task became available after 1s, 10 times slower!

I will clarify that in both cases I used identical separate worker runed in docker with standard perfork pool.
celery -A tasks worker --prefetch-multiplier=1 --concurrency=10

This behavior relate with implementation features of GH-5974
AsyncResult.get() in a gevent context has at least 1 second latency, because

I've updated the code to have a specific implementation for gevent and eventlet that will cause wait_for to only return every "timeout" # of seconds, rather than returning immediately

I would like the task result to be available as soon as possible after the worker has submitted it. I think that wait_for have to return immediate after socket operation result_consumer.drain_events occurs, rather than every "timeout" # of seconds.

It won't cause overhead, because socket operation still not block event loop, so it let other greenlets executing.

Design

Now in wait_for it wait (with timeout 1 second) when greenletDrainer.run will finish, of course it's never finish, so it return after 1 second timeout.
If in wait_for wait the end of socket operation _pubsub.get_message instead greenletDrainer.run it will less latency and still not block event loop, so it let other greenlets executing.

I can prepare PR with changes in backends.asynchronous greenletDrainer, eventletDrainer, geventDrainer, that every time after result_consumer.drain_events (in greenletDrainer run loop) will send notification for wakeup all greenlets waiting results in wait_for.

Architectural Considerations

None

Proposed Behavior

Asynchronous backends (for example Redis) will be same efficient under gevent context as without it.

Proposed UI/UX

Diagrams

N/A

Alternatives

Another workaround may be to pass interval argument into BaseResultConsumer._wait_for_pending for pass it into drain_events_until and than into wait_for.
In this case it will be possible define own timeout for wait_for interation when execute AsyncResult.get(interval=0.05).

Howto reproduce

  1. run docker with redis and rabbitmq
docker run -d -p 6379:6379 --name redis redis
docker run -d -p 5672:5672 --name rabbit rabbitmq:3

create tasks.py with your ip address for broker and backend

from celery import Celery
import time

app = Celery('tasks', broker='pyamqp://guest@192.168.1.40//', backend='redis://192.168.1.40')

@app.task(bind=True)
def sleep(self, timeout):
    print(f'task {self.request.id} sleep({timeout}) started')
    time.sleep(timeout)
    print(f'task {self.request.id} sleep({timeout}) finished')
    return f'task {self.request.id} sleep({timeout}) finished'

create Dockerfile

FROM python:3.8

RUN pip install celery redis

WORKDIR /root
CMD bash

build and run worker

docker build . -t worker
docker run -it --entrypoint celery -v $(pwd):/root worker -A tasks worker --prefetch-multiplier=1 --concurrency=10 --loglevel DEBUG

create gevent_example.py

import gevent
from gevent import monkey; monkey.patch_all()
from datetime import datetime
from tasks import sleep

st = datetime.now()


def test(timeout):
    print(f'{datetime.now() - st} sleep.delay({timeout})')
    a = sleep.delay(timeout)
    gevent.sleep(0)
    res = a.get()
    print(f'{datetime.now()-st} {res}')
    return str(res)


jobs = [gevent.spawn(test, timeout) for timeout in [0.1, 0.1, 0.1, 0.1, 0.1, 0.1]]
gevent.sleep(0)
gevent.joinall(jobs)
print(f'{datetime.now()-st} finish')

now run gevent_example.py

0:00:00.015999 sleep.delay(0.1)
0:00:00.055000 sleep.delay(0.1)
0:00:00.055000 sleep.delay(0.1)
0:00:00.055999 sleep.delay(0.1)
0:00:00.055999 sleep.delay(0.1)
0:00:00.056997 sleep.delay(0.1)
0:00:01.226389 task 3a4074c3-294b-4bfc-9139-8ed8724ef564 sleep(0.1) finished
0:00:01.226389 task e69e3232-f925-4a9e-afbc-13add65e5806 sleep(0.1) finished
0:00:01.226389 task 23c40dd4-6083-4d04-aac9-3c23e8508912 sleep(0.1) finished
0:00:01.241475 task df35ca20-8e7d-47b0-9fe8-13fe6b1a7a65 sleep(0.1) finished
0:00:01.241475 task 76bf5b84-fe4b-4791-9686-2dd3018221b0 sleep(0.1) finished
0:00:01.241475 task 2fc0ae38-2521-4371-8caf-88d36ea53eff sleep(0.1) finished
0:00:01.241475 finish

It is seen that first task with length 0.1s has completed only after ~1.23s (0:00:01.226389)
But if comment line with monkey patching:

# from gevent import monkey; monkey.patch_all()
0:00:00.015999 sleep.delay(0.1)
0:00:00.226367 sleep.delay(0.1)
0:00:00.228763 sleep.delay(0.1)
0:00:00.231746 sleep.delay(0.1)
0:00:00.233513 sleep.delay(0.1)
0:00:00.234733 sleep.delay(0.1)
0:00:00.338647 task 38f76488-d956-4671-be14-e43df9a593aa sleep(0.1) finished
0:00:00.349064 task 5222feed-6a49-4a55-8a47-f516c3642c30 sleep(0.1) finished
0:00:00.354673 task 87529c14-50a2-4ab3-8e3f-b1f9d7241c42 sleep(0.1) finished
0:00:00.361298 task af68327d-6c37-44e4-9ffa-388ef7c8a8e7 sleep(0.1) finished
0:00:00.363136 task 390980d6-e99d-40b4-afa5-241080ade7a4 sleep(0.1) finished
0:00:00.364271 task f9115418-3175-4caa-98d2-2b39be4ace41 sleep(0.1) finished
0:00:00.364271 finish

It is seen that first task with length 0.1s has completed just after ~0.34s (0:00:00.338647)

with proposed changes in backends.asynchronous become:

0:00:00.016001 sleep.delay(0.1)
0:00:00.054004 sleep.delay(0.1)
0:00:00.055003 sleep.delay(0.1)
0:00:00.055003 sleep.delay(0.1)
0:00:00.056002 sleep.delay(0.1)
0:00:00.056002 sleep.delay(0.1)
0:00:00.322748 task 00a71b72-c12d-40fe-b7e1-7526120c25ad sleep(0.1) finished
0:00:00.325041 task fa31d127-35c9-42f8-a5a4-0d1e29172b59 sleep(0.1) finished
0:00:00.346190 task 3ecb8a4e-78e2-471d-b6ac-e84f9b2c44ac sleep(0.1) finished
0:00:00.349314 task eaa27232-37b1-42ed-87f5-e92c19e87aed sleep(0.1) finished
0:00:00.351648 task 48ed06c8-3683-4353-88e1-f35aeb870332 sleep(0.1) finished
0:00:00.353663 task 8e95e4b7-6988-4b00-b93d-d4240eb38645 sleep(0.1) finished
0:00:00.353663 finish

It is seen that first task with length 0.1s has completed just after ~0.32s (0:00:00.322748)

@open-collective-bot
Copy link

Hey @mrmaxi 👋,
Thank you for opening an issue. We will get back to you as soon as we can.
Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors.
If you require immediate assistance please consider sponsoring us.

mrmaxi added a commit to mrmaxi/celery that referenced this issue Nov 14, 2021
wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout
mrmaxi added a commit to mrmaxi/celery that referenced this issue Nov 14, 2021
wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout
mrmaxi added a commit to mrmaxi/celery that referenced this issue Nov 14, 2021
wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout
mrmaxi added a commit to mrmaxi/celery that referenced this issue Nov 14, 2021
Wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout.

Does not block event loop, because `drain_events` of asynchronous backends with pubsub commonly sleeping for some nonzero time while waiting events.
mrmaxi added a commit to mrmaxi/celery that referenced this issue Nov 14, 2021
Wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout.

Does not block event loop, because `drain_events` of asynchronous backends with pubsub commonly sleeping for some nonzero time while waiting events.
@auvipy auvipy added this to the 5.2.x milestone Nov 15, 2021
auvipy pushed a commit that referenced this issue Nov 15, 2021
Wakeup waiters in `wait_for` after every `drain_events` occurs instead of only after 1 seconds timeout.

Does not block event loop, because `drain_events` of asynchronous backends with pubsub commonly sleeping for some nonzero time while waiting events.
@auvipy auvipy linked a pull request Nov 15, 2021 that will close this issue
@auvipy auvipy closed this as completed Nov 15, 2021
craigjbass added a commit to uktrade/celery that referenced this issue Jan 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants