Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST][Bug?] Can I fit/evaluate many XGBoost models on the same cluster? #8623

Open
rjzamora opened this issue Apr 17, 2024 · 10 comments
Open

Comments

@rjzamora
Copy link
Member

Description of possible bug

When I try to fit/evaluate many xgboost.dask models in parallel, one or more of the fit/evaluation futures hangs forever. The hang only occurs when threads_per_worker=1 (which is the default for GPU-enabled clusters in dask-cuda).

Is this a bug, or is the reproducer shown below known to be wrong or dangerous for a specific reason?

Reproducer

from dask.distributed import LocalCluster, Client, as_completed
from dask.delayed import delayed
from dask_ml.datasets import make_classification
from dask_ml.model_selection import train_test_split
from xgboost.dask import DaskXGBClassifier


n_workers = 8
threads_per_worker = 1  ###  Using >1 avoids the hang  ###
n_trials = n_workers


if __name__ == "__main__":
    
    # Start up the dask cluster
    cluster = LocalCluster(
        n_workers=n_workers,
        threads_per_worker=threads_per_worker,
    )
    client = Client(cluster)

    # Mimic an HPO objective function (fit/predict with xgboost)
    def objective(random_state):
        X_param, y_param = make_classification(
            n_samples=1000,
            n_features=20,
            chunks=100,
            n_informative=4,
            random_state=random_state,
        )

        X_train, X_valid, y_train, y_valid = train_test_split(
            X_param, y_param, random_state=random_state
        )

        classifier = DaskXGBClassifier(
            **{
                'objective': 'binary:logistic',
                'max_depth': 4,
                'eta': 0.01,
                'subsample': 0.5,
                'min_child_weight': 0.5,
            }
        )

        classifier.fit(X_train, y_train)
        y_pred = classifier.predict(X_valid)
        return

    # Submit and compute many delayed jobs at once
    jobs = []
    delayed_objective = delayed(objective)
    for rs in range(0, n_trials):
        jobs.append(delayed_objective(rs))
    for i, future in enumerate(as_completed(client.compute(jobs))):
        print(f"Job {i} done.")
    print("All jobs done!")

Further background

There is a great blog article from Coiled that demonstrates Optuna-based HPO with XGBoost and dask. The article states: "the current xgboost.dask implementation takes over the entire Dask cluster, so running many of these at once is problematic." Note that the "problematic" practice described in that article is exactly what the reproducer above is doing. With that said, it is not clear to me why one or more workers might hang.

NOTE: I realize this is probably more of an xgboost issue than a distributed issue. However, it seems clear that significant dask/distributed knowledge is needed to pin down the actual problem. Any and all help, advice, or intuition is greatly appreciated!

@rjzamora
Copy link
Member Author

Just a note that the reproducer is certainly submiting tasks from the workers. Therefore this documentation is probably relevant. Perhaps there is a deadlock that can be avoided if xgboost knows it is running from a worker and can secede/rejoin when necessary?

@mrocklin
Copy link
Member

mrocklin commented Apr 17, 2024 via email

@rjzamora
Copy link
Member Author

Thanks @mrocklin ! I saw this repository the other day, and actually did suggest the multi-cluster approach to the original user who ran into this issue. It is clear to me that the multi-cluster approach should work well. What is not clear to me is the specific reason that the single-cluster approach will hang.

If it is possible for XGBoost to make the single-cluster approach work well, they seem interested to know what that change looks like.

@mrocklin
Copy link
Member

I agree that it would be good if the xgboost.dask approach could happily share the cluster. If your question is "does it?" then I think that the answer is "no" at least when the first prototype was made. I think that you'd have to check with the current sourcecode or folks who wrote that code to be sure.

If the question is "could it?" then sure the answer is yes, although there are likely tricky things you'd want to figure out, like if xgboost jobs should queue, or if they should shard the cluster, or if they should run concurrently, etc. and how to manage that.

@trivialfis
Copy link

trivialfis commented Apr 17, 2024

I think that you'd have to check with the current sourcecode or folks who wrote that code to be sure.

Thank you for the suggestion. Is there any particular pattern I should avoid (to not hang)?

If the question is "could it?" then sure the answer is yes,

Let's say it should run sequentially, which is the simplest case. Currently, XGBoost uses the distributed.MultiLock before launching a training task to ensure all requested workers are ready for the training task. As a result, I expect multiple training tasks to run sequentially when the data is balanced across all workers instead of hang.

If nested parallelism is not supported, an error would also help.

@mrocklin
Copy link
Member

I'm not sure I understand the question that is being asked. If the question is "how would you design xgboost.dask so that it could be run multiple times at once?" then I'd need to think a lot longer (although maybe someone else can help). That's an interesting question that probably deserves a lot of thought.

If the question is "how do you enforce sequential work" then it probably depends on what you're doing. For example, you could use a threading.Lock or something similar around your train method to make sure that it wasn't run multiple times? That would be a very simple solution that has nothing to do with Dask.

If you're doing something like the above then you'd want to swap out threading.Lock for dask.distributed.Lock and then yes, you'd want to make sure that the threads weren't totally locked. The MultiLock is waiting on all of the workers to be ready to run but the workers are busy waiting on your initial Lock.

If your question is "is this a bug" then I think my answer is "no", you're just dealing with lots of Locks and you've set them up in such a way that they deadlock. Using something like secede/rejoin as @rjzamora described above is one way to start working through this. You'll probably have to be careful though.

@trivialfis
Copy link

trivialfis commented Apr 17, 2024

The MultiLock is waiting on all of the workers to be ready to run but the workers are busy waiting on your initial Lock.

That makes sense. So, if I understand it correctly, the observed hang is caused by the worker having a single thread, and that sole thread is being used to wait for the lock?

If your question is "is this a bug" then I think my answer is "no", you're just dealing with lots of Locks and you've set them up in such a way that they deadlock.

There's only one lock in xgboost. I can't think of another way to ensure the workers' availability.

I'm not sure I understand the question that is being asked.

Since the underlying implementation of xgboost.dask is still quite similar to the original dask-xgboost with the addition of a multi-lock, my question is whether there's an anti-pattern that prevents xgboost from being used in this nested way. But if it's caused by the thread, then the question is not needed. Apologies for the ambiguity.

If my understanding of the underlying cause is correct, then I agree that it's not a bug but more a limitation of the current design. This will bring us back to the feature request for dask to support collective/mpi tasks so that xgboost can remove/replace the lock. Will leave that for another dask sync. ;-)

@mrocklin
Copy link
Member

So, if I understand it correctly, the observed hang is caused by the worker having a single thread, and that sole thread is being used to wait for the lock

I don't know for sure. I'd have to look at the xgboost.dask code to see what is going on.

Will leave that for another dask sync. ;-)

I recommend raising an issue. Most technical conversation for Dask happens in issues, not at the monthly meetings. Alternatively, if you want to talk synchronously with someone you could e-mail that person and ask for a conversation. (I am not the person to talk to here FWIW).

@fjetter
Copy link
Member

fjetter commented Apr 18, 2024

I very briefly looked over the implementation of the xgboost dask integration. It's a little difficult to wrap my head around what's going on. However, I'm pretty sure it's because you are indeed submitting tasks from within tasks without seceding.

  1. I recommend not using the async client. The API for Clients is sometimes not working very well (e.g. when the asynchronous keyword is not properly set) and sometimes it is a little off when working with collections. This is just a recommendation and probably not related to the deadlock
  2. I recommend using the worker_client which wraps the correct secede and rejoin calls in a contextmanager and makes sure you get a cached client on the worker. This is typically more robust and less error prone than implementing anything yourself with secede/rejoin

@trivialfis
Copy link

I recommend raising an issue.

Thank you for the suggestion, will try to write a feature request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants