From 57714adfe1098f9e8a01a96011dfff787e929a32 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 12:56:41 -0700 Subject: [PATCH 01/12] Add failing test for use_nullable_dtypes --- dask/dataframe/io/tests/test_parquet.py | 66 +++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 57aeb4defc2..c9093d9dd22 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -594,6 +594,72 @@ def test_roundtrip_from_pandas(tmpdir, write_engine, read_engine): assert_eq(dfp, ddf) +@write_read_engines() +def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): + """ + Test round-tripping nullable extension dtypes. Parquet engines will + typically add dtype metadata for this. + """ + if read_engine == "fastparquet" or write_engine == "fastparquet": + pytest.xfail("not working yet") + + fn = str(tmpdir.join("test.parquet")) + df = pd.DataFrame( + { + "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), + "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), + "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string[python]"), + } + ) + ddf = dd.from_pandas(df, npartitions=2) + ddf.to_parquet( + fn, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet" + ) + ddf2 = dd.read_parquet(fn, engine=read_engine) + print(ddf2.dtypes) + assert_eq(df, ddf2) + + +@PYARROW_MARK +def test_pyarrow_use_nullable_dtypes(tmpdir): + """ + Test reading a parquet file without pandas metadata, + but forcing use of nullable dtypes where appropriate + """ + fn = str(tmpdir) + df = pd.DataFrame( + { + "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), + "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), + "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string[python]"), + } + ) + ddf = dd.from_pandas(df, npartitions=2) + + @dask.delayed + def write_partition(df, i): + "Write a parquet file without the pandas metadata" + import pyarrow as pa + import pyarrow.parquet as pq + + table = pa.Table.from_pandas(df).replace_schema_metadata({}) + pq.write_table(table, fn + f"/part.{i}.parquet") + + # Create a pandas-metadata-free partitioned parquet. By default it will + # not read into nullable extension dtypes + partitions = ddf.to_delayed() + dask.compute([write_partition(p, i) for i, p in enumerate(partitions)]) + + with pytest.raises(AssertionError): + ddf2 = dd.read_parquet(fn, engine="pyarrow", use_nullable_dtypes=False) + assert_eq(df, ddf2) + + ddf2 = dd.read_parquet(fn, engine="pyarrow", use_nullable_dtypes=True) + assert_eq(df, ddf2) + + @write_read_engines() def test_categorical(tmpdir, write_engine, read_engine): tmp = str(tmpdir) From bcd51cf1b3b02f8b256b0171c5a614a1659a0523 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 14:35:28 -0700 Subject: [PATCH 02/12] WIP use_nullable_dtypes --- dask/dataframe/io/parquet/arrow.py | 34 ++++++++++++++++++++++++++++-- dask/dataframe/io/parquet/core.py | 34 ++++++++++++++++++++++++++---- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 86327ec2d5a..59c220b57ba 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -11,6 +11,7 @@ from dask.base import tokenize from dask.core import flatten +from dask.dataframe._compat import PANDAS_GT_120 from dask.dataframe.backends import pyarrow_schema_dispatch from dask.dataframe.io.parquet.utils import ( Engine, @@ -43,6 +44,29 @@ partitioning_supported = _pa_version >= parse_version("5.0.0") del _pa_version +PYARROW_NULLABLE_DTYPE_MAPPING = { + pa.int8(): pd.Int8Dtype(), + pa.int16(): pd.Int16Dtype(), + pa.int32(): pd.Int32Dtype(), + pa.int64(): pd.Int64Dtype(), + pa.uint8(): pd.UInt8Dtype(), + pa.uint16(): pd.UInt16Dtype(), + pa.uint32(): pd.UInt32Dtype(), + pa.uint64(): pd.UInt64Dtype(), + pa.bool_(): pd.BooleanDtype(), + pa.string(): pd.StringDtype(), +} + +if PANDAS_GT_120: + PYARROW_NULLABLE_DTYPE_MAPPING[pa.float32()] = pd.Float32Dtype() + PYARROW_NULLABLE_DTYPE_MAPPING[pa.float64()] = pd.Float64Dtype() + + +def mapper(t): + print(t, PYARROW_NULLABLE_DTYPE_MAPPING.get(t)) + return PYARROW_NULLABLE_DTYPE_MAPPING.get(t) + + # # Helper Utilities # @@ -381,6 +405,7 @@ def read_partition( pieces, columns, index, + use_nullable_dtypes=False, categories=(), partitions=(), filters=None, @@ -451,7 +476,9 @@ def read_partition( arrow_table = pa.concat_tables(tables) # Convert to pandas - df = cls._arrow_table_to_pandas(arrow_table, categories, **kwargs) + df = cls._arrow_table_to_pandas( + arrow_table, categories, use_nullable_dtypes=use_nullable_dtypes, **kwargs + ) # For pyarrow.dataset api, need to convert partition columns # to categorigal manually for integer types. @@ -1547,11 +1574,14 @@ def _read_table( @classmethod def _arrow_table_to_pandas( - cls, arrow_table: pa.Table, categories, **kwargs + cls, arrow_table: pa.Table, categories, use_nullable_dtypes=False, **kwargs ) -> pd.DataFrame: _kwargs = kwargs.get("arrow_to_pandas", {}) _kwargs.update({"use_threads": False, "ignore_metadata": False}) + if use_nullable_dtypes: + _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get + return arrow_table.to_pandas(categories=categories, **_kwargs) @classmethod diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 36ca140c92b..e0352899a2a 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -45,6 +45,7 @@ def __init__( meta, columns, index, + use_nullable_dtypes, kwargs, common_kwargs, ): @@ -53,6 +54,7 @@ def __init__( self.meta = meta self._columns = columns self.index = index + self.use_nullable_dtypes = use_nullable_dtypes # `kwargs` = user-defined kwargs to be passed # identically for all partitions. @@ -78,6 +80,7 @@ def project_columns(self, columns): self.meta, columns, self.index, + self.use_nullable_dtypes, None, # Already merged into common_kwargs self.common_kwargs, ) @@ -101,6 +104,7 @@ def __call__(self, part): ], self.columns, self.index, + self.use_nullable_dtypes, self.common_kwargs, ) @@ -181,6 +185,7 @@ def read_parquet( index=None, storage_options=None, engine="auto", + use_nullable_dtypes=False, calculate_divisions=None, ignore_metadata_file=False, metadata_task_size=None, @@ -433,6 +438,7 @@ def read_parquet( "index": index, "storage_options": storage_options, "engine": engine, + "use_nullable_dtypes": use_nullable_dtypes, "calculate_divisions": calculate_divisions, "ignore_metadata_file": ignore_metadata_file, "metadata_task_size": metadata_task_size, @@ -540,6 +546,7 @@ def read_parquet( meta, columns, index, + use_nullable_dtypes, {}, # All kwargs should now be in `common_kwargs` common_kwargs, ) @@ -578,7 +585,9 @@ def check_multi_support(engine): return hasattr(engine, "multi_support") and engine.multi_support() -def read_parquet_part(fs, engine, meta, part, columns, index, kwargs): +def read_parquet_part( + fs, engine, meta, part, columns, index, use_nullable_dtypes, kwargs +): """Read a part of a parquet dataset This function is used by `read_parquet`.""" @@ -587,7 +596,14 @@ def read_parquet_part(fs, engine, meta, part, columns, index, kwargs): # Part kwargs expected func = engine.read_partition dfs = [ - func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw)) + func( + fs, + rg, + columns.copy(), + index, + use_nullable_dtypes, + **toolz.merge(kwargs, kw), + ) for (rg, kw) in part ] df = concat(dfs, axis=0) if len(dfs) > 1 else dfs[0] @@ -595,14 +611,24 @@ def read_parquet_part(fs, engine, meta, part, columns, index, kwargs): # No part specific kwargs, let engine read # list of parts at once df = engine.read_partition( - fs, [p[0] for p in part], columns.copy(), index, **kwargs + fs, + [p[0] for p in part], + columns.copy(), + index, + use_nullable_dtypes, + **kwargs, ) else: # NOTE: `kwargs` are the same for all parts, while `part_kwargs` may # be different for each part. rg, part_kwargs = part df = engine.read_partition( - fs, rg, columns, index, **toolz.merge(kwargs, part_kwargs) + fs, + rg, + columns, + index, + use_nullable_dtypes, + **toolz.merge(kwargs, part_kwargs), ) if meta.columns.name: From 58659d2d626a0c6b2fdb689f0a0c0d85eada182c Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 14:54:41 -0700 Subject: [PATCH 03/12] apply use_nullable_dtypes to meta --- dask/dataframe/io/parquet/arrow.py | 7 ++++--- dask/dataframe/io/parquet/core.py | 1 + dask/dataframe/io/tests/test_parquet.py | 6 +++--- dask/dataframe/io/utils.py | 10 ++++++++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 59c220b57ba..f52e136e7c3 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -351,6 +351,7 @@ def read_metadata( paths, categories=None, index=None, + use_nullable_dtypes=False, gather_statistics=None, filters=None, split_row_groups=False, @@ -380,7 +381,7 @@ def read_metadata( ) # Stage 2: Generate output `meta` - meta = cls._create_dd_meta(dataset_info) + meta = cls._create_dd_meta(dataset_info, use_nullable_dtypes) # Stage 3: Generate parts and stats parts, stats, common_kwargs = cls._construct_collection_plan(dataset_info) @@ -991,7 +992,7 @@ def _collect_dataset_info( } @classmethod - def _create_dd_meta(cls, dataset_info): + def _create_dd_meta(cls, dataset_info, use_nullable_dtypes=False): """Use parquet schema and hive-partition information (stored in dataset_info) to construct DataFrame metadata. """ @@ -1063,7 +1064,7 @@ def _create_dd_meta(cls, dataset_info): "categories: {} | columns: {}".format(categories, list(all_columns)) ) - dtypes = _get_pyarrow_dtypes(schema, categories) + dtypes = _get_pyarrow_dtypes(schema, categories, use_nullable_dtypes) dtypes = {storage_name_mapping.get(k, k): v for k, v in dtypes.items()} index_cols = index or () diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index e0352899a2a..9877ac84320 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -481,6 +481,7 @@ def read_parquet( paths, categories=categories, index=index, + use_nullable_dtypes=use_nullable_dtypes, gather_statistics=calculate_divisions, filters=filters, split_row_groups=split_row_groups, diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index c9093d9dd22..5adf0fcf6f4 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -657,7 +657,7 @@ def write_partition(df, i): assert_eq(df, ddf2) ddf2 = dd.read_parquet(fn, engine="pyarrow", use_nullable_dtypes=True) - assert_eq(df, ddf2) + assert_eq(df, ddf2, check_index=False) @write_read_engines() @@ -3185,11 +3185,11 @@ def clamp_arrow_datetimes(cls, arrow_table: pa.Table) -> pa.Table: @classmethod def _arrow_table_to_pandas( - cls, arrow_table: pa.Table, categories, **kwargs + cls, arrow_table: pa.Table, categories, use_nullable_dtypes=False, **kwargs ) -> pd.DataFrame: fixed_arrow_table = cls.clamp_arrow_datetimes(arrow_table) return super()._arrow_table_to_pandas( - fixed_arrow_table, categories, **kwargs + fixed_arrow_table, categories, use_nullable_dtypes, **kwargs ) # this should not fail, but instead produce timestamps that are in the valid range diff --git a/dask/dataframe/io/utils.py b/dask/dataframe/io/utils.py index 5af144b98f3..7271d1062bd 100644 --- a/dask/dataframe/io/utils.py +++ b/dask/dataframe/io/utils.py @@ -18,8 +18,14 @@ def _is_local_fs(fs): return fs and isinstance(fs, LocalFileSystem) -def _get_pyarrow_dtypes(schema, categories): +def _get_pyarrow_dtypes(schema, categories, use_nullable_dtypes=False): """Convert a pyarrow.Schema object to pandas dtype dict""" + if use_nullable_dtypes: + from dask.dataframe.io.parquet.arrow import PYARROW_NULLABLE_DTYPE_MAPPING + + type_mapper = PYARROW_NULLABLE_DTYPE_MAPPING.get + else: + type_mapper = lambda t: t.to_pandas_dtype() # Check for pandas metadata has_pandas_metadata = schema.metadata is not None and b"pandas" in schema.metadata @@ -53,7 +59,7 @@ def _get_pyarrow_dtypes(schema, categories): numpy_dtype = pandas_metadata_dtypes[field.name] else: try: - numpy_dtype = field.type.to_pandas_dtype() + numpy_dtype = type_mapper(field.type) except NotImplementedError: continue # Skip this field (in case we aren't reading it anyway) From cd9a6c6a9f1c41d620993cbb3bad3fa0ed898dcd Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 15:11:28 -0700 Subject: [PATCH 04/12] clean up --- dask/dataframe/io/parquet/fastparquet.py | 2 ++ dask/dataframe/io/parquet/utils.py | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/io/parquet/fastparquet.py b/dask/dataframe/io/parquet/fastparquet.py index 67d6bcbd902..ea204c45346 100644 --- a/dask/dataframe/io/parquet/fastparquet.py +++ b/dask/dataframe/io/parquet/fastparquet.py @@ -820,6 +820,7 @@ def read_metadata( paths, categories=None, index=None, + use_nullable_dtypes=False, gather_statistics=None, filters=None, split_row_groups=False, @@ -889,6 +890,7 @@ def read_partition( pieces, columns, index, + use_nullable_dtypes=False, categories=(), root_cats=None, root_file_scheme=None, diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index c955c774d66..1a55bbd84cc 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -17,6 +17,7 @@ def read_metadata( paths, categories=None, index=None, + use_nullable_dtypes=False, gather_statistics=None, filters=None, **kwargs, @@ -37,6 +38,9 @@ def read_metadata( The column name(s) to be used as the index. If set to ``None``, pandas metadata (if available) can be used to reset the value in this function + use_nullable_dtypes: boolean + Whether to use pandas nullable dtypes (like "string" or "Int64") + where appropriate when reading parquet files. gather_statistics: bool Whether or not to gather statistics to calculate divisions for the output DataFrame collection. @@ -73,7 +77,9 @@ def read_metadata( raise NotImplementedError() @classmethod - def read_partition(cls, fs, piece, columns, index, **kwargs): + def read_partition( + cls, fs, piece, columns, index, use_nullable_dtypes=False, **kwargs + ): """Read a single piece of a Parquet dataset into a Pandas DataFrame This function is called many times in individual tasks @@ -88,6 +94,9 @@ def read_partition(cls, fs, piece, columns, index, **kwargs): List of column names to pull out of that row group index: str, List[str], or False The index name(s). + use_nullable_dtypes: boolean + Whether to use pandas nullable dtypes (like "string" or "Int64") + where appropriate when reading parquet files. **kwargs: Includes `"kwargs"` values stored within the `parts` output of `engine.read_metadata`. May also include arguments to be From 32d45eb3d834299f98f55c756d8d8b0571c3318f Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 15:46:38 -0700 Subject: [PATCH 05/12] Raise when using fastparquet --- dask/dataframe/io/parquet/fastparquet.py | 4 ++++ dask/dataframe/io/tests/test_parquet.py | 25 ++++++++++++++++-------- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/dask/dataframe/io/parquet/fastparquet.py b/dask/dataframe/io/parquet/fastparquet.py index ea204c45346..1ccf821c23c 100644 --- a/dask/dataframe/io/parquet/fastparquet.py +++ b/dask/dataframe/io/parquet/fastparquet.py @@ -831,6 +831,10 @@ def read_metadata( parquet_file_extension=None, **kwargs, ): + if use_nullable_dtypes: + raise ValueError( + "`use_nullable_dtypes` is not supported by the fastparquet engine" + ) # Stage 1: Collect general dataset information dataset_info = cls._collect_dataset_info( diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 5adf0fcf6f4..12016219084 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -601,7 +601,7 @@ def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): typically add dtype metadata for this. """ if read_engine == "fastparquet" or write_engine == "fastparquet": - pytest.xfail("not working yet") + pytest.xfail("https://github.com/dask/fastparquet/issues/465") fn = str(tmpdir.join("test.parquet")) df = pd.DataFrame( @@ -622,7 +622,7 @@ def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): @PYARROW_MARK -def test_pyarrow_use_nullable_dtypes(tmpdir): +def test_use_nullable_dtypes(tmpdir, engine): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate @@ -652,12 +652,21 @@ def write_partition(df, i): partitions = ddf.to_delayed() dask.compute([write_partition(p, i) for i, p in enumerate(partitions)]) - with pytest.raises(AssertionError): - ddf2 = dd.read_parquet(fn, engine="pyarrow", use_nullable_dtypes=False) - assert_eq(df, ddf2) - - ddf2 = dd.read_parquet(fn, engine="pyarrow", use_nullable_dtypes=True) - assert_eq(df, ddf2, check_index=False) + # Not supported by fastparquet + if engine == "fastparquet": + with pytest.raises(ValueError, match="not supported"): + ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=True) + + # Works in pyarrow + elif "arrow" in engine: + # Doesn't round-trip by default when we aren't using nullable dtypes + with pytest.raises(AssertionError): + ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=False) + assert_eq(df, ddf2) + + # Round trip works when we use nullable dtypes + ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=True) + assert_eq(df, ddf2, check_index=False) @write_read_engines() From db090371118a7b75b5d64034588e0ae78b17d300 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 16:08:43 -0700 Subject: [PATCH 06/12] Add spark test --- dask/tests/test_spark_compat.py | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/dask/tests/test_spark_compat.py b/dask/tests/test_spark_compat.py index 65436acf685..936b4f7c0ce 100644 --- a/dask/tests/test_spark_compat.py +++ b/dask/tests/test_spark_compat.py @@ -11,6 +11,9 @@ pytest.importorskip("pyarrow") pytest.importorskip("fastparquet") +import numpy as np +import pandas as pd + from dask.dataframe.utils import assert_eq pytestmark = pytest.mark.skipif( @@ -106,3 +109,43 @@ def test_roundtrip_parquet_dask_to_spark(spark_session, npartitions, tmpdir, eng sdf = sdf.assign(timestamp=sdf.timestamp.dt.tz_localize("UTC")) assert_eq(sdf, ddf, check_index=False) + + +def test_roundtrip_parquet_spark_to_dask_extension_dtypes(spark_session, tmpdir): + tmpdir = str(tmpdir) + npartitions = 5 + + size = 20 + pdf = pd.DataFrame( + { + "a": range(size), + "b": np.random.random(size=size), + "c": [True, False] * (size // 2), + "d": ["alice", "bob"] * (size // 2), + } + ) + # Note: since we set use_nullable_dtypes=True below, we are expecting *all* + # of the resulting series to use those dtypes. If there is a mix of nullable + # and non-nullable dtypes here, then that will result in dtype mismatches + # in the finale frame. + pdf = pdf.astype( + { + "a": "Int64", + "b": "Float64", + "c": "boolean", + "d": "string[python]", + } + ) + # # Ensure all columns are extension dtypes + assert all([pd.api.types.is_extension_array_dtype(dtype) for dtype in pdf.dtypes]) + + sdf = spark_session.createDataFrame(pdf) + # We are not overwriting any data, but spark complains if the directory + # already exists (as tmpdir does) and we don't set overwrite + sdf.repartition(npartitions).write.parquet(tmpdir, mode="overwrite") + + ddf = dd.read_parquet(tmpdir, engine="pyarrow", use_nullable_dtypes=True) + assert all( + [pd.api.types.is_extension_array_dtype(dtype) for dtype in ddf.dtypes] + ), ddf.dtypes + assert_eq(ddf, pdf, check_index=False) From 203a1f24183ade57408d5833a2f79c9670964f25 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Wed, 2 Nov 2022 16:14:33 -0700 Subject: [PATCH 07/12] Remove debug code --- dask/dataframe/io/parquet/arrow.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index f52e136e7c3..96530d58c69 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -61,12 +61,6 @@ PYARROW_NULLABLE_DTYPE_MAPPING[pa.float32()] = pd.Float32Dtype() PYARROW_NULLABLE_DTYPE_MAPPING[pa.float64()] = pd.Float64Dtype() - -def mapper(t): - print(t, PYARROW_NULLABLE_DTYPE_MAPPING.get(t)) - return PYARROW_NULLABLE_DTYPE_MAPPING.get(t) - - # # Helper Utilities # From c77a88e9aa46dfc22fba5fd5a09e07f4b74fe1a8 Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 3 Nov 2022 11:53:31 -0700 Subject: [PATCH 08/12] Don't specify storage backend for string dtypes in tests, as that's not available in pandas <= 1.2.0. --- dask/dataframe/io/tests/test_parquet.py | 4 ++-- dask/tests/test_spark_compat.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 12016219084..3e6151c6a9c 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -609,7 +609,7 @@ def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), - "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string[python]"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), } ) ddf = dd.from_pandas(df, npartitions=2) @@ -633,7 +633,7 @@ def test_use_nullable_dtypes(tmpdir, engine): "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), - "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string[python]"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), } ) ddf = dd.from_pandas(df, npartitions=2) diff --git a/dask/tests/test_spark_compat.py b/dask/tests/test_spark_compat.py index 936b4f7c0ce..41e21d05f6e 100644 --- a/dask/tests/test_spark_compat.py +++ b/dask/tests/test_spark_compat.py @@ -133,7 +133,7 @@ def test_roundtrip_parquet_spark_to_dask_extension_dtypes(spark_session, tmpdir) "a": "Int64", "b": "Float64", "c": "boolean", - "d": "string[python]", + "d": "string", } ) # # Ensure all columns are extension dtypes From f2e1d2454058cfc76fee18e07b178edac5f18a0c Mon Sep 17 00:00:00 2001 From: Ian Rose Date: Thu, 3 Nov 2022 11:59:52 -0700 Subject: [PATCH 09/12] Speficy use_nullable_dtypes as kwarg --- dask/dataframe/io/parquet/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 9877ac84320..a5cd8cdc52f 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -602,7 +602,7 @@ def read_parquet_part( rg, columns.copy(), index, - use_nullable_dtypes, + use_nullable_dtypes=use_nullable_dtypes, **toolz.merge(kwargs, kw), ) for (rg, kw) in part @@ -616,7 +616,7 @@ def read_parquet_part( [p[0] for p in part], columns.copy(), index, - use_nullable_dtypes, + use_nullable_dtypes=use_nullable_dtypes, **kwargs, ) else: @@ -628,7 +628,7 @@ def read_parquet_part( rg, columns, index, - use_nullable_dtypes, + use_nullable_dtypes=use_nullable_dtypes, **toolz.merge(kwargs, part_kwargs), ) From a1f8181b9bd77a9d5a7ac45b02dbf0bbeb9aba23 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 23 Nov 2022 21:28:56 -0600 Subject: [PATCH 10/12] Support for use_nullable_dtypes + types_mapper --- dask/dataframe/io/parquet/arrow.py | 14 ++++++++++++- dask/dataframe/io/tests/test_parquet.py | 28 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 25ba66c2dce..5f96e78723c 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1572,7 +1572,19 @@ def _arrow_table_to_pandas( _kwargs.update({"use_threads": False, "ignore_metadata": False}) if use_nullable_dtypes: - _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get + if "types_mapper" in _kwargs: + # User-provided entries take priority over PYARROW_NULLABLE_DTYPE_MAPPING + types_mapper = _kwargs["types_mapper"] + + def _types_mapper(pa_type): + return types_mapper(pa_type) or PYARROW_NULLABLE_DTYPE_MAPPING.get( + pa_type + ) + + _kwargs["types_mapper"] = _types_mapper + + else: + _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get return arrow_table.to_pandas(categories=categories, **_kwargs) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 15ff5c09256..4ab8677775a 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -669,6 +669,34 @@ def write_partition(df, i): assert_eq(df, ddf2, check_index=False) +def test_use_nullable_dtypes_with_types_mapper(tmp_path, engine): + # Read in dataset with `use_nullable_dtypes=True` and a custom pyarrow `types_mapper`. + # Ensure `types_mapper` takes priority. + dataset_dir = tmp_path / "test.parquet" + df = pd.DataFrame( + { + "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), + "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), + "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), + } + ) + ddf = dd.from_pandas(df, npartitions=3) + ddf.to_parquet(dataset_dir, engine=engine) + + types_mapper = { + pa.int64(): pd.Float32Dtype(), + } + result = dd.read_parquet( + dataset_dir, + engine="pyarrow", + use_nullable_dtypes=True, + arrow_to_pandas={"types_mapper": types_mapper.get}, + ) + expected = df.astype({"a": pd.Float32Dtype()}) + assert_eq(result, expected) + + @write_read_engines() def test_categorical(tmpdir, write_engine, read_engine): tmp = str(tmpdir) From 8e6612da21c57369fe53234d2142d1be17764c4f Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 23 Nov 2022 21:49:48 -0600 Subject: [PATCH 11/12] Minor cleanup --- dask/dataframe/io/tests/test_parquet.py | 35 +++++++++---------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 4ab8677775a..6099588cf51 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -595,7 +595,7 @@ def test_roundtrip_from_pandas(tmpdir, write_engine, read_engine): @write_read_engines() -def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): +def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): """ Test round-tripping nullable extension dtypes. Parquet engines will typically add dtype metadata for this. @@ -603,7 +603,6 @@ def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): if read_engine == "fastparquet" or write_engine == "fastparquet": pytest.xfail("https://github.com/dask/fastparquet/issues/465") - fn = str(tmpdir.join("test.parquet")) df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), @@ -613,21 +612,17 @@ def test_roundtrip_nullable_dtypes(tmpdir, write_engine, read_engine): } ) ddf = dd.from_pandas(df, npartitions=2) - ddf.to_parquet( - fn, engine="pyarrow" if write_engine.startswith("pyarrow") else "fastparquet" - ) - ddf2 = dd.read_parquet(fn, engine=read_engine) - print(ddf2.dtypes) + ddf.to_parquet(tmp_path, engine=write_engine) + ddf2 = dd.read_parquet(tmp_path, engine=read_engine) assert_eq(df, ddf2) @PYARROW_MARK -def test_use_nullable_dtypes(tmpdir, engine): +def test_use_nullable_dtypes(tmp_path, engine): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate """ - fn = str(tmpdir) df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), @@ -640,12 +635,9 @@ def test_use_nullable_dtypes(tmpdir, engine): @dask.delayed def write_partition(df, i): - "Write a parquet file without the pandas metadata" - import pyarrow as pa - import pyarrow.parquet as pq - + """Write a parquet file without the pandas metadata""" table = pa.Table.from_pandas(df).replace_schema_metadata({}) - pq.write_table(table, fn + f"/part.{i}.parquet") + pq.write_table(table, tmp_path / f"part.{i}.parquet") # Create a pandas-metadata-free partitioned parquet. By default it will # not read into nullable extension dtypes @@ -654,25 +646,24 @@ def write_partition(df, i): # Not supported by fastparquet if engine == "fastparquet": - with pytest.raises(ValueError, match="not supported"): - ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=True) + with pytest.raises(ValueError, match="`use_nullable_dtypes` is not supported"): + dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) # Works in pyarrow - elif "arrow" in engine: + else: # Doesn't round-trip by default when we aren't using nullable dtypes with pytest.raises(AssertionError): - ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=False) + ddf2 = dd.read_parquet(tmp_path, engine=engine) assert_eq(df, ddf2) # Round trip works when we use nullable dtypes - ddf2 = dd.read_parquet(fn, engine=engine, use_nullable_dtypes=True) + ddf2 = dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) assert_eq(df, ddf2, check_index=False) def test_use_nullable_dtypes_with_types_mapper(tmp_path, engine): # Read in dataset with `use_nullable_dtypes=True` and a custom pyarrow `types_mapper`. # Ensure `types_mapper` takes priority. - dataset_dir = tmp_path / "test.parquet" df = pd.DataFrame( { "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), @@ -682,13 +673,13 @@ def test_use_nullable_dtypes_with_types_mapper(tmp_path, engine): } ) ddf = dd.from_pandas(df, npartitions=3) - ddf.to_parquet(dataset_dir, engine=engine) + ddf.to_parquet(tmp_path, engine=engine) types_mapper = { pa.int64(): pd.Float32Dtype(), } result = dd.read_parquet( - dataset_dir, + tmp_path, engine="pyarrow", use_nullable_dtypes=True, arrow_to_pandas={"types_mapper": types_mapper.get}, From f5bf706524e4be94b9ca991635dab43cd32cf9a4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 30 Nov 2022 16:23:41 -0600 Subject: [PATCH 12/12] xfail test_use_nullable_dtypes_with_types_mapper for pandas<1.3 --- dask/dataframe/io/tests/test_parquet.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 6099588cf51..cf0689d3706 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -661,6 +661,14 @@ def write_partition(df, i): assert_eq(df, ddf2, check_index=False) +@pytest.mark.xfail( + not PANDAS_GT_130, + reason=( + "Known bug in pandas. " + "See https://issues.apache.org/jira/browse/ARROW-13413 " + "and https://github.com/pandas-dev/pandas/pull/41052." + ), +) def test_use_nullable_dtypes_with_types_mapper(tmp_path, engine): # Read in dataset with `use_nullable_dtypes=True` and a custom pyarrow `types_mapper`. # Ensure `types_mapper` takes priority.