Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heteogenous CUDA Cluster #1161

Open
jackyko1991 opened this issue Apr 14, 2023 · 6 comments
Open

Heteogenous CUDA Cluster #1161

jackyko1991 opened this issue Apr 14, 2023 · 6 comments

Comments

@jackyko1991
Copy link

jackyko1991 commented Apr 14, 2023

I am trying to build a customized cluster with multiple CPU workers and single GPU worker.

To build this heterogenous cluster, I have used Dask SpecCluster.

On our workstation we have 16 CPU cores, 1 GPU and 128GB RAM. We want to distribute the resources evenly with the specification:

worker nthreads memory GPU
cpu-0 4 32GB 0
cpu-1 4 32GB 0
cpu-2 4 32GB 0
gpu-0 4 32GB 1

To achieve this in dask I use the following script:

import dask
from dask.distributed import Client, Scheduler, Worker, Nanny, SpecCluster
import multiprocessing
import psutil

# gather device info
cpu_count = multiprocessing.cpu_count()
memory_count = psutil.virtual_memory().total
print("CPU count:", cpu_count)
print("System memory:",memory_count)

specs = {
    "cpu":{
        "scale":3,
        "resources":{
        }
    },
    "gpu":{
        "scale":1,
        "resources":{
            "CUDA_VISIBLE_DEVICES": [0]
        }
    }
}

worker_count = 0
for v in specs.values():
    worker_count += v["scale"]

nthreads = cpu_count//worker_count
memory_limit = int(memory_count*0.9)//worker_count # set to use 90% of the system memory to avoid crashing

print("number of workers:", worker_count)
print("threads per worker:", nthreads)
print("memory limit per worker:", round(memory_limit/(1024*1024*1024),2), "GB")

workers = {}

for k, v in specs.items():
    for i in range(v["scale"]):
        if "CUDA_VISIBLE_DEVICES" in v["resources"].keys():
            workers["{}-{}".format(k,i)] = worker_spec(threads_per_worker=nthreads, memory_limit=memory_limit, CUDA_VISIBLE_DEVICES=v["resources"]["CUDA_VISIBLE_DEVICES"])[0]
        else:
            workers["{}-{}".format(k,i)] = {
                "cls":Nanny,
                "options":{
                    "nthreads": nthreads,
                    "memory_limit": memory_limit
                    }
             }     
            
workers

"""
{'cpu-0': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'cpu-1': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'cpu-2': {'cls': distributed.nanny.Nanny,
  'options': {'nthreads': 4, 'memory_limit': 30317854924}},
 'gpu-0': {'cls': distributed.nanny.Nanny,
  'options': {'env': {'CUDA_VISIBLE_DEVICES': '0'},
   'interface': None,
   'protocol': None,
   'nthreads': 4,
   'data': dict,
   'dashboard_address': ':8787',
   'plugins': [<dask_cuda.utils.CPUAffinity at 0x7f35c8ea7880>],
   'silence_logs': True,
   'memory_limit': 134746021888.0,
   'preload': 'dask_cuda.initialize',
   'preload_argv': '--create-cuda-context'}}}
"""


scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
client

In ordinary dask resource manage we suppose to call resources to selectively choose the worker:

# specify the worker for the compute process
with dask.annotate(worker={'GPU': 1}):
    res = dask.compute(*futures)

Can we do a samilar behavior for cuda worker?

It is expected to have 4 CPU affinity to single GPU, but turns out give me the GPU is attached to all the CPU workers. Also memory limit is not applied for the gpu worker

Untitled

Can dask_cuda build this type of cluster?

@jackyko1991
Copy link
Author

jackyko1991 commented Apr 14, 2023

memory_limit = MEMORY_LIMIT / get_gpu_count()

This line seems will override user-specified per worker memory limit, should that be changed?

@quasiben
Copy link
Member

Dask-CUDA will probably not handle this kind of automated cluster creation. Instead, we (@jacobtomlinson ) has explored a bit around inferring hardware and auto annotating that cluster in https://github.com/jacobtomlinson/dask-agent

I also played with this idea a bit in rapidsai/cudf#11599 but have since paused these experiments.

What you have above does not seem correct -- I think you are trying to build 5 workers total: 4 CPU and 1 GPU. Is that correct ?

@jacobtomlinson
Copy link
Member

The Jupyter widget may be incorrectly showing a GPU for non-GPU workers. Although technically there is nothing stopping the regular workers from accessing the GPU so it's not incorrect that they can see the GPU. It might be worth setting CUDA_VISIBLE_DEVICES="" for those workers to obscure the GPU from them, not that it will matter as the annotation will steer GPU tasks to the GPU worker.

@jackyko1991
Copy link
Author

@quasiben I have tested to utilize all CPUs only need to use all workers no matter if it is with GPU or not. What I am trying to do here is to have a single GPU worker affinity to some CPU threads so whenever calling the GPU worker it will not infer other CPU processes. This way of setup sometimes provides me with more efficient asynchronized calls of dask tasks.

So should that be 3CPU workers + 1 GPU worker?

@jackyko1991
Copy link
Author

jackyko1991 commented Apr 18, 2023

@jacobtomlinson tested with following attempt:

CUDA_VISIBLE_DEVICES to jupyter: 2,3

ws = worker_spec(threads_per_worker=nthreads, )
ws[2]["options"]["env"]["CUDA_VISIBLE_DEVICES"] = ""
ws 

"""
{2: {'cls': distributed.nanny.Nanny,
  'options': {'env': {'CUDA_VISIBLE_DEVICES': ''},
   'interface': None,
   'protocol': None,
   'nthreads': 4,
   'data': dict,
   'dashboard_address': ':8787',
   'plugins': [<dask_cuda.utils.CPUAffinity at 0x7fb9ece55930>],
   'silence_logs': True,
   'memory_limit': 67592159232.0,
   'preload': 'dask_cuda.initialize',
   'preload_argv': '--create-cuda-context'}},
 3: {'cls': distributed.nanny.Nanny,
  'options': {'env': {'CUDA_VISIBLE_DEVICES': '3,2'},
   'interface': None,
   'protocol': None,
   'nthreads': 4,
   'data': dict,
   'dashboard_address': ':8787',
   'plugins': [<dask_cuda.utils.CPUAffinity at 0x7fb9ece55660>],
   'silence_logs': True,
   'memory_limit': 67592159232.0,
   'preload': 'dask_cuda.initialize',
   'preload_argv': '--create-cuda-context'}}}
"""

Still creates the worker bind to the GPU:
Untitled

Somehow I tried in this way on a 4GPU 20 core system:

  1. Using environment variable CUDA_VISIBLE_DEVICES=2,3 for notebook launching and gpu process isolation
  2. Setup of 1 CUDA worker per GPU. For dask resource management suppose with annotation you can call two GPU for one worker, but seems not suitable for dask cuda
# Note that scale time GPU per worker should never exceed length of specified CUDA_VISIBLE_DEVICES
specs = {
    "cpu":{
        "scale":3,
        "resources":{
        }
    },
    "gpu":{
        "scale":2,
        "CUDA_VISIBLE_DEVICES": [2,3],
        "resources":{
            "GPU": 1 # must be 1 under cuda-dask
        }
    }
}

assert specs["gpu"]["scale"]*specs["gpu"]["resources"]["GPU"] <= len(specs["gpu"]["CUDA_VISIBLE_DEVICES"]), "Number of gpu workers (scale) times GPU per worker should not exceed length of CUDA_VISIBLE_DEVICES"

worker_count = 0
for v in specs.values():
    worker_count += v["scale"]

nthreads = cpu_count//worker_count
memory_limit = int(memory_count*0.9)//worker_count # set to use 90% of the system memory to avoid crashing

print("number of workers:", worker_count)
print("threads per worker:", nthreads)
print("memory limit per worker:", round(memory_limit/(1024*1024*1024),2), "GB")
print("GPU workers:", specs["gpu"]["scale"])
print("GPU per worker:", specs["gpu"]["resources"]["GPU"])

"""
number of workers: 5
threads per worker: 4
memory limit per worker: 45.32 GB
GPU workers: 2
GPU per worker: 1
"""
workers = {}

for k, v in specs.items():
    for i in range(v["scale"]):
        if "CUDA_VISIBLE_DEVICES" in v.keys():
            CUDA_VISIBLE_DEVICES = v["CUDA_VISIBLE_DEVICES"]
            gpu_per_worker = v["resources"]["GPU"]
            assert gpu_per_worker == 1, "gpu per worker need to be 1 for dask cuda"
            ws = worker_spec(
                threads_per_worker=nthreads, 
                CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES[gpu_per_worker*i:gpu_per_worker*(i+1)]
                )
            workers["{}-{}".format(k,i)] = ws[v["CUDA_VISIBLE_DEVICES"][i]]
            workers["{}-{}".format(k,i)]["options"]["resources"]={"GPU":v["resources"]["GPU"]}
            workers["{}-{}".format(k,i)]["options"]["memory_limit"]=memory_limit
        else:
            workers["{}-{}".format(k,i)] = {
                "cls":Nanny,
                "options":{
                    "nthreads": nthreads,
                    "memory_limit": memory_limit
                    }
             }     
            
scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
client

Even the workers are appear to bind with gpus but with dask annotation:

# specify the worker for the compute process
with dask.annotate(resources={'GPU': 1}):
    res = dask.compute(*futures)

can properly restrict cuda process on the gpu workers only

@jacobtomlinson
Copy link
Member

I wouldn't suggest having more than one GPU per worker. My point was more that the screenshots you're sharing that show the GPU listed on the non-GPU workers is likely to be a UI bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants