Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive doesn't scale quickly for long-running jobs #3627

Closed
kazimuth opened this issue Mar 23, 2020 · 5 comments
Closed

Adaptive doesn't scale quickly for long-running jobs #3627

kazimuth opened this issue Mar 23, 2020 · 5 comments

Comments

@kazimuth
Copy link

I'm using dask to coordinate some long-running machine learning jobs. I've set up an adaptive cluster (with dask_jobqueue) that has a minimum of 5 workers and a maximum of 10. Each task I dispatch takes about two hours to run and consistently uses ~100% of the CPU available to it. However, the adaptive cluster doesn't seem to want to add any more workers. It sits at the minimum number and never increases. Is there some way to modify the scheduling policy so that the cluster scales up more aggressively?

I'm aware this isn't exactly the sort of job dask is designed to schedule -- it wants smaller, faster tasks was my impression. I think you might be able to modify Adaptive to use a different policy that's better suited for long-running jobs? But I spent some time digging into the source and got kinda lost. Any pointers would be helpful :)

My current workaround is ignoring Adaptive and scaling the cluster by hand. I feel bad for taking up nodes longer than I need though.

@fjetter
Copy link
Member

fjetter commented Mar 24, 2020

The decision to scale up or down is currently coupled to the measurement of a tasks runtime. (Similar tasks are grouped, see TaskPrefix)
As long as the runtime wasn't measured at least once, the cluster cannot estimate how long the entire computation graph might take and it will not scale up (see here, it boils down to Scheduler.total_occupancy if you want to go down the rabbit hole). You should see the cluster scale up once the first job finishes.

We're facing the same issue, see #3516

A workaround for this is to configure default task durations which are used as long as there are no measurements available, e.g.

distributed:
  scheduler:
    default-task-durations:
      my-function: 2h  # This should be the same name you see on the dashboard

@fjetter
Copy link
Member

fjetter commented Apr 3, 2020

Will close this issue as a duplicate to #3516

I can also recommend to have a look at #3642 which introduces an option to define a default runtime for unknown tasks which might offer a practical way to resolve your issue as well.

@chrisroat
Copy link
Contributor

chrisroat commented Jan 27, 2021

A workaround for this is to configure default task durations which are used as long as there are no measurements available, e.g.

distributed:
  scheduler:
    default-task-durations:
      my-function: 2h  # This should be the same name you see on the dashboard

Does this get picked up dynamically, say when I start a cluster scheduler via dask-gateway, or for a particular graph? Meaning, does it work to set the context, or is it a startup-only configuration? How are underscore vs dash handled (as I know they are generally interchangeable)?

with dask.config.set({'distributed.scheduler.default-task-durations.segment_chunk': '1h'}):
  dask.compute(...)

@fjetter
Copy link
Member

fjetter commented Jan 27, 2021

Does this get picked up dynamically, say when I start a cluster scheduler via dask-gateway, or for a particular graph?

afaik, yes, it is dynamic and the value of the config is read during task initialization.

How are underscore vs dash handled (as I know they are generally interchangeable)?

Very good question, I believe it is not interchangeable. Theoretically, it should be whatever distributed.utils.key_split returns

If you set key names yourself:

In [14]: key_split(f"my-func-name-{uuid.uuid4()}")
Out[14]: 'my-func-name'

In [15]: key_split(f"my-func_name-{uuid.uuid4()}")
Out[15]: 'my'

If you're executing your own functions and are using client.submit and/or dask.delayed probably more relevant is this snippet. from my understanding user defined functions will never have a dash since the function name is taken literally. The dashes come in with dask internals where the computations are "handwritten"

In [16]: def my_func_name():
    ...:     pass

In [19]: key_split(funcname(my_func_name) + "-" + str(uuid.uuid4()))
Out[19]: 'my_func_name'

@chrisroat
Copy link
Contributor

Is there a correct incantation of this that would rectify the problem in #4471 ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants