From 9c077801eb298be511062ab8e860eefa69492a41 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 28 Nov 2022 12:05:14 -0800 Subject: [PATCH 1/9] expose filesystem option --- dask/dataframe/io/parquet/arrow.py | 69 +++++++++++++++++++++++++++--- dask/dataframe/io/parquet/core.py | 7 ++- dask/dataframe/io/parquet/utils.py | 42 ++++++++++++++++++ 3 files changed, 111 insertions(+), 7 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 0d7ca6eefff..12f52c1a370 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -30,7 +30,10 @@ # Check PyArrow version for feature support _pa_version = parse_version(pa.__version__) +from fsspec.core import expand_paths_if_needed, stringify_path +from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow import dataset as pa_ds +from pyarrow import fs as pa_fs subset_stats_supported = _pa_version > parse_version("2.0.0") pre_buffer_supported = _pa_version >= parse_version("5.0.0") @@ -42,6 +45,10 @@ # +def _dataset_fs(fs): + return fs.fs if isinstance(fs, ArrowFSWrapper) else fs + + def _append_row_groups(metadata, md): """Append row-group metadata and include a helpful error message if an inconsistent schema is detected. @@ -314,6 +321,56 @@ class ArrowDatasetEngine(Engine): # Public Class Methods # + @classmethod + def get_filesystem( + cls, + urlpath, + filesystem=None, + storage_options=None, + open_file_options=None, + ): + if filesystem is None: + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + strpath = stringify_path(next(iter(urlpath))) + else: + strpath = stringify_path(urlpath) + if strpath.startswith("s3://") and not open_file_options: + filesystem = "arrow" + else: + filesystem = "fsspec" + + if isinstance(filesystem, pa_fs.FileSystem) or filesystem == "arrow": + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + urlpath = [stringify_path(u) for u in urlpath] + else: + urlpath = [stringify_path(urlpath)] + + if filesystem == "arrow": + filesystem = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) + if isinstance(filesystem, pa_fs.LocalFileSystem): + filesystem = "fsspec" + + fs = ArrowFSWrapper(filesystem) + paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) + return ( + fs, + [fs._strip_protocol(u) for u in paths], + {"open_file_func": filesystem.open_input_file}, + ) + + return Engine.get_filesystem( + urlpath, + filesystem=filesystem, + storage_options=storage_options, + open_file_options=open_file_options, + ) + @classmethod def read_metadata( cls, @@ -534,7 +591,7 @@ def initialize_write( metadata_file_exists = False if append: # Extract metadata and get file offset if appending - ds = pa_ds.dataset(path, filesystem=fs, format="parquet") + ds = pa_ds.dataset(path, filesystem=_dataset_fs(fs), format="parquet") i_offset = len(ds.files) if i_offset > 0: try: @@ -809,7 +866,7 @@ def _collect_dataset_info( # Use _metadata file ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_dataset_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -837,7 +894,7 @@ def _collect_dataset_info( if not ignore_metadata_file: ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_dataset_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -851,7 +908,7 @@ def _collect_dataset_info( if ds is None: ds = pa_ds.dataset( paths, - filesystem=fs, + filesystem=_dataset_fs(fs), **_dataset_kwargs, ) @@ -1284,7 +1341,7 @@ def _collect_file_parts( file_frags = list( pa_ds.dataset( files_or_frags, - filesystem=fs, + filesystem=_dataset_fs(fs), **dataset_options, ).get_fragments() ) @@ -1477,7 +1534,7 @@ def _read_table( # to a single "fragment" to read ds = pa_ds.dataset( path_or_frag, - filesystem=fs, + filesystem=_dataset_fs(fs), **kwargs.get("dataset", {}), ) frags = list(ds.get_fragments()) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 527a86f924d..fb88504c685 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -180,6 +180,7 @@ def read_parquet( categories=None, index=None, storage_options=None, + filesystem=None, engine="auto", calculate_divisions=None, ignore_metadata_file=False, @@ -431,6 +432,7 @@ def read_parquet( "filters": filters, "categories": categories, "index": index, + "filesystem": filesystem, "storage_options": storage_options, "engine": engine, "calculate_divisions": calculate_divisions, @@ -460,7 +462,10 @@ def read_parquet( # Update input_kwargs input_kwargs.update({"columns": columns, "engine": engine}) - fs, _, paths = get_fs_token_paths(path, mode="rb", storage_options=storage_options) + # Get fsspec-compatible filesystem and paths + fs, paths, kwargs["open_file_options"] = engine.get_filesystem( + path, filesystem, storage_options + ) paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering auto_index_allowed = False diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index c955c774d66..0d2593d540f 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -1,6 +1,9 @@ import re +import warnings import pandas as pd +from fsspec.core import expand_paths_if_needed, get_fs_token_paths, stringify_path +from fsspec.spec import AbstractFileSystem from dask import config from dask.dataframe.io.utils import _is_local_fs @@ -10,6 +13,45 @@ class Engine: """The API necessary to provide a new Parquet reader/writer""" + @classmethod + def get_filesystem( + cls, + urlpath, + filesystem=None, + storage_options=None, + open_file_options=None, + ): + # Use fsspec to infer a filesystem by default + filesystem = filesystem or "fsspec" + if filesystem != "fsspec": + + if not isinstance(filesystem, AbstractFileSystem): + raise ValueError( + f"Expected fsspec.AbstractFileSystem. Got {type(filesystem)}" + ) + + if storage_options: + warnings.warn(f"Ignoring storage_options: {storage_options}") + + if isinstance(urlpath, (list, tuple, set)): + if not urlpath: + raise ValueError("empty urlpath sequence") + urlpath = [stringify_path(u) for u in urlpath] + else: + urlpath = [stringify_path(urlpath)] + + paths = expand_paths_if_needed(urlpath, "rb", 1, filesystem, None) + return ( + filesystem, + [filesystem._strip_protocol(u) for u in paths], + open_file_options or {}, + ) + + fs, _, paths = get_fs_token_paths( + urlpath, mode="rb", storage_options=storage_options + ) + return fs, paths, open_file_options or {} + @classmethod def read_metadata( cls, From 61d0aab2a107182abf2f3ed7b72f6bf24f3e4f28 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 5 Dec 2022 12:12:38 -0800 Subject: [PATCH 2/9] move argument under dataset and add exception handling --- dask/dataframe/io/parquet/arrow.py | 198 ++++++++++++++++------- dask/dataframe/io/parquet/core.py | 30 +++- dask/dataframe/io/parquet/fastparquet.py | 9 +- dask/dataframe/io/parquet/utils.py | 73 +++++++-- dask/dataframe/io/utils.py | 18 ++- 5 files changed, 237 insertions(+), 91 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 12f52c1a370..e0b8fbf173f 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1,5 +1,6 @@ import json import textwrap +import warnings from collections import defaultdict from datetime import datetime @@ -21,7 +22,6 @@ _set_gather_statistics, _set_metadata_task_size, _sort_and_analyze_paths, - _split_user_options, ) from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files from dask.dataframe.utils import clear_known_categories @@ -45,10 +45,34 @@ # -def _dataset_fs(fs): +_s3_note = ( + "Note that this version of `ArrowDatasetEngine` will attempt " + "to use `S3FileSystem` by default when reading from s3 storage. " + "If necessary, try passing in `dataset=dict(filesystem='arrow')` " + "to revert the default to `s3fs` (if available)" +) + + +def _wrapped_fs(fs): + """Return the wrapped filesystem if fs is ArrowFSWrapper""" return fs.fs if isinstance(fs, ArrowFSWrapper) else fs +def _with_wrapped_fs(func, *args, filesystem=None, **kwargs): + """Call a function with a filesystem kwarg that may be wrapped""" + fs = _wrapped_fs(filesystem) + try: + return func(*args, filesystem=fs, **kwargs) + except Exception as err: + if not (hasattr(fs, "type_name") and fs.type_name == "s3"): + raise err + raise type(err)( + f"Call to {func} failed with `filesystem={filesystem}`.\n" + f"{_s3_note}\n" + f"Original Error: {err}" + ) + + def _append_row_groups(metadata, md): """Append row-group metadata and include a helpful error message if an inconsistent schema is detected. @@ -231,27 +255,36 @@ def _read_table_from_path( else {} ) - with _open_input_files( - [path], - fs=fs, - precache_options=precache_options, - **open_file_options, - )[0] as fil: - if row_groups == [None]: - return pq.ParquetFile(fil, **pre_buffer).read( - columns=columns, - use_threads=False, - use_pandas_metadata=True, - **read_kwargs, - ) - else: - return pq.ParquetFile(fil, **pre_buffer).read_row_groups( - row_groups, - columns=columns, - use_threads=False, - use_pandas_metadata=True, - **read_kwargs, + try: + with _open_input_files( + [path], + fs=fs, + precache_options=precache_options, + **open_file_options, + )[0] as fil: + if row_groups == [None]: + return pq.ParquetFile(fil, **pre_buffer).read( + columns=columns, + use_threads=False, + use_pandas_metadata=True, + **read_kwargs, + ) + else: + return pq.ParquetFile(fil, **pre_buffer).read_row_groups( + row_groups, + columns=columns, + use_threads=False, + use_pandas_metadata=True, + **read_kwargs, + ) + except Exception as err: + if open_file_options.get("open_file_func", None): + raise type(err)( + f"Failed to open and read Parquet file.\n" + f"{_s3_note}\n" + f"Original Error: {err}" ) + raise err def _get_rg_statistics(row_group, col_indices): @@ -322,13 +355,16 @@ class ArrowDatasetEngine(Engine): # @classmethod - def get_filesystem( + def extract_filesystem( cls, urlpath, - filesystem=None, - storage_options=None, - open_file_options=None, + dataset_options, + open_file_options, + storage_options, ): + + # Check if fs was already specified as a dataset option + filesystem = dataset_options.pop("filesystem", None) if filesystem is None: if isinstance(urlpath, (list, tuple, set)): if not urlpath: @@ -336,7 +372,37 @@ def get_filesystem( strpath = stringify_path(next(iter(urlpath))) else: strpath = stringify_path(urlpath) - if strpath.startswith("s3://") and not open_file_options: + if ( + strpath.startswith("s3://") + and not open_file_options + and not ( + # Only use PyArrow by default when storage is in s3, + # and `storage_option` only includes simple options + # # that are "expected" by `S3FileSystem` + set(storage_options) + - { + "access_key", + "secret_key", + "session_token", + "anonymous", + "role_arn", + "session_name", + "external_id", + "load_frequency", + "region", + "request_timeout", + "connect_timeout", + "scheme", + "endpoint_override", + "background_writes", + "default_metadata", + "proxy_options", + "allow_bucket_creation", + "allow_bucket_deletion", + "retry_strategy", + } + ) + ): filesystem = "arrow" else: filesystem = "fsspec" @@ -350,25 +416,35 @@ def get_filesystem( urlpath = [stringify_path(urlpath)] if filesystem == "arrow": - filesystem = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( - **(storage_options or {}) - ) - if isinstance(filesystem, pa_fs.LocalFileSystem): + try: + filesystem = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) + except (TypeError, pa.lib.ArrowInvalid) as err: + # Try falling back to fsspec + warnings.warn( + f"Failed to initialize a pyarrow-based `FileSystem` object. " + f"Falling back to `fsspec`.\n" + f"{_s3_note}\n" + f"Original Error: {err}" + ) filesystem = "fsspec" - fs = ArrowFSWrapper(filesystem) - paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) - return ( - fs, - [fs._strip_protocol(u) for u in paths], - {"open_file_func": filesystem.open_input_file}, - ) + if filesystem != "fsspec": + fs = ArrowFSWrapper(filesystem) + paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) + return ( + fs, + [fs._strip_protocol(u) for u in paths], + dataset_options, + {"open_file_func": filesystem.open_input_file}, + ) - return Engine.get_filesystem( + return Engine.extract_filesystem( urlpath, - filesystem=filesystem, - storage_options=storage_options, - open_file_options=open_file_options, + dataset_options, + open_file_options, + storage_options, ) @classmethod @@ -591,7 +667,7 @@ def initialize_write( metadata_file_exists = False if append: # Extract metadata and get file offset if appending - ds = pa_ds.dataset(path, filesystem=_dataset_fs(fs), format="parquet") + ds = _with_wrapped_fs(pa_ds.dataset, path, filesystem=fs, format="parquet") i_offset = len(ds.files) if i_offset > 0: try: @@ -839,12 +915,8 @@ def _collect_dataset_info( ds = None valid_paths = None # Only used if `paths` is a list containing _metadata - # Extract "supported" key-word arguments from `kwargs` - ( - _dataset_kwargs, - read_kwargs, - user_kwargs, - ) = _split_user_options(**kwargs) + # Extract dataset-specific options + _dataset_kwargs = kwargs.pop("dataset", {}) if "partitioning" not in _dataset_kwargs: _dataset_kwargs["partitioning"] = "hive" @@ -864,9 +936,10 @@ def _collect_dataset_info( meta_path = fs.sep.join([paths, "_metadata"]) if not ignore_metadata_file and fs.exists(meta_path): # Use _metadata file - ds = pa_ds.parquet_dataset( + ds = _with_wrapped_fs( + pa_ds.parquet_dataset, meta_path, - filesystem=_dataset_fs(fs), + filesystem=fs, **_dataset_kwargs, ) has_metadata_file = True @@ -892,9 +965,10 @@ def _collect_dataset_info( # Pyarrow cannot handle "_metadata" when `paths` is a list # Use _metadata file if not ignore_metadata_file: - ds = pa_ds.parquet_dataset( + ds = _with_wrapped_fs( + pa_ds.parquet_dataset, meta_path, - filesystem=_dataset_fs(fs), + filesystem=fs, **_dataset_kwargs, ) has_metadata_file = True @@ -906,9 +980,10 @@ def _collect_dataset_info( # Final "catch-all" pyarrow.dataset call if ds is None: - ds = pa_ds.dataset( + ds = _with_wrapped_fs( + pa_ds.dataset, paths, - filesystem=_dataset_fs(fs), + filesystem=fs, **_dataset_kwargs, ) @@ -1009,8 +1084,7 @@ def _collect_dataset_info( "metadata_task_size": metadata_task_size, "kwargs": { "dataset": _dataset_kwargs, - "read": read_kwargs, - **user_kwargs, + **kwargs, }, } @@ -1339,9 +1413,10 @@ def _collect_file_parts( # Need more information - convert the path to a fragment file_frags = list( - pa_ds.dataset( + _with_wrapped_fs( + pa_ds.dataset, files_or_frags, - filesystem=_dataset_fs(fs), + filesystem=fs, **dataset_options, ).get_fragments() ) @@ -1532,9 +1607,10 @@ def _read_table( # We are filtering with "pyarrow-dataset". # Need to convert the path and row-group IDs # to a single "fragment" to read - ds = pa_ds.dataset( + ds = _with_wrapped_fs( + pa_ds.dataset, path_or_frag, - filesystem=_dataset_fs(fs), + filesystem=fs, **kwargs.get("dataset", {}), ) frags = list(ds.get_fragments()) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index fb88504c685..5c4dd22f4b3 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -15,7 +15,11 @@ from dask.dataframe.backends import dataframe_creation_dispatch from dask.dataframe.core import DataFrame, Scalar from dask.dataframe.io.io import from_map -from dask.dataframe.io.parquet.utils import Engine, _sort_and_analyze_paths +from dask.dataframe.io.parquet.utils import ( + Engine, + _sort_and_analyze_paths, + _split_user_options, +) from dask.dataframe.io.utils import DataFrameIOFunction, _is_local_fs from dask.dataframe.methods import concat from dask.delayed import Delayed @@ -180,7 +184,6 @@ def read_parquet( categories=None, index=None, storage_options=None, - filesystem=None, engine="auto", calculate_divisions=None, ignore_metadata_file=False, @@ -432,7 +435,6 @@ def read_parquet( "filters": filters, "categories": categories, "index": index, - "filesystem": filesystem, "storage_options": storage_options, "engine": engine, "calculate_divisions": calculate_divisions, @@ -462,10 +464,22 @@ def read_parquet( # Update input_kwargs input_kwargs.update({"columns": columns, "engine": engine}) - # Get fsspec-compatible filesystem and paths - fs, paths, kwargs["open_file_options"] = engine.get_filesystem( - path, filesystem, storage_options + # Process and split user options + ( + dataset_options, + read_options, + open_file_options, + other_options, + ) = _split_user_options(**kwargs) + + # Extract global filesystem and paths + fs, paths, dataset_options, open_file_options = engine.extract_filesystem( + path, + dataset_options, + open_file_options, + storage_options, ) + read_options["open_file_options"] = open_file_options paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering auto_index_allowed = False @@ -488,7 +502,9 @@ def read_parquet( ignore_metadata_file=ignore_metadata_file, metadata_task_size=metadata_task_size, parquet_file_extension=parquet_file_extension, - **kwargs, + dataset=dataset_options, + read=read_options, + **other_options, ) # In the future, we may want to give the engine the diff --git a/dask/dataframe/io/parquet/fastparquet.py b/dask/dataframe/io/parquet/fastparquet.py index 7d6e5934a11..cdc06691b68 100644 --- a/dask/dataframe/io/parquet/fastparquet.py +++ b/dask/dataframe/io/parquet/fastparquet.py @@ -35,7 +35,6 @@ _set_gather_statistics, _set_metadata_task_size, _sort_and_analyze_paths, - _split_user_options, ) from dask.dataframe.io.utils import _is_local_fs, _meta_from_dtypes, _open_input_files from dask.dataframe.utils import UNKNOWN_CATEGORIES @@ -383,9 +382,8 @@ def _collect_dataset_info( # then each part will correspond to a file. Otherwise, each part will # correspond to a row group (populated later). - # Extract "supported" key-word arguments from `kwargs`. - # Split items into `dataset_kwargs` and `read_kwargs` - dataset_kwargs, read_kwargs, user_kwargs = _split_user_options(**kwargs) + # Extract dataset-specific options + dataset_kwargs = kwargs.pop("dataset", {}) parts = [] _metadata_exists = False @@ -512,8 +510,7 @@ def _collect_dataset_info( "metadata_task_size": metadata_task_size, "kwargs": { "dataset": dataset_kwargs, - "read": read_kwargs, - **user_kwargs, + **kwargs, }, } diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 0d2593d540f..74ee69ddebf 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -14,21 +14,51 @@ class Engine: """The API necessary to provide a new Parquet reader/writer""" @classmethod - def get_filesystem( + def extract_filesystem( cls, urlpath, - filesystem=None, - storage_options=None, - open_file_options=None, + dataset_options, + open_file_options, + storage_options, ): + """Extract filesystem object from urlpath or user arguments + + This classmethod should only be overridden for engines that need + to handle filesystem implementations other than ``fsspec`` + (e.g. ``pyarrow.fs.S3FileSystem``). + + Parameters + ---------- + urlpath: str or List[str] + Source directory for data, or path(s) to individual parquet files. + dataset_options: dict + Engine-specific dataset options. + open_file_options: dict + Options to be used for file-opening at read time. + storage_options: dict + Options to be passed on to the file-system backend. + + Returns + ------- + fs: Any + A global filesystem object to be used for metadata + processing and file-opening by the engine. + paths: List[str] + List of data-source paths. + dataset_options: dict + Engine-specific dataset options. + open_file_options: dict + Options to be used for file-opening at read time. + """ + + # Check if fs was already specified as a dataset option + fs = dataset_options.pop("fs", "fsspec") + # Use fsspec to infer a filesystem by default - filesystem = filesystem or "fsspec" - if filesystem != "fsspec": + if fs != "fsspec": - if not isinstance(filesystem, AbstractFileSystem): - raise ValueError( - f"Expected fsspec.AbstractFileSystem. Got {type(filesystem)}" - ) + if not isinstance(fs, AbstractFileSystem): + raise ValueError(f"Expected fsspec.AbstractFileSystem. Got {type(fs)}") if storage_options: warnings.warn(f"Ignoring storage_options: {storage_options}") @@ -40,17 +70,18 @@ def get_filesystem( else: urlpath = [stringify_path(urlpath)] - paths = expand_paths_if_needed(urlpath, "rb", 1, filesystem, None) + paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) return ( - filesystem, - [filesystem._strip_protocol(u) for u in paths], - open_file_options or {}, + fs, + [fs._strip_protocol(u) for u in paths], + dataset_options, + open_file_options, ) fs, _, paths = get_fs_token_paths( urlpath, mode="rb", storage_options=storage_options ) - return fs, paths, open_file_options or {} + return fs, paths, dataset_options, open_file_options @classmethod def read_metadata( @@ -763,15 +794,25 @@ def _split_user_options(**kwargs): # Check user-defined options. # Split into "file" and "dataset"-specific kwargs user_kwargs = kwargs.copy() + + if "file" in user_kwargs: + # Deprecation warning to move toward a single `dataset` key + warnings.warn( + "Passing user options under the 'file' key is now deprecated. " + "Please use 'dataset' instead.", + FutureWarning, + ) + dataset_options = { **user_kwargs.pop("file", {}).copy(), **user_kwargs.pop("dataset", {}).copy(), } read_options = user_kwargs.pop("read", {}).copy() - read_options["open_file_options"] = user_kwargs.pop("open_file_options", {}).copy() + open_file_options = user_kwargs.pop("open_file_options", {}).copy() return ( dataset_options, read_options, + open_file_options, user_kwargs, ) diff --git a/dask/dataframe/io/utils.py b/dask/dataframe/io/utils.py index 5af144b98f3..cfb2d5d7b8a 100644 --- a/dask/dataframe/io/utils.py +++ b/dask/dataframe/io/utils.py @@ -15,7 +15,23 @@ def _is_local_fs(fs): """Check if an fsspec file-system is local""" - return fs and isinstance(fs, LocalFileSystem) + return fs and ( + isinstance(fs, LocalFileSystem) + # Check wrapped pyarrow filesystem + or _is_local_fs_pyarrow(fs) + ) + + +def _is_local_fs_pyarrow(fs): + """Check if a pyarrow-based file-system is local""" + if fs: + if hasattr(fs, "fs"): + # ArrowFSWrapper will have an "fs" attribute + return _is_local_fs_pyarrow(fs.fs) + elif hasattr(fs, "type_name"): + # pa.fs.LocalFileSystem will have "type_name" attribute + return fs.type_name == "local" + return False def _get_pyarrow_dtypes(schema, categories): From 6733668f80bbce858b0ff0230e44809eefe28f74 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 5 Dec 2022 15:21:29 -0800 Subject: [PATCH 3/9] docstring update --- dask/dataframe/io/parquet/core.py | 38 ++++++++++++++++++++---------- dask/dataframe/io/parquet/utils.py | 6 ++--- dask/dataframe/io/utils.py | 3 ++- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 5c4dd22f4b3..3f55cf570fd 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -245,6 +245,8 @@ def read_parquet( data written by dask/fastparquet, not otherwise. storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``dataset`` argument, described below. open_file_options : dict, default None Key/value arguments to be passed along to ``AbstractFileSystem.open`` when each parquet data file is open for reading. Experimental @@ -341,19 +343,31 @@ def read_parquet( unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` + or ``fastparquet.ParquetFile`` object. These options may include a + "filesystem" key (or "fs" for the "fastparquet" engine) to configure + the desired file-system backend. For the "arrow" engine, the + "filesystem" option may be set to "arrow" to specify that the backend + should correspond to ``pyarrow.fs.FileSystem``:: + + ``dd.read_parquet(..., dataset={"filesystem": "arrow"})`` + + For both engines, "filesystem" may be set to "fsspec" to specify that a + ``fsspec.spec.AbstractFileSystem`` is preferred. Note that the default + is "fsspec", unless reading from s3 storage with the "arrow" engine + (in which case ``pyarrow.fs.S3FileSystem`` is used, when possible). + read: dict, default None + Dictionary of options to pass through to ``engine.read_partitions`` + using the ``read`` key-word argument. + arrow_to_pandas: dict, default None + Dictionary of options to use when converting from ``pyarrow.Table`` to + a pandas ``DataFrame`` object. Only used by the "arrow" engine. **kwargs: dict (of dicts) - Passthrough key-word arguments for read backend. - The top-level keys correspond to the appropriate operation type, and - the second level corresponds to the kwargs that will be passed on to - the underlying ``pyarrow`` or ``fastparquet`` function. - Supported top-level keys: 'dataset' (for opening a ``pyarrow`` dataset), - 'file' or 'dataset' (for opening a ``fastparquet.ParquetFile``), 'read' - (for the backend read function), 'arrow_to_pandas' (for controlling the - arguments passed to convert from a ``pyarrow.Table.to_pandas()``). - Any element of kwargs that is not defined under these top-level keys - will be passed through to the `engine.read_partitions` classmethod as a - stand-alone argument (and will be ignored by the engine implementations - defined in ``dask.dataframe``). + Options to pass through to ``engine.read_partitions`` as staand-alone + key-word arguments. Note that these options will be ignored by the + engines defined in ``dask.dataframe``, but may be used by other custom + implementations. Examples -------- diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 74ee69ddebf..5046d414aa8 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -792,14 +792,14 @@ def _process_open_file_options( def _split_user_options(**kwargs): # Check user-defined options. - # Split into "file" and "dataset"-specific kwargs + # Split into "dataset"-specific kwargs user_kwargs = kwargs.copy() if "file" in user_kwargs: # Deprecation warning to move toward a single `dataset` key warnings.warn( - "Passing user options under the 'file' key is now deprecated. " - "Please use 'dataset' instead.", + "Passing user options with the 'file' argument is now deprecated." + " Please use 'dataset' instead.", FutureWarning, ) diff --git a/dask/dataframe/io/utils.py b/dask/dataframe/io/utils.py index cfb2d5d7b8a..bf4aad4ea5b 100644 --- a/dask/dataframe/io/utils.py +++ b/dask/dataframe/io/utils.py @@ -4,6 +4,7 @@ import fsspec import pandas as pd +from fsspec.implementations.arrow import ArrowFSWrapper from fsspec.implementations.local import LocalFileSystem from packaging.version import parse as parse_version @@ -25,7 +26,7 @@ def _is_local_fs(fs): def _is_local_fs_pyarrow(fs): """Check if a pyarrow-based file-system is local""" if fs: - if hasattr(fs, "fs"): + if isinstance(fs, ArrowFSWrapper): # ArrowFSWrapper will have an "fs" attribute return _is_local_fs_pyarrow(fs.fs) elif hasattr(fs, "type_name"): From ce32dca362296f542aa73b81c6c6039aa8c13172 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 5 Dec 2022 15:32:15 -0800 Subject: [PATCH 4/9] remove problematic import --- dask/dataframe/io/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask/dataframe/io/utils.py b/dask/dataframe/io/utils.py index bf4aad4ea5b..cfb2d5d7b8a 100644 --- a/dask/dataframe/io/utils.py +++ b/dask/dataframe/io/utils.py @@ -4,7 +4,6 @@ import fsspec import pandas as pd -from fsspec.implementations.arrow import ArrowFSWrapper from fsspec.implementations.local import LocalFileSystem from packaging.version import parse as parse_version @@ -26,7 +25,7 @@ def _is_local_fs(fs): def _is_local_fs_pyarrow(fs): """Check if a pyarrow-based file-system is local""" if fs: - if isinstance(fs, ArrowFSWrapper): + if hasattr(fs, "fs"): # ArrowFSWrapper will have an "fs" attribute return _is_local_fs_pyarrow(fs.fs) elif hasattr(fs, "type_name"): From bb3a9c7c8f2113e761422be8a83adebd7ac6c8da Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 7 Dec 2022 09:01:30 -0800 Subject: [PATCH 5/9] address code review --- dask/dataframe/io/parquet/arrow.py | 105 +++++++++++++---------------- dask/dataframe/io/parquet/core.py | 22 +++--- dask/dataframe/io/parquet/utils.py | 24 ++++--- 3 files changed, 74 insertions(+), 77 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index e0b8fbf173f..b3fcae2aac7 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -48,8 +48,8 @@ _s3_note = ( "Note that this version of `ArrowDatasetEngine` will attempt " "to use `S3FileSystem` by default when reading from s3 storage. " - "If necessary, try passing in `dataset=dict(filesystem='arrow')` " - "to revert the default to `s3fs` (if available)" + "If necessary, try passing in `filesystem='arrow'` to revert " + "the default to `s3fs` (if available)" ) @@ -358,56 +358,33 @@ class ArrowDatasetEngine(Engine): def extract_filesystem( cls, urlpath, + filesystem, dataset_options, open_file_options, storage_options, ): - # Check if fs was already specified as a dataset option - filesystem = dataset_options.pop("filesystem", None) - if filesystem is None: + # Check if filesystem was specified as a dataset option + fs = dataset_options.pop("filesystem", None) + if filesystem is not None: + fs = filesystem + + default_pa_s3 = False + default_pa_s3_error = None + if fs is None: if isinstance(urlpath, (list, tuple, set)): if not urlpath: raise ValueError("empty urlpath sequence") strpath = stringify_path(next(iter(urlpath))) else: strpath = stringify_path(urlpath) - if ( - strpath.startswith("s3://") - and not open_file_options - and not ( - # Only use PyArrow by default when storage is in s3, - # and `storage_option` only includes simple options - # # that are "expected" by `S3FileSystem` - set(storage_options) - - { - "access_key", - "secret_key", - "session_token", - "anonymous", - "role_arn", - "session_name", - "external_id", - "load_frequency", - "region", - "request_timeout", - "connect_timeout", - "scheme", - "endpoint_override", - "background_writes", - "default_metadata", - "proxy_options", - "allow_bucket_creation", - "allow_bucket_deletion", - "retry_strategy", - } - ) - ): - filesystem = "arrow" + if strpath.startswith("s3://") and not open_file_options: + fs = "arrow" + default_pa_s3 = True else: - filesystem = "fsspec" + fs = "fsspec" - if isinstance(filesystem, pa_fs.FileSystem) or filesystem == "arrow": + if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): if not urlpath: raise ValueError("empty urlpath sequence") @@ -415,37 +392,49 @@ def extract_filesystem( else: urlpath = [stringify_path(urlpath)] - if filesystem == "arrow": + if fs in ("arrow", "pyarrow"): try: - filesystem = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( + fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( **(storage_options or {}) ) except (TypeError, pa.lib.ArrowInvalid) as err: - # Try falling back to fsspec - warnings.warn( - f"Failed to initialize a pyarrow-based `FileSystem` object. " - f"Falling back to `fsspec`.\n" - f"{_s3_note}\n" - f"Original Error: {err}" - ) - filesystem = "fsspec" + if default_pa_s3: + # Fall back to fsspec + default_pa_s3_error = err + fs = "fsspec" + else: + raise err - if filesystem != "fsspec": - fs = ArrowFSWrapper(filesystem) + if isinstance(fs, pa_fs.FileSystem): + fs = ArrowFSWrapper(fs) paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) return ( fs, [fs._strip_protocol(u) for u in paths], dataset_options, - {"open_file_func": filesystem.open_input_file}, + {"open_file_func": fs.fs.open_input_file}, ) - return Engine.extract_filesystem( - urlpath, - dataset_options, - open_file_options, - storage_options, - ) + # Use default file-system initialization + try: + return Engine.extract_filesystem( + urlpath, + fs, + dataset_options, + open_file_options, + storage_options, + ) + except Exception as err: + if default_pa_s3_error is None: + raise err + # Inform the user that we tried falling back to fsspec + warnings.warn( + f"Failed to initialize an fsspec-based filesystem after " + f"failing to initialize a `pyarrow.fs.S3FileSystem`." + f"\nOriginal Error: {default_pa_s3_error}" + f"\nFallback Error: {err}" + f"\n{_s3_note}" + ) @classmethod def read_metadata( diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index 3f55cf570fd..a2228120a46 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -192,6 +192,7 @@ def read_parquet( chunksize=None, aggregate_files=None, parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem=None, **kwargs, ): """ @@ -246,7 +247,7 @@ def read_parquet( storage_options : dict, default None Key/value pairs to be passed on to the file-system backend, if any. Note that the default file-system backend can be configured with the - ``dataset`` argument, described below. + ``filesystem`` argument, described below. open_file_options : dict, default None Key/value arguments to be passed along to ``AbstractFileSystem.open`` when each parquet data file is open for reading. Experimental @@ -343,20 +344,17 @@ def read_parquet( unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", fsspec.AbstractFileSystem, or pyarrow.fs.FileSystem + Filesystem backend to use. Default is "fsspec", unless reading from s3 + storage with the "pyarrow" engine and ``open_file_options=None`` (in + which case the default is "arrow"). Note that the "fastparquet" engine + only supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object. dataset: dict, default None Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` or ``fastparquet.ParquetFile`` object. These options may include a "filesystem" key (or "fs" for the "fastparquet" engine) to configure - the desired file-system backend. For the "arrow" engine, the - "filesystem" option may be set to "arrow" to specify that the backend - should correspond to ``pyarrow.fs.FileSystem``:: - - ``dd.read_parquet(..., dataset={"filesystem": "arrow"})`` - - For both engines, "filesystem" may be set to "fsspec" to specify that a - ``fsspec.spec.AbstractFileSystem`` is preferred. Note that the default - is "fsspec", unless reading from s3 storage with the "arrow" engine - (in which case ``pyarrow.fs.S3FileSystem`` is used, when possible). + the desired file-system backend. However, the top-level ``filesystem`` + argument will always take precedence. read: dict, default None Dictionary of options to pass through to ``engine.read_partitions`` using the ``read`` key-word argument. @@ -458,6 +456,7 @@ def read_parquet( "chunksize": chunksize, "aggregate_files": aggregate_files, "parquet_file_extension": parquet_file_extension, + "filesystem": filesystem, **kwargs, } @@ -489,6 +488,7 @@ def read_parquet( # Extract global filesystem and paths fs, paths, dataset_options, open_file_options = engine.extract_filesystem( path, + filesystem, dataset_options, open_file_options, storage_options, diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 5046d414aa8..eac63b06a6e 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -17,6 +17,7 @@ class Engine: def extract_filesystem( cls, urlpath, + filesystem, dataset_options, open_file_options, storage_options, @@ -31,6 +32,8 @@ def extract_filesystem( ---------- urlpath: str or List[str] Source directory for data, or path(s) to individual parquet files. + filesystem: "fsspec" or fsspec.AbstractFileSystem + Filesystem backend to use. Default is "fsspec" dataset_options: dict Engine-specific dataset options. open_file_options: dict @@ -51,16 +54,26 @@ def extract_filesystem( Options to be used for file-opening at read time. """ - # Check if fs was already specified as a dataset option + # Check if fs was specified as a dataset option fs = dataset_options.pop("fs", "fsspec") + if filesystem is not None: + fs = filesystem - # Use fsspec to infer a filesystem by default - if fs != "fsspec": + if fs in (None, "fsspec"): + # Use fsspec to infer a filesystem by default + fs, _, paths = get_fs_token_paths( + urlpath, mode="rb", storage_options=storage_options + ) + return fs, paths, dataset_options, open_file_options + else: + # Check that an initialized filesystem object was provided if not isinstance(fs, AbstractFileSystem): raise ValueError(f"Expected fsspec.AbstractFileSystem. Got {type(fs)}") if storage_options: + # The filesystem was already specified. Can't pass in + # any storage options warnings.warn(f"Ignoring storage_options: {storage_options}") if isinstance(urlpath, (list, tuple, set)): @@ -78,11 +91,6 @@ def extract_filesystem( open_file_options, ) - fs, _, paths = get_fs_token_paths( - urlpath, mode="rb", storage_options=storage_options - ) - return fs, paths, dataset_options, open_file_options - @classmethod def read_metadata( cls, From e8fd1ae53783139da3b146fbed610952a7f2445f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 14 Dec 2022 08:50:43 -0800 Subject: [PATCH 6/9] roll back default filesystem change to arrow for now --- dask/dataframe/io/parquet/arrow.py | 156 +++++++----------------- dask/dataframe/io/parquet/core.py | 9 +- dask/dataframe/io/parquet/utils.py | 4 +- dask/dataframe/io/tests/test_parquet.py | 37 ++++++ 4 files changed, 87 insertions(+), 119 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index b3fcae2aac7..3cb3aae22b5 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -1,6 +1,5 @@ import json import textwrap -import warnings from collections import defaultdict from datetime import datetime @@ -45,34 +44,11 @@ # -_s3_note = ( - "Note that this version of `ArrowDatasetEngine` will attempt " - "to use `S3FileSystem` by default when reading from s3 storage. " - "If necessary, try passing in `filesystem='arrow'` to revert " - "the default to `s3fs` (if available)" -) - - def _wrapped_fs(fs): """Return the wrapped filesystem if fs is ArrowFSWrapper""" return fs.fs if isinstance(fs, ArrowFSWrapper) else fs -def _with_wrapped_fs(func, *args, filesystem=None, **kwargs): - """Call a function with a filesystem kwarg that may be wrapped""" - fs = _wrapped_fs(filesystem) - try: - return func(*args, filesystem=fs, **kwargs) - except Exception as err: - if not (hasattr(fs, "type_name") and fs.type_name == "s3"): - raise err - raise type(err)( - f"Call to {func} failed with `filesystem={filesystem}`.\n" - f"{_s3_note}\n" - f"Original Error: {err}" - ) - - def _append_row_groups(metadata, md): """Append row-group metadata and include a helpful error message if an inconsistent schema is detected. @@ -255,36 +231,27 @@ def _read_table_from_path( else {} ) - try: - with _open_input_files( - [path], - fs=fs, - precache_options=precache_options, - **open_file_options, - )[0] as fil: - if row_groups == [None]: - return pq.ParquetFile(fil, **pre_buffer).read( - columns=columns, - use_threads=False, - use_pandas_metadata=True, - **read_kwargs, - ) - else: - return pq.ParquetFile(fil, **pre_buffer).read_row_groups( - row_groups, - columns=columns, - use_threads=False, - use_pandas_metadata=True, - **read_kwargs, - ) - except Exception as err: - if open_file_options.get("open_file_func", None): - raise type(err)( - f"Failed to open and read Parquet file.\n" - f"{_s3_note}\n" - f"Original Error: {err}" + with _open_input_files( + [path], + fs=fs, + precache_options=precache_options, + **open_file_options, + )[0] as fil: + if row_groups == [None]: + return pq.ParquetFile(fil, **pre_buffer).read( + columns=columns, + use_threads=False, + use_pandas_metadata=True, + **read_kwargs, + ) + else: + return pq.ParquetFile(fil, **pre_buffer).read_row_groups( + row_groups, + columns=columns, + use_threads=False, + use_pandas_metadata=True, + **read_kwargs, ) - raise err def _get_rg_statistics(row_group, col_indices): @@ -369,21 +336,9 @@ def extract_filesystem( if filesystem is not None: fs = filesystem - default_pa_s3 = False - default_pa_s3_error = None - if fs is None: - if isinstance(urlpath, (list, tuple, set)): - if not urlpath: - raise ValueError("empty urlpath sequence") - strpath = stringify_path(next(iter(urlpath))) - else: - strpath = stringify_path(urlpath) - if strpath.startswith("s3://") and not open_file_options: - fs = "arrow" - default_pa_s3 = True - else: - fs = "fsspec" + fs = fs or "fsspec" # Default is fsspec + # Handle pyarrow-based filesystem if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): if not urlpath: @@ -393,17 +348,9 @@ def extract_filesystem( urlpath = [stringify_path(urlpath)] if fs in ("arrow", "pyarrow"): - try: - fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( - **(storage_options or {}) - ) - except (TypeError, pa.lib.ArrowInvalid) as err: - if default_pa_s3: - # Fall back to fsspec - default_pa_s3_error = err - fs = "fsspec" - else: - raise err + fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])( + **(storage_options or {}) + ) if isinstance(fs, pa_fs.FileSystem): fs = ArrowFSWrapper(fs) @@ -416,25 +363,13 @@ def extract_filesystem( ) # Use default file-system initialization - try: - return Engine.extract_filesystem( - urlpath, - fs, - dataset_options, - open_file_options, - storage_options, - ) - except Exception as err: - if default_pa_s3_error is None: - raise err - # Inform the user that we tried falling back to fsspec - warnings.warn( - f"Failed to initialize an fsspec-based filesystem after " - f"failing to initialize a `pyarrow.fs.S3FileSystem`." - f"\nOriginal Error: {default_pa_s3_error}" - f"\nFallback Error: {err}" - f"\n{_s3_note}" - ) + return Engine.extract_filesystem( + urlpath, + fs, + dataset_options, + open_file_options, + storage_options, + ) @classmethod def read_metadata( @@ -656,7 +591,7 @@ def initialize_write( metadata_file_exists = False if append: # Extract metadata and get file offset if appending - ds = _with_wrapped_fs(pa_ds.dataset, path, filesystem=fs, format="parquet") + ds = pa_ds.dataset(path, filesystem=_wrapped_fs(fs), format="parquet") i_offset = len(ds.files) if i_offset > 0: try: @@ -925,10 +860,9 @@ def _collect_dataset_info( meta_path = fs.sep.join([paths, "_metadata"]) if not ignore_metadata_file and fs.exists(meta_path): # Use _metadata file - ds = _with_wrapped_fs( - pa_ds.parquet_dataset, + ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -954,10 +888,9 @@ def _collect_dataset_info( # Pyarrow cannot handle "_metadata" when `paths` is a list # Use _metadata file if not ignore_metadata_file: - ds = _with_wrapped_fs( - pa_ds.parquet_dataset, + ds = pa_ds.parquet_dataset( meta_path, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) has_metadata_file = True @@ -969,10 +902,9 @@ def _collect_dataset_info( # Final "catch-all" pyarrow.dataset call if ds is None: - ds = _with_wrapped_fs( - pa_ds.dataset, + ds = pa_ds.dataset( paths, - filesystem=fs, + filesystem=_wrapped_fs(fs), **_dataset_kwargs, ) @@ -1402,10 +1334,9 @@ def _collect_file_parts( # Need more information - convert the path to a fragment file_frags = list( - _with_wrapped_fs( - pa_ds.dataset, + pa_ds.dataset( files_or_frags, - filesystem=fs, + filesystem=_wrapped_fs(fs), **dataset_options, ).get_fragments() ) @@ -1596,10 +1527,9 @@ def _read_table( # We are filtering with "pyarrow-dataset". # Need to convert the path and row-group IDs # to a single "fragment" to read - ds = _with_wrapped_fs( - pa_ds.dataset, + ds = pa_ds.dataset( path_or_frag, - filesystem=fs, + filesystem=_wrapped_fs(fs), **kwargs.get("dataset", {}), ) frags = list(ds.get_fragments()) diff --git a/dask/dataframe/io/parquet/core.py b/dask/dataframe/io/parquet/core.py index a2228120a46..2bfe94c3ea9 100644 --- a/dask/dataframe/io/parquet/core.py +++ b/dask/dataframe/io/parquet/core.py @@ -345,10 +345,9 @@ def read_parquet( It may be necessary to change this argument if the data files in your parquet dataset do not end in ".parq", ".parquet", or ".pq". filesystem: "fsspec", "arrow", fsspec.AbstractFileSystem, or pyarrow.fs.FileSystem - Filesystem backend to use. Default is "fsspec", unless reading from s3 - storage with the "pyarrow" engine and ``open_file_options=None`` (in - which case the default is "arrow"). Note that the "fastparquet" engine - only supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object. + Filesystem backend to use. Note that the "fastparquet" engine only + supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object. + Default is "fsspec". dataset: dict, default None Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` or ``fastparquet.ParquetFile`` object. These options may include a @@ -362,7 +361,7 @@ def read_parquet( Dictionary of options to use when converting from ``pyarrow.Table`` to a pandas ``DataFrame`` object. Only used by the "arrow" engine. **kwargs: dict (of dicts) - Options to pass through to ``engine.read_partitions`` as staand-alone + Options to pass through to ``engine.read_partitions`` as stand-alone key-word arguments. Note that these options will be ignored by the engines defined in ``dask.dataframe``, but may be used by other custom implementations. diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index eac63b06a6e..99256ab86bc 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -69,7 +69,9 @@ def extract_filesystem( else: # Check that an initialized filesystem object was provided if not isinstance(fs, AbstractFileSystem): - raise ValueError(f"Expected fsspec.AbstractFileSystem. Got {type(fs)}") + raise ValueError( + f"Expected fsspec.AbstractFileSystem or 'fsspec'. Got {fs}" + ) if storage_options: # The filesystem was already specified. Can't pass in diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 5eee87f2c4a..d08c18c9dce 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -4299,3 +4299,40 @@ def test_retries_on_remote_filesystem(tmpdir): layer = hlg_layer(ddf2.dask, "read-parquet") assert layer.annotations assert layer.annotations["retries"] == 2 + + +def test_filesystem_option(tmpdir, engine): + from fsspec.implementations.local import LocalFileSystem + + df = pd.DataFrame({"a": range(10)}) + dd.from_pandas(df, npartitions=2).to_parquet(tmpdir, engine=engine) + fs = LocalFileSystem() + fs._myfs = True + ddf = dd.read_parquet( + tmpdir, + engine=engine, + filesystem=fs, + ) + layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs + assert layer_fs._myfs + assert_eq(ddf, df) + + +@PYARROW_MARK +@pytest.mark.parametrize("fs", ["arrow", None]) +def test_pyarrow_filesystem_option(tmpdir, fs): + from fsspec.implementations.arrow import ArrowFSWrapper + from pyarrow.fs import LocalFileSystem + + df = pd.DataFrame({"a": range(10)}) + dd.from_pandas(df, npartitions=2).to_parquet(tmpdir) + fs = fs or LocalFileSystem() + ddf = dd.read_parquet( + tmpdir, + engine="pyarrow", + filesystem=fs, + ) + layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs + assert isinstance(layer_fs, ArrowFSWrapper) + assert isinstance(layer_fs.fs, LocalFileSystem) + assert_eq(ddf, df) From 45b5493e29e8e6455a038ffd0f0ca2e59df968f8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 14 Dec 2022 10:21:37 -0800 Subject: [PATCH 7/9] address windows issue --- dask/dataframe/io/parquet/arrow.py | 19 ++++++++++++++----- dask/dataframe/io/tests/test_parquet.py | 4 ++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index f412c3f0813..726cdc79e5a 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -371,13 +371,22 @@ def extract_filesystem( ) if isinstance(fs, pa_fs.FileSystem): - fs = ArrowFSWrapper(fs) - paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None) + fsspec_fs = ArrowFSWrapper(fs) + if urlpath[0].startswith("C:") and isinstance( + fs, pa_fs.LocalFileSystem + ): + # ArrowFSWrapper._strip_protocol not reliable on windows + from fsspec.implementations.local import LocalFileSystem + + fs_strip = LocalFileSystem() + else: + fs_strip = fsspec_fs + paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None) return ( - fs, - [fs._strip_protocol(u) for u in paths], + fsspec_fs, + [fs_strip._strip_protocol(u) for u in paths], dataset_options, - {"open_file_func": fs.fs.open_input_file}, + {"open_file_func": fs.open_input_file}, ) # Use default file-system initialization diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index 22b337924fa..ce8bb516903 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -4411,7 +4411,7 @@ def test_filesystem_option(tmpdir, engine): fs = LocalFileSystem() fs._myfs = True ddf = dd.read_parquet( - tmpdir, + str(tmpdir), engine=engine, filesystem=fs, ) @@ -4430,7 +4430,7 @@ def test_pyarrow_filesystem_option(tmpdir, fs): dd.from_pandas(df, npartitions=2).to_parquet(tmpdir) fs = fs or LocalFileSystem() ddf = dd.read_parquet( - tmpdir, + str(tmpdir), engine="pyarrow", filesystem=fs, ) From 1304900a2636f34096c27559b477d9ab5d582702 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Dec 2022 10:52:48 -0800 Subject: [PATCH 8/9] address code review --- dask/dataframe/io/parquet/arrow.py | 43 +++++++++++++------------ dask/dataframe/io/parquet/utils.py | 15 +++++++-- dask/dataframe/io/tests/test_parquet.py | 23 ++++++------- 3 files changed, 46 insertions(+), 35 deletions(-) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index 726cdc79e5a..a7682148906 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -350,12 +350,16 @@ def extract_filesystem( ): # Check if filesystem was specified as a dataset option - fs = dataset_options.pop("filesystem", None) - if filesystem is not None: + if filesystem is None: + fs = dataset_options.pop("filesystem", "fsspec") + else: + if "filesystem" in dataset_options: + raise ValueError( + "Cannot specify a filesystem argument if the " + "'filesystem' dataset option is also defined." + ) fs = filesystem - fs = fs or "fsspec" # Default is fsspec - # Handle pyarrow-based filesystem if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"): if isinstance(urlpath, (list, tuple, set)): @@ -370,24 +374,21 @@ def extract_filesystem( **(storage_options or {}) ) - if isinstance(fs, pa_fs.FileSystem): - fsspec_fs = ArrowFSWrapper(fs) - if urlpath[0].startswith("C:") and isinstance( - fs, pa_fs.LocalFileSystem - ): - # ArrowFSWrapper._strip_protocol not reliable on windows - from fsspec.implementations.local import LocalFileSystem + fsspec_fs = ArrowFSWrapper(fs) + if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): + # ArrowFSWrapper._strip_protocol not reliable on windows + from fsspec.implementations.local import LocalFileSystem - fs_strip = LocalFileSystem() - else: - fs_strip = fsspec_fs - paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None) - return ( - fsspec_fs, - [fs_strip._strip_protocol(u) for u in paths], - dataset_options, - {"open_file_func": fs.open_input_file}, - ) + fs_strip = LocalFileSystem() + else: + fs_strip = fsspec_fs + paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None) + return ( + fsspec_fs, + [fs_strip._strip_protocol(u) for u in paths], + dataset_options, + {"open_file_func": fs.open_input_file}, + ) # Use default file-system initialization return Engine.extract_filesystem( diff --git a/dask/dataframe/io/parquet/utils.py b/dask/dataframe/io/parquet/utils.py index 828cd367efb..359e7fa2f67 100644 --- a/dask/dataframe/io/parquet/utils.py +++ b/dask/dataframe/io/parquet/utils.py @@ -55,8 +55,14 @@ def extract_filesystem( """ # Check if fs was specified as a dataset option - fs = dataset_options.pop("fs", "fsspec") - if filesystem is not None: + if filesystem is None: + fs = dataset_options.pop("fs", "fsspec") + else: + if "fs" in dataset_options: + raise ValueError( + "Cannot specify a filesystem argument if the " + "'fs' dataset option is also defined." + ) fs = filesystem if fs in (None, "fsspec"): @@ -76,7 +82,10 @@ def extract_filesystem( if storage_options: # The filesystem was already specified. Can't pass in # any storage options - warnings.warn(f"Ignoring storage_options: {storage_options}") + raise ValueError( + f"Cannot specify storage_options when an explicit " + f"filesystem object is specified. Got: {storage_options}" + ) if isinstance(urlpath, (list, tuple, set)): if not urlpath: diff --git a/dask/dataframe/io/tests/test_parquet.py b/dask/dataframe/io/tests/test_parquet.py index ce8bb516903..5a1089a693d 100644 --- a/dask/dataframe/io/tests/test_parquet.py +++ b/dask/dataframe/io/tests/test_parquet.py @@ -4403,34 +4403,35 @@ def test_retries_on_remote_filesystem(tmpdir): assert layer.annotations["retries"] == 2 -def test_filesystem_option(tmpdir, engine): +@pytest.mark.parametrize("fs", ["fsspec", None]) +def test_filesystem_option(tmp_path, engine, fs): from fsspec.implementations.local import LocalFileSystem df = pd.DataFrame({"a": range(10)}) - dd.from_pandas(df, npartitions=2).to_parquet(tmpdir, engine=engine) - fs = LocalFileSystem() - fs._myfs = True + dd.from_pandas(df, npartitions=2).to_parquet(tmp_path, engine=engine) + filesystem = fs or LocalFileSystem() ddf = dd.read_parquet( - str(tmpdir), + tmp_path, engine=engine, - filesystem=fs, + filesystem=filesystem, ) - layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs - assert layer_fs._myfs + if fs is None: + layer_fs = next(iter(ddf.dask.layers.values())).io_func.fs + assert layer_fs is filesystem assert_eq(ddf, df) @PYARROW_MARK @pytest.mark.parametrize("fs", ["arrow", None]) -def test_pyarrow_filesystem_option(tmpdir, fs): +def test_pyarrow_filesystem_option(tmp_path, fs): from fsspec.implementations.arrow import ArrowFSWrapper from pyarrow.fs import LocalFileSystem df = pd.DataFrame({"a": range(10)}) - dd.from_pandas(df, npartitions=2).to_parquet(tmpdir) + dd.from_pandas(df, npartitions=2).to_parquet(tmp_path) fs = fs or LocalFileSystem() ddf = dd.read_parquet( - str(tmpdir), + tmp_path, engine="pyarrow", filesystem=fs, ) From 0b2d037252ceafd05e61ebdc3cda0d41805c7e4f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 15 Dec 2022 11:07:12 -0800 Subject: [PATCH 9/9] add comment on windows issue --- dask/dataframe/io/parquet/arrow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask/dataframe/io/parquet/arrow.py b/dask/dataframe/io/parquet/arrow.py index a7682148906..1e483977fef 100644 --- a/dask/dataframe/io/parquet/arrow.py +++ b/dask/dataframe/io/parquet/arrow.py @@ -377,6 +377,7 @@ def extract_filesystem( fsspec_fs = ArrowFSWrapper(fs) if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem): # ArrowFSWrapper._strip_protocol not reliable on windows + # See: https://github.com/fsspec/filesystem_spec/issues/1137 from fsspec.implementations.local import LocalFileSystem fs_strip = LocalFileSystem()