forked from dask/dask
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.py
1580 lines (1393 loc) · 61.8 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import contextlib
import math
import warnings
import tlz as toolz
from fsspec.core import get_fs_token_paths
from fsspec.utils import stringify_path
from packaging.version import parse as parse_version
import dask
from dask.base import tokenize
from dask.blockwise import BlockIndex
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.core import DataFrame, Scalar
from dask.dataframe.io.io import from_map
from dask.dataframe.io.parquet.utils import Engine, _sort_and_analyze_paths
from dask.dataframe.io.utils import DataFrameIOFunction, _is_local_fs
from dask.dataframe.methods import concat
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from dask.utils import apply, import_required, natural_sort_key, parse_bytes
__all__ = ("read_parquet", "to_parquet")
NONE_LABEL = "__null_dask_index__"
# ----------------------------------------------------------------------
# User API
class ParquetFunctionWrapper(DataFrameIOFunction):
"""
Parquet Function-Wrapper Class
Reads parquet data from disk to produce a partition
(given a `part` argument).
"""
def __init__(
self,
engine,
fs,
meta,
columns,
index,
use_nullable_dtypes,
kwargs,
common_kwargs,
):
self.engine = engine
self.fs = fs
self.meta = meta
self._columns = columns
self.index = index
self.use_nullable_dtypes = use_nullable_dtypes
# `kwargs` = user-defined kwargs to be passed
# identically for all partitions.
#
# `common_kwargs` = kwargs set by engine to be
# passed identically for all
# partitions.
self.common_kwargs = toolz.merge(common_kwargs, kwargs or {})
@property
def columns(self):
return self._columns
def project_columns(self, columns):
"""Return a new ParquetFunctionWrapper object
with a sub-column projection.
"""
if columns == self.columns:
return self
return ParquetFunctionWrapper(
self.engine,
self.fs,
self.meta,
columns,
self.index,
self.use_nullable_dtypes,
None, # Already merged into common_kwargs
self.common_kwargs,
)
def __call__(self, part):
if not isinstance(part, list):
part = [part]
return read_parquet_part(
self.fs,
self.engine,
self.meta,
[
# Temporary workaround for HLG serialization bug
# (see: https://github.com/dask/dask/issues/8581)
(p.data["piece"], p.data.get("kwargs", {}))
if hasattr(p, "data")
else (p["piece"], p.get("kwargs", {}))
for p in part
],
self.columns,
self.index,
self.use_nullable_dtypes,
self.common_kwargs,
)
class ToParquetFunctionWrapper:
"""
Parquet Function-Wrapper Class
Writes a DataFrame partition into a distinct parquet
file. When called, the function also requires the
current block index (via ``blockwise.BlockIndex``).
"""
def __init__(
self,
engine,
path,
fs,
partition_on,
write_metadata_file,
i_offset,
name_function,
kwargs_pass,
):
self.engine = engine
self.path = path
self.fs = fs
self.partition_on = partition_on
self.write_metadata_file = write_metadata_file
self.i_offset = i_offset
self.name_function = name_function
self.kwargs_pass = kwargs_pass
# NOTE: __name__ must be with "to-parquet"
# for the name of the resulting `Blockwise`
# layer to begin with "to-parquet"
self.__name__ = "to-parquet"
def __dask_tokenize__(self):
return (
self.engine,
self.path,
self.fs,
self.partition_on,
self.write_metadata_file,
self.i_offset,
self.name_function,
self.kwargs_pass,
)
def __call__(self, df, block_index: tuple[int]):
# Get partition index from block index tuple
part_i = block_index[0]
filename = (
f"part.{part_i + self.i_offset}.parquet"
if self.name_function is None
else self.name_function(part_i + self.i_offset)
)
# Write out data
return self.engine.write_partition(
df,
self.path,
self.fs,
filename,
self.partition_on,
self.write_metadata_file,
**(dict(self.kwargs_pass, head=True) if part_i == 0 else self.kwargs_pass),
)
@dataframe_creation_dispatch.register_inplace("pandas")
def read_parquet(
path,
columns=None,
filters=None,
categories=None,
index=None,
storage_options=None,
engine="auto",
use_nullable_dtypes: bool = False,
calculate_divisions=None,
ignore_metadata_file=False,
metadata_task_size=None,
split_row_groups=False,
chunksize=None,
aggregate_files=None,
parquet_file_extension=(".parq", ".parquet", ".pq"),
**kwargs,
):
"""
Read a Parquet file into a Dask DataFrame
This reads a directory of Parquet data into a Dask.dataframe, one file per
partition. It selects the index among the sorted columns if any exist.
Parameters
----------
path : str or list
Source directory for data, or path(s) to individual parquet files.
Prefix with a protocol like ``s3://`` to read from alternative
filesystems. To read from multiple files you can pass a globstring or a
list of paths, with the caveat that they must all have the same
protocol.
columns : str or list, default None
Field name(s) to read in as columns in the output. By default all
non-index fields will be read (as determined by the pandas parquet
metadata, if present). Provide a single field name instead of a list to
read in the data as a Series.
filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None
List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``.
Using this argument will NOT result in row-wise filtering of the final
partitions unless ``engine="pyarrow"`` is also specified. For
other engines, filtering is only performed at the partition level, that is,
to prevent the loading of some row-groups and/or files.
For the "pyarrow" engine, predicates can be expressed in disjunctive
normal form (DNF). This means that the inner-most tuple describes a single
column predicate. These inner predicates are combined with an AND
conjunction into a larger predicate. The outer-most list then combines all
of the combined filters with an OR disjunction.
Predicates can also be expressed as a ``List[Tuple]``. These are evaluated
as an AND conjunction. To express OR in predicates, one must use the
(preferred for "pyarrow") ``List[List[Tuple]]`` notation.
Note that the "fastparquet" engine does not currently support DNF for
the filtering of partitioned columns (``List[Tuple]`` is required).
index : str, list or False, default None
Field name(s) to use as the output frame index. By default will be
inferred from the pandas parquet file metadata, if present. Use ``False``
to read all fields as columns.
categories : list or dict, default None
For any fields listed here, if the parquet encoding is Dictionary,
the column will be created with dtype category. Use only if it is
guaranteed that the column is encoded as dictionary in all row-groups.
If a list, assumes up to 2**16-1 labels; if a dict, specify the number
of labels expected; if None, will load categories automatically for
data written by dask/fastparquet, not otherwise.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
open_file_options : dict, default None
Key/value arguments to be passed along to ``AbstractFileSystem.open``
when each parquet data file is open for reading. Experimental
(optimized) "precaching" for remote file systems (e.g. S3, GCS) can
be enabled by adding ``{"method": "parquet"}`` under the
``"precache_options"`` key. Also, a custom file-open function can be
used (instead of ``AbstractFileSystem.open``), by specifying the
desired function under the ``"open_file_func"`` key.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if
it is installed, and falls back to ``fastparquet`` otherwise.
use_nullable_dtypes : {False, True}
Whether to use extension dtypes for the resulting ``DataFrame``.
``use_nullable_dtypes=True`` is only supported when ``engine="pyarrow"``.
.. note::
Use the ``dataframe.dtype_backend`` config option to select which
dtype implementation to use.
``dataframe.dtype_backend="pandas"`` (the default) will use
pandas' ``numpy``-backed nullable dtypes (e.g. ``Int64``,
``string[python]``, etc.) while ``dataframe.dtype_backend="pyarrow"``
will use ``pyarrow``-backed extension dtypes (e.g. ``int64[pyarrow]``,
``string[pyarrow]``, etc.). ``dataframe.dtype_backend="pyarrow"``
requires ``pandas`` 1.5+.
calculate_divisions : bool, default False
Whether to use min/max statistics from the footer metadata (or global
``_metadata`` file) to calculate divisions for the output DataFrame
collection. Divisions will not be calculated if statistics are missing.
This option will be ignored if ``index`` is not specified and there is
no physical index column specified in the custom "pandas" Parquet
metadata. Note that ``calculate_divisions=True`` may be extremely slow
when no global ``_metadata`` file is present, especially when reading
from remote storage. Set this to ``True`` only when known divisions
are needed for your workload (see :ref:`dataframe-design-partitions`).
ignore_metadata_file : bool, default False
Whether to ignore the global ``_metadata`` file (when one is present).
If ``True``, or if the global ``_metadata`` file is missing, the parquet
metadata may be gathered and processed in parallel. Parallel metadata
processing is currently supported for ``ArrowDatasetEngine`` only.
metadata_task_size : int, default configurable
If parquet metadata is processed in parallel (see ``ignore_metadata_file``
description above), this argument can be used to specify the number of
dataset files to be processed by each task in the Dask graph. If this
argument is set to ``0``, parallel metadata processing will be disabled.
The default values for local and remote filesystems can be specified
with the "metadata-task-size-local" and "metadata-task-size-remote"
config fields, respectively (see "dataframe.parquet").
split_row_groups : bool or int, default False
If True, then each output dataframe partition will correspond to a single
parquet-file row-group. If False, each partition will correspond to a
complete file. If a positive integer value is given, each dataframe
partition will correspond to that number of parquet row-groups (or fewer).
chunksize : int or str, default None
WARNING: The ``chunksize`` argument will be deprecated in the future.
Please use ``split_row_groups`` to specify how many row-groups should be
mapped to each output partition. If you strongly oppose the deprecation of
``chunksize``, please comment at https://github.com/dask/dask/issues/9043".
The desired size of each output ``DataFrame`` partition in terms of total
(uncompressed) parquet storage space. If specified, adjacent row-groups
and/or files will be aggregated into the same output partition until the
cumulative ``total_byte_size`` parquet-metadata statistic reaches this
value. Use `aggregate_files` to enable/disable inter-file aggregation.
aggregate_files : bool or str, default None
WARNING: The ``aggregate_files`` argument will be deprecated in the future.
Please consider using ``from_map`` to create a DataFrame collection with a
custom file-to-partition mapping. If you strongly oppose the deprecation of
``aggregate_files``, comment at https://github.com/dask/dask/issues/9051".
Whether distinct file paths may be aggregated into the same output
partition. This parameter is only used when `chunksize` is specified
or when `split_row_groups` is an integer >1. A setting of True means
that any two file paths may be aggregated into the same output partition,
while False means that inter-file aggregation is prohibited.
For "hive-partitioned" datasets, a "partition"-column name can also be
specified. In this case, we allow the aggregation of any two files
sharing a file path up to, and including, the corresponding directory name.
For example, if ``aggregate_files`` is set to ``"section"`` for the
directory structure below, ``03.parquet`` and ``04.parquet`` may be
aggregated together, but ``01.parquet`` and ``02.parquet`` cannot be.
If, however, ``aggregate_files`` is set to ``"region"``, ``01.parquet``
may be aggregated with ``02.parquet``, and ``03.parquet`` may be aggregated
with ``04.parquet``::
dataset-path/
├── region=1/
│ ├── section=a/
│ │ └── 01.parquet
│ ├── section=b/
│ └── └── 02.parquet
└── region=2/
├── section=a/
│ ├── 03.parquet
└── └── 04.parquet
Note that the default behavior of ``aggregate_files`` is ``False``.
parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq")
A file extension or an iterable of extensions to use when discovering
parquet files in a directory. Files that don't match these extensions
will be ignored. This argument only applies when ``paths`` corresponds
to a directory and no ``_metadata`` file is present (or
``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None``
will treat all files in the directory as parquet files.
The purpose of this argument is to ensure that the engine will ignore
unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files).
It may be necessary to change this argument if the data files in your
parquet dataset do not end in ".parq", ".parquet", or ".pq".
**kwargs: dict (of dicts)
Passthrough key-word arguments for read backend.
The top-level keys correspond to the appropriate operation type, and
the second level corresponds to the kwargs that will be passed on to
the underlying ``pyarrow`` or ``fastparquet`` function.
Supported top-level keys: 'dataset' (for opening a ``pyarrow`` dataset),
'file' or 'dataset' (for opening a ``fastparquet.ParquetFile``), 'read'
(for the backend read function), 'arrow_to_pandas' (for controlling the
arguments passed to convert from a ``pyarrow.Table.to_pandas()``).
Any element of kwargs that is not defined under these top-level keys
will be passed through to the `engine.read_partitions` classmethod as a
stand-alone argument (and will be ignored by the engine implementations
defined in ``dask.dataframe``).
Examples
--------
>>> df = dd.read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP
See Also
--------
to_parquet
pyarrow.parquet.ParquetDataset
"""
if use_nullable_dtypes:
use_nullable_dtypes = dask.config.get("dataframe.dtype_backend")
# "Pre-deprecation" warning for `chunksize`
if chunksize:
warnings.warn(
"The `chunksize` argument will be deprecated in the future. "
"Please use `split_row_groups` to specify how many row-groups "
"should be mapped to each output partition.\n\n"
"If you strongly oppose the deprecation of `chunksize`, please "
"comment at https://github.com/dask/dask/issues/9043",
FutureWarning,
)
# "Pre-deprecation" warning for `aggregate_files`
if aggregate_files:
warnings.warn(
"The `aggregate_files` argument will be deprecated in the future. "
"Please consider using `from_map` to create a DataFrame collection "
"with a custom file-to-partition mapping.\n\n"
"If you strongly oppose the deprecation of `aggregate_files`, "
"please comment at https://github.com/dask/dask/issues/9051",
FutureWarning,
)
if "read_from_paths" in kwargs:
kwargs.pop("read_from_paths")
warnings.warn(
"`read_from_paths` is no longer supported and will be ignored.",
FutureWarning,
)
# Handle gather_statistics deprecation
if "gather_statistics" in kwargs:
if calculate_divisions is None:
calculate_divisions = kwargs.pop("gather_statistics")
warnings.warn(
"``gather_statistics`` is deprecated and will be removed in a "
"future release. Please use ``calculate_divisions`` instead.",
FutureWarning,
)
else:
warnings.warn(
f"``gather_statistics`` is deprecated. Ignoring this option "
f"in favor of ``calculate_divisions={calculate_divisions}``",
FutureWarning,
)
calculate_divisions = bool(calculate_divisions)
# We support a top-level `parquet_file_extension` kwarg, but
# must check if the deprecated `require_extension` option is
# being passed to the engine. If `parquet_file_extension` is
# set to the default value, and `require_extension` was also
# specified, we will use `require_extension` but warn the user.
if (
"dataset" in kwargs
and "require_extension" in kwargs["dataset"]
and parquet_file_extension == (".parq", ".parquet", ".pq")
):
parquet_file_extension = kwargs["dataset"].pop("require_extension")
warnings.warn(
"require_extension is deprecated, and will be removed from "
"read_parquet in a future release. Please use the top-level "
"parquet_file_extension argument instead.",
FutureWarning,
)
# Store initial function arguments
input_kwargs = {
"columns": columns,
"filters": filters,
"categories": categories,
"index": index,
"storage_options": storage_options,
"engine": engine,
"use_nullable_dtypes": use_nullable_dtypes,
"calculate_divisions": calculate_divisions,
"ignore_metadata_file": ignore_metadata_file,
"metadata_task_size": metadata_task_size,
"split_row_groups": split_row_groups,
"chunksize": chunksize,
"aggregate_files": aggregate_files,
"parquet_file_extension": parquet_file_extension,
**kwargs,
}
if isinstance(columns, str):
input_kwargs["columns"] = [columns]
df = read_parquet(path, **input_kwargs)
return df[columns]
if columns is not None:
columns = list(columns)
if isinstance(engine, str):
engine = get_engine(engine)
if hasattr(path, "name"):
path = stringify_path(path)
# Update input_kwargs
input_kwargs.update({"columns": columns, "engine": engine})
fs, _, paths = get_fs_token_paths(path, mode="rb", storage_options=storage_options)
paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering
auto_index_allowed = False
if index is None:
# User is allowing auto-detected index
auto_index_allowed = True
if index and isinstance(index, str):
index = [index]
read_metadata_result = engine.read_metadata(
fs,
paths,
categories=categories,
index=index,
use_nullable_dtypes=use_nullable_dtypes,
gather_statistics=calculate_divisions,
filters=filters,
split_row_groups=split_row_groups,
chunksize=chunksize,
aggregate_files=aggregate_files,
ignore_metadata_file=ignore_metadata_file,
metadata_task_size=metadata_task_size,
parquet_file_extension=parquet_file_extension,
**kwargs,
)
# In the future, we may want to give the engine the
# option to return a dedicated element for `common_kwargs`.
# However, to avoid breaking the API, we just embed this
# data in the first element of `parts` for now.
# The logic below is inteded to handle backward and forward
# compatibility with a user-defined engine.
meta, statistics, parts, index = read_metadata_result[:4]
common_kwargs = {}
aggregation_depth = False
if len(parts):
# For now, `common_kwargs` and `aggregation_depth`
# may be stored in the first element of `parts`
common_kwargs = parts[0].pop("common_kwargs", {})
aggregation_depth = parts[0].pop("aggregation_depth", aggregation_depth)
# Parse dataset statistics from metadata (if available)
parts, divisions, index = process_statistics(
parts,
statistics,
filters,
index,
chunksize,
split_row_groups,
fs,
aggregation_depth,
)
# Account for index and columns arguments.
# Modify `meta` dataframe accordingly
meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
if meta.index.name == NONE_LABEL:
meta.index.name = None
if len(divisions) < 2:
# empty dataframe - just use meta
divisions = (None, None)
io_func = lambda x: x
parts = [meta]
else:
# Use IO function wrapper
io_func = ParquetFunctionWrapper(
engine,
fs,
meta,
columns,
index,
use_nullable_dtypes,
{}, # All kwargs should now be in `common_kwargs`
common_kwargs,
)
# If we are using a remote filesystem and retries is not set, bump it
# to be more fault tolerant, as transient transport errors can occur.
# The specific number 5 isn't hugely motivated: it's less than ten and more
# than two.
annotations = dask.config.get("annotations", {})
if "retries" not in annotations and not _is_local_fs(fs):
ctx = dask.annotate(retries=5)
else:
ctx = contextlib.nullcontext() # type: ignore
with ctx:
# Construct the output collection with from_map
return from_map(
io_func,
parts,
meta=meta,
divisions=divisions,
label="read-parquet",
token=tokenize(path, **input_kwargs),
enforce_metadata=False,
creation_info={
"func": read_parquet,
"args": (path,),
"kwargs": input_kwargs,
},
)
def check_multi_support(engine):
# Helper function to check that the engine
# supports a multi-partition read
return hasattr(engine, "multi_support") and engine.multi_support()
def read_parquet_part(
fs, engine, meta, part, columns, index, use_nullable_dtypes, kwargs
):
"""Read a part of a parquet dataset
This function is used by `read_parquet`."""
if isinstance(part, list):
if len(part) == 1 or part[0][1] or not check_multi_support(engine):
# Part kwargs expected
func = engine.read_partition
dfs = [
func(
fs,
rg,
columns.copy(),
index,
use_nullable_dtypes=use_nullable_dtypes,
**toolz.merge(kwargs, kw),
)
for (rg, kw) in part
]
df = concat(dfs, axis=0) if len(dfs) > 1 else dfs[0]
else:
# No part specific kwargs, let engine read
# list of parts at once
df = engine.read_partition(
fs,
[p[0] for p in part],
columns.copy(),
index,
use_nullable_dtypes=use_nullable_dtypes,
**kwargs,
)
else:
# NOTE: `kwargs` are the same for all parts, while `part_kwargs` may
# be different for each part.
rg, part_kwargs = part
df = engine.read_partition(
fs,
rg,
columns,
index,
use_nullable_dtypes=use_nullable_dtypes,
**toolz.merge(kwargs, part_kwargs),
)
if meta.columns.name:
df.columns.name = meta.columns.name
columns = columns or []
index = index or []
df = df[[c for c in columns if c not in index]]
if index == [NONE_LABEL]:
df.index.name = None
return df
def to_parquet(
df,
path,
engine="auto",
compression="snappy",
write_index=True,
append=False,
overwrite=False,
ignore_divisions=False,
partition_on=None,
storage_options=None,
custom_metadata=None,
write_metadata_file=None,
compute=True,
compute_kwargs=None,
schema="infer",
name_function=None,
**kwargs,
):
"""Store Dask.dataframe to Parquet files
Notes
-----
Each partition will be written to a separate file.
Parameters
----------
df : dask.dataframe.DataFrame
path : string or pathlib.Path
Destination directory for data. Prepend with protocol like ``s3://``
or ``hdfs://`` for remote data.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. Defaults to 'auto', which uses ``pyarrow`` if
it is installed, and falls back to ``fastparquet`` otherwise.
compression : string or dict, default 'snappy'
Either a string like ``"snappy"`` or a dictionary mapping column names
to compressors like ``{"name": "gzip", "values": "snappy"}``. Defaults
to ``"snappy"``.
write_index : boolean, default True
Whether or not to write the index. Defaults to True.
append : bool, default False
If False (default), construct data-set from scratch. If True, add new
row-group(s) to an existing data-set. In the latter case, the data-set
must exist, and the schema must match the input data.
overwrite : bool, default False
Whether or not to remove the contents of `path` before writing the dataset.
The default is False. If True, the specified path must correspond to
a directory (but not the current working directory). This option cannot
be set to True if `append=True`.
NOTE: `overwrite=True` will remove the original data even if the current
write operation fails. Use at your own risk.
ignore_divisions : bool, default False
If False (default) raises error when previous divisions overlap with
the new appended divisions. Ignored if append=False.
partition_on : list, default None
Construct directory-based partitioning by splitting on these fields'
values. Each dask partition will result in one or more datafiles,
there will be no global groupby.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
custom_metadata : dict, default None
Custom key/value metadata to include in all footer metadata (and
in the global "_metadata" file, if applicable). Note that the custom
metadata may not contain the reserved b"pandas" key.
write_metadata_file : bool or None, default None
Whether to write the special ``_metadata`` file. If ``None`` (the
default), a ``_metadata`` file will only be written if ``append=True``
and the dataset already has a ``_metadata`` file.
compute : bool, default True
If ``True`` (default) then the result is computed immediately. If
``False`` then a ``dask.dataframe.Scalar`` object is returned for
future computation.
compute_kwargs : dict, default True
Options to be passed in to the compute method
schema : pyarrow.Schema, dict, "infer", or None, default "infer"
Global schema to use for the output dataset. Defaults to "infer", which
will infer the schema from the dask dataframe metadata. This is usually
sufficient for common schemas, but notably will fail for ``object``
dtype columns that contain things other than strings. These columns
will require an explicit schema be specified. The schema for a subset
of columns can be overridden by passing in a dict of column names to
pyarrow types (for example ``schema={"field": pa.string()}``); columns
not present in this dict will still be automatically inferred.
Alternatively, a full ``pyarrow.Schema`` may be passed, in which case
no schema inference will be done. Passing in ``schema=None`` will
disable the use of a global file schema - each written file may use a
different schema dependent on the dtypes of the corresponding
partition. Note that this argument is ignored by the "fastparquet"
engine.
name_function : callable, default None
Function to generate the filename for each output partition.
The function should accept an integer (partition index) as input and
return a string which will be used as the filename for the corresponding
partition. Should preserve the lexicographic order of partitions.
If not specified, files will created using the convention
``part.0.parquet``, ``part.1.parquet``, ``part.2.parquet``, ...
and so on for each partition in the DataFrame.
**kwargs :
Extra options to be passed on to the specific backend.
Examples
--------
>>> df = dd.read_csv(...) # doctest: +SKIP
>>> df.to_parquet('/path/to/output/', ...) # doctest: +SKIP
By default, files will be created in the specified output directory using the
convention ``part.0.parquet``, ``part.1.parquet``, ``part.2.parquet``, ... and so on for
each partition in the DataFrame. To customize the names of each file, you can use the
``name_function=`` keyword argument. The function passed to ``name_function`` will be
used to generate the filename for each partition and should expect a partition's index
integer as input and return a string which will be used as the filename for the corresponding
partition. Strings produced by ``name_function`` must preserve the order of their respective
partition indices.
For example:
>>> name_function = lambda x: f"data-{x}.parquet"
>>> df.to_parquet('/path/to/output/', name_function=name_function) # doctest: +SKIP
will result in the following files being created::
/path/to/output/
├── data-0.parquet
├── data-1.parquet
├── data-2.parquet
└── ...
See Also
--------
read_parquet: Read parquet data to dask.dataframe
"""
compute_kwargs = compute_kwargs or {}
partition_on = partition_on or []
if isinstance(partition_on, str):
partition_on = [partition_on]
if set(partition_on) - set(df.columns):
raise ValueError(
"Partitioning on non-existent column. "
"partition_on=%s ."
"columns=%s" % (str(partition_on), str(list(df.columns)))
)
if df.columns.inferred_type not in {"string", "empty"}:
raise ValueError("parquet doesn't support non-string column names")
if isinstance(engine, str):
engine = get_engine(engine)
if hasattr(path, "name"):
path = stringify_path(path)
fs, _, _ = get_fs_token_paths(path, mode="wb", storage_options=storage_options)
# Trim any protocol information from the path before forwarding
path = fs._strip_protocol(path)
if overwrite:
if append:
raise ValueError("Cannot use both `overwrite=True` and `append=True`!")
if fs.exists(path) and fs.isdir(path):
# Check for any previous parquet layers reading from a file in the
# output directory, since deleting those files now would result in
# errors or incorrect results.
for layer_name, layer in df.dask.layers.items():
if layer_name.startswith("read-parquet-") and isinstance(
layer, DataFrameIOLayer
):
path_with_slash = path.rstrip("/") + "/" # ensure trailing slash
for input in layer.inputs:
if input["piece"][0].startswith(path_with_slash):
raise ValueError(
"Reading and writing to the same parquet file within the "
"same task graph is not supported."
)
# Don't remove the directory if it's the current working directory
if _is_local_fs(fs):
working_dir = fs.expand_path(".")[0]
if path.rstrip("/") == working_dir.rstrip("/"):
raise ValueError(
"Cannot clear the contents of the current working directory!"
)
# It's safe to clear the output directory
fs.rm(path, recursive=True)
# Always skip divisions checks if divisions are unknown
if not df.known_divisions:
ignore_divisions = True
# Save divisions and corresponding index name. This is necessary,
# because we may be resetting the index to write the file
division_info = {"divisions": df.divisions, "name": df.index.name}
if division_info["name"] is None:
# As of 0.24.2, pandas will rename an index with name=None
# when df.reset_index() is called. The default name is "index",
# but dask will always change the name to the NONE_LABEL constant
if NONE_LABEL not in df.columns:
division_info["name"] = NONE_LABEL
elif write_index:
raise ValueError(
"Index must have a name if __null_dask_index__ is a column."
)
else:
warnings.warn(
"If read back by Dask, column named __null_dask_index__ "
"will be set to the index (and renamed to None)."
)
# There are some "resrved" names that may be used as the default column
# name after resetting the index. However, we don't want to treat it as
# a "special" name if the string is already used as a "real" column name.
reserved_names = []
for name in ["index", "level_0"]:
if name not in df.columns:
reserved_names.append(name)
# If write_index==True (default), reset the index and record the
# name of the original index in `index_cols` (we will set the name
# to the NONE_LABEL constant if it is originally `None`).
# `fastparquet` will use `index_cols` to specify the index column(s)
# in the metadata. `pyarrow` will revert the `reset_index` call
# below if `index_cols` is populated (because pyarrow will want to handle
# index preservation itself). For both engines, the column index
# will be written to "pandas metadata" if write_index=True
index_cols = []
if write_index:
real_cols = set(df.columns)
none_index = list(df._meta.index.names) == [None]
df = df.reset_index()
if none_index:
df.columns = [
c if c not in reserved_names else NONE_LABEL for c in df.columns
]
index_cols = [c for c in set(df.columns) - real_cols]
else:
# Not writing index - might as well drop it
df = df.reset_index(drop=True)
if custom_metadata and b"pandas" in custom_metadata.keys():
raise ValueError(
"User-defined key/value metadata (custom_metadata) can not "
"contain a b'pandas' key. This key is reserved by Pandas, "
"and overwriting the corresponding value can render the "
"entire dataset unreadable."
)
# Engine-specific initialization steps to write the dataset.
# Possibly create parquet metadata, and load existing stuff if appending
i_offset, fmd, metadata_file_exists, extra_write_kwargs = engine.initialize_write(
df,
fs,
path,
append=append,
ignore_divisions=ignore_divisions,
partition_on=partition_on,
division_info=division_info,
index_cols=index_cols,
schema=schema,
custom_metadata=custom_metadata,
**kwargs,
)
# By default we only write a metadata file when appending if one already
# exists
if append and write_metadata_file is None:
write_metadata_file = metadata_file_exists
# Check that custom name_function is valid,
# and that it will produce unique names
if name_function is not None:
if not callable(name_function):
raise ValueError("``name_function`` must be a callable with one argument.")
filenames = [name_function(i + i_offset) for i in range(df.npartitions)]
if len(set(filenames)) < len(filenames):
raise ValueError("``name_function`` must produce unique filenames.")
# If we are using a remote filesystem and retries is not set, bump it
# to be more fault tolerant, as transient transport errors can occur.
# The specific number 5 isn't hugely motivated: it's less than ten and more
# than two.
annotations = dask.config.get("annotations", {})
if "retries" not in annotations and not _is_local_fs(fs):
ctx = dask.annotate(retries=5)
else:
ctx = contextlib.nullcontext()
# Create Blockwise layer for parquet-data write
with ctx:
data_write = df.map_partitions(
ToParquetFunctionWrapper(
engine,
path,
fs,
partition_on,
write_metadata_file,
i_offset,
name_function,
toolz.merge(
kwargs,
{"compression": compression, "custom_metadata": custom_metadata},
extra_write_kwargs,
),
),
BlockIndex((df.npartitions,)),
# Pass in the original metadata to avoid
# metadata emulation in `map_partitions`.
# This is necessary, because we are not
# expecting a dataframe-like output.
meta=df._meta,
enforce_metadata=False,
transform_divisions=False,
align_dataframes=False,
)
# Collect metadata and write _metadata.
# TODO: Use tree-reduction layer (when available)
if write_metadata_file:
final_name = "metadata-" + data_write._name
dsk = {
(final_name, 0): (
apply,
engine.write_metadata,
[
data_write.__dask_keys__(),
fmd,
fs,
path,
],
{"append": append, "compression": compression},
)
}
else:
# NOTE: We still define a single task to tie everything together
# when we are not writing a _metadata file. We do not want to
# return `data_write` (or a `data_write.to_bag()`), because calling
# `compute()` on a multi-partition collection requires the overhead
# of trying to concatenate results on the client.
final_name = "store-" + data_write._name
dsk = {(final_name, 0): (lambda x: None, data_write.__dask_keys__())}
# Convert data_write + dsk to computable collection
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(data_write,))
out = Scalar(graph, final_name, "")