Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing Out-of-Core Merge #1143

Open
quasiben opened this issue Mar 27, 2023 · 0 comments
Open

Failing Out-of-Core Merge #1143

quasiben opened this issue Mar 27, 2023 · 0 comments

Comments

@quasiben
Copy link
Member

I have a somewhat representative (and currently failing) example of merging two dataframes in a resources constrained environment:

df_base = 295GB and 10674 partitions
df_other = 466GB and 2576 partitions

Each dataframe has two random int columns Key and Payload, both int64

In [7]: ddf_base.head()
Out[7]:
            key   payload
shuffle
0        113664  38855413
0        113671  13729563
0        113673   2885245
0        113681  14508293
0        113689   9661924

Here's a more complete script:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf

def main():
    cluster = LocalCUDACluster(protocol="ucx",
                               CUDA_VISIBLE_DEVICES="4,5,6,7",
                               threads_per_worker=1,
                               rmm_pool_size="30GB",
                               rmm_async=True,
                               rmm_release_threshold='25GB')
    client = Client(cluster)
    ddf_base = dask_cudf.read_parquet('/datasets/bzaitlen/GitRepos/random-parquet-data/df_base.parquet/')
    ddf_other = dask_cudf.read_parquet('/datasets/bzaitlen/GitRepos/random-parquet-data/df_other.parquet/')
    ddf_join = ddf_base.merge(ddf_other, on=["key"], how="inner")
    ddf_join.to_parquet("/datasets/bzaitlen/GitRepos/random-parquet-data/df_join.parquet")



if __name__ == "__main__":
    main()

The above fails with OOM and sometimes random UCX errors (after hours off waiting) using the somewhat new cudf_spilling manager using both explicit-comms and regular tasks. Note that the above script limits the number of GPUs to 4 -- the equivalent of 128GB of GPU memory (significantly less than the total data size) :

time DASK_EXPLICIT_COMM=False CUDF_SPILL=1 python ooc-merge.py 2>&1 | tee merge-res.txt

The hope is that p2p shuffling provides stable infrastructure for accomplishing this out-of-core shuffle/merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant