Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] P2P shuffle without PyArrow #8606

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

crusaderky
Copy link
Collaborator

No description provided.

@crusaderky crusaderky self-assigned this Apr 2, 2024
Copy link
Contributor

github-actions bot commented Apr 2, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    15 files  +    1      15 suites  +1   4h 4m 2s ⏱️ - 3m 33s
 4 046 tests  -     2   3 657 ✅  - 276    113 💤 +3  274 ❌ +269  2 🔥 +2 
25 509 runs  +1 227  23 535 ✅ +548  1 298 💤 +8  674 ❌ +669  2 🔥 +2 

For more details on these failures and errors, see this check.

Results for commit 8a732a7. ± Comparison against base commit 3f210fd.

This pull request removes 4 and adds 2 tests. Note that renamed tests count towards both.
distributed.shuffle.tests.test_merge ‑ test_minimal_version
distributed.shuffle.tests.test_shuffle ‑ test_minimal_version
distributed.shuffle.tests.test_shuffle_plugins ‑ test_split_by_partition[False]
distributed.shuffle.tests.test_shuffle_plugins ‑ test_split_by_partition[True]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
This pull request skips 1 test.
distributed.tests.test_metrics ‑ test_monotonic

♻️ This comment has been updated with latest results.

Comment on lines 49 to 111
def pickle_dataframe_shard(
input_part_id: int,
shard: pd.DataFrame,
) -> list[pickle.PickleBuffer]:
"""Optimized pickler for pandas Dataframes. DIscard all unnecessary metadata
(like the columns header).

Parameters:
obj: pandas
"""
return pickle_bytelist(
(input_part_id, shard.index, *shard._mgr.blocks), prelude=False
)


def unpickle_and_concat_dataframe_shards(
b: bytes | bytearray | memoryview, meta: pd.DataFrame
) -> pd.DataFrame:
"""Optimized unpickler for pandas Dataframes.

Parameters
----------
b:
raw buffer, containing the concatenation of the outputs of
:func:`pickle_dataframe_shard`, in arbitrary order
meta:
DataFrame header

Returns
-------
Reconstructed output shard, sorted by input partition ID

**Roundtrip example**

>>> import random
>>> import pandas as pd
>>> from toolz import concat

>>> df = pd.DataFrame(...) # Input partition
>>> meta = df.iloc[:0].copy()
>>> shards = df.iloc[0:10], df.iloc[10:20], ...
>>> frames = [pickle_dataframe_shard(i, shard) for i, shard in enumerate(shards)]
>>> random.shuffle(frames) # Simulate the frames arriving in arbitrary order
>>> blob = bytearray(b"".join(concat(frames))) # Simulate disk roundtrip
>>> df2 = unpickle_and_concat_dataframe_shards(blob, meta)
"""
import pandas as pd
from pandas.core.internals import BlockManager

parts = list(unpickle_bytestream(b))
# [(input_part_id, index, *blocks), ...]
parts = sorted(parts, key=first)
shards = []
for _, idx, *blocks in parts:
axes = [meta.columns, idx]
df = pd.DataFrame._from_mgr( # type: ignore[attr-defined]
BlockManager(blocks, axes, verify_integrity=False), axes
)
shards.append(df)

# Actually load memory-mapped buffers into memory and close the file
# descriptors
return pd.concat(shards, copy=True)
Copy link
Collaborator Author

@crusaderky crusaderky Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phofl these two functions are the heart of the PR. As you see I've already done some optimization by removing the columns header, but there's ample space for further tweaks:

  • don't pickle block wrappers; pickle the underlying ndarray/pyarrow tables instead
  • don't reassemble many tiny pd.DataFrame and then call pd.concat on them; instead concat the ndarray/pyarrow tables and then create a single dataframe out of them
  • find and remove unnecessary buffer deep copies
  • use pa.serialize / pa.deserialize instead of pickle/unpickle for pyarrow blocks

Copy link
Collaborator

@phofl phofl Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block stores the order how the columns match the arrays within the block. We have to keep track of this information somewhere else if we want to go lower. This mapping is global, so it should be doable but makes us somewhat susceptible to pandas changes internally.

I am kind of curious who big the block overhead is for arrow dtypes compared to numpy dtypes.

The block stores the array under .values. You'll get a NumPy arrays for NumPy dtypes or a pandas ExtensionArray für non-NumPy dtypes. The latter probably has a bunch more overhead than the NumPy dtypes. You can access the arrow array for arrow dtypes with the dunder arrow_array, this might help when you want to get lower

Concatenating the arrays should definitely be faster
pandas has a method concat_compat that does this within pandas, but it has more complexity than we need. We basically care about

You don't need a copy in concat in the end, it copies automatically and copy is/will be deprecated in pandas 3

One nit: We use pyarrow chunked arrays to represent arrow dtypes instead of tables

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants