Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Revise aggregate_files behavior in read_parquet #9197

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Jun 17, 2022

Closes #9043
Closes #9051
Closes #8829

This PR (mostly) preserves the existing chunksize/aggregate_files options in read_parquet by adding two new arguments:

  • sort_input_paths (default True): Whether or not Dask should re-order the files in the dataset to use "natural" ordering. Note that this new feature could be added in a separate PR, but I wanted to make sure that the new design allows for such an argument to exist.
  • file_groups (default None): A dictionary mapping paths to "file-group" indices, or a list of directory-partitioned-column names that must match for two or more files to belong to the same "file group." This PR introduces the file group concept to dd.read_parquet. The meaning is simple: Two files must belong to the same file group for Dask to consider aggregating them into the same output DataFrame partition. Matching file-group membership is necessary, but not sufficient, for file aggregation. That is, there must be some other option (like aggregate_files=int|True or chunksize) to specify how files should be aggregated within each group. The engines are always allowed to reorder paths by file group to improve file-aggregation behavior (even if sort_input_paths=False). Note that I orginally added this option in order to drop support for str arguments to aggregate_files in favor of int support (explained below). However, it is worth noting that this option is also much more flexible/powerful than the original aggregate_files=<str> behavior.

In addition to these new arguments, I also modified the existing aggregate_files argument to only accept bool or int types. That is, the aggregate_files option is now the "file equivalent" of split_row_groups. Specifying aggregate_files=100 means that 100 files from the same file group may be aggregated into the same output partition.

The most important result of this PR is likely the support for aggregate_files=<int> (in combination with file_groups=). For example. In main, one would need to use chunksize (or split_row_groups) with aggregate_files="year" to read in a single large partition for each distinct year in a partitioned NYC-taxi dataset:

import dask.dataframe as dd

ddf = dd.read_parquet(
    "s3://ursa-labs-taxi-data/",
    engine="pyarrow",
    storage_options={"anon": True},
    dataset={
        "partitioning": ["year", "month"],
        "partition_base_dir": "ursa-labs-taxi-data",
    },
    chunksize="6GB",
    aggregate_files="year",
)
# (takes several seconds on my workstation - but scales poorly with the total number of row groups)

However, with this branch, file aggregation can be much faster/simpler:

ddf = dd.read_parquet(
    "s3://ursa-labs-taxi-data/",
    engine="pyarrow",
    storage_options={"anon": True},
    dataset={
        "partitioning": ["year", "month"],
        "partition_base_dir": "ursa-labs-taxi-data",
    },
    file_groups=["year"],
    aggregate_files=True,
)
# (takes less than 1s on my workstation)

@ian-r-rose ian-r-rose self-requested a review August 2, 2022 16:29
@charlesbluca
Copy link
Member

@rjzamora bumping this - are there any blockers here or is this ready for review?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants