From 9b69961c6b408054b248f712eeef2a3911b5f357 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 5 Dec 2022 16:10:28 -0600 Subject: [PATCH 1/9] Support use_nullable_dtypes="pandas|pyarrow" --- dask/dataframe/io/parquet/arrow.py | 21 ++++++++--- dask/dataframe/io/parquet/core.py | 6 ++++ dask/dataframe/io/tests/test_parquet.py | 48 +++++++++++++++++++++---- 3 files changed, 63 insertions(+), 12 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 369de1707e1..57a9fbc252e 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1580,19 +1580,30 @@ def _arrow_table_to_pandas( _kwargs.update({"use_threads": False, "ignore_metadata": False}) if use_nullable_dtypes: + # Determine is `pandas` or `pyarrow`-backed dtypes should be used + if use_nullable_dtypes in ("pandas", True): + default_types_mapper = PYARROW_NULLABLE_DTYPE_MAPPING.get + elif use_nullable_dtypes == "pyarrow": + + def default_types_mapper(pyarrow_dtype): # type: ignore + return pd.ArrowDtype(pyarrow_dtype) + + else: + raise ValueError( + "Invalid `use_nullable_dtypes` received. Expected `True`, " + f"`'pandas'`, or `'pyarrow'` but got {use_nullable_dtypes}." + ) if "types_mapper" in _kwargs: - # User-provided entries take priority over PYARROW_NULLABLE_DTYPE_MAPPING + # User-provided entries take priority over default_types_mapper types_mapper = _kwargs["types_mapper"] def _types_mapper(pa_type): - return types_mapper(pa_type) or PYARROW_NULLABLE_DTYPE_MAPPING.get( - pa_type - ) + return types_mapper(pa_type) or default_types_mapper(pa_type) _kwargs["types_mapper"] = _types_mapper else: - _kwargs["types_mapper"] = PYARROW_NULLABLE_DTYPE_MAPPING.get + _kwargs["types_mapper"] = default_types_mapper return arrow_table.to_pandas(categories=categories, **_kwargs) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 8ed92b6a892..b50a3fcd5fb 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -366,6 +366,12 @@ def read_parquet( pyarrow.parquet.ParquetDataset """ + if use_nullable_dtypes not in (True, False, "pandas", "pyarrow"): + raise ValueError( + "Invalid value for `use_nullable_dtypes` received. Expected `True`, `False`, " + f"`'pandas'`, or `'pyarrow'` but got {use_nullable_dtypes} instead." + ) + # "Pre-deprecation" warning for `chunksize` if chunksize: warnings.warn( diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index c12f956f35f..899cb0d1066 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -618,17 +618,29 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): @PYARROW_MARK -def test_use_nullable_dtypes(tmp_path, engine): +@pytest.mark.parametrize("use_nullable_dtypes", [True, "pandas", "pyarrow"]) +def test_use_nullable_dtypes(tmp_path, engine, use_nullable_dtypes): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate """ + + if use_nullable_dtypes in (True, "pandas"): + nullable_backend = "" + else: + nullable_backend = "[pyarrow]" df = pd.DataFrame( { - "a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64"), - "b": pd.Series([True, pd.NA, False, True, False], dtype="boolean"), - "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype="Float64"), - "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype="string"), + "a": pd.Series([1, 2, pd.NA, 3, 4], dtype=f"Int64{nullable_backend}"), + "b": pd.Series( + [True, pd.NA, False, True, False], dtype=f"boolean{nullable_backend}" + ), + "c": pd.Series( + [0.1, 0.2, 0.3, pd.NA, 0.4], dtype=f"Float64{nullable_backend}" + ), + "d": pd.Series( + ["a", "b", "c", "d", pd.NA], dtype=f"string{nullable_backend}" + ), } ) ddf = dd.from_pandas(df, npartitions=2) @@ -647,7 +659,9 @@ def write_partition(df, i): # Not supported by fastparquet if engine == "fastparquet": with pytest.raises(ValueError, match="`use_nullable_dtypes` is not supported"): - dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) + dd.read_parquet( + tmp_path, engine=engine, use_nullable_dtypes=use_nullable_dtypes + ) # Works in pyarrow else: @@ -657,10 +671,30 @@ def write_partition(df, i): assert_eq(df, ddf2) # Round trip works when we use nullable dtypes - ddf2 = dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) + ddf2 = dd.read_parquet( + tmp_path, engine=engine, use_nullable_dtypes=use_nullable_dtypes + ) assert_eq(df, ddf2, check_index=False) +def test_use_nullable_dtypes_raises(tmp_path, engine): + # Raise an informative error message when `use_nullable_dtypes` is invalid + df = pd.DataFrame({"a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64")}) + ddf = dd.from_pandas(df, npartitions=3) + ddf.to_parquet(tmp_path, engine=engine) + + bad_use_nullable_dtypes = "not-a-valid-option" + with pytest.raises(ValueError) as excinfo: + dd.read_parquet( + tmp_path, + engine=engine, + use_nullable_dtypes=bad_use_nullable_dtypes, + ) + msg = str(excinfo.value) + assert "Invalid value for `use_nullable_dtypes`" in msg + assert bad_use_nullable_dtypes in msg + + @pytest.mark.xfail( not PANDAS_GT_130, reason=( From f79594eda37c8c7b266fb47ddad4008d4b4addf8 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 5 Dec 2022 16:17:33 -0600 Subject: [PATCH 2/9] Cleanup --- dask/dataframe/io/parquet/arrow.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 57a9fbc252e..f3b96c21106 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1588,11 +1588,6 @@ def _arrow_table_to_pandas( def default_types_mapper(pyarrow_dtype): # type: ignore return pd.ArrowDtype(pyarrow_dtype) - else: - raise ValueError( - "Invalid `use_nullable_dtypes` received. Expected `True`, " - f"`'pandas'`, or `'pyarrow'` but got {use_nullable_dtypes}." - ) if "types_mapper" in _kwargs: # User-provided entries take priority over default_types_mapper types_mapper = _kwargs["types_mapper"] From 472fbd44c9ece1a1ba6a02077b0b8a7a93cef755 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 5 Dec 2022 20:31:22 -0600 Subject: [PATCH 3/9] Test fixup --- dask/dataframe/io/parquet/arrow.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index f3b96c21106..b0f51610abc 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1586,7 +1586,12 @@ def _arrow_table_to_pandas( elif use_nullable_dtypes == "pyarrow": def default_types_mapper(pyarrow_dtype): # type: ignore - return pd.ArrowDtype(pyarrow_dtype) + # Special case pyarrow strings to use more feature complete dtype + # See https://github.com/pandas-dev/pandas/issues/50074 + if pyarrow_dtype == pa.string(): + return pd.StringDtype("pyarrow") + else: + return pd.ArrowDtype(pyarrow_dtype) if "types_mapper" in _kwargs: # User-provided entries take priority over default_types_mapper From fb3a25fe923aba07e2a207dbb1f5a1db819a10ad Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 5 Dec 2022 21:08:24 -0600 Subject: [PATCH 4/9] Skip test is pyarrow dtypes not available --- dask/dataframe/io/tests/test_parquet.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 899cb0d1066..9c7ae89cf29 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -15,7 +15,12 @@ import dask.dataframe as dd import dask.multiprocessing from dask.blockwise import Blockwise, optimize_blockwise -from dask.dataframe._compat import PANDAS_GT_110, PANDAS_GT_121, PANDAS_GT_130 +from dask.dataframe._compat import ( + PANDAS_GT_110, + PANDAS_GT_121, + PANDAS_GT_130, + PANDAS_GT_150, +) from dask.dataframe.io.parquet.core import get_engine from dask.dataframe.io.parquet.utils import _parse_pandas_metadata from dask.dataframe.optimize import optimize_dataframe_getitem @@ -618,7 +623,19 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): @PYARROW_MARK -@pytest.mark.parametrize("use_nullable_dtypes", [True, "pandas", "pyarrow"]) +@pytest.mark.parametrize( + "use_nullable_dtypes", + [ + True, + "pandas", + pytest.param( + "pyarrow", + marks=pytest.mark.skipif( + not PANDAS_GT_150, reason="Requires pyarrow-backed nullable dtypes" + ), + ), + ], +) def test_use_nullable_dtypes(tmp_path, engine, use_nullable_dtypes): """ Test reading a parquet file without pandas metadata, From 2ae283ba1f774ce241653a32334088ca58a9a3fa Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 6 Dec 2022 15:51:25 -0600 Subject: [PATCH 5/9] Docstring --- dask/dataframe/io/parquet/core.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index b50a3fcd5fb..070f12c1283 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -3,6 +3,7 @@ import contextlib import math import warnings +from typing import Literal import tlz as toolz from fsspec.core import get_fs_token_paths @@ -185,7 +186,7 @@ def read_parquet( index=None, storage_options=None, engine="auto", - use_nullable_dtypes=False, + use_nullable_dtypes: bool | Literal["pandas", "pyarrow"] = False, calculate_divisions=None, ignore_metadata_file=False, metadata_task_size=None, @@ -257,6 +258,17 @@ def read_parquet( engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if it is installed, and falls back to ``fastparquet`` otherwise. + use_nullable_dtypes : {False, True, "pandas", "pyarrow"} + Whether to use dtypes that use ``pd.NA`` as a missing value indicator + for the resulting ``DataFrame``. ``True`` and ``"pandas"`` will use + pandas nullable dtypes (e.g. ``Int64``, ``string[python]``, etc.) while + ``"pyarrow"`` will use ``pyarrow``-backed extension dtypes (e.g. + ``int64[pyarrow]``, ``string[pyarrow]``, etc.). + + .. note:: + ``use_nullable_dtypes`` is only supported when ``engine="pyarrow"`` + and ``use_nullable_dtypes="pyarrow"`` requires ``pandas`` 1.5+. + calculate_divisions : bool, default False Whether to use min/max statistics from the footer metadata (or global ``_metadata`` file) to calculate divisions for the output DataFrame @@ -558,7 +570,7 @@ def read_parquet( if "retries" not in annotations and not _is_local_fs(fs): ctx = dask.annotate(retries=5) else: - ctx = contextlib.nullcontext() + ctx = contextlib.nullcontext() # type: ignore with ctx: # Construct the output collection with from_map From bf30884fa8183438e9eaf29c6c159a49b6b9d4ea Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 8 Dec 2022 16:39:16 -0600 Subject: [PATCH 6/9] Use config option --- dask/dask-schema.yaml | 8 +++ dask/dask.yaml | 1 + dask/dataframe/io/parquet/arrow.py | 5 +- dask/dataframe/io/parquet/core.py | 31 +++++----- dask/dataframe/io/tests/test_parquet.py | 77 +++++++++---------------- 5 files changed, 55 insertions(+), 67 deletions(-) diff --git a/dask/dask-schema.yaml b/dask/dask-schema.yaml index 8573c0e5b66..6be1a5b3bba 100644 --- a/dask/dask-schema.yaml +++ b/dask/dask-schema.yaml @@ -72,6 +72,14 @@ properties: task when reading a parquet dataset from a REMOTE file system. Specifying 0 will result in serial execution on the client. + nullable_backend: + enum: + - pandas + - pyarrow + description: | + The nullable dtype implementation to use. Must be either "pandas" or + "pyarrow". Default is "pandas". + array: type: object properties: diff --git a/dask/dask.yaml b/dask/dask.yaml index 4b649c80a81..50d0aed9e24 100644 --- a/dask/dask.yaml +++ b/dask/dask.yaml @@ -12,6 +12,7 @@ dataframe: parquet: metadata-task-size-local: 512 # Number of files per local metadata-processing task metadata-task-size-remote: 16 # Number of files per remote metadata-processing task + nullable_backend: "pandas" # Nullable dtype implementation to use array: backend: "numpy" # Backend array library for input IO and data creation diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index b0f51610abc..e00cf050c8e 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1581,9 +1581,10 @@ def _arrow_table_to_pandas( if use_nullable_dtypes: # Determine is `pandas` or `pyarrow`-backed dtypes should be used - if use_nullable_dtypes in ("pandas", True): + if use_nullable_dtypes == "pandas": default_types_mapper = PYARROW_NULLABLE_DTYPE_MAPPING.get - elif use_nullable_dtypes == "pyarrow": + else: + # use_nullable_dtypes == "pyarrow" def default_types_mapper(pyarrow_dtype): # type: ignore # Special case pyarrow strings to use more feature complete dtype diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 070f12c1283..78da7e51dab 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -3,7 +3,6 @@ import contextlib import math import warnings -from typing import Literal import tlz as toolz from fsspec.core import get_fs_token_paths @@ -186,7 +185,7 @@ def read_parquet( index=None, storage_options=None, engine="auto", - use_nullable_dtypes: bool | Literal["pandas", "pyarrow"] = False, + use_nullable_dtypes: bool = False, calculate_divisions=None, ignore_metadata_file=False, metadata_task_size=None, @@ -258,16 +257,21 @@ def read_parquet( engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if it is installed, and falls back to ``fastparquet`` otherwise. - use_nullable_dtypes : {False, True, "pandas", "pyarrow"} - Whether to use dtypes that use ``pd.NA`` as a missing value indicator - for the resulting ``DataFrame``. ``True`` and ``"pandas"`` will use - pandas nullable dtypes (e.g. ``Int64``, ``string[python]``, etc.) while - ``"pyarrow"`` will use ``pyarrow``-backed extension dtypes (e.g. - ``int64[pyarrow]``, ``string[pyarrow]``, etc.). + use_nullable_dtypes : {False, True} + Whether to use extension dtypes for the resulting ``DataFrame``. + ``use_nullable_dtypes=True`` is only supported when ``engine="pyarrow"``. .. note:: - ``use_nullable_dtypes`` is only supported when ``engine="pyarrow"`` - and ``use_nullable_dtypes="pyarrow"`` requires ``pandas`` 1.5+. + + Use the ``dataframe.nullable_backend`` config option to select which + dtype implementation to use. + + ``dataframe.nullable_backend="pandas"`` (the default) will use + pandas' ``numpy``-backed nullable dtypes (e.g. ``Int64``, + ``string[python]``, etc.) while ``dataframe.nullable_backend="pyarrow"`` + will use ``pyarrow``-backed extension dtypes (e.g. ``int64[pyarrow]``, + ``string[pyarrow]``, etc.). ``dataframe.nullable_backend="pyarrow"`` + requires ``pandas`` 1.5+. calculate_divisions : bool, default False Whether to use min/max statistics from the footer metadata (or global @@ -378,11 +382,8 @@ def read_parquet( pyarrow.parquet.ParquetDataset """ - if use_nullable_dtypes not in (True, False, "pandas", "pyarrow"): - raise ValueError( - "Invalid value for `use_nullable_dtypes` received. Expected `True`, `False`, " - f"`'pandas'`, or `'pyarrow'` but got {use_nullable_dtypes} instead." - ) + if use_nullable_dtypes: + use_nullable_dtypes = dask.config.get("dataframe.nullable_backend") # "Pre-deprecation" warning for `chunksize` if chunksize: diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 9c7ae89cf29..2ab5ae3e65f 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -624,9 +624,8 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): @PYARROW_MARK @pytest.mark.parametrize( - "use_nullable_dtypes", + "nullable_backend", [ - True, "pandas", pytest.param( "pyarrow", @@ -636,28 +635,25 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): ), ], ) -def test_use_nullable_dtypes(tmp_path, engine, use_nullable_dtypes): +def test_use_nullable_dtypes(tmp_path, engine, nullable_backend): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate """ - if use_nullable_dtypes in (True, "pandas"): - nullable_backend = "" + if nullable_backend == "pandas": + dtype_extra = "" else: - nullable_backend = "[pyarrow]" + # nullable_backend == "pyarrow" + dtype_extra = "[pyarrow]" df = pd.DataFrame( { - "a": pd.Series([1, 2, pd.NA, 3, 4], dtype=f"Int64{nullable_backend}"), + "a": pd.Series([1, 2, pd.NA, 3, 4], dtype=f"Int64{dtype_extra}"), "b": pd.Series( - [True, pd.NA, False, True, False], dtype=f"boolean{nullable_backend}" - ), - "c": pd.Series( - [0.1, 0.2, 0.3, pd.NA, 0.4], dtype=f"Float64{nullable_backend}" - ), - "d": pd.Series( - ["a", "b", "c", "d", pd.NA], dtype=f"string{nullable_backend}" + [True, pd.NA, False, True, False], dtype=f"boolean{dtype_extra}" ), + "c": pd.Series([0.1, 0.2, 0.3, pd.NA, 0.4], dtype=f"Float64{dtype_extra}"), + "d": pd.Series(["a", "b", "c", "d", pd.NA], dtype=f"string{dtype_extra}"), } ) ddf = dd.from_pandas(df, npartitions=2) @@ -673,43 +669,24 @@ def write_partition(df, i): partitions = ddf.to_delayed() dask.compute([write_partition(p, i) for i, p in enumerate(partitions)]) - # Not supported by fastparquet - if engine == "fastparquet": - with pytest.raises(ValueError, match="`use_nullable_dtypes` is not supported"): - dd.read_parquet( - tmp_path, engine=engine, use_nullable_dtypes=use_nullable_dtypes - ) - - # Works in pyarrow - else: - # Doesn't round-trip by default when we aren't using nullable dtypes - with pytest.raises(AssertionError): - ddf2 = dd.read_parquet(tmp_path, engine=engine) - assert_eq(df, ddf2) - - # Round trip works when we use nullable dtypes - ddf2 = dd.read_parquet( - tmp_path, engine=engine, use_nullable_dtypes=use_nullable_dtypes - ) - assert_eq(df, ddf2, check_index=False) - - -def test_use_nullable_dtypes_raises(tmp_path, engine): - # Raise an informative error message when `use_nullable_dtypes` is invalid - df = pd.DataFrame({"a": pd.Series([1, 2, pd.NA, 3, 4], dtype="Int64")}) - ddf = dd.from_pandas(df, npartitions=3) - ddf.to_parquet(tmp_path, engine=engine) + with dask.config.set({"dataframe.nullable_backend": nullable_backend}): + # Not supported by fastparquet + if engine == "fastparquet": + with pytest.raises( + ValueError, match="`use_nullable_dtypes` is not supported" + ): + dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) - bad_use_nullable_dtypes = "not-a-valid-option" - with pytest.raises(ValueError) as excinfo: - dd.read_parquet( - tmp_path, - engine=engine, - use_nullable_dtypes=bad_use_nullable_dtypes, - ) - msg = str(excinfo.value) - assert "Invalid value for `use_nullable_dtypes`" in msg - assert bad_use_nullable_dtypes in msg + # Works in pyarrow + else: + # Doesn't round-trip by default when we aren't using nullable dtypes + with pytest.raises(AssertionError): + ddf2 = dd.read_parquet(tmp_path, engine=engine) + assert_eq(df, ddf2) + + # Round trip works when we use nullable dtypes + ddf2 = dd.read_parquet(tmp_path, engine=engine, use_nullable_dtypes=True) + assert_eq(df, ddf2, check_index=False) @pytest.mark.xfail( From 911f36b943d4bb07561c62ae70fa80c5fb02f369 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Dec 2022 16:26:02 -0600 Subject: [PATCH 7/9] Add test that demonstrates increased spark interoperability --- dask/tests/test_spark_compat.py | 43 ++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/dask/tests/test_spark_compat.py b/dask/tests/test_spark_compat.py index 41e21d05f6e..94e4fdd4b1b 100644 --- a/dask/tests/test_spark_compat.py +++ b/dask/tests/test_spark_compat.py @@ -1,14 +1,16 @@ +import decimal import signal import sys import threading import pytest +import dask from dask.datasets import timeseries dd = pytest.importorskip("dask.dataframe") pyspark = pytest.importorskip("pyspark") -pytest.importorskip("pyarrow") +pa = pytest.importorskip("pyarrow") pytest.importorskip("fastparquet") import numpy as np @@ -149,3 +151,42 @@ def test_roundtrip_parquet_spark_to_dask_extension_dtypes(spark_session, tmpdir) [pd.api.types.is_extension_array_dtype(dtype) for dtype in ddf.dtypes] ), ddf.dtypes assert_eq(ddf, pdf, check_index=False) + + +def test_read_decimal_dtype_pyarrow(spark_session, tmpdir): + tmpdir = str(tmpdir) + npartitions = 3 + size = 6 + + decimal_data = [ + decimal.Decimal("8093.234"), + decimal.Decimal("8094.234"), + decimal.Decimal("8095.234"), + decimal.Decimal("8096.234"), + decimal.Decimal("8097.234"), + decimal.Decimal("8098.234"), + ] + pdf = pd.DataFrame( + { + "a": range(size), + "b": decimal_data, + } + ) + sdf = spark_session.createDataFrame(pdf) + sdf = sdf.withColumn("b", sdf["b"].cast(pyspark.sql.types.DecimalType(7, 3))) + # We are not overwriting any data, but spark complains if the directory + # already exists (as tmpdir does) and we don't set overwrite + sdf.repartition(npartitions).write.parquet(tmpdir, mode="overwrite") + + with dask.config.set({"dataframe.nullable_backend": "pyarrow"}): + ddf = dd.read_parquet(tmpdir, engine="pyarrow", use_nullable_dtypes=True) + assert ddf.b.dtype.pyarrow_dtype == pa.decimal128(7, 3) + assert ddf.b.compute().dtype.pyarrow_dtype == pa.decimal128(7, 3) + expected = pdf.astype( + { + "a": "int64[pyarrow]", + "b": pd.ArrowDtype(pa.decimal128(7, 3)), + } + ) + + assert_eq(ddf, expected, check_index=False) From 0498e5cf2fa95c17ae1fe90d616b802d0ec86cfc Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Dec 2022 16:59:51 -0600 Subject: [PATCH 8/9] Skip test when arrow dtypes are available --- dask/tests/test_spark_compat.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dask/tests/test_spark_compat.py b/dask/tests/test_spark_compat.py index 94e4fdd4b1b..c810a4aad58 100644 --- a/dask/tests/test_spark_compat.py +++ b/dask/tests/test_spark_compat.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd +from dask.dataframe._compat import PANDAS_GT_150 from dask.dataframe.utils import assert_eq pytestmark = pytest.mark.skipif( @@ -153,6 +154,7 @@ def test_roundtrip_parquet_spark_to_dask_extension_dtypes(spark_session, tmpdir) assert_eq(ddf, pdf, check_index=False) +@pytest.mark.skipif(not PANDAS_GT_150, reason="Requires pyarrow-backed nullable dtypes") def test_read_decimal_dtype_pyarrow(spark_session, tmpdir): tmpdir = str(tmpdir) npartitions = 3 From dd80bb8635d0c35ec2a3ce5ae1133335cb855e02 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 16 Dec 2022 13:18:59 -0600 Subject: [PATCH 9/9] Rename to dtype_backend --- dask/dask-schema.yaml | 2 +- dask/dask.yaml | 2 +- dask/dataframe/io/parquet/core.py | 10 +++++----- dask/dataframe/io/tests/test_parquet.py | 10 +++++----- dask/tests/test_spark_compat.py | 2 +- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/dask/dask-schema.yaml b/dask/dask-schema.yaml index 6be1a5b3bba..7a7bc84680d 100644 --- a/dask/dask-schema.yaml +++ b/dask/dask-schema.yaml @@ -72,7 +72,7 @@ properties: task when reading a parquet dataset from a REMOTE file system. Specifying 0 will result in serial execution on the client. - nullable_backend: + dtype_backend: enum: - pandas - pyarrow diff --git a/dask/dask.yaml b/dask/dask.yaml index 50d0aed9e24..2d640fc64a8 100644 --- a/dask/dask.yaml +++ b/dask/dask.yaml @@ -12,7 +12,7 @@ dataframe: parquet: metadata-task-size-local: 512 # Number of files per local metadata-processing task metadata-task-size-remote: 16 # Number of files per remote metadata-processing task - nullable_backend: "pandas" # Nullable dtype implementation to use + dtype_backend: "pandas" # Dtype implementation to use array: backend: "numpy" # Backend array library for input IO and data creation diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 78da7e51dab..b7beb88931a 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -263,14 +263,14 @@ def read_parquet( .. note:: - Use the ``dataframe.nullable_backend`` config option to select which + Use the ``dataframe.dtype_backend`` config option to select which dtype implementation to use. - ``dataframe.nullable_backend="pandas"`` (the default) will use + ``dataframe.dtype_backend="pandas"`` (the default) will use pandas' ``numpy``-backed nullable dtypes (e.g. ``Int64``, - ``string[python]``, etc.) while ``dataframe.nullable_backend="pyarrow"`` + ``string[python]``, etc.) while ``dataframe.dtype_backend="pyarrow"`` will use ``pyarrow``-backed extension dtypes (e.g. ``int64[pyarrow]``, - ``string[pyarrow]``, etc.). ``dataframe.nullable_backend="pyarrow"`` + ``string[pyarrow]``, etc.). ``dataframe.dtype_backend="pyarrow"`` requires ``pandas`` 1.5+. calculate_divisions : bool, default False @@ -383,7 +383,7 @@ def read_parquet( """ if use_nullable_dtypes: - use_nullable_dtypes = dask.config.get("dataframe.nullable_backend") + use_nullable_dtypes = dask.config.get("dataframe.dtype_backend") # "Pre-deprecation" warning for `chunksize` if chunksize: diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 2ab5ae3e65f..dd8450de134 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -624,7 +624,7 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): @PYARROW_MARK @pytest.mark.parametrize( - "nullable_backend", + "dtype_backend", [ "pandas", pytest.param( @@ -635,16 +635,16 @@ def test_roundtrip_nullable_dtypes(tmp_path, write_engine, read_engine): ), ], ) -def test_use_nullable_dtypes(tmp_path, engine, nullable_backend): +def test_use_nullable_dtypes(tmp_path, engine, dtype_backend): """ Test reading a parquet file without pandas metadata, but forcing use of nullable dtypes where appropriate """ - if nullable_backend == "pandas": + if dtype_backend == "pandas": dtype_extra = "" else: - # nullable_backend == "pyarrow" + # dtype_backend == "pyarrow" dtype_extra = "[pyarrow]" df = pd.DataFrame( { @@ -669,7 +669,7 @@ def write_partition(df, i): partitions = ddf.to_delayed() dask.compute([write_partition(p, i) for i, p in enumerate(partitions)]) - with dask.config.set({"dataframe.nullable_backend": nullable_backend}): + with dask.config.set({"dataframe.dtype_backend": dtype_backend}): # Not supported by fastparquet if engine == "fastparquet": with pytest.raises( diff --git a/dask/tests/test_spark_compat.py b/dask/tests/test_spark_compat.py index c810a4aad58..f26a859dfce 100644 --- a/dask/tests/test_spark_compat.py +++ b/dask/tests/test_spark_compat.py @@ -180,7 +180,7 @@ def test_read_decimal_dtype_pyarrow(spark_session, tmpdir): # already exists (as tmpdir does) and we don't set overwrite sdf.repartition(npartitions).write.parquet(tmpdir, mode="overwrite") - with dask.config.set({"dataframe.nullable_backend": "pyarrow"}): + with dask.config.set({"dataframe.dtype_backend": "pyarrow"}): ddf = dd.read_parquet(tmpdir, engine="pyarrow", use_nullable_dtypes=True) assert ddf.b.dtype.pyarrow_dtype == pa.decimal128(7, 3) assert ddf.b.compute().dtype.pyarrow_dtype == pa.decimal128(7, 3)