Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change split_row_groups default to "infer" #9637

Merged
merged 26 commits into from
Feb 21, 2023

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Nov 8, 2022

This PR proposes new "infer" and "adaptive" options for split_row_groups, as well as a new blocksize argument (defaulting to "128 MiB") to replace chunksize. Using split_row_groups="infer" (the new default) results in the uncompressed storage sizes specified in the metadata of the first file being used to set split_row_groups to either "adaptive" or False (depending on blocksize). The immediate result is that users should be much less likely to get large/problematic partition sizes by default. For users with unbalanced row-group sizes, split_row_groups="adaptive" can be set directly to ensure that none of the output partitions will exceed blocksize (according to the uncompressed storage sizes recorded in the parquet metadata).

After this PR is revised/merged, I propose that we do something similar for aggregate_files (probably a mixture of aggregate_files="adaptive" and the changes in #9197). For example, ...

The difference being that the current chunksize option results in us (slowly) choosing a distinct number of row-groups to aggregate together for every individual partition. I suppose we can still support this (slower) fine-grained partitioning approach when the user specifies split_row_groups=True (or aggregate_files=True) and chunksize=<something>.

@mrocklin
Copy link
Member

mrocklin commented Nov 8, 2022

Oh cool. I'm excited about this.

🤔 who should review this? @ian-r-rose is out

Any concerns you want to call out @rjzamora ? I notice that this is in draft mode.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora -- I'll take a look at this

@jrbourbeau jrbourbeau self-requested a review November 8, 2022 22:59
@rjzamora rjzamora marked this pull request as ready for review November 8, 2022 23:01
@rjzamora
Copy link
Member Author

rjzamora commented Nov 8, 2022

I'll take a look at this

Thanks @jrbourbeau !

Any concerns you want to call out @rjzamora ? I notice that this is in draft mode.

It should be ready for feedback. My main points of uncertainty are:

  1. Should we remove the deprecation plan (and the corresponding FutureWarning) for "legacy" chunksize usage? With the introduction of split_row_groups="auto", we are able to do something much faster than what chunksize was doing before. However, there may still be messy datasets where each partition really does need to correspond to a distinct row-group count (from 1+ files). I propose that we still support that behavior if the user explicitly specifies split_row_groups=True (rather than an integer or "auto") and chunksize=<something>.
  2. Are we okay with chunksize corresponding to an uncompressed storage size, or do we want to use the real in-memory size? Although I was originally planning for in-memory size, I eventually decided on using a value that is available in the parquet metadata (for performance reasons).
  3. What should the default chuksize be when split_row_groups="auto"? This PR proposes the same as the default blocksize used in read_csv (1/10 the per-core memory).

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, there may still be messy datasets where each partition really does need to correspond to a distinct row-group count (from 1+ files)

What use case would that be? Honestly, I don't think we should offer this functionality at all. Row groups are more or less random chunks with statistics to optimize data access. Why would we want to guarantee a one-to-one mapping to dask partitions?
Even if that is a use case, how frequent is it really? Is this edge case really worth the complexity?

Are we okay with chunksize corresponding to an uncompressed storage size, or do we want to use the real in-memory size? Although I was originally planning for in-memory size, I eventually decided on using a value that is available in the parquet metadata (for performance reasons).

I don't see a problem with this as long as it is clearly documented. I'd love some utility functions that could tell the user what the average compression ratio for a given dataset is (even if it is just sampled) to help advanced users make good decisions.
In my experience, compressed vs uncompressed is maybe a factor of 2-3 unless the user really knows what they are doing and write a dataset highly compressed (I've seen parquet files with 20-100 compression ratios but this is very rare and this kind of expert user can use the API appropriately).

What should the default chuksize be when split_row_groups="auto"? This PR proposes the same as the default blocksize used in read_csv (1/10 the per-core memory).

I was surprised to see the 1/10 implementation. This will break down for all kinds of distributed compute scenarios. Even for a LocalCluster this might be a bit much.

One datapoint: I have a 16GB / 4 CPU notebook. That would create 400MB uncompressed data partitions. That will easily blow up to a GB or more after decompression. That's a bit heavy.
This will, of course, be caught by min(blocksize, int(64e6)), i.e. I realistically have 64MB * compression ratio almost all the time unless somebody has many CPUs with virtually no memory

So, two things

  1. I think the logic we're applying here is overkill. psutil + rounding down to 64MB sounds smart but in the end it's hard coded 64MB, why not be explicit? Even if it's not rounded, the psutil will lead to confusing results when executing on a distributed cluster. After all, why would the client memory size have an impact on worker memory size? Lastly, the truly optimal chunksize should consider cluster size as well... After this argument, why not drop the "auto" calculation and just set "64/100MiB" as a default? In my experience this is a better UX than some magical/opaque "auto"
  2. 64MB uncompressed feels OK. That'll create 100-500MB uncompressed partitions which I think is a comfortable place for a default value

Comment on lines 278 to 300
split_row_groups : "auto", bool, or int, default "auto"
If True, then each output dataframe partition will correspond to a single
parquet-file row-group. If False, each partition will correspond to a
complete file. If a positive integer value is given, each dataframe
partition will correspond to that number of parquet row-groups (or fewer).
If "auto" (the default), the uncompressed storage size of all row-groups
in the first file will be used to automatically set a value that is
consistent with ``chunksize``.
chunksize : int or str, default None
WARNING: The ``chunksize`` argument will be deprecated in the future.
Please use ``split_row_groups`` to specify how many row-groups should be
mapped to each output partition. If you strongly oppose the deprecation of
``chunksize``, please comment at https://github.com/dask/dask/issues/9043".

The desired size of each output ``DataFrame`` partition in terms of total
(uncompressed) parquet storage space. If specified, adjacent row-groups
and/or files will be aggregated into the same output partition until the
cumulative ``total_byte_size`` parquet-metadata statistic reaches this
value. Use `aggregate_files` to enable/disable inter-file aggregation.
(uncompressed) parquet storage space. If ``split_row_groups='auto'``,
this argument will default to 1/10 the per-core system memory. The metadata
of the first file will then be used to choose an ``split_row_groups`` value
that is consistent with ``chunksize``.

WARNING: Using the ``chunksize`` argument in the absence of
``split_row_groups='auto'`` is often slow on large and/or remote datasets.

If ``split_row_groups`` is set to ``True``, ``chunksize`` defaults to
``None``. If ``chunksize`` is set to an explicit value, adjacent row-groups
will be aggregated into the same output partition until the cumulative
``total_byte_size`` parquet-metadata statistic reaches that value.
Use `aggregate_files` to enable/disable inter-file aggregation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highlevel API wise I don't feel great about supporting two kwargs that are so strongly entangled.

Isn't chunksize already sufficient? The fact that we're splitting the files per row group almost feels like an implementation detail.

chunksize
"auto": Generate reasonably large partitions, e.g. 50-100MB or larger. Fuse files or split files as appropriate
int/str: Specifiy the size in bytes. e.g. 100 * 1024**2 or "100 MiB". Fuse files or split files as appropriate.
False: Disable all fusing/splitting and serve files as-is

The same argument applies for aggregate_files. Do we need this (out of scope for this PR, of course)

@rjzamora
Copy link
Member Author

Thanks for the review @fjetter !

What use case would that be? Honestly, I don't think we should offer this functionality at all. Row groups are more or less random chunks with statistics to optimize data access. Why would we want to guarantee a one-to-one mapping to dask partitions?
Even if that is a use case, how frequent is it really? Is this edge case really worth the complexity?

I probably agree with you, but I’m not sure I understand this question: “Why would we want to guarantee a one-to-one mapping to dask partitions?”



To clarify exactly what I am talking about here, lets pretend like we have a simple Parquet dataset with two files, and each file has 3 x 500MB row-groups:



File-0: [rg-0, rg-1, rg-2]

File-1: [rg-0, rg-1, rg-2]





Case A - The result I am talking about supporting (chunksize=“1GB”, split_row_groups=True) would be:




Partition-0: [(File-0, [rg-0, rg-1])]




Partition-1: [(File-0, [rg-2]), (File-1, [rg-0])]





Partition-2: [(File-1, [rg-1, rg-2])]












Case B - The (faster) result implemented in this PR (chunksize=“1GB”, split_row_groups=“auto”) would be:












Partition-0: [(File-0, [rg-0, rg-1])]





Partition-1: [(File-0, [rg-2])]






Partition-2: [(File-1, [rg-0, rg-1])]






Partition-3: [(File-1, [rg-2])]








We currently support “case A”, but it doesn’t get used much (and currently returns a “pre-deprectation” warning), because iterating through the row-group statistics can be slow. I’d be very happy to drop support for this altogether, but it also seems like a reasonable result to want.



I don't see a problem with this as long as it is clearly documented. I'd love some utility functions that could tell the user what the average compression ratio for a given dataset is (even if it is just sampled) to help advanced users make good decisions.
In my experience, compressed vs uncompressed is maybe a factor of 2-3 unless the user really knows what they are doing and write a dataset highly compressed (I've seen parquet files with 20-100 compression ratios but this is very rare and this kind of expert user can use the API appropriately).

Utility functions do sound useful here. However, just to clarify one point: Compression shouldn’t really come into play here. The parquet metadata we use corresponds to the parquet storage size without compression (even if the data ends up being compressed). Therefore, the distinction between the specified chunksize and the “real” memory usage will have more to do with dictionary encoding and other variations in the memory layout (e.g. “object” vs string).

I was surprised to see the 1/10 implementation. This will break down for all kinds of distributed compute scenarios. Even for a LocalCluster this might be a bit much.




The 1/10 implementation is just the precedent set in other places (like dd.read_csv). Do you think we should overhaul auto-partitioning in other places as well? I agree that there is no guarantee that the workers and client will have a comparable memory/core ratio, so perhaps using/documenting a reasonable default is better than playing with psutil to produce a system-dependent default.




To partially summarize my thoughts here: I think I agree that the auto-partitioning logic is overkill. I also want to make sure it is clear that we don’t need to worry about compression (unless you are using compression to refer to memory-layout details like dictionary encoding).

@fjetter
Copy link
Member

fjetter commented Dec 1, 2022

Utility functions do sound useful here. However, just to clarify one point: Compression shouldn’t really come into play here. The parquet metadata we use corresponds to the parquet storage size without compression (even if the data ends up being compressed).

Never mind then.

To clarify exactly what I am talking about here, lets pretend like we have a simple Parquet dataset with two files, and each file has 3 x 500MB row-groups:


CaseA/B

Thanks for clarification. I was under the impression that we'd always do A, even now.

The one-to-one mapping I'm talking about is a usecase that would break out every RG to a dedicated partitions regardless of actual RG sizes

Partition-0: [(File-0, [rg-0])]
Partition-1: [(File-0, [rg-1])]
Partition-2: [(File-0, [rg-2])]
Partition-3: [(File-1, [rg-0])]
Partition-4: [(File-1, [rg-1])]
Partition-5: [(File-1, [rg-2])]

This example would still be fine bc 500MB is a nice partition size but in general I think RGs are smaller (at least I've been very successfully working with much smaller RGs.) and this would produce horrible results.

Isn't this chunksize=None, split_row_groups=True/auto doing this or is this not possible?

My bigger point is actually: Why do we have a split_row_groups? What's the point of defining a chunk size if we do not split rowgroups?

The 1/10 implementation is just the precedent set in other places (like dd.read_csv). Do you think we should overhaul auto-partitioning in other places as well? I agree that there is no guarantee that the workers and client will have a comparable memory/core ratio, so perhaps using/documenting a reasonable default is better than playing with psutil to produce a system-dependent default.

If that's what we're using, yes, I believe we should. The psutil logic is smarter than it needs to be since in almost all situations this will default to 64MB. From a UX perspective, I'd rather set the default to 64MB and write a sentence into the docs (or point to an entire page weighing pros/cons).

@fjetter
Copy link
Member

fjetter commented Dec 1, 2022

This argument also lets me wonder: What datasets are we using to measure this? 500MB RGs sound insanely large from my experience. I wonder if we have a severe misalignment here

@fjetter
Copy link
Member

fjetter commented Dec 14, 2022

FWIW I'm not blocking on this. Just wanted to learn more about what we do and why. I think this is a good step even if we pursue something differently long term (e.g. #9637)

@rjzamora
Copy link
Member Author

FWIW I'm not blocking on this. Just wanted to learn more about what we do and why. I think this is a good step even if we pursue something differently long term (e.g. #9637)

That makes sense. I do ultimately want to move in a different direction, but agree that this PR is clearly an improvement.

The only lingering question I have about this particular PR is about how to merge it without effectively "blocking" the possibility of further improvement. For example, I'd personally like to continue with the plan to deprecate chunksize (under its current meaning), because (1) the current implementation is too ugly to maintain, and (2) chunksize usually corresponds to a row-count (rather than byte size) is pandas.

Perhaps the answer is to continue deprecating chunksize, and to add a distinct blocksize argument having the sole purpose (for now) of controlling split_row_groups when it is set to "auto".

@rjzamora rjzamora changed the title Change split_row_groups default to "auto" Change split_row_groups default to "infer" Jan 26, 2023
@fjetter
Copy link
Member

fjetter commented Feb 9, 2023

I'd be happy about this but to_parquet should be changed to automatically split into sensible row groups by default

Off topic but this is a -1 from me. Finding sensible row group sizes depends on many different factors like data types, number of columns, entropy of the data, cardinality, access patterns, etc.

I agree that 67_108_864 is too large but finding a good default that suits everyone is not possible. Hence, pyarrow is choosing a very boring but very safe default.

@fjetter
Copy link
Member

fjetter commented Feb 9, 2023

My backlog for reviews is currently a bit full. @jrbourbeau do you have capacity for this?

@@ -11,7 +11,7 @@ dataframe:
shuffle-compression: null # compression for on disk-shuffling. Partd supports ZLib, BZ2, SNAPPY
parquet:
metadata-task-size-local: 512 # Number of files per local metadata-processing task
metadata-task-size-remote: 16 # Number of files per remote metadata-processing task
metadata-task-size-remote: 1 # Number of files per remote metadata-processing task
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this greater than 1 is only an optimization when you are dealing with many small files. However, it is more likely that we will need to parse metadata when files are large, and we need to split them by row-group. For large remote files, metadata_task_size=1 is often a better choice.

@rjzamora
Copy link
Member Author

Small update: I'd like to get this in before the next release.

cc @martindurant - In case you have thoughts

assert "_metadata" not in files
path = os.path.join(dirname, "*.parquet")

# Use (default) split_row_groups="auto"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is actually "infer" ?

@martindurant
Copy link
Member

The description, conversation and reasoning above all seem sound to me.
The names "infer" and "auto" mean more or less the same in my mind, so I wonder if better names could be found - but I don't have an immediate better suggestion.

I feel like test_blocksize should ideally have exact result to measure against. If we specify no compression, no nulls and no dict encoding, then the sizes of the column chunks we play with should be nearly deterministic ("nearly" because the thrift header stuff adds a little). Actually making tests like this might be time-consuming, however; but it would be nice to have at least check that our idea is reasonable

import pandas as pd
df = pd.DataFrame({"a": range(10000)})
pf = fastparquet.ParquetFile("out.parquet", compression=False)
pf = fastparquet.ParquetFile("out.parquet")
pf.row_groups[0].total_byte_size # => 80039  ~= 10000 * 8

df.to_parquet("out2.parquet", index=False, engine="pyarrow", compression={}, use_dictionary=False)
pf = fastparquet.ParquetFile("out2.parquet")
pf.row_groups[0].total_byte_size # => 80075

(note that pyarrow always wants to make dictionary encoding, which would have made this data appear ~20% bigger)

@github-actions github-actions bot added the documentation Improve or add to documentation label Feb 21, 2023
@rjzamora
Copy link
Member Author

Thanks for the review @martindurant !

The names "infer" and "auto" mean more or less the same in my mind

I also struggled with this naming question a bit. The default setting of "infer" is meant to mean: "use a metadata sample from the first file to decide if we should be partitioning by file, or if we should be using a row-group range for each partition. I like the "infer" name for this a bit more than "auto", because we are using a metadata sample to "infer" properties for the rest of the dataset.

I am less comfortable with the meaning of "auto". Right now, it means that we will "individualize" the number of row groups for each partition, depending on the size of each row-group. Perhaps the name for this should be something like "variable", "adaptive", or "dynamic"?

@martindurant
Copy link
Member

adaptive or dynamic sounds better to me

@rjzamora
Copy link
Member Author

adaptive or dynamic sounds better to me

Okay - Thanks for the quick feedback! Going with "adaptive"

@rjzamora rjzamora changed the title Change split_row_groups default to "infer" Change split_row_groups default to "infer" Feb 21, 2023
@rjzamora rjzamora merged commit 556d3df into dask:main Feb 21, 2023
@rjzamora rjzamora deleted the auto-split-row-groups branch February 21, 2023 20:12
mrocklin pushed a commit to dask/dask-expr that referenced this pull request Feb 22, 2023
Updates `ReadParquet` to use metadata-parsing and IO logic from `dask.dataframe.io.parquet`.

Requires dask/dask#9637 (only because my environment was using that PR when I put this together).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe documentation Improve or add to documentation io parquet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants