Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should we deprecate chunksize from read_parquet? #9043

Closed
rjzamora opened this issue May 5, 2022 · 19 comments · May be fixed by #9197
Closed

Should we deprecate chunksize from read_parquet? #9043

rjzamora opened this issue May 5, 2022 · 19 comments · May be fixed by #9197

Comments

@rjzamora
Copy link
Member

rjzamora commented May 5, 2022

Follow-up to recent issues like #8937

Recent PRs have simplified the read_parquet API a bit, but the code is still vast. After some careful consideration, I'd like to suggest that we deprecate the chunksize option.

Why to deprecate chunksize:

  • No one seems to be using it (Please do correct me if I'm wrong!)
  • The current implementation is not very efficient (and is extremely slow on remote file systems)
  • There is no case where this feature is required for stability (probably why it is never used)

cc @jcrist

@github-actions github-actions bot added the needs triage Needs a response from a contributor label May 5, 2022
@rjzamora rjzamora added dataframe io parquet and removed needs triage Needs a response from a contributor labels May 5, 2022
jsignell pushed a commit that referenced this issue May 10, 2022
…` and ``aggregate_files`` (#9052)

As discussed in #9043 (for `chunksize`) and #9051 (for `aggregate_files`), I propose that we deprecate two complex and rarely-utilized arguments from `read_parquet`: `chunksize` and `aggregate_files`.

This PR simply adds "pre-deprectation" warnings for the targeted arguments (including links to the relevant Issues discussing their deprecation).  My goal is to find (and inform) whatever users may be depending on these obscure options.
erayaslan pushed a commit to erayaslan/dask that referenced this issue May 12, 2022
…` and ``aggregate_files`` (dask#9052)

As discussed in dask#9043 (for `chunksize`) and dask#9051 (for `aggregate_files`), I propose that we deprecate two complex and rarely-utilized arguments from `read_parquet`: `chunksize` and `aggregate_files`.

This PR simply adds "pre-deprectation" warnings for the targeted arguments (including links to the relevant Issues discussing their deprecation).  My goal is to find (and inform) whatever users may be depending on these obscure options.
@alienscience
Copy link

We use chunksize quite a lot because some of the data we have to read is not under our control and has a large number of partitions i.e: 1 parquet sub-directory contains many files. I believe the large number of partitions causes the task graph to grow and Dask workers to die.

Since moving over to using the following:

       df = dd.read_parquet(path,
                            engine="pyarrow",
                            chunksize="200MB",
                            aggregate_files=True,
                            gather_statistics=True,
                            filters=some_filter)

Reading the data, and processing it, has become faster and more reliable.

My guess is, that in the future, the replacement will be split_row_groups=N where N is a number we reach by trail and error?

@mrocklin
Copy link
Member

mrocklin commented May 18, 2022 via email

@rjzamora
Copy link
Member Author

rjzamora commented May 18, 2022

Rather than deprecate could we lean in and automate?

For example we might randomly sample the size of row groups, and from that
and a default chunk size determine how many row groups to include in every
task.

Yes - This is similar to what Merlin/NVTabular does, and is much easier to support than the current chunksize behavior.

Thanks for the example @alienscience - It seems like you are relying on the current relationship between chunksize and aggregate_files to combine multiple small files into each partition - In that case I will try to think of a way to preserve your needs (for this Issue and #9051)

@g4brielvs
Copy link

g4brielvs commented Jun 1, 2022

We use chunksize in our data pipelines at the Development Data Partnership to re-partition and maximize the number of active workers.

We read a small number of large Apache Parquet files that expanded in memory can easily exceed 32GB each. Using an 64-core/512GB machine, for example, we could use 16 workers (32GB per worker) and 48 workers would be idle. The idea is to use chunksize to distribute the workload to all available workers. In addition, the parameter gives a more predictable estimation of how much memory is needed and the re-partitioning makes the output data more manageable requiring less memory per worker to read and/or compute.

I admit there might be better alternatives and I confess I was a bit reluctant to use this parameter that looked obscure, undocumented and the current implementation not being very efficient. But I'm very much keen to learn, contribute and any recommendations would be immensely appreciated.

In summary, there seems to be an use case when reading the input data puts the user in out-of-memory territory, but using the power of Dask to break it into smaller pieces could be a solution.

Greatly appreciative of your work and maintaining Dask! Thank you so much for creating this issue for discussion.

@rjzamora
Copy link
Member Author

rjzamora commented Jun 1, 2022

Thank you for commenting @g4brielvs ! This is good information - Hopefully we can preserve the features you are finding useful as we clean things up. A few questions with this motivation in mind:

  1. Are you ever using chunksize in combination with aggregate_files? (Note that I am hoping the answer is "no," but no worries if you are)

  2. Do your Parquet files have consistent row-group sizes (aside from the residual row-group at the end of each file)? That is, if you were to inspect the row-group metadata (as shown in the script below), would your first row group be a reasonable estimate for the average row group size in your dataset?

import pyarrow.dataset as ds
path = "/your/dataset/path"
sizes = []
first_row_group = None
for file_frag in ds.dataset(path).get_fragments():
    for rg_frag in file_frag.split_by_row_group():
        row_group = rg_frag.row_groups[0]
        if first_row_group is None:
            first_row_group = row_group
        sizes.append(row_group.total_byte_size)
print(f"First row-group size: {first_row_group.total_byte_size} bytes\n")
print(pd.DataFrame({"sizes": sizes}).describe())

Example Output

First row-group size: 496594 bytes

               sizes
count     180.000000
mean   446733.600000
std    111765.168816
min    197458.000000
25%    496546.000000
50%    496570.000000
75%    496594.000000
max    496634.000000

I ask this because I am hoping that we can capture the benefits of chunksize without reading/parsing the metadata for every row-group in the dataset. Instead, we should be able to sample the first row group (optionally more), and use this as an average row-group size estimate for the split_row_groups setting.

@patefon
Copy link

patefon commented Jun 16, 2022

I wish I could use chunksize, but I'm not sure that I doing it correctly.
Not sure that split_row_groups works for my case too.

  • dask=2022.5.2
  • dask-core=2022.5.2
  • pyarrow=7.0.0
  • s3fs=2022.1.0

===

My case: imbalanced parquet file on S3:

  1. 1 row group
  2. Parquet file splitted into partitions (parquet parts, which actually parquet files too) but in irregular way (this is out of my control)

Example: 30 parts, each ~70mb of compressed numbers and ONE ~1GB.

I want to read this parquet from S3 location (S3FS) with Dask with N workers (processes), each worker have same amount M of RAM. So by default Dask trying to parallelize read operations between all workers (1 worker per parquet-part) and 29 read successfully, but the last one (1GB) goes for execution in the same way to just 1 worker, but it fails to read it because of memory.

I'd happy if Dask can read that big file by chunks (chunksize) or at least multiple number of partitions by factor passed with split_row_groups.

@patefon
Copy link

patefon commented Jun 16, 2022

  • sample output from code in #
  1. Do your Parquet files have consistent row-group sizes (aside from the residual row-group at the end of each file)? That is, if you were to inspect the row-group metadata (as shown in the script below), would your first row group be a reasonable estimate for the average row group size in your dataset?

First row-group size: 96876297 bytes

          sizes

count 3.600000e+01
mean 9.698928e+07
std 2.057642e+07
min 5.790649e+07
25% 9.672966e+07
50% 1.036962e+08
75% 1.116984e+08
max 1.225265e+08

@patefon
Copy link

patefon commented Jun 16, 2022

UPD: i think i found workaround for my case (sorry for offopic).

Thanks @rjzamora for implementation.

so. with from_map i found way to read parquet directly form PyArrow's dataset.to_batches()

@rjzamora
Copy link
Member Author

with from_map i found way to read parquet directly form PyArrow's dataset.to_batches()

Great! Please do let us know if you run into any issues with from_map. The purpose of that API is to make custom IO as easy as possible.

Regarding chunksize: I have been experimenting for the past two weeks with ways to simplify the read_parquet code without dropping this option (nor aggregate_files). It is taking me longer than I had hoped, but I do expect to have something ready soon.

@mrocklin
Copy link
Member

we should be able to sample the first row group (optionally more), and use this as an average row-group size estimate for the split_row_groups setting

I've run into this a couple of times recently. In trying to read the nyc-taxi data on the NYC-TLC S3 bucket I now get 12 large partitions rather than many, which was unpleasant. I was also just speaking with a customer who routinely has 100 GB partitions and was sad about the new default (although also quite happy about not running into metadata issues).

If it was the case that reading metadata for a single file/row-group was reliably easy then I would be in favor of at least that (maybe sampling would be good as well). Are there good arguments against this? Client/worker mismatches in environment and data access?

@rjzamora
Copy link
Member Author

If it was the case that reading metadata for a single file/row-group was reliably easy then I would be in favor of at least that (maybe sampling would be good as well). Are there good arguments against this? Client/worker mismatches in environment and data access?

I do think the most realiable approach for getting good partition sizes is to simply read the first row-group in the dataset upfront, and to use the memory_usage of that row-group to set a reasonable default-value for split_row_groups. This is what Merlin's Dataset API does by default.

The possible drawbacks are that (1) this approach is a bit slower than the current default, (2) the first row group may not be representitive of all row-groups in the dataset, and (3) it's yet another new feature and behavior change that some users may not like :)

With these drawbacks in mind, I suppose we could start by using a simple split_row_groups="auto" option to explore this idea?

@mrocklin
Copy link
Member

With these drawbacks in mind, I suppose we could start by using a simple split_row_groups="auto" option to explore this idea

Adding the functionality seems like a general good to me. Making it default introduces some questions. I would defer to you and @ian-r-rose on what makes sense there. My current pain points me to want to make it default, but that might be short-sighted if I don't have all of the context in-brain.

@mrocklin
Copy link
Member

@ian-r-rose any concerns that you'd like to raise here? I'll propose a basic "look at the metadata for the first partition" policy. Concerns?

@ian-r-rose ian-r-rose self-assigned this Oct 28, 2022
@ian-r-rose
Copy link
Collaborator

Apologies for the slow response -- I don't have any objections to peeking at the statistics for the first row group and making a decision based on some reasonable heuristics about whether to read in parquet files per-row-group or per-file. split_row_groups="auto" seems like a reasonable API for that as well. It's unfortunate that the API surface area for read_parquet is so large as to make it difficult to describe in under, say, an hour. But as the responses to this issue show, that probably reflects a lot of real-world variability in how parquet datasets are written. Sigh...

It's likely that our estimates of the expansion factor from parquet files to in-memory dataframes would be pretty rough, as they would be dependent on both features of the data (dictionary encoding, length encoding, etc) and on output formats (e.g., are we using pyarrow strings?, cf #9617). So I'd imagine this would be something like a 75% solution, and would require some extra thought for some users.

An interesting wrinkle here is that a good choice for whether to split by row group or not is a function of how much memory your individual workers have. So implementing the "auto" feature could be a good target for #9522, where we actually construct the graph differently if there is an active Client with information available about the cluster.

@mrocklin
Copy link
Member

mrocklin commented Nov 4, 2022

Given the feedback I think that we should pursue this. @rjzamora is this something you're interested in?

An interesting wrinkle here is that a good choice for whether to split by row group or not is a function of how much memory your individual workers have

I kinda agree with this, but not entirely. I think that ideal chunk size has more to do with the CPU/memory ratio than it does with total memory. Most machines today have a pretty consistent ratio of 4 GB/core.

@rjzamora
Copy link
Member Author

rjzamora commented Nov 4, 2022

is this something you're interested in?

Sure - I'd be interested (unless someone else is eager to jump in). I do think it makes sense for us to try to agree on the desired API first...

My immediate thought is to change the existing chunksize argument to be the desired partition size, with the default set in a similar way as read_csv. We would also add support for split_row_groups="infer" (and maybe make it the default?), which would result in the size of the first row-group being used to set split_row_groups to an integer value.

It may also make sense for chunksize to correspond to the uncompressed storage size to be mapped to each partition. Since dictionary encoding typically makes this number significantly smaller than the in-memory size of a pandas/cudf DataFrame, this approach may make behavior slightly less intuitive. However, parquet metadata would certainly be faster to process than "real" data (and the default chunksize could be set conservatively).

Some drawbacks to either of these split_row_groups="infer" approach are that:

  1. Users with carefully-tuned file sizes will once-again need to set split_row_groups=False explicitly if they want to avoid the overhead of sampling data and/or metadata from the first file.
  2. We are still missing an option to specify that files (rather than row-groups) should be aggregated together with a regular pattern.

@mrocklin
Copy link
Member

mrocklin commented Nov 5, 2022

I don't have strong opinions. I'd encourage you to push ahead if no one else jumps in in the next few days

@rjzamora
Copy link
Member Author

rjzamora commented Nov 8, 2022

Update: My initial proposal in #9637 is to add a (default) split_row_groups="auto" option, and to use the uncompressed total_byte_size statistics in the first file to set split_row_groups automatically. This approach means that we are not setting split_row_groups by the expected in-memory partition size. However, we are likely to avoid oversized partitions without reading in any data.

@hayesgb hayesgb assigned jrbourbeau and unassigned ian-r-rose Nov 9, 2022
@rjzamora
Copy link
Member Author

Update: #9637 is now merged - The "final" decision there was to continue deprecating chunksize by effectively replacing it with a blocksize argument that is now set to 128 MiB by default (note that this name is more consistent with read_csv). Therefore, we will indeed "deprecate" chunksize, but the same functionality will be preserved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants