Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nested canvas - fetching result hangs #5060

Open
greuff opened this issue Sep 19, 2018 · 11 comments
Open

Nested canvas - fetching result hangs #5060

greuff opened this issue Sep 19, 2018 · 11 comments

Comments

@greuff
Copy link

greuff commented Sep 19, 2018

Hello, I'm not quite sure if this is a bug or if I'm missing something essential. We're trying to drive a non-trivial canvas and experience lockups when fetching the result now and then, and I'm trying to hunt down the cause. We're using Celery 4.2.1, and current Redis (from the redis Docker image) as both backend and broker.

I stripped a whole lot of code and came up with this little test script. The tasks are stripped down to do nothing, they just return the input given to them (all of the tasks return results).

    tileinfos = [{'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'},
             {'technology': 'x', 'tilespec': 'y'}]

    res = group(
        chain(
            tasks.generateTile.s(tileinfo),
            group(
                [
                    chain(tasks.generatePCL.s('signal'), tasks.entwine.s('signal') ),
                    chain(tasks.generatePCL.s('sinr'), tasks.entwine.s('sinr') ),
                    chain(tasks.generatePCL.s('safezone'), tasks.entwine.s('safezone') ),
                    chain(tasks.generatePCL.s('cellcolor'), tasks.entwine.s('cellcolor') )
                ]
            ),
            tasks.merge.s()
        ) for tileinfo in tileinfos ).apply_async()

Now, when I fetch the result in the next instruction, the script hangs, although all celery tasks completed successfully, as confirmed with flower, celery event monitor, etc.:

print(res.get())    # This hangs forever

The curious part now is, that when I do the following instead, the result is fetched and the script succeeds:

    while True:
        try:
            print("Successful: {}".format(res.successful()))
            print("Failed: {}".format(res.failed()))
            print("Waiting: {}".format(res.waiting()))
            print("Ready: {}".format(res.ready()))
            print("Completed count: {}".format(res.completed_count()))

            pipelineResult = res.get(timeout=1)
            print("Pipeline finished!")
            print(pipelineResult)
            break
        except TimeoutError:
            print("I am still alive and waiting for pipeline results...")

Note that sometimes more than one loop revolution is required until res.get eventually returns the result, even though all tasks are already completed! As if it was driving some kind of state machine and lags behind.

Now, even funnier, when I comment out the print statements that call waiting() etc on the GroupResult object, the script again loops forever and never finishes:

while True:
    try:
        # print("Successful: {}".format(res.successful()))
        # print("Failed: {}".format(res.failed()))
        # print("Waiting: {}".format(res.waiting()))
        # print("Ready: {}".format(res.ready()))
        # print("Completed count: {}".format(res.completed_count()))

        pipelineResult = res.get(timeout=1)
        print("Pipeline finished!")
        print(pipelineResult)
        break
    except TimeoutError:
        print("I am still alive and waiting for pipeline results...")

Output:

I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
I am still alive and waiting for pipeline results...
....

I looked at the redis-cli monitor command, and there is a lot of subscribing/unsubscribing going on. I put the working version of this script in a loop and let it run 1000 times in a row - it completed successfully every time. I also confirmed with redis-cli pubsub channels that the number of channels stays stable (at around 150).

Also, when the tileinfos array only contains one item, the script doesn't hang at all.

Now, I'm not sure if I did anything wrong, especially in constructing the canvas, or if there is a bug. I understand that we do a lot of nesting chains into groups into chains into groups, and I'm not sure if that's supported at all. I don't however understand why res.get() blocks forever, or why it seems to be important to call res.waiting etc. before calling res.get.

@greuff
Copy link
Author

greuff commented Sep 19, 2018

Additional info: the observed hang of res.get() only occurs when the tileinfos array (and therefore the outer group) has more than 4 elements. Up to 4 elements the call doesn't lock up!

@greuff
Copy link
Author

greuff commented Sep 19, 2018

It seems to have to do with the total number of tasks in the canvas. I simplified the canvas to:

res = group(
        tasks.generateTile.s(tileinfo) for tileinfo in tileinfos
    ).apply_async()

and I observe the same hang when I have more than 28 tasks in the group (i.e., more than 28 tileinfo elements in the tileinfos array). Up to and including 28 tasks the script works fine! Workaround is again to fetch the result in the loop.

For the sake of completeness, this is the definition of the task:

@app.task(name = 'tasks.generateTile', bind=True, max_retries=3, acks_late=True, track_started=True)
def generateTile(self, tileinfo):
    if 'error' in tileinfo:
        return tileinfo
    try:
        logger.info('>>> generateTile {}'.format(tileinfo['tilespec']))

        tilespec = tileinfo['tilespec']

        # Complete success!
        logger.info('<<< generateTile {}'.format(tileinfo['tilespec']))
        return tileinfo
    except subprocess.CalledProcessError as e:
        logger.error(e.output)
        try:
            self.retry(countdown=2)
        except MaxRetriesExceededError:
            return { 'error': 'exception' }
    except Exception as e:
        logger.error(e)
        try:
            self.retry(countdown=2)
        except MaxRetriesExceededError:
            return { 'error': 'exception' }

and this is how I set up the app:

app = Celery('pipeline',
             broker=os.environ['CELERY_BROKER_URL'],
             backend=os.environ['CELERY_BACKEND_URL'],
             include=['pipeline.tasks'])

@Helumpago
Copy link

I think I'm seeing the same issue.

I have a toy problem where I'm trying to generate a bunch of random numbers. I have a trivial task that just returns a single random number. My aim is to use celery.group to call this task multiple times, then get the list of results.

My task is:

@app.task
def random_weight():
    return str(random.random())

The script I'm using to generate the numbers is:

import celery

from weighter.tasks import random_weight

def main():
    n = 11
    weights = celery.group(random_weight.s() for _ in range(n))
    print("The weights are: {}".format(weights.apply_async().get()))

if __name__ == "__main__":
    main()

Running the task with n <= 11 works correctly, but n > 11 hangs.

The logs show that all the tasks are succeeding (even for n > 11). It just seems like .get() isn't being informed that everything is done.

I am using RabbitMQ as my broker and Redis as my result backend. This problem does not occur when using RPC as the backend.

I have not tried @greuff's workaround yet. Thanks for the detailed description, though!

Versions:

Celery: 4.2.0
Redis: 5.0.3
RabbitMQ: 3.7.13

@Helumpago
Copy link

Helumpago commented Mar 13, 2019

I tried @greuff's workaround (periodically re-trying .get()), and I'm getting identical results. As long as I call res.successful(), res.failed(), res.waiting(), res.ready(), or res.completed_count(), I get a valid result.

Additionally, it looks like the while can be removed:

def main():
    n = 100
    weights = celery.group(random_weight.s() for _ in range(n))
    res = weights.apply_async()

    print("Successful: {}".format(res.successful()))
    pipelineResult = res.get(timeout=600000)
    print("The weights are: {}".format(pipelineResult))

Notice the massive timeout argument. If I remove that, everything hangs. If it is set, I get a response in a fraction of a second.

Also, if I drop the res.successful() call, everything hangs, even with the timeout.


Edit:

Notice the massive timeout argument. If I remove that, everything hangs. If it is set, I get a response in a fraction of a second.

Never mind about this. I was just getting lucky with the race condition. The timeout argument does not appear to have much of an affect on the chances of things hanging. Calling res.successful() definitely does have an affect on the chances of success.

Because of the race condition, I'd highly recommend keeping the full while loop approach.

@Helumpago
Copy link

I just switched to using memcached as my results backend. I haven't seen it hang yet.

@greuff
Copy link
Author

greuff commented Mar 18, 2019

@Helumpago thanks for chiming in. We don't have a real solution yet, but the workaround with the loop seems to be stable. We haven't had the chance to try another backend yet.

@yueyongyue
Copy link

I'm meeting the same issue. res.ready() hangs

@auvipy auvipy modified the milestones: 4.5, 4.4.x Dec 16, 2019
@sobolevn
Copy link

The same, this hangs:

>>> from app import add
>>> result = add.apply_async((2, 2), link=add.s(8))
>>> result.get()
4
>>> result.children
[<AsyncResult: 328b14d3-cef2-4748-a52d-26dd67efd91e>]
>>> result.children[0].get()  # hangs

Repro:

from celery import Celery

app = Celery(
    'tasks',
    broker='pyamqp://guest:guest@rabbitmq:5672',
    backend='rpc://',
)


@app.task
def add(first: int, second: int) -> int:
    print(first + second)
    return first + second

Setup:

version: '3.7'
services:
  rabbitmq:
    image: 'rabbitmq:3.8-management-alpine'
    restart: unless-stopped

  worker:
    build: .
    command: celery --app=app:app worker
    restart: unless-stopped

And dockerfile:

FROM python:3.8.6-slim-buster

LABEL maintainer="sobolevn@wemake.services"
LABEL vendor="wemake.services"

WORKDIR /code

RUN pip install 'celery==5.0.2'

# Copy source files:
COPY app.py /code/

@auvipy auvipy modified the milestones: 4.4.x, 5.1.0 Dec 13, 2020
@maybe-sybr
Copy link
Contributor

maybe-sybr commented Dec 14, 2020

Brief reading makes me think that the original issue might have been related to the inner group being upgraded to a chord and then perhaps an issue with chord size counting which we fixed in #6354 . But seeing it not stall for smaller sizes of the iterable driving the comprehension is a bit suspicious. I'd have to try some of these MRTCs on top of master to get an idea.

I'm also a bit blown away by @sobolevn 's comment - the only thing I could think of there is the promise confusion we fixed in #6411 but that landed in 5.0.1 so for behaviour like that to present in such a trivial task is surprising. I'd be pretty confident in saying they're probably caused by different things, .ready() or .get() hanging doesn't really tell us too much unfortunately. I'll see if I can get some ideas and either split out some of these reports to more specific issues or add a deeper dive here if they turn out to be related.

Edit: Weird, it looks like maybe linked tasks don't actually save their result to the RPC backend. With a redis broker/backend this works fine:

import celery

app = celery.Celery(name="foo", broker="redis://", backend="redis://")

@app.task
def add(a, b):
    return a + b
>>> import foo
>>> s = foo.add.s(2, 2)
>>> s.link(foo.add.s(8))
foo.add(8)
>>> r = s.apply_async()
>>> r.get()
4
>>> r.children[0].get()
12
>>> r.children[0].id
'31111d85-0626-49ee-9574-f3aa978c0f06'
[2020-12-14 11:15:30,200: INFO/MainProcess] Received task: foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb]
[2020-12-14 11:15:30,200: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f4d21b495e0> (args:('foo.add', 'e6b4e680-38f2-460b-918f-cdff6533d2eb', {'lang': 'py', 'task': 'foo.add', 'id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'parent_id': None, 'argsrepr': '(2, 2)', 'kwargsrepr': '{}', 'origin': 'gen33309@host', 'reply_to': '30a087dd-981b-3e27-ac04-a67d8a219165', 'correlation_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'hostname': 'celery@host', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [2, 2], 'kwargs': {}}, b'[[2, 2], {}, {"callbacks": [{"task": "foo.add", "args": [8], "kwargs": {}, "options": {}, "subtask_type": null, "immutable": false, "chord_size": null}], "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2020-12-14 11:15:30,201: DEBUG/MainProcess] Task accepted: foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb] pid:33307
[2020-12-14 11:15:30,223: INFO/MainProcess] Received task: foo.add[31111d85-0626-49ee-9574-f3aa978c0f06]
[2020-12-14 11:15:30,223: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x7f4d21b495e0> (args:('foo.add', '31111d85-0626-49ee-9574-f3aa978c0f06', {'lang': 'py', 'task': 'foo.add', 'id': '31111d85-0626-49ee-9574-f3aa978c0f06', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'parent_id': 'e6b4e680-38f2-460b-918f-cdff6533d2eb', 'argsrepr': '(4, 8)', 'kwargsrepr': '{}', 'origin': 'gen33307@host', 'reply_to': 'e9ede7ae-7150-3af2-a641-915176135280', 'correlation_id': '31111d85-0626-49ee-9574-f3aa978c0f06', 'hostname': 'celery@host', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [4, 8], 'kwargs': {}}, b'[[4, 8], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2020-12-14 11:15:30,224: DEBUG/MainProcess] Task accepted: foo.add[31111d85-0626-49ee-9574-f3aa978c0f06] pid:33300
[2020-12-14 11:15:30,224: INFO/ForkPoolWorker-8] Task foo.add[e6b4e680-38f2-460b-918f-cdff6533d2eb] succeeded in 0.022559431999980006s: 4
[2020-12-14 11:15:30,242: INFO/ForkPoolWorker-1] Task foo.add[31111d85-0626-49ee-9574-f3aa978c0f06] succeeded in 0.01803768400077388s: 12

But the same thing with the RPC backend stalls on the child result object as @sobolevn describes

@maybe-sybr
Copy link
Contributor

I've not had any time to dig further into this. From memory and reading my previous comment, it does seem like this is still currently broken and should be looked into further. I'm going to unassign myself since I certainly won't have the time to do so over the next couple of months. Pinging @thedrow and @auvipy for redistribution if possible.

@maybe-sybr maybe-sybr removed their assignment Feb 20, 2021
@auvipy
Copy link
Member

auvipy commented Feb 21, 2021

no worries. we will revisit this after 5.1 release

@auvipy auvipy modified the milestones: 5.1.0, 5.2 Feb 21, 2021
@auvipy auvipy modified the milestones: 5.2, 5.2.x Oct 30, 2021
@auvipy auvipy modified the milestones: 5.2.x, 5.3.x Jun 29, 2022
@auvipy auvipy self-assigned this Jun 29, 2022
@auvipy auvipy modified the milestones: 5.3.x, 5.4 Nov 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment