Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers idle even though there's queued work #4501

Open
JohnEmhoff opened this issue Feb 5, 2021 · 11 comments
Open

Workers idle even though there's queued work #4501

JohnEmhoff opened this issue Feb 5, 2021 · 11 comments
Labels
needs info Needs further information from the user

Comments

@JohnEmhoff
Copy link

What happened: We have a large-ish cluster (about 100 nodes) and recently when we submit a lot of jobs (in the thousands) we notice that about 60% of the cluster is idle. Generally, a job will spawn about 20 downstream sub-jobs; these are submitted from inside the worker which will call secede / rejoin while it waits on those jobs. I'm fairly certain this use of secede / rejoin is related as you can see in the reproduction below.

What you expected to happen: The cluster uses all available resources

Minimal Complete Verifiable Example:

This requires running a scheduler, a worker with two procs, and then submitting jobs. Bear with me while I show all the pieces:

This is how I create the environment:

#!/bin/bash
python3 -m venv env
source env/bin/activate
pip install "dask[distributed,diagnostics,delayed]==2020.12.0"

...and this is the python file with the jobs and such. You can see the two operations are:

  1. Submit a child job to wait for X seconds, and wait on that job. We call secede / rejoin while waiting.
  2. Instantly print out a message
import time
import dask.distributed
import sys


def long_running_job(seconds):
    print(f"Doing the actual work ({seconds}s)")
    time.sleep(seconds)
    print(f"Finished working ({seconds}s)")


def root_job(seconds):
    client = dask.distributed.Client(address="tcp://127.0.0.1:8786")
    futures = client.map(long_running_job, [seconds])
    print(f"Submitted long runing job ({seconds}s); seceding while we wait")
    dask.distributed.secede()
    client.gather(futures)
    print(f"Job done ({seconds}s); rejoining")
    dask.distributed.rejoin()


def other_job(message):
    print(message)


if __name__ == "__main__":
    client = dask.distributed.Client(address="tcp://127.0.0.1:8786")
    if sys.argv[1] == "wait":
        future = client.submit(root_job, int(sys.argv[2]))
        dask.distributed.fire_and_forget(future)
    elif sys.argv[1] == "message":
        future = client.submit(other_job, sys.argv[2])
        dask.distributed.fire_and_forget(future)

Finally, this is the script that will submit jobs that will show the issue we're running into:

#!/bin/bash

# start a scheduler in one terminal:
# $ dask-scheduler
# ...and a worker in another:
# $ dask-worker tcp://10.0.2.15:8786 --nthreads 1 --nprocs 2
# then run the below:

python stuff.py wait 120
sleep 1
python stuff.py wait 60
sleep 1
python stuff.py message instantaneous-job

This script submits a long job, a shorter job, and then just an instantaneous job to show that there's a scheduling problem. When the jobs are submitted the worker will print out:

Submitted long runing job (120s); seceding while we wait
Doing the actual work (120s)
Submitted long runing job (60s); seceding while we wait
Doing the actual work (60s)
Finished working (60s)
Job done (60s); rejoining
Finished working (120s)
instantaneous-job
Job done (120s); rejoining

The problem is on the line that says Job done (60s); rejoining. At this point there's one idle worker that could be running the instantaneous job but it doesn't -- instead it waits on the 120s job. After the 120s job is done (about a minute later) that instantaneous job finally runs. Hence the worker is idle for about a minute.

Anything else we need to know?:
Sorry for the length; I don't think I can cut it down any more. If the problem isn't clear let me know and I'll see if I can explain better.

Environment:

  • Dask version: 2020.12.0
  • Python version: 3.6.9
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): pip
@JohnEmhoff
Copy link
Author

I just tried updating to the latest version (2021.1.1) and I think it's even worse now? The worker now prints this out (I added timestamps to the prints):

Fri Feb  5 14:13:57 2021 Submitted long runing job (120s); seceding while we wait
Fri Feb  5 14:13:57 2021 Doing the actual work (120s)
Fri Feb  5 14:15:57 2021 Finished working (120s)
Fri Feb  5 14:15:57 2021 Job done (120s); rejoining
Fri Feb  5 14:15:57 2021 Submitted long runing job (60s); seceding while we wait
Fri Feb  5 14:15:57 2021 Doing the actual work (60s)
Fri Feb  5 14:15:57 2021 instantaneous-job
Fri Feb  5 14:16:57 2021 Finished working (60s)
Fri Feb  5 14:16:57 2021 Job done (60s); rejoining

You can see that it waits for the first job to completely finish before it picks up the second one, despite there being a free worker (i.e. the one that called secede)

@crusaderky
Copy link
Collaborator

@jrbourbeau could you migrate this ticket to dask/distributed?

@quasiben quasiben transferred this issue from dask/dask Feb 10, 2021
@gforsyth
Copy link
Contributor

@JohnEmhoff -- thanks for reporting this!

I thought this might've been a problem I introduced when adding the TaskState class to worker.py but I can also reproduce this on dask[distributed]==2.30.0

Do you have a sense of when this last worked as expected?

@JohnEmhoff
Copy link
Author

@gforsyth We were on dask 2.18.0 for a while and I thought everything was fine there, but it looks like I'm able to reproduce it on that version too? Maybe we didn't notice or maybe I'm not properly installing the older versions:

$ pip freeze | egrep 'dask|distributed'
dask==2.18.1
distributed==2.18.0

@gforsyth
Copy link
Contributor

An observation -- if the shorter of the two jobs is started first, then the "instantaneous job" fires as soon as the first job finishes:

(I've changed the times to 20 and 10 seconds, respectively)

Submitted long runing job (10s); seceding while we wait
Doing the actual work (10s)
Submitted long runing job (20s); seceding while we wait
Doing the actual work (20s)
Finished working (10s)
instantaneous-job
Job done (10s); rejoining
Finished working (20s)
Job done (20s); rejoining

I haven't gone very deep on secession and rejoining, but it looks like the scheduler is assigning the task to the first worker it has available, or that it thinks is available.
This seems like a case in which work-stealing might correct when one worker was idle and the other was still occupied, but I think the fire_and_forget part messes that up.

@JohnEmhoff
Copy link
Author

Thanks for looking into this -- do you think there's a reasonable workaround while this exists?

@fjetter
Copy link
Member

fjetter commented Feb 17, 2021

Haven't looked deeply into this, yet, and there are differences due to seceding/long runnign jobs but a similar issue was reported in #4471

@gforsyth
Copy link
Contributor

Thanks for looking into this -- do you think there's a reasonable workaround while this exists?

I know your example is a simplified version of the workflow you have, but since this flow currently leaves workers idle, I would try adding more threads to each proc. That might increase GIL-contention but also might make things run more smoothly.

@jakirkham
Copy link
Member

Any more questions here @JohnEmhoff? 🙂

@jrbourbeau jrbourbeau added the needs info Needs further information from the user label Mar 18, 2021
@JohnEmhoff
Copy link
Author

@jakirkham not especially, thanks for asking! It seems there isn't much of a work-around though beyond over-provisioning, which isn't great at scale. Is this considered a bug? I feel like this is a serious gotcha in scheduling.

@chrisroat
Copy link
Contributor

I am the author of the related issue, and also am forced to over-provision. Is there any direction on where to look for issues? I'm spending some time this week learning the scheduler so as to look into this an other issues I'm having.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs info Needs further information from the user
Projects
None yet
Development

No branches or pull requests

7 participants