Add a configuration option to disable prefetch completely #7106
Replies: 26 comments 17 replies
-
This is a recurrent misconception. There are two prefetch at play here. One is for the Main worker process and the second one is for the worker threads (or spawned processes) We can take a step back to remember that a Celery worker has a main process which is the one that takes messages from the broker. In this case your output is showing the tasks that each worker is pulling from the broker. Using All of this means that if you want to make sure workers are not too greedy you have to play with the prefetch multiplier |
Beta Was this translation helpful? Give feedback.
-
Thanks so much for the info! Sadly, we've already set In my original message under the celery inspect active and celery inspect reserved we can look at this worker Active
Reserved
This worker pulled a task from the broker even when it already had 4 processes running (I have 4 worker processes per worker node). This is exactly what we don't want to happen, it makes that reserved task wait a long time before it starts executing when there are plenty of other empty workers waiting for tasks. What I would like to happen is for the worker to wait until one of it's processes is finished, then go look at the broker and grab a task. I want to completely disable prefetching. |
Beta Was this translation helpful? Give feedback.
-
Oh I think I get what you're saying. In that case, we really need a way to disable prefetching altogether. Afaik, this is not possible right? Why not? Is there any way I could monkeypatch or config to make prefetching only happen when the worker has a free thread? |
Beta Was this translation helpful? Give feedback.
-
It should be possible but I don't think anyone has requested us to implement it. I'm currently scheduling this for the future milestone. If you want to dive in and write a patch, feel free to submit a PR. |
Beta Was this translation helpful? Give feedback.
-
We have also been struggling with this (workers prefetching tasks from the queue even though all pool processes / subprocesses are currently busy). We note from the documentation that it is recommended to set The code below is a loader (invoked via It seems to work with celery version 4.4.7, prefork pool. But use at your own risk. Hope it helps somebody. Advice appreciated. from celery.loaders.app import AppLoader
class SingleTaskLoader(AppLoader):
def on_worker_init(self):
# called when the worker starts, before logging setup
super().on_worker_init()
"""
Celery depends on kombu for messaging.
The criteria for whether a celery worker can accept more messages
from a queue is @ kombu.transport.virtual.base.QoS.can_consume()
The standard implementation depends on `<QoS>.prefetch_count`,
which is a function of `--concurrency` * `--prefetch-multiplier`.
If `<QoS>.prefetch_count == 0` prefetching of
messages is unrestricted.
The standard implementation also depends on a count of received
messages `<QoS>._delivered` vs ack-ed messages `<QoS>._dirty`,
and hence the prefetch behaviour is different depending on the
`ACKS_LATE` setting.
https://docs.celeryproject.org/en/stable/userguide/optimizing.html#reserve-one-task-at-a-time
https://github.com/celery/celery/issues/6500
https://github.com/celery/celery/issues/2788
https://stackoverflow.com/questions/16040039/understanding-celery-task-prefetching
here we override kombu.transport.virtual.base.QoS.can_consume()
to run a delegate function, instead of the builtin implementation.
"""
def can_consume(_self):
"""
override for kombu.transport.virtual.base.QoS.can_consume
run a delegate function, instead of the builtin implementation
"""
return getattr(_self, 'delegate_can_consume', lambda: False)()
import kombu.transport.virtual.base
kombu.transport.virtual.base.QoS.can_consume = can_consume
"""
Celery instances are built from "blueprints"
https://docs.celeryproject.org/en/latest/userguide/extending.html#blueprints
> The Worker is the first blueprint to start...
> When the worker is fully started it continues with the Consumer
> blueprint, that sets up how tasks are executed,
> connects to the broker and starts the message consumers.
In the Consumer blueprint, celery.worker.consumer.tasks.Tasks is
responsible for setting
`<Consumer>.task_consumer = kombu.messaging.Consumer(...)`
hence `<Consumer>.task_consumer.channel` is type
kombu.transport.virtual.base.Channel
hence `<Consumer>.task_consumer.channel.qos` is type
kombu.transport.virtual.base.QoS
here we add a new bootstep to the celery Consumer,
to set `<QoS>.delegate_can_consume`
"""
from celery import bootsteps
from celery.worker import state as worker_state
class Set_QoS_Delegate(bootsteps.StartStopStep):
requires = {'celery.worker.consumer.tasks:Tasks'}
def start(self, c):
def can_consume():
"""
delegate for QoS.can_consume
only fetch a message from the queue if the worker has
no other messages
"""
# note: reserved_requests includes active_requests
return len(worker_state.reserved_requests) == 0
c.task_consumer.channel.qos.delegate_can_consume = can_consume
# add bootstep
self.app.steps['consumer'].add(Set_QoS_Delegate) |
Beta Was this translation helpful? Give feedback.
-
That is an interesting monkeypatch you got there. If you can come up with a patch that does this properly, we'll include this as an option. EDIT: |
Beta Was this translation helpful? Give feedback.
-
@samdoolin @george-miller If I'm getting you correctly, you like the worker to consumes messages only when it's free for processing. I tried reproducing this with a single worker with acks_late=true & worker_prefetch_multiplier=1 and it seems to be fine (consuming only when it can) What am I missing? |
Beta Was this translation helpful? Give feedback.
-
@galCohen88 thank you for your reply. Our problem is that we cannot set The implementation of kombu.transport.virtual.base.QoS.can_consume is such that the command line argument Hence with |
Beta Was this translation helpful? Give feedback.
-
@thedrow thank you for your reply. I agree that it's a bit of a grungy monkey patch. I could inherit from the concrete kombu channel of the transport type that I'm going to use (e.g. kombu.transport.redis.Channel), and then override the concrete QoS implementation of that channel. But I couldn't find a legitimate way to patch into the base QoS class (kombu.transport.virtual.base.QoS), which would then work irrespective of transport. |
Beta Was this translation helpful? Give feedback.
-
@samdoolin Can you give me some context? Why acks_late can not be set to true? Is it mission critical / specific for your usecase? |
Beta Was this translation helpful? Give feedback.
-
Unfortunately our tasks are not idempotent, and so per the documentation we should not use |
Beta Was this translation helpful? Give feedback.
-
The concrete Transport class is resolved by the method class QoS_Mixin:
def can_consume(self): # override
...
class Connection(kombu.connection.Connection):
def get_transport_cls(self):
transport_cls = super().get_transport_cls()
channel_cls = transport_cls.Channel
qos_cls = channel_cls.QoS
QoS = type('QoS', (QoS_Mixin, qos_cls), {})
Channel = type('Channel', (channel_cls,), {'QoS': QoS})
Transport = type('Transport', (transport_cls,), {'Channel': Channel})
return Transport Perhaps I should instead override the setting |
Beta Was this translation helpful? Give feedback.
-
Maybe that's the right solution. |
Beta Was this translation helpful? Give feedback.
-
Hi @samdoolin, I am facing a similar issue with tasks which cannot be used with More information on my issue can be found in this StackOverflow issue: https://stackoverflow.com/questions/69987419/celery-prefetched-tasks-stuck-behind-other-tasks-on-ecs-cluster |
Beta Was this translation helpful? Give feedback.
-
We have been running with a Redis broker, with code for the loader similar to my post back in April (updated below). We start up the worker with For other backends I think that Still feels like an unpalatable hack, but it has been working for us. class SingleTaskLoader(AppLoader):
def on_worker_init(self):
# called when the worker starts, before logging setup
super().on_worker_init()
"""
STEP 1:
monkey patch kombu.transport.virtual.base.QoS.can_consume()
to prefer to run a delegate function,
instead of the builtin implementation.
"""
import kombu.transport.virtual
builtin_can_consume = kombu.transport.virtual.QoS.can_consume
def can_consume(self):
"""
monkey patch for kombu.transport.virtual.QoS.can_consume
if self.delegate_can_consume exists, run it instead
"""
if delegate := getattr(self, 'delegate_can_consume', False):
return delegate()
else:
return builtin_can_consume(self)
kombu.transport.virtual.QoS.can_consume = can_consume
"""
STEP 2:
add a bootstep to the celery Consumer blueprint
to supply the delegate function above.
"""
from celery import bootsteps
from celery.worker import state as worker_state
class Set_QoS_Delegate(bootsteps.StartStopStep):
requires = {'celery.worker.consumer.tasks:Tasks'}
def start(self, c):
def can_consume():
"""
delegate for QoS.can_consume
only fetch a message from the queue if the worker has
no other messages
"""
# note: reserved_requests includes active_requests
return len(worker_state.reserved_requests) == 0
# types...
# c: celery.worker.consumer.consumer.Consumer
# c.task_consumer: kombu.messaging.Consumer
# c.task_consumer.channel: kombu.transport.virtual.Channel
# c.task_consumer.channel.qos: kombu.transport.virtual.QoS
c.task_consumer.channel.qos.delegate_can_consume = can_consume
# add bootstep to Consumer blueprint
self.app.steps['consumer'].add(Set_QoS_Delegate) |
Beta Was this translation helpful? Give feedback.
-
Which version of celery are you using? And do you use any concurrency? I am currently running 5.1.1 together with Flask. For some reason the
Then it indeed starts reaching the |
Beta Was this translation helpful? Give feedback.
-
The delegate def can_consume():
return len(worker_state.reserved_requests) < c.controller.concurrency |
Beta Was this translation helpful? Give feedback.
-
Ah I was under the impression that you also intended to have more than a single concurrent task at the same time! Anyways this indeed works as expected! The |
Beta Was this translation helpful? Give feedback.
-
This is a problem for me too :( We have our own more reliable retry mechanism (based on DB outbox pattern) and so really don't want the retry behaviour associated with
We use rabbitmq - are you saying we'd need to monkey patch |
Beta Was this translation helpful? Give feedback.
-
@Diggsey the docstring for |
Beta Was this translation helpful? Give feedback.
-
Was thinking transfaring this issue to discussion before any concrete consensus is reached |
Beta Was this translation helpful? Give feedback.
-
Hello there, I have found that the prefetch related limit takes into account the Let's suppose we have a main worker with concurrency 2, prefetch limit 1, and acks_late. The Main Process gets 1 task from RabbitMQ but it's scheduled in the future, this task goes to the @samdoolin I think the Edit: The |
Beta Was this translation helpful? Give feedback.
-
@samdoolin I tried your solution with RabbitMQ, but I'm unable to specify Is the change to |
Beta Was this translation helpful? Give feedback.
-
After a bit of research, I have discovered that the Consumer Prefetch is a RabbitMQ feature. Both librabbitmq and py-amqp (Kombu transports) set the prefetch limit through the RabbitMQ API, never calling a In my opinion, the virtual implementation must preserve the original behaviour, and therefore we can't develop a "disable prefetch" feature as discussed before. However, from an AMQP perspective, the right approach is to acknowledge a task only after it has been processed (i.e. use ETA/Countdown tasks The ETA/Countdown tasks implementation complicates everything a bit. When the MainProcess reserves a task with an ETA in the future, it stores the task in memory and fetches the next one. It repeats this process until it finds a task without an ETA or one of the ETAs is reached. This means that a MainProcess can reserve an infinite number of tasks if all the tasks have an ETA in the future. In case that the prefetch limit is set, the MainProcess increases its value by one every time it needs to reserve an additional task bypassing the original value of the This is a serious issue no matter if you're using the prefetch limit or not. Here is a case example:
Exploring ETA solutions IMHO, the MainProcess must not reserve tasks that can't process. Moreover, it must never bypass the prefetch limits to avoid the issues described above. So here are my two first thoughts. A native RabbitMQ solution is the RabbitMQ Delayed Message Plugin (github, blog 1, blog 2). It has some limitations (see Github) but delegates the ETA responsibility to the RabbitMQ bypassing the current Celery implementation. I think it could be an optional feature compatible with the current Celery API (although it would require more configuration) A general solution for all brokers would require changing the current algorithm. Instead of storing the ETA tasks in memory, they could be requeued using the Celery 'retry' feature when the ETA isn't yet met. However, they can't be requeued using the native RabbitMQ reject/nack because the tasks will be fetched and requeued in an infinite loop (see docs). With this algorithm, a task scheduled first may be processed later, but the current algorithm has the same problem. |
Beta Was this translation helpful? Give feedback.
-
Yeah that's in line with what I found whilst investigating the issue. It looks like the AMQP protocol (and therefore RabbitMQ) is simply unsuitable for "at most once delivery" if you also need low latency. Which is insane given that this seems to be one of the primary usecases of RabbitMQ. It seems to stem from the fact that AMQP is a "push" based protocol rather than a "pull" based one, but doesn't implement any kind of back-pressure system. And if you need "at least once delivery", it's a pain to configure correctly and the tooling for managing persistent data within RabbitMQ is just inadequate. So, it seems the ideal use-case for RabbitMQ is "less or more than once delivery"... Can't say I'll be using it again any time soon. |
Beta Was this translation helpful? Give feedback.
-
We use the solution proposed by @samdoolin, but not limited to 1 concurrent task, but several concurrent tasks. still, no prefetch. in some scenarios a celery worker can spawn several processes, all of which will run for longer than the serverside redis timeout configuration. after that happens, next time the celery worker tries to fetch tasks, it will encounter "Connection to broker lost". we tried configuration heartbeat to redis broker, but might have done it incorrectly as it didn't resolve the issue (on kubernetes, with memorystore redis in GCP). FYI to anyone having the same issue |
Beta Was this translation helpful? Give feedback.
-
Checklist
master
branch of Celery.contribution guide
on reporting bugs.
for similar or identical bug reports.
for existing proposed fixes.
to find out if the bug was already fixed in the master branch.
in this issue (If there are none, check this box anyway).
Mandatory Debugging Information
celery -A proj report
in the issue.(if you are not able to do this, then at least specify the Celery
version affected).
master
branch of Celery.pip freeze
in the issue.to reproduce this bug.
Optional Debugging Information
and/or implementation.
result backend.
broker and/or result backend.
ETA/Countdown & rate limits disabled.
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
Possible Duplicates
Environment & Settings
Celery version: 4.3.0 (rhubarb)
celery report
Output:Steps to Reproduce
Required Dependencies
Python Packages
pip freeze
Output:Other Dependencies
N/A
Minimally Reproducible Test Case
Expected Behavior
I was hoping by specifying
-Ofair
that tasks would not be reserved by workers as stated here https://docs.celeryproject.org/en/v4.3.0/userguide/optimizing.html?highlight=optimization#prefork-pool-prefetch-settingsActual Behavior
Workers have reserved tasks while other workers are free to do work.
I am going to paste the state we are seeing in prod when a lot of tasks come in. Notice how
query-runner-celery-datalake-interval-refresh-784bb858c8-qskvj
has a reserved task whilecelery@query-runner-celery-datalake-interval-refresh-784bb858c8-dgmf9
is empty and could run that task. Please note the name of the worker is the name of the queue they are reading from.celery inspect active and celery inspect reserved
Output:Also, here is the output of
celery inspect stats
where you can seefair
is getting in properlycelery inspect stats for worker query-runner-celery-datalake-interval-refresh-784bb858c8-dgmf9
Output:``` -> celery@query-runner-celery-datalake-interval-refresh-784bb858c8-dgmf9: OK { "broker": { "alternates": [], "connect_timeout": 4, "failover_strategy": "round-robin", "heartbeat": 120.0, "hostname": "query-runner-redis-master", "insist": false, "login_method": null, "port": 6379, "ssl": false, "transport": "redis", "transport_options": {}, "uri_prefix": null, "userid": null, "virtual_host": "0" }, "clock": "8600271", "pid": 8, "pool": { "max-concurrency": 4, "max-tasks-per-child": "N/A", "processes": [ 13, 14, 15, 16 ], "put-guarded-by-semaphore": false, "timeouts": [ 0, 0 ], "writes": { "all": "42.86%, 42.86%, 14.29%", "avg": "33.33%", "inqueues": { "active": 0, "total": 4 }, "raw": "3, 3, 1", "strategy": "fair", "total": 7 } }, "prefetch_count": 4, "rusage": { "idrss": 0, "inblock": 72400, "isrss": 0, "ixrss": 0, "majflt": 158, "maxrss": 68800, "minflt": 69557, "msgrcv": 0, "msgsnd": 0, "nivcsw": 2942, "nsignals": 0, "nswap": 0, "nvcsw": 334278, "oublock": 1584, "stime": 7.775837, "utime": 152.889586 }, "total": { "tasks.query.run_query": 7 } } ```
Generally, we have a lot of tasks long running tasks. This issue is making our tasks take double the amount of time because they are unnecessarily waiting behind other tasks without utilizing the other workers.
Beta Was this translation helpful? Give feedback.
All reactions