Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delayed in fusing with multipled dependencies #1038

Merged
merged 4 commits into from
Apr 30, 2024
Merged

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Apr 25, 2024

def test_from_delayed_fusion():
df = from_delayed([_load(x) for x in range(10)], meta={"x": "int64", "y": "int64"})
result = df.map_partitions(lambda x: None, meta={}).optimize().dask
assert len(result) == 30
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you help me out...? 10 x load + 10 x lambda = 20. What am I missing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to test that it doesn't fuse, maybe

df.map_partitions(lambda x: None, meta={}).optimize(fuse=False).dask == df.map_partitions(lambda x: None, meta={}).optimize().dask

is a better test. or even

ddf = df.map_partitions(lambda x: None, meta={})
dsk_opt = ddf.optimize().dask
dsk_raw = ddf.lower_completely().dask

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 x an intermediate task representing the from_delayed thing that isn't fused anymore. Previously that was fused into the map_partitions thing as well. This is the tradeoff for now

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that, adjusted

@jtilly
Copy link

jtilly commented Apr 29, 2024

Thank you for the PR!

I just wanted to flag that this PR doesn't (yet) change the behavior for the example that I provided in dask/dask#11067:

(dask) ➜  ~/dask (main) ✗ pip install git+https://github.com/phofl/dask-expr@11067
...
(dask) ➜  ~/dask (main) ✗ python example.py                                             
Loading chunk 9
Loading chunk 8
Loading chunk 7
Loading chunk 6
Loading chunk 5
Loading chunk 4
Loading chunk 3
Loading chunk 2
Loading chunk 1
Loading chunk 0
Storing chunk 9
Storing chunk 8
Storing chunk 7
Storing chunk 6
Storing chunk 5
Storing chunk 4
Storing chunk 3
Storing chunk 2
Storing chunk 1
Storing chunk 0
(dask) ➜  ~/dask (main) ✗ DASK_DATAFRAME__QUERY_PLANNING=False python example.py                     
Loading chunk 9
Storing chunk 9
Loading chunk 8
Storing chunk 8
Loading chunk 7
Storing chunk 7
Loading chunk 6
Storing chunk 6
Loading chunk 5
Storing chunk 5
Loading chunk 4
Storing chunk 4
Loading chunk 3
Storing chunk 3
Loading chunk 2
Storing chunk 2
Loading chunk 1
Storing chunk 1
Loading chunk 0
Storing chunk 0

@phofl
Copy link
Collaborator Author

phofl commented Apr 29, 2024

Yes we still have some overhead. It works mostly as expected if your delayed function runs longer (e.g. increase the size of your DataFrame), but this will still need a follow up on our end since the behaviour isn't as intended yet.

On a side note: You might want to use from_map if your delayed function is indeed only loading data from disk, that is better suited for your task anyway and doesn't suffer from this problem.

@phofl phofl merged commit 4854a85 into dask:main Apr 30, 2024
7 checks passed
@phofl phofl deleted the 11067 branch April 30, 2024 10:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants