New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[K8S] livenessProbe and readinessProbe for celery beat and workers #4079
Comments
Celery has a monitoring API you can use. If you have specific problems or feature requests, please open a separate issue. |
Would this work?
|
@7wonders You'd need to extract the celery node name first. This readinessProbe will fail if any celery instance fails which is not what you want. |
@thedrow Hmm, I think its actually that it will succeed even if the actual node has failed but another one is ok which is also not a great outcome. |
Looks like
|
Beware that in some apps, running this command can take a few seconds using full CPU AND kubernetes defaults are to run it every 10 seconds. It is thus much safer to have a high periodSeconds (ours is set to 300). |
@redbaron did that command work for you? If it works then what are the settings for liveness and readiness prob? |
For some reason, this readiness probe is nowhere near satisfactory for us. The inspect responds non-deterministically with no load on our cluster. We run the format like this: celery inspect ping -b "redis://archii-redis-master:6379" -d celery@archii-task-crawl-integration-7d96d86b9d-jwtq7 And with normal ping times (10 seconds), our cluster is completely killed by the CPU celery requires. |
Readiness probes aren't necessary, Celery is never used in a service. I just set |
Readiness probes are still useful even if they're not used in a service. Specifically, when you do a deployment of workers and want to make sure your deployment was successful, you usually use |
My solution was: readinessProbe:
exec:
command:
[
"/usr/local/bin/python",
"-c",
"\"import os;from celery.task.control import inspect;from <APP> import celery_app;exit(0 if os.environ['HOSTNAME'] in ','.join(inspect(app=celery_app).stats().keys()) else 1)\""
] Others seem to not work 🤷♂️ |
Thanks @yardensachs! |
|
good to know |
We ended up ripping celery inspect ping out from our liveness probes because we found that under heavier load, the ping would just hang for minutes at a time even though the jobs were processing fine and there was no backlog. I have a feeling it had something to do with using eventlet, but we're continuing to look into it. |
@WillPlatnick That won't happen with 5.0 because Celery will be async so there will be reserved capacity for control coroutines. |
I'm having trouble with
Anyone else encountering this? There isn't a |
Can I ask what are you using instead of |
@mcyprian We got rid of the liveness probe. My gut is telling me it has something to do with eventlet, but we haven't made it a priority to figure it out. |
we meet the same CPU problem with Redis broker |
can flower do the monitor tasks indirectly? |
Note that the above Kubernetes configuration for the readiness probe from @beje2k15's comment has a bug, which causes the readiness probe to never fail. Here's one way to fix it: command:
- sh
- -c
- test -e /tmp/worker_ready |
The approach is good but it requires you to spam your broker with heartbeat garbage. from pathlib import Path
from celery import bootsteps
from celery.signals import worker_ready, worker_shutdown
HEARTBEAT_FILE = Path("/tmp/worker_heartbeat")
READINESS_FILE = Path("/tmp/worker_ready")
class LivenessProbe(bootsteps.StartStopStep):
requires = {'celery.worker.components:Timer'}
def __init__(self, worker, **kwargs):
self.requests = []
self.tref = None
def start(self, worker):
self.tref = worker.timer.call_repeatedly(
1.0, self.update_heartbeat_file, (worker,), priority=10,
)
def stop(self, worker):
HEARTBEAT_FILE.unlink(missing_ok=True)
def update_heartbeat_file(self, worker):
HEARTBEAT_FILE.touch()
@worker_ready.connect
def worker_ready(**_):
READINESS_FILE.touch()
@worker_shutdown.connect
def worker_shutdown(**_):
READINESS_FILE.unlink(missing_ok=True)
app = Celery("appname")
app.steps["worker"].add(LivenessProbe) The liveness file will be created once the connection to the broker is successfully established so you could also use that for readiness. |
In my experience, yes. Broker connection only guarantees the pod can connect to the broker, but its not uncomon for celery worker to be in a broken state even if that passes (stopped queue with rabbitmq is a scenario that comes to mind). We use a file do only do these checks every 5 minutes though, as they are a bit expensive. |
How about beat ? above healthcheck setting doesnt' seem to work with beat |
You can do something similar where beat runs a "health" task and writes the timestamp to a file and you just check that. |
as others have suggested, we've started using |
@joekohlsdorf Thanks a lot for your solution to implement the probes! It might be worth mentioning that HEARTBEAT_FILE = Path("/tmp/worker_heartbeat")
READINESS_FILE = Path("/tmp/worker_ready") |
I really like the signal approach + file checks and it's what I was planning to implement as well, then I Google'd around and was pleasantly surprised to find these solutions posted here. Celery also has the option to write out a pid file when it starts. Is the |
Great approach! Finally something, that works and doesn't look hacky. Any thought on how to make the same for the @beat_init.connect
def beat_ready(**_):
READINESS_FILE.touch() But I just can't find a solution for the liveness check... |
Now that this is tagged for a milestone which hints it'll be in Celery at some point, how necessary is that timer liveness probe? If the worker can write that file every second it's healthy, no doubt about it. But if a container is running 1 process and that process crashes then the container dies with it. In a Kubernetes context the pod will be removed and restarted. Where I'm going with this one is can we remove that entire command:
- sh
- -c
- test -e /tmp/worker_ready My thought process there is the file either always exists (the worker is working) or the process is dead in which case the file won't be there because the container isn't up. |
It seems the heartbeat using the file based approach is not working when using the gevent pool when the worker is blocking. Does anyone have a solution that works for gevent as well? |
have anyone tried keda? |
We're about to, for scaling workers by queue size. We just need to switch brokers to rabbitmq first. How does KEDA relate to readiness/liveness probes? |
@some1ataplace please do us all a favour and don't just post random chatgpt crap on github |
I also can't configure beat. There is an init without cleanup option (on_shutdown) and no liveness check Anyone figured this out? |
We tried a lot of ideas proposed here like @beje2k15 solution with heartbeat signals. Sadly due to bugs like #7276 worker can respond to heartbeats (so Only fully reliable solution for us is to send own heartbeat task with celery beat, handle it with celery worker and use from pathlib import Path
from celery.signals import (
after_task_publish,
beat_init,
task_success,
worker_ready,
worker_shutdown,
)
from .tasks import celery_heartbeat
HEARTBEAT_FILE = Path("/tmp/celery_live")
READINESS_FILE = Path("/tmp/celery_ready")
######
# celery worker rediness and liveness checks
@worker_ready.connect
def worker_ready(**_):
READINESS_FILE.touch()
@worker_shutdown.connect
def worker_shutdown(**_):
for f in (HEARTBEAT_FILE, READINESS_FILE):
f.unlink(missing_ok=True)
@task_success.connect(sender=celery_heartbeat)
def heartbeat(**_):
HEARTBEAT_FILE.touch()
######
# celery beat rediness and liveness checks
@beat_init.connect
def beat_ready(**_):
READINESS_FILE.touch()
@after_task_publish.connect(sender="healthcheck.tasks.celery_heartbeat")
def task_published(**_):
HEARTBEAT_FILE.touch() This is our code for liveness probe. We check if livenessProbe:
initialDelaySeconds: 90
periodSeconds: 30
failureThreshold: 3
timeoutSeconds: 3
exec:
command:
- /bin/sh
- c
- find /tmp/celery_live -mmin -1 | grep . |
@grapo Thx for the detailed answer but I think that your case of only having one worker won't do for most of the crowd. You have celery to be able to scale, right? Nevertheless, thx for sharing! |
What @GitRon said. I am working on a scenario where there will be at least 4-6 workers and I need to have a readiness probe that indicates that all of them are ready to process tasks. |
Testing if the broker is alive from the worker makes no sense to me. It will lead to all workers restarting unnecessarily if the broker has a temporary issue which is unrelated to Celery workers. Workers can recover from broker failure without restart. If you really want full roundtrip monitoring you can use the All of these checks still don't mean that you are able to process real tasks but there is really no good way to check this on the worker. You can count processed tasks but that doesn't mean anything because your system might only process one task per month. I recommend to also monitor queue size and ack rate on your broker. Timers can be used to do all kinds of cool things, I have code which emits currently active tasks and general worker statistics as statsd metics. This is way better than Flower or inspect. |
This worked best for me
It relies on your base image being able to do |
Hi,
I'm using Kubernetes to deploy my python application, Kubernetes provide a livenessProbe and readinessProbe see here.
How can I do to check if my celery beat or celery worker is alive and in correct state ?
The PID is not a solution because it cannot be used to catch a deadlock for example.
Thanks in advance for your help,
Best regards,
The text was updated successfully, but these errors were encountered: