-
-
Notifications
You must be signed in to change notification settings - Fork 710
/
shuffle.py
111 lines (89 loc) · 3.14 KB
/
shuffle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from __future__ import annotations
from typing import TYPE_CHECKING
from dask.base import tokenize
from dask.dataframe import DataFrame
from dask.delayed import Delayed, delayed
from dask.highlevelgraph import HighLevelGraph
from .shuffle_extension import NewShuffleMetadata, ShuffleId, ShuffleWorkerExtension
if TYPE_CHECKING:
import pandas as pd
def get_ext() -> ShuffleWorkerExtension:
from distributed import get_worker
try:
worker = get_worker()
except ValueError as e:
raise RuntimeError(
"`shuffle='p2p'` requires Dask's distributed scheduler. This task is not running on a Worker; "
"please confirm that you've created a distributed Client and are submitting this computation through it."
) from e
extension: ShuffleWorkerExtension | None = worker.extensions.get("shuffle")
if not extension:
raise RuntimeError(
f"The worker {worker.address} does not have a ShuffleExtension. "
"Is pandas installed on the worker?"
)
return extension
def shuffle_setup(metadata: NewShuffleMetadata) -> None:
get_ext().create_shuffle(metadata)
def shuffle_transfer(input: pd.DataFrame, id: ShuffleId, setup=None) -> None:
get_ext().add_partition(input, id)
def shuffle_unpack(id: ShuffleId, output_partition: int, barrier=None) -> pd.DataFrame:
return get_ext().get_output_partition(id, output_partition)
def shuffle_barrier(id: ShuffleId, transfers: list[None]) -> None:
get_ext().barrier(id)
def rearrange_by_column_p2p(
df: DataFrame,
column: str,
npartitions: int | None = None,
):
npartitions = npartitions or df.npartitions
token = tokenize(df, column, npartitions)
setup = delayed(shuffle_setup, pure=True)(
NewShuffleMetadata(
ShuffleId(token),
df._meta,
column,
npartitions,
)
)
transferred = df.map_partitions(
shuffle_transfer,
token,
setup,
meta=df,
enforce_metadata=False,
transform_divisions=False,
)
barrier_key = "shuffle-barrier-" + token
barrier_dsk = {barrier_key: (shuffle_barrier, token, transferred.__dask_keys__())}
barrier = Delayed(
barrier_key,
HighLevelGraph.from_collections(
barrier_key, barrier_dsk, dependencies=[transferred]
),
)
name = "shuffle-unpack-" + token
dsk = {
(name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions)
}
# TODO use this blockwise (https://github.com/coiled/oss-engineering/issues/49)
# Changes task names, so breaks setting worker restrictions at the moment.
# Also maybe would be nice if the `DataFrameIOLayer` interface supported this?
# dsk = blockwise(
# shuffle_unpack,
# name,
# "i",
# token,
# None,
# BlockwiseDepDict({(i,): i for i in range(npartitions)}),
# "i",
# barrier_key,
# None,
# numblocks={},
# )
return DataFrame(
HighLevelGraph.from_collections(name, dsk, [barrier]),
name,
df._meta,
[None] * (npartitions + 1),
)