Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Distributed checkpointing API for dask collections #8483

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

rjzamora
Copy link
Member

This PR includes a rough POC for a distributed checkpointing API.

Background

I have come across several RAPIDS + Dask projects that are forced to perform a global "checkpoint" in the middle of their workflow by writing a Parquet dataset to shared storage. Due to extreme memory pressure, it is not practical for these workflows to execute their end-to-end data pipeline as a single task graph. P2p shuffling can sometimes help a subset of these workflows. However, explicit IO tends to be much faster and more stable when fast/shared storage is available. The problem with this naive checkpointing approach is that "shared" storage is not always fast and/or available. It is much more common to have fast node-local storage.

We are currently exploring the possibility of adding an API for distributed checkpointing in RAPIDS. By "distributed", I mean that workers must be able to write to and read from node-local storage. The idea is similar to persisting a collection and forcing workers to spill all partitions. However, this API would also allow the user to completely restart the cluster after checkpointing.

I'm not sure if this kind of API is generally desired. However, I wanted to share the idea early in case it is.

Example Usage

import dask.dataframe as dd

from distributed import LocalCluster, Client
from distributed.checkpoint import DataFrameCheckpoint

client = Client(LocalCluster())

# Start memory-intensive ETL
ddf = dd.read_parquet(...)
ddf = ddf.shuffle("id")

# Persist the data to worker-local storage.
# This could also look something like: `ddf.checkpoint(path)`
ckpt = DataFrameCheckpoint.create(ddf, "./my_checkpoint")

# Can delete existing collections and even restart the client
def ddf
client.restart()

# Load new DataFrame collection from the checkpoint
ddf2 = ckpt.load()  # Still a lazy DataFrame collection
...

Copy link
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    26 files   -     1      26 suites   - 1   9h 23m 7s ⏱️ - 41m 1s
 3 966 tests +    1   3 844 ✅ ±    0    109 💤 ± 0   13 ❌ +1 
48 487 runs   - 1 378  46 114 ✅  - 1 317  2 227 💤  - 63  146 ❌ +2 

For more details on these failures, see this check.

Results for commit fe789b0. ± Comparison against base commit ade01b3.

@fjetter
Copy link
Member

fjetter commented Jan 31, 2024

I think I do not understand exactly what problem this is solving and why this API is better than writing to and reading from an ordinary parquet dataset.
OTOH I see a lot of things that can go wrong by hard pinning tasks to individual workers. Besides resilience which is obviously gone, this also changes scheduler heuristics. Most notably, task queuing is disabled which is particularly for those kind of data loading tasks important.

@rjzamora
Copy link
Member Author

Thanks for engaging @fjetter !

Important Disclaimer: Overall, I'm not expecting there to be an appetite or need for this kind of feature in mainline dask/distributed. I get the sense that the key challenges it aims to solve are slightly RAPIDS-specific, and so the primary purpose of this POC is for general awareness and discussion. I don't personally want to introduce any new APIs at all. Even if a bunch of people responded to this POC with "+1", I'd still be pretty hesitant to merge anything in the near future.

I think I do not understand exactly what problem this is solving and why this API is better than writing to and reading from an ordinary parquet dataset.

Writing an ordinary parquet dataset requires you to use shared storage. This API would be used in cases where such storage is not guaranteed.

Further context: Most dask/dask users shouldn't need something like this. Worker spilling and "p2p" shuffling should replace the need for this kind of checkpointing in most cases. In RAPIDS, effective spilling is much more of a challenge (i.e. OOM errors are still common), and the pyarrow components of "p2p" are painfully slow. In order to get around these two issues, we use this kind of checkpointing as a form of "manual" spilling.

OTOH I see a lot of things that can go wrong by hard pinning tasks to individual workers. Besides resilience which is obviously gone, this also changes scheduler heuristics. Most notably, task queuing is disabled which is particularly for those kind of data loading tasks important.

Yes. If the user is simply checkpointing to shared storage, there is no reason to pin tasks to workers. A "real" implementation would effectively fall back to something like dd.read_parquet whenever possible. We only need to sacrifice resilience and scheduler heuristics when the checkpoint location is worker-local storage (e.g. local SSD's with GPU-direct storage support). Even in that case, the logic used in this POC can obviously be improved a bit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants