Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign tasks to idle workers rather than queuing them #7384

Open
NakulK48 opened this issue Dec 8, 2022 · 19 comments
Open

Assign tasks to idle workers rather than queuing them #7384

NakulK48 opened this issue Dec 8, 2022 · 19 comments

Comments

@NakulK48
Copy link

NakulK48 commented Dec 8, 2022

In our workflow, an HTTP server receives a request containing a payload for processing, starts a Dask cluster, splits the payload into batches and then submits each batch via Dask to the cluster (via fire_and_forget).

The workers are on EC2 (via Kubernetes) so naturally they don’t start up instantly - an instance needs to be found, it needs to download the Docker image, etc.

I just tried a small run which had 7 batches, and spun up a 7-worker cluster to run them. Intriguingly, even though all 7 machines were up and ready, only 3 of them were actually processing - 3 of the tasks were completed in parallel and then the next 3. Looking at the logs of the other 4 workers showed that nothing was happening at all.

I figure the scheduler must somehow be assigning all of these tasks to whichever machine happens to be available at the time they are submitted - but this doesn’t strike me as the right behaviour at all. Surely it’s only when the task is about to start that a worker node should be selected. So if only 3 nodes are up, the task has to wait to begin, but if an idle node is available, execution should start immediately on that node.

@jacobtomlinson jacobtomlinson transferred this issue from dask/dask-kubernetes Dec 9, 2022
@jacobtomlinson
Copy link
Member

I'm going to transfer this issue over to dask/distriubuted as it's not really related to the Kubernetes deployment tooling.

@NakulK48 could you share an example of the code you're running to submit the tasks?

@fjetter
Copy link
Member

fjetter commented Dec 9, 2022

We are indeed assigning tasks greedly to workers if they are available. once new workers join, we are rebalancing s.t. every worker has a roughly equal load. If this is not happening, something if off. Can you produce this somehow minimally?

Note: The latest releases introduced a change to this logic (I believe starting 2022.11.0). What version are you running?

@NakulK48
Copy link
Author

NakulK48 commented Dec 9, 2022

This is the full scheduler spec - we are using 2022.10, so I can try 2022.11 and see if that improves things:

  scheduler:
    service:
      ports:
      - name: tcp-comm
        port: 8786
        protocol: TCP
        targetPort: tcp-comm
      - name: http-dashboard
        port: 8787
        protocol: TCP
        targetPort: http-dashboard
      selector:
        dask.org/cluster-name: null
        dask.org/component: scheduler
      type: ClusterIP
    spec:
      serviceAccountName: research
      priorityClassName: dlf-workers
      containers:
      - args:
        - dask-scheduler
        - --host
        - 0.0.0.0
        env: []
        image: "ghcr.io/dask/dask:2022.10.0"
        livenessProbe:
          httpGet:
            path: /health
            port: http-dashboard
          initialDelaySeconds: 15
          periodSeconds: 20
        name: scheduler
        ports:
        - containerPort: 8786
          name: tcp-comm
          protocol: TCP
        - containerPort: 8787
          name: http-dashboard
          protocol: TCP
        readinessProbe:
          httpGet:
            path: /health
            port: http-dashboard
          initialDelaySeconds: 5
          periodSeconds: 10
        resources:
          limits:
            cpu: "1"
            memory: 8Gi
  

@NakulK48
Copy link
Author

NakulK48 commented Dec 9, 2022

Still the same issue with 2022.12 I'm afraid - three tasks started and all of the other workers are idle.

If it's relevant: this is making use of fire_and_forget (as the cluster is spun up, and the tasks are submitted, from a synchronous REST API endpoint; I'm open to alternative suggestions on handling this).

@gjoseph92
Copy link
Collaborator

Can you create a reproducer for this in any way? Something that can run with a local cluster instead of k8s?

Are the tasks using resource restrictions or worker restrictions or anything like that? Also, do the tasks have any dependencies?

If the tasks don't have dependencies, I'd guess #7274 would solve your problem. Probably all of the tasks are being submitted to the first worker that joins, and maybe once 3 workers have joined, the load-balancing algorithm (work stealing) no longer thinks it would be worth rebalancing them to new workers.

The work-stealing load-balancing algorithm is known to not perform that well when clusters scale up:

At a minimum, screenshots or a recording of the dashboard would be helpful in diagnosing this.

@NakulK48
Copy link
Author

Here's the dashboard, showing the idle workers:

image

Anything in particular you'd like to see? Unfortunately this wouldn't be reproducible on a local dask cluster, because you wouldn't see the same behaviour around machines spinning up at different times.

@NakulK48
Copy link
Author

NakulK48 commented Dec 13, 2022

Here's an interesting thing:

image

Note how it claims all three tasks are processing (but if I look at the S3 bucket where it should be writing, only one of these is processing).

@NakulK48
Copy link
Author

And one of the workers has -1 GPUs consumed...

image

@fjetter
Copy link
Member

fjetter commented Dec 13, 2022

Note how it claims all three tasks are processing

processing in this view merely means that they are assigned to a worker. However, since the worker has three threads it should also process them at the same time. I recommend verifying that your code does not acquire the GIL or any other locks.


Can you please share some code that represents the computation you are running? Without this we're mostly guessing. The screenshots you showed are not sufficient to debug this.

@NakulK48
Copy link
Author

I don't think GIL should really enter into this. The intention is very much that only one task runs on any given worker at a time, because these are single-GPU machines and each task needs the whole GPU.

  worker:
    replicas: null
    spec:
      serviceAccountName: research
      priorityClassName: dlf-workers
      containers:
      - args:
        - dask-worker
        - --name
        - $(DASK_WORKER_NAME)
        - --resources
        - GPU=1
        env: []
        image: null
        name: worker
        resources:
          limits:
            cpu: "3"
            memory: 14Gi
            nvidia.com/gpu: "1"
        with dask.annotate(resources={"GPU": 1}):
            future = client.submit(
                <snip>
            )
            LOGGER.info("Submitting task %s for run %s", task_index, run_id)
            futures.append(future)
            fire_and_forget(future)

@fjetter
Copy link
Member

fjetter commented Dec 13, 2022

The intention is very much that only one task runs on any given worker at a time, because these are single-GPU machines and each task needs the whole GPU.

That's the important piece of info.

It turns out that our documentation about dask.annotate + worker resources is wrong. If you are using the Client.submit API, it will not pick up annotations from the dask.annotate ctx manager.

instead, you need to provide resources explicitly to the submit call, e.g.

client.submit(foo, resources={"GPU": 1})

Can you please try this and see if the execution pattern then matches your expectations?

@NakulK48
Copy link
Author

NakulK48 commented Dec 13, 2022

That was actually the first thing I tried! Unfortunately that had the same issue. And indeed - it doesn't try to run them concurrently; it runs them sequentially, so it is picking up the requirement.

    for task_index, sub_request in enumerate(sub_requests):
        future = client.submit(
            worker.run_and_store_result,
            run_id=run_id,
            task_index=task_index,
            expected_task_count=task_count,
            request=sub_request,
            resources={"GPU": 1},
        )
        LOGGER.info("Submitting task %s for run %s", task_index, run_id)
        futures.append(future)
        fire_and_forget(future)

@fjetter
Copy link
Member

fjetter commented Dec 13, 2022

can you try to reproduce this issue with a local cluster and some dummy code, please?

@fjetter
Copy link
Member

fjetter commented Dec 13, 2022

For reference, here the ticket about the false documentation #7397

@NakulK48
Copy link
Author

I wouldn't think this problem would occur locally. It's specifically a problem centred around workers that spawn at different times.

@NakulK48
Copy link
Author

More importantly, this sounds very similar to the issues mentioned above. Is the functional workaround just to wait for all workers to be ready, and only submit tasks after that point?

@NakulK48
Copy link
Author

Interestingly, with larger datasets (70ish tasks) this does appear to do the right thing, evenly scaling tasks between workers. It's only when it tries to schedule 7 tasks on 7 workers that it decides to allocate 3 tasks to some workers, and 0 to others.

@gjoseph92
Copy link
Collaborator

I wouldn't think this problem would occur locally. It's specifically a problem centred around workers that spawn at different times.

You could mimic this by calling scale on a LocalCluster

with larger datasets (70ish tasks) this does appear to do the right thing

My guess is still:

You're probably hitting the work-stealing code path to balance tasks between workers when scaling. When you increase the number of tasks, you instead hit the queuing code path. Work stealing is known to not do very well when scaling up. Queuing is much simpler and consistently does the right thing when scaling up. Note that once you set the GPU annotation correctly, you also won't use queuing; queuing is not compatible with resource restrictions.

@gjoseph92
Copy link
Collaborator

I also think you might be confused by

Even though you say that a task requires 1 GPU, and workers only have 1 GPU, the scheduler won't consider how much GPU is currently available when deciding where to run a task—just whether a GPU is available.

If you explicitly set --nthreads 1 (and then explicitly pass a memory limit) to the worker process, you'll likely see better scheduling, since it doesn't seem like you were utilizing three threads anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants