diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 369de1707e1..1e483977fef 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -22,7 +22,6 @@ _set_gather_statistics, _set_metadata_task_size, _sort_and_analyze_paths, - _split_user_options, ) from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files from dask.dataframe.utils import clear_known_categories @@ -31,7 +30,10 @@ # Check PyArrow version for feature support _pa_version = parse_version(pa.__version__) +from fsspec.core import expand_paths_if_needed, stringify_path +from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow import dataset as pa_ds +from pyarrow import fs as pa_fs subset_stats_supported = _pa_version > parse_version("2.0.0") pre_buffer_supported = _pa_version >= parse_version("5.0.0") @@ -60,6 +62,11 @@ # +def _wrapped_fs(fs): + """Return the wrapped filesystem if fs is ArrowFSWrapper""" + return fs.fs if isinstance(fs, ArrowFSWrapper) else fs + + def _append_row_groups(metadata, md): """Append row-group metadata and include a helpful error message if an inconsistent schema is detected. @@ -332,6 +339,67 @@ class ArrowDatasetEngine(Engine): # Public Class Methods # + @classmethod + def extract_filesystem( + cls, + urlpath, + filesystem, + dataset_options, + open_file_options, + storage_options, + ): + + # Check if filesystem was specified as a dataset option + if filesystem is None: + fs = dataset_options.pop("filesystem", "fsspec") + else: + if "filesystem" in dataset_options: + raise ValueError( + "Cannot specify a filesystem argument if the " + "'filesystem' dataset option is also defined." + ) + fs = filesystem + + # Handle pyarrow-based filesystem + if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + urlpath = [stringify_path(u) for u in urlpath] + else: + urlpath = [stringify_path(urlpath)] + + if fs in ("arrow", "pyarrow"): + fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) + + fsspec_fs = ArrowFSWrapper(fs) + if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): + # ArrowFSWrapper._strip_protocol not reliable on windows + # See: https://github.com/fsspec/filesystem_spec/issues/1137 + from fsspec.implementations.local import LocalFileSystem + + fs_strip = LocalFileSystem() + else: + fs_strip = fsspec_fs + paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None) + return ( + fsspec_fs, + [fs_strip._strip_protocol(u) for u in paths], + dataset_options, + {"open_file_func": fs.open_input_file}, + ) + + # Use default file-system initialization + return Engine.extract_filesystem( + urlpath, + fs, + dataset_options, + open_file_options, + storage_options, + ) + @classmethod def read_metadata( cls, @@ -556,7 +624,7 @@ def initialize_write( metadata_file_exists = False if append: # Extract metadata and get file offset if appending - ds = pa_ds.dataset(path, filesystem=fs, format="parquet") + ds = pa_ds.dataset(path, filesystem=_wrapped_fs(fs), format="parquet") i_offset = len(ds.files) if i_offset > 0: try: @@ -804,12 +872,8 @@ def _collect_dataset_info( ds = None valid_paths = None # Only used if `paths` is a list containing _metadata - # Extract "supported" key-word arguments from `kwargs` - ( - _dataset_kwargs, - read_kwargs, - user_kwargs, - ) = _split_user_options(**kwargs) + # Extract dataset-specific options + _dataset_kwargs = kwargs.pop("dataset", {}) if "partitioning" not in _dataset_kwargs: _dataset_kwargs["partitioning"] = "hive" @@ -831,7 +895,7 @@ def _collect_dataset_info( # Use _metadata file ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -859,7 +923,7 @@ def _collect_dataset_info( if not ignore_metadata_file: ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -873,7 +937,7 @@ def _collect_dataset_info( if ds is None: ds = pa_ds.dataset( paths, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) @@ -974,8 +1038,7 @@ def _collect_dataset_info( "metadata_task_size": metadata_task_size, "kwargs": { "dataset": _dataset_kwargs, - "read": read_kwargs, - **user_kwargs, + **kwargs, }, } @@ -1315,7 +1378,7 @@ def _collect_file_parts( file_frags = list( pa_ds.dataset( files_or_frags, - filesystem=fs, + filesystem=_wrapped_fs(fs), **dataset_options, ).get_fragments() ) @@ -1508,7 +1571,7 @@ def _read_table( # to a single "fragment" to read ds = pa_ds.dataset( path_or_frag, - filesystem=fs, + filesystem=_wrapped_fs(fs), **kwargs.get("dataset", {}), ) frags = list(ds.get_fragments()) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 8ed92b6a892..ed3e27fd516 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -15,7 +15,11 @@ from dask.dataframe.backends import dataframe_creation_dispatch from dask.dataframe.core import DataFrame, Scalar from dask.dataframe.io.io import from_map -from dask.dataframe.io.parquet.utils import Engine, _sort_and_analyze_paths +from dask.dataframe.io.parquet.utils import ( + Engine, + _sort_and_analyze_paths, + _split_user_options, +) from dask.dataframe.io.utils import DataFrameIOFunction, _is_local_fs from dask.dataframe.methods import concat from dask.delayed import Delayed @@ -193,6 +197,7 @@ def read_parquet( chunksize=None, aggregate_files=None, parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem=None, **kwargs, ): """ @@ -246,6 +251,8 @@ def read_parquet( data written by dask/fastparquet, not otherwise. storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``filesystem`` argument, described below. open_file_options : dict, default None Key/value arguments to be passed along to ``AbstractFileSystem.open`` when each parquet data file is open for reading. Experimental @@ -342,19 +349,27 @@ def read_parquet( unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", fsspec.AbstractFileSystem, or pyarrow.fs.FileSystem + Filesystem backend to use. Note that the "fastparquet" engine only + supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object. + Default is "fsspec". + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` + or ``fastparquet.ParquetFile`` object. These options may include a + "filesystem" key (or "fs" for the "fastparquet" engine) to configure + the desired file-system backend. However, the top-level ``filesystem`` + argument will always take precedence. + read: dict, default None + Dictionary of options to pass through to ``engine.read_partitions`` + using the ``read`` key-word argument. + arrow_to_pandas: dict, default None + Dictionary of options to use when converting from ``pyarrow.Table`` to + a pandas ``DataFrame`` object. Only used by the "arrow" engine. **kwargs: dict (of dicts) - Passthrough key-word arguments for read backend. - The top-level keys correspond to the appropriate operation type, and - the second level corresponds to the kwargs that will be passed on to - the underlying ``pyarrow`` or ``fastparquet`` function. - Supported top-level keys: 'dataset' (for opening a ``pyarrow`` dataset), - 'file' or 'dataset' (for opening a ``fastparquet.ParquetFile``), 'read' - (for the backend read function), 'arrow_to_pandas' (for controlling the - arguments passed to convert from a ``pyarrow.Table.to_pandas()``). - Any element of kwargs that is not defined under these top-level keys - will be passed through to the `engine.read_partitions` classmethod as a - stand-alone argument (and will be ignored by the engine implementations - defined in ``dask.dataframe``). + Options to pass through to ``engine.read_partitions`` as stand-alone + key-word arguments. Note that these options will be ignored by the + engines defined in ``dask.dataframe``, but may be used by other custom + implementations. Examples -------- @@ -446,6 +461,7 @@ def read_parquet( "chunksize": chunksize, "aggregate_files": aggregate_files, "parquet_file_extension": parquet_file_extension, + "filesystem": filesystem, **kwargs, } @@ -466,7 +482,23 @@ def read_parquet( # Update input_kwargs input_kwargs.update({"columns": columns, "engine": engine}) - fs, _, paths = get_fs_token_paths(path, mode="rb", storage_options=storage_options) + # Process and split user options + ( + dataset_options, + read_options, + open_file_options, + other_options, + ) = _split_user_options(**kwargs) + + # Extract global filesystem and paths + fs, paths, dataset_options, open_file_options = engine.extract_filesystem( + path, + filesystem, + dataset_options, + open_file_options, + storage_options, + ) + read_options["open_file_options"] = open_file_options paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering auto_index_allowed = False @@ -490,7 +522,9 @@ def read_parquet( ignore_metadata_file=ignore_metadata_file, metadata_task_size=metadata_task_size, parquet_file_extension=parquet_file_extension, - **kwargs, + dataset=dataset_options, + read=read_options, + **other_options, ) # In the future, we may want to give the engine the diff --git a/dask/dataframe/io/parquet/fastparquet.py b/dask/dataframe/io/parquet/fastparquet.py index 6f6523bd3ea..80512e80033 100644 --- a/dask/dataframe/io/parquet/fastparquet.py +++ b/dask/dataframe/io/parquet/fastparquet.py @@ -35,7 +35,6 @@ _set_gather_statistics, _set_metadata_task_size, _sort_and_analyze_paths, - _split_user_options, ) from dask.dataframe.io.utils import _is_local_fs, _meta_from_dtypes, _open_input_files from dask.dataframe.utils import UNKNOWN_CATEGORIES @@ -383,9 +382,8 @@ def _collect_dataset_info( # then each part will correspond to a file. Otherwise, each part will # correspond to a row group (populated later). - # Extract "supported" key-word arguments from `kwargs`. - # Split items into `dataset_kwargs` and `read_kwargs` - dataset_kwargs, read_kwargs, user_kwargs = _split_user_options(**kwargs) + # Extract dataset-specific options + dataset_kwargs = kwargs.pop("dataset", {}) parts = [] _metadata_exists = False @@ -512,8 +510,7 @@ def _collect_dataset_info( "metadata_task_size": metadata_task_size, "kwargs": { "dataset": dataset_kwargs, - "read": read_kwargs, - **user_kwargs, + **kwargs, }, } diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 1a55bbd84cc..359e7fa2f67 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -1,6 +1,9 @@ import re +import warnings import pandas as pd +from fsspec.core import expand_paths_if_needed, get_fs_token_paths, stringify_path +from fsspec.spec import AbstractFileSystem from dask import config from dask.dataframe.io.utils import _is_local_fs @@ -10,6 +13,95 @@ class Engine: """The API necessary to provide a new Parquet reader/writer""" + @classmethod + def extract_filesystem( + cls, + urlpath, + filesystem, + dataset_options, + open_file_options, + storage_options, + ): + """Extract filesystem object from urlpath or user arguments + + This classmethod should only be overridden for engines that need + to handle filesystem implementations other than ``fsspec`` + (e.g. ``pyarrow.fs.S3FileSystem``). + + Parameters + ---------- + urlpath: str or List[str] + Source directory for data, or path(s) to individual parquet files. + filesystem: "fsspec" or fsspec.AbstractFileSystem + Filesystem backend to use. Default is "fsspec" + dataset_options: dict + Engine-specific dataset options. + open_file_options: dict + Options to be used for file-opening at read time. + storage_options: dict + Options to be passed on to the file-system backend. + + Returns + ------- + fs: Any + A global filesystem object to be used for metadata + processing and file-opening by the engine. + paths: List[str] + List of data-source paths. + dataset_options: dict + Engine-specific dataset options. + open_file_options: dict + Options to be used for file-opening at read time. + """ + + # Check if fs was specified as a dataset option + if filesystem is None: + fs = dataset_options.pop("fs", "fsspec") + else: + if "fs" in dataset_options: + raise ValueError( + "Cannot specify a filesystem argument if the " + "'fs' dataset option is also defined." + ) + fs = filesystem + + if fs in (None, "fsspec"): + # Use fsspec to infer a filesystem by default + fs, _, paths = get_fs_token_paths( + urlpath, mode="rb", storage_options=storage_options + ) + return fs, paths, dataset_options, open_file_options + + else: + # Check that an initialized filesystem object was provided + if not isinstance(fs, AbstractFileSystem): + raise ValueError( + f"Expected fsspec.AbstractFileSystem or 'fsspec'. Got {fs}" + ) + + if storage_options: + # The filesystem was already specified. Can't pass in + # any storage options + raise ValueError( + f"Cannot specify storage_options when an explicit " + f"filesystem object is specified. Got: {storage_options}" + ) + + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + urlpath = [stringify_path(u) for u in urlpath] + else: + urlpath = [stringify_path(urlpath)] + + paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) + return ( + fs, + [fs._strip_protocol(u) for u in paths], + dataset_options, + open_file_options, + ) + @classmethod def read_metadata( cls, @@ -728,17 +820,27 @@ def _process_open_file_options( def _split_user_options(**kwargs): # Check user-defined options. - # Split into "file" and "dataset"-specific kwargs + # Split into "dataset"-specific kwargs user_kwargs = kwargs.copy() + + if "file" in user_kwargs: + # Deprecation warning to move toward a single `dataset` key + warnings.warn( + "Passing user options with the 'file' argument is now deprecated." + " Please use 'dataset' instead.", + FutureWarning, + ) + dataset_options = { **user_kwargs.pop("file", {}).copy(), **user_kwargs.pop("dataset", {}).copy(), } read_options = user_kwargs.pop("read", {}).copy() - read_options["open_file_options"] = user_kwargs.pop("open_file_options", {}).copy() + open_file_options = user_kwargs.pop("open_file_options", {}).copy() return ( dataset_options, read_options, + open_file_options, user_kwargs, ) diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index c12f956f35f..5a1089a693d 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -4403,6 +4403,44 @@ def test_retries_on_remote_filesystem(tmpdir): assert layer.annotations["retries"] == 2 +@pytest.mark.parametrize("fs", ["fsspec", None]) +def test_filesystem_option(tmp_path, engine, fs): + from fsspec.implementations.local import LocalFileSystem + + df = pd.DataFrame({"a": range(10)}) + dd.from_pandas(df, npartitions=2).to_parquet(tmp_path, engine=engine) + filesystem = fs or LocalFileSystem() + ddf = dd.read_parquet( + tmp_path, + engine=engine, + filesystem=filesystem, + ) + if fs is None: + layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs + assert layer_fs is filesystem + assert_eq(ddf, df) + + +@PYARROW_MARK +@pytest.mark.parametrize("fs", ["arrow", None]) +def test_pyarrow_filesystem_option(tmp_path, fs): + from fsspec.implementations.arrow import ArrowFSWrapper + from pyarrow.fs import LocalFileSystem + + df = pd.DataFrame({"a": range(10)}) + dd.from_pandas(df, npartitions=2).to_parquet(tmp_path) + fs = fs or LocalFileSystem() + ddf = dd.read_parquet( + tmp_path, + engine="pyarrow", + filesystem=fs, + ) + layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs + assert isinstance(layer_fs, ArrowFSWrapper) + assert isinstance(layer_fs.fs, LocalFileSystem) + assert_eq(ddf, df) + + def test_select_filtered_column(tmp_path, engine): df = pd.DataFrame({"a": range(10), "b": ["cat"] * 10}) diff --git a/dask/dataframe/io/utils.py b/dask/dataframe/io/utils.py index 7271d1062bd..3ee2710868a 100644 --- a/dask/dataframe/io/utils.py +++ b/dask/dataframe/io/utils.py @@ -15,7 +15,23 @@ def _is_local_fs(fs): """Check if an fsspec file-system is local""" - return fs and isinstance(fs, LocalFileSystem) + return fs and ( + isinstance(fs, LocalFileSystem) + # Check wrapped pyarrow filesystem + or _is_local_fs_pyarrow(fs) + ) + + +def _is_local_fs_pyarrow(fs): + """Check if a pyarrow-based file-system is local""" + if fs: + if hasattr(fs, "fs"): + # ArrowFSWrapper will have an "fs" attribute + return _is_local_fs_pyarrow(fs.fs) + elif hasattr(fs, "type_name"): + # pa.fs.LocalFileSystem will have "type_name" attribute + return fs.type_name == "local" + return False def _get_pyarrow_dtypes(schema, categories, use_nullable_dtypes=False):