Skip to content

Commit

Permalink
Revise meta creation in arrow parquet engine (#9672)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Nov 23, 2022
1 parent c4111ff commit 07c57d8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 36 deletions.
68 changes: 32 additions & 36 deletions dask/dataframe/io/parquet/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@
Engine,
_get_aggregation_depth,
_normalize_index_columns,
_parse_pandas_metadata,
_process_open_file_options,
_row_groups_to_parts,
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import (
_get_pyarrow_dtypes,
_is_local_fs,
_meta_from_dtypes,
_open_input_files,
)
from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files
from dask.dataframe.utils import clear_known_categories
from dask.delayed import Delayed
from dask.utils import getargspec, natural_sort_key
Expand Down Expand Up @@ -978,33 +972,40 @@ def _create_dd_meta(cls, dataset_info):
physical_column_names = dataset_info.get("physical_schema", schema).names
columns = None

# Set index and column names using
# pandas metadata (when available)
# Use pandas metadata to update categories
pandas_metadata = _get_pandas_metadata(schema)
if pandas_metadata:
(
index_names,
column_names,
storage_name_mapping,
column_index_names,
) = _parse_pandas_metadata(pandas_metadata)
if categories is None:
categories = []
for col in pandas_metadata["columns"]:
if (col["pandas_type"] == "categorical") and (
col["name"] not in categories
):
categories.append(col["name"])
else:
# No pandas metadata implies no index, unless selected by the user
index_names = []
column_names = physical_column_names
storage_name_mapping = {k: k for k in column_names}
column_index_names = [None]

# Use _arrow_table_to_pandas to generate meta
arrow_to_pandas = dataset_info["kwargs"].get("arrow_to_pandas", {}).copy()
meta = cls._arrow_table_to_pandas(
schema.empty_table(),
categories,
arrow_to_pandas=arrow_to_pandas,
)
index_names = list(meta.index.names)
column_names = list(meta.columns)
if index_names and index_names != [None]:
# Reset the index if non-null index name
meta.reset_index(inplace=True)

# Use index specified in the pandas metadata if
# the index column was not specified by the user
if index is None and index_names:
# Pandas metadata has provided the index name for us
index = index_names

# Set proper index for meta
index_cols = index or ()
if index_cols and index_cols != [None]:
meta.set_index(index_cols, inplace=True)

# Ensure that there is no overlap between partition columns
# and explicit column storage
if partitions:
Expand All @@ -1023,33 +1024,28 @@ def _create_dd_meta(cls, dataset_info):
)
)

# Get all available column names
column_names, index_names = _normalize_index_columns(
columns, column_names + partitions, index, index_names
)

all_columns = index_names + column_names

# Check that categories are included in columns
if categories and not set(categories).intersection(all_columns):
raise ValueError(
"categories not in available columns.\n"
"categories: {} | columns: {}".format(categories, list(all_columns))
)

dtypes = _get_pyarrow_dtypes(schema, categories)
dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()}

index_cols = index or ()
meta = _meta_from_dtypes(all_columns, dtypes, index_cols, column_index_names)
if categories:
# Check that categories are included in columns
if not set(categories).intersection(all_columns):
raise ValueError(
"categories not in available columns.\n"
"categories: {} | columns: {}".format(categories, list(all_columns))
)

# Make sure all categories are set to "unknown".
# Cannot include index names in the `cols` argument.
meta = clear_known_categories(
meta, cols=[c for c in categories if c not in meta.index.names]
)

if partition_obj:

# Update meta dtypes for partitioned columns
for partition in partition_obj:
if isinstance(index, list) and partition.name == index[0]:
# Index from directory structure
Expand Down
17 changes: 17 additions & 0 deletions dask/dataframe/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3130,6 +3130,23 @@ def _arrow_table_to_pandas(
dd.read_parquet(str(tmpdir), engine=ArrowEngineWithTimestampClamp).compute()


@PYARROW_MARK
def test_arrow_to_pandas(tmpdir, engine):
# Test that dtypes are correct when arrow_to_pandas is used
# (See: https://github.com/dask/dask/issues/9664)

df = pd.DataFrame({"A": [pd.Timestamp("2000-01-01")]})
path = str(tmpdir.join("test.parquet"))
df.to_parquet(path, engine=engine)

arrow_to_pandas = {"timestamp_as_object": True}
expect = pq.ParquetFile(path).read().to_pandas(**arrow_to_pandas)
got = dd.read_parquet(path, engine="pyarrow", arrow_to_pandas=arrow_to_pandas)

assert_eq(expect, got)
assert got.A.dtype == got.compute().A.dtype


@pytest.mark.parametrize(
"write_cols",
[["part", "col"], ["part", "kind", "col"]],
Expand Down

0 comments on commit 07c57d8

Please sign in to comment.