Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make filesystem-backend configurable in read_parquet #9699

Merged
merged 10 commits into from Dec 15, 2022
93 changes: 78 additions & 15 deletions dask/dataframe/io/parquet/arrow.py
Expand Up @@ -22,7 +22,6 @@
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files
from dask.dataframe.utils import clear_known_categories
Expand All @@ -31,7 +30,10 @@

# Check PyArrow version for feature support
_pa_version = parse_version(pa.__version__)
from fsspec.core import expand_paths_if_needed, stringify_path
from fsspec.implementations.arrow import ArrowFSWrapper
from pyarrow import dataset as pa_ds
from pyarrow import fs as pa_fs

subset_stats_supported = _pa_version > parse_version("2.0.0")
pre_buffer_supported = _pa_version >= parse_version("5.0.0")
Expand Down Expand Up @@ -60,6 +62,11 @@
#


def _wrapped_fs(fs):
"""Return the wrapped filesystem if fs is ArrowFSWrapper"""
return fs.fs if isinstance(fs, ArrowFSWrapper) else fs


def _append_row_groups(metadata, md):
"""Append row-group metadata and include a helpful
error message if an inconsistent schema is detected.
Expand Down Expand Up @@ -332,6 +339,67 @@ class ArrowDatasetEngine(Engine):
# Public Class Methods
#

@classmethod
def extract_filesystem(
cls,
urlpath,
filesystem,
dataset_options,
open_file_options,
storage_options,
):

# Check if filesystem was specified as a dataset option
if filesystem is None:
fs = dataset_options.pop("filesystem", "fsspec")
else:
if "filesystem" in dataset_options:
raise ValueError(
"Cannot specify a filesystem argument if the "
"'filesystem' dataset option is also defined."
)
fs = filesystem

# Handle pyarrow-based filesystem
if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"):
if isinstance(urlpath, (list, tuple, set)):
if not urlpath:
raise ValueError("empty urlpath sequence")
urlpath = [stringify_path(u) for u in urlpath]
else:
urlpath = [stringify_path(urlpath)]

if fs in ("arrow", "pyarrow"):
fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])(
**(storage_options or {})
)

fsspec_fs = ArrowFSWrapper(fs)
if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem):
# ArrowFSWrapper._strip_protocol not reliable on windows
# See: https://github.com/fsspec/filesystem_spec/issues/1137
from fsspec.implementations.local import LocalFileSystem

fs_strip = LocalFileSystem()
else:
fs_strip = fsspec_fs
paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None)
return (
fsspec_fs,
[fs_strip._strip_protocol(u) for u in paths],
dataset_options,
{"open_file_func": fs.open_input_file},
)

# Use default file-system initialization
return Engine.extract_filesystem(
urlpath,
fs,
dataset_options,
open_file_options,
storage_options,
)

@classmethod
def read_metadata(
cls,
Expand Down Expand Up @@ -556,7 +624,7 @@ def initialize_write(
metadata_file_exists = False
if append:
# Extract metadata and get file offset if appending
ds = pa_ds.dataset(path, filesystem=fs, format="parquet")
ds = pa_ds.dataset(path, filesystem=_wrapped_fs(fs), format="parquet")
i_offset = len(ds.files)
if i_offset > 0:
try:
Expand Down Expand Up @@ -804,12 +872,8 @@ def _collect_dataset_info(
ds = None
valid_paths = None # Only used if `paths` is a list containing _metadata

# Extract "supported" key-word arguments from `kwargs`
(
_dataset_kwargs,
read_kwargs,
user_kwargs,
) = _split_user_options(**kwargs)
# Extract dataset-specific options
_dataset_kwargs = kwargs.pop("dataset", {})

if "partitioning" not in _dataset_kwargs:
_dataset_kwargs["partitioning"] = "hive"
Expand All @@ -831,7 +895,7 @@ def _collect_dataset_info(
# Use _metadata file
ds = pa_ds.parquet_dataset(
meta_path,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)
has_metadata_file = True
Expand Down Expand Up @@ -859,7 +923,7 @@ def _collect_dataset_info(
if not ignore_metadata_file:
ds = pa_ds.parquet_dataset(
meta_path,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)
has_metadata_file = True
Expand All @@ -873,7 +937,7 @@ def _collect_dataset_info(
if ds is None:
ds = pa_ds.dataset(
paths,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)

Expand Down Expand Up @@ -974,8 +1038,7 @@ def _collect_dataset_info(
"metadata_task_size": metadata_task_size,
"kwargs": {
"dataset": _dataset_kwargs,
"read": read_kwargs,
**user_kwargs,
**kwargs,
},
}

Expand Down Expand Up @@ -1315,7 +1378,7 @@ def _collect_file_parts(
file_frags = list(
pa_ds.dataset(
files_or_frags,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**dataset_options,
).get_fragments()
)
Expand Down Expand Up @@ -1508,7 +1571,7 @@ def _read_table(
# to a single "fragment" to read
ds = pa_ds.dataset(
path_or_frag,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**kwargs.get("dataset", {}),
)
frags = list(ds.get_fragments())
Expand Down
64 changes: 49 additions & 15 deletions dask/dataframe/io/parquet/core.py
Expand Up @@ -15,7 +15,11 @@
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.core import DataFrame, Scalar
from dask.dataframe.io.io import from_map
from dask.dataframe.io.parquet.utils import Engine, _sort_and_analyze_paths
from dask.dataframe.io.parquet.utils import (
Engine,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import DataFrameIOFunction, _is_local_fs
from dask.dataframe.methods import concat
from dask.delayed import Delayed
Expand Down Expand Up @@ -193,6 +197,7 @@ def read_parquet(
chunksize=None,
aggregate_files=None,
parquet_file_extension=(".parq", ".parquet", ".pq"),
filesystem=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -246,6 +251,8 @@ def read_parquet(
data written by dask/fastparquet, not otherwise.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
Note that the default file-system backend can be configured with the
``filesystem`` argument, described below.
open_file_options : dict, default None
Key/value arguments to be passed along to ``AbstractFileSystem.open``
when each parquet data file is open for reading. Experimental
Expand Down Expand Up @@ -342,19 +349,27 @@ def read_parquet(
unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files).
It may be necessary to change this argument if the data files in your
parquet dataset do not end in ".parq", ".parquet", or ".pq".
filesystem: "fsspec", "arrow", fsspec.AbstractFileSystem, or pyarrow.fs.FileSystem
Filesystem backend to use. Note that the "fastparquet" engine only
supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object.
Default is "fsspec".
dataset: dict, default None
Dictionary of options to use when creating a ``pyarrow.dataset.Dataset``
or ``fastparquet.ParquetFile`` object. These options may include a
"filesystem" key (or "fs" for the "fastparquet" engine) to configure
the desired file-system backend. However, the top-level ``filesystem``
argument will always take precedence.
read: dict, default None
Dictionary of options to pass through to ``engine.read_partitions``
using the ``read`` key-word argument.
arrow_to_pandas: dict, default None
Dictionary of options to use when converting from ``pyarrow.Table`` to
a pandas ``DataFrame`` object. Only used by the "arrow" engine.
**kwargs: dict (of dicts)
Passthrough key-word arguments for read backend.
The top-level keys correspond to the appropriate operation type, and
the second level corresponds to the kwargs that will be passed on to
the underlying ``pyarrow`` or ``fastparquet`` function.
Supported top-level keys: 'dataset' (for opening a ``pyarrow`` dataset),
'file' or 'dataset' (for opening a ``fastparquet.ParquetFile``), 'read'
(for the backend read function), 'arrow_to_pandas' (for controlling the
arguments passed to convert from a ``pyarrow.Table.to_pandas()``).
Any element of kwargs that is not defined under these top-level keys
will be passed through to the `engine.read_partitions` classmethod as a
stand-alone argument (and will be ignored by the engine implementations
defined in ``dask.dataframe``).
Options to pass through to ``engine.read_partitions`` as stand-alone
key-word arguments. Note that these options will be ignored by the
engines defined in ``dask.dataframe``, but may be used by other custom
implementations.

Examples
--------
Expand Down Expand Up @@ -446,6 +461,7 @@ def read_parquet(
"chunksize": chunksize,
"aggregate_files": aggregate_files,
"parquet_file_extension": parquet_file_extension,
"filesystem": filesystem,
**kwargs,
}

Expand All @@ -466,7 +482,23 @@ def read_parquet(
# Update input_kwargs
input_kwargs.update({"columns": columns, "engine": engine})

fs, _, paths = get_fs_token_paths(path, mode="rb", storage_options=storage_options)
# Process and split user options
(
dataset_options,
read_options,
open_file_options,
other_options,
) = _split_user_options(**kwargs)

# Extract global filesystem and paths
fs, paths, dataset_options, open_file_options = engine.extract_filesystem(
path,
filesystem,
dataset_options,
open_file_options,
storage_options,
)
read_options["open_file_options"] = open_file_options
paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering

auto_index_allowed = False
Expand All @@ -490,7 +522,9 @@ def read_parquet(
ignore_metadata_file=ignore_metadata_file,
metadata_task_size=metadata_task_size,
parquet_file_extension=parquet_file_extension,
**kwargs,
dataset=dataset_options,
read=read_options,
**other_options,
)

# In the future, we may want to give the engine the
Expand Down
9 changes: 3 additions & 6 deletions dask/dataframe/io/parquet/fastparquet.py
Expand Up @@ -35,7 +35,6 @@
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import _is_local_fs, _meta_from_dtypes, _open_input_files
from dask.dataframe.utils import UNKNOWN_CATEGORIES
Expand Down Expand Up @@ -383,9 +382,8 @@ def _collect_dataset_info(
# then each part will correspond to a file. Otherwise, each part will
# correspond to a row group (populated later).

# Extract "supported" key-word arguments from `kwargs`.
# Split items into `dataset_kwargs` and `read_kwargs`
dataset_kwargs, read_kwargs, user_kwargs = _split_user_options(**kwargs)
# Extract dataset-specific options
dataset_kwargs = kwargs.pop("dataset", {})

parts = []
_metadata_exists = False
Expand Down Expand Up @@ -512,8 +510,7 @@ def _collect_dataset_info(
"metadata_task_size": metadata_task_size,
"kwargs": {
"dataset": dataset_kwargs,
"read": read_kwargs,
**user_kwargs,
**kwargs,
},
}

Expand Down