Skip to content

Commit

Permalink
Add pre-deprecation warnings for read_parquet kwargs `chunksize
Browse files Browse the repository at this point in the history
…` and ``aggregate_files`` (#9052)

As discussed in #9043 (for `chunksize`) and #9051 (for `aggregate_files`), I propose that we deprecate two complex and rarely-utilized arguments from `read_parquet`: `chunksize` and `aggregate_files`.

This PR simply adds "pre-deprectation" warnings for the targeted arguments (including links to the relevant Issues discussing their deprecation).  My goal is to find (and inform) whatever users may be depending on these obscure options.
  • Loading branch information
rjzamora committed May 10, 2022
1 parent 4e048c9 commit 2115574
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 39 deletions.
34 changes: 33 additions & 1 deletion dask/dataframe/io/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,22 @@ def read_parquet(
complete file. If a positive integer value is given, each dataframe
partition will correspond to that number of parquet row-groups (or fewer).
chunksize : int or str, default None
WARNING: The ``chunksize`` argument will be deprecated in the future.
Please use ``split_row_groups`` to specify how many row-groups should be
mapped to each output partition. If you strongly oppose the deprecation of
``chunksize``, please comment at https://github.com/dask/dask/issues/9043".
The desired size of each output ``DataFrame`` partition in terms of total
(uncompressed) parquet storage space. If specified, adjacent row-groups
and/or files will be aggregated into the same output partition until the
cumulative ``total_byte_size`` parquet-metadata statistic reaches this
value. Use `aggregate_files` to enable/disable inter-file aggregation.
aggregate_files : bool or str, default None
WARNING: The ``aggregate_files`` argument will be deprecated in the future.
Please consider using ``from_map`` to create a DataFrame collection with a
custom file-to-partition mapping. If you strongly oppose the deprecation of
``aggregate_files``, comment at https://github.com/dask/dask/issues/9051".
Whether distinct file paths may be aggregated into the same output
partition. This parameter is only used when `chunksize` is specified
or when `split_row_groups` is an integer >1. A setting of True means
Expand Down Expand Up @@ -341,6 +351,28 @@ def read_parquet(
pyarrow.parquet.ParquetDataset
"""

# "Pre-deprecation" warning for `chunksize`
if chunksize:
warnings.warn(
"The `chunksize` argument will be deprecated in the future. "
"Please use `split_row_groups` to specify how many row-groups "
"should be mapped to each output partition.\n\n"
"If you strongly oppose the deprecation of `chunksize`, please "
"comment at https://github.com/dask/dask/issues/9043",
FutureWarning,
)

# "Pre-deprecation" warning for `aggregate_files`
if aggregate_files:
warnings.warn(
"The `aggregate_files` argument will be deprecated in the future. "
"Please consider using `from_map` to create a DataFrame collection "
"with a custom file-to-partition mapping.\n\n"
"If you strongly oppose the deprecation of `aggregate_files`, "
"please comment at https://github.com/dask/dask/issues/9051",
FutureWarning,
)

if "read_from_paths" in kwargs:
kwargs.pop("read_from_paths")
warnings.warn(
Expand Down Expand Up @@ -395,7 +427,7 @@ def read_parquet(
"ignore_metadata_file": ignore_metadata_file,
"metadata_task_size": metadata_task_size,
"split_row_groups": split_row_groups,
"chunksize=": chunksize,
"chunksize": chunksize,
"aggregate_files": aggregate_files,
"parquet_file_extension": parquet_file_extension,
**kwargs,
Expand Down
81 changes: 43 additions & 38 deletions dask/dataframe/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2599,7 +2599,7 @@ def test_split_row_groups(tmpdir, engine):
tmp, engine="pyarrow", row_group_size=100
)

ddf3 = dd.read_parquet(tmp, engine=engine, split_row_groups=True, chunksize=1)
ddf3 = dd.read_parquet(tmp, engine=engine, split_row_groups=True)
assert ddf3.npartitions == 4

ddf3 = dd.read_parquet(
Expand All @@ -2616,7 +2616,6 @@ def test_split_row_groups(tmpdir, engine):
engine=engine,
calculate_divisions=True,
split_row_groups=True,
chunksize=1,
)
assert ddf3.npartitions == 12

Expand Down Expand Up @@ -2678,12 +2677,13 @@ def test_split_row_groups_int_aggregate_files(tmpdir, engine, split_row_groups):

# Read back with both `split_row_groups>1` and
# `aggregate_files=True`
ddf2 = dd.read_parquet(
str(tmpdir),
engine=engine,
split_row_groups=split_row_groups,
aggregate_files=True,
)
with pytest.warns(match="argument will be deprecated"):
ddf2 = dd.read_parquet(
str(tmpdir),
engine=engine,
split_row_groups=split_row_groups,
aggregate_files=True,
)

# Check that we are aggregating files as expected
npartitions_expected = math.ceil((size / row_group_size) / split_row_groups)
Expand Down Expand Up @@ -2764,7 +2764,8 @@ def test_chunksize_empty(tmpdir, write_engine, read_engine):
df = pd.DataFrame({"a": pd.Series(dtype="int"), "b": pd.Series(dtype="float")})
ddf1 = dd.from_pandas(df, npartitions=1)
ddf1.to_parquet(tmpdir, engine=write_engine, write_metadata_file=True)
ddf2 = dd.read_parquet(tmpdir, engine=read_engine, chunksize="1MiB")
with pytest.warns(match="argument will be deprecated"):
ddf2 = dd.read_parquet(tmpdir, engine=read_engine, chunksize="1MiB")
assert_eq(ddf1, ddf2, check_index=False)


Expand Down Expand Up @@ -2798,12 +2799,13 @@ def test_chunksize_files(
write_index=False,
)

ddf2 = dd.read_parquet(
str(tmpdir),
engine=read_engine,
chunksize=chunksize,
aggregate_files=partition_on if partition_on else True,
)
with pytest.warns(match="argument will be deprecated"):
ddf2 = dd.read_parquet(
str(tmpdir),
engine=read_engine,
chunksize=chunksize,
aggregate_files=partition_on if partition_on else True,
)

# Check that files where aggregated as expected
if chunksize == 4096:
Expand Down Expand Up @@ -2846,12 +2848,13 @@ def test_chunksize_aggregate_files(tmpdir, write_engine, read_engine, aggregate_
partition_on=partition_on,
write_index=False,
)
ddf2 = dd.read_parquet(
str(tmpdir),
engine=read_engine,
chunksize=chunksize,
aggregate_files=aggregate_files,
)
with pytest.warns(match="argument will be deprecated"):
ddf2 = dd.read_parquet(
str(tmpdir),
engine=read_engine,
chunksize=chunksize,
aggregate_files=aggregate_files,
)

# Check that files where aggregated as expected
if aggregate_files == "a":
Expand Down Expand Up @@ -2897,15 +2900,16 @@ def test_chunksize(tmpdir, chunksize, engine, metadata):
assert "_metadata" not in files
path = os.path.join(dirname, "*.parquet")

ddf2 = dd.read_parquet(
path,
engine=engine,
chunksize=chunksize,
split_row_groups=True,
calculate_divisions=True,
index="index",
aggregate_files=True,
)
with pytest.warns(match="argument will be deprecated"):
ddf2 = dd.read_parquet(
path,
engine=engine,
chunksize=chunksize,
split_row_groups=True,
calculate_divisions=True,
index="index",
aggregate_files=True,
)

assert_eq(ddf1, ddf2, check_divisions=False)

Expand All @@ -2930,14 +2934,15 @@ def test_roundtrip_pandas_chunksize(tmpdir, write_engine, read_engine):
path, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet"
)

ddf_read = dd.read_parquet(
path,
engine=read_engine,
chunksize="10 kiB",
calculate_divisions=True,
split_row_groups=True,
index="index",
)
with pytest.warns(match="argument will be deprecated"):
ddf_read = dd.read_parquet(
path,
engine=read_engine,
chunksize="10 kiB",
calculate_divisions=True,
split_row_groups=True,
index="index",
)

assert_eq(pdf, ddf_read)

Expand Down

0 comments on commit 2115574

Please sign in to comment.