Skip to content

Commit

Permalink
[doc][dask] Update notes about k8s. (#10271)
Browse files Browse the repository at this point in the history
  • Loading branch information
trivialfis committed May 13, 2024
1 parent 75fe2ff commit 871fabe
Showing 1 changed file with 55 additions and 17 deletions.
72 changes: 55 additions & 17 deletions doc/tutorials/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,53 +237,91 @@ For most of the use cases with GPUs, the `Dask-CUDA <https://docs.rapids.ai/api/
Working with other clusters
***************************

Using Dask's ``LocalCluster`` is convenient for getting started quickly on a single-machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.
Using Dask's ``LocalCluster`` is convenient for getting started quickly on a local machine. Once you're ready to scale your work, though, there are a number of ways to deploy Dask on a distributed cluster. You can use `Dask-CUDA <https://docs.rapids.ai/api/dask-cuda/stable/quickstart.html>`_, for example, for GPUs and you can use Dask Cloud Provider to `deploy Dask clusters in the cloud <https://docs.dask.org/en/stable/deploying.html#cloud>`_. See the `Dask documentation for a more comprehensive list <https://docs.dask.org/en/stable/deploying.html#distributed-computing>`_.

In the example below, a ``KubeCluster`` is used for `deploying Dask on Kubernetes <https://docs.dask.org/en/stable/deploying-kubernetes.html>`_:

.. code-block:: python
from dask_kubernetes import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator import KubeCluster # Need to install the ``dask-kubernetes`` package
from dask_kubernetes.operator.kubecluster.kubecluster import CreateMode
from dask.distributed import Client
from xgboost import dask as dxgb
import dask
import dask.array as da
dask.config.set({"kubernetes.scheduler-service-type": "LoadBalancer",
"kubernetes.scheduler-service-wait-timeout": 360,
"distributed.comm.timeouts.connect": 360})
def main():
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
'''Connect to a remote kube cluster with GPU nodes and run training on it.'''
m = 1000
n = 10
kWorkers = 2 # assuming you have 2 GPU nodes on that cluster.
# You need to work out the worker-spec yourself. See document in dask_kubernetes for
# its usage. Here we just want to show that XGBoost works on various clusters.
cluster = KubeCluster.from_yaml('worker-spec.yaml', deploy_mode='remote')
cluster.scale(kWorkers) # scale to use all GPUs
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# See notes below for why we use pre-allocated cluster.
with KubeCluster(
name="xgboost-test",
image="my-image-name:latest",
n_workers=kWorkers,
create_mode=CreateMode.CONNECT_ONLY,
shutdown_on_close=False,
) as cluster:
with Client(cluster) as client:
X = da.random.random(size=(m, n), chunks=100)
y = X.sum(axis=1)
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
regressor = dxgb.DaskXGBRegressor(n_estimators=10, missing=0.0)
regressor.client = client
regressor.set_params(tree_method='hist', device="cuda")
regressor.fit(X, y, eval_set=[(X, y)])
if __name__ == '__main__':
# Launch the kube cluster on somewhere like GKE, then run this as client process.
# main function will connect to that cluster and start training xgboost model.
main()
Different cluster classes might have subtle differences like network configuration, or
specific cluster implementation might contains bugs that we are not aware of. Open an
issue if such case is found and there's no documentation on how to resolve it in that
cluster implementation.

An interesting aspect of the Kubernetes cluster is that the pods may become available
after the Dask workflow has begun, which can cause issues with distributed XGBoost since
XGBoost expects the nodes used by input data to remain unchanged during training. To use
Kubernetes clusters, it is necessary to wait for all the pods to be online before
submitting XGBoost tasks. One can either create a wait function in Python or simply
pre-allocate a cluster with k8s tools (like ``kubectl``) before running dask workflows. To
pre-allocate a cluster, we can first generate the cluster spec using dask kubernetes:

.. code-block:: python
import json
from dask_kubernetes.operator import make_cluster_spec
spec = make_cluster_spec(name="xgboost-test", image="my-image-name:latest", n_workers=16)
with open("cluster-spec.json", "w") as fd:
json.dump(spec, fd, indent=2)
.. code-block:: sh
kubectl apply -f ./cluster-spec.json
Check whether the pods are available:

.. code-block:: sh
kubectl get pods
Once all pods have been initialized, the Dask XGBoost workflow can be run, as in the
previous example. It is important to ensure that the cluster sets the parameter
``create_mode=CreateMode.CONNECT_ONLY`` and optionally ``shutdown_on_close=False`` if you
do not want to shut down the cluster after a single job.

*******
Threads
*******
Expand Down

0 comments on commit 871fabe

Please sign in to comment.