Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

da.store loses dependency information #8380

Open
djhoese opened this issue Nov 15, 2021 · 12 comments · May be fixed by #9732
Open

da.store loses dependency information #8380

djhoese opened this issue Nov 15, 2021 · 12 comments · May be fixed by #9732
Labels
array bug Something is broken

Comments

@djhoese
Copy link
Contributor

djhoese commented Nov 15, 2021

What happened:

I'm trying to take the result from a map_blocks function and store one slice of the resulting array in one zarr array and another slice in another array. I set compute=False on the da.store calls so that I can compute them together later and avoid computing the array multiple times. However, it seems the map blocks function is called for each store operation. From what I can tell the dependency graph is getting lost.

What you expected to happen:

Two da.store calls with shared tasks on their dask graphs should share computations when computed at the same time.

Minimal Complete Verifiable Example:

import dask
import dask.array as da
import numpy as np

TOTAL_CALLS = 0


def shared_task2(arr1):
    global TOTAL_CALLS
    TOTAL_CALLS += 1
    return np.stack([arr1 + 1, arr1 + 2])


if __name__ == "__main__":
    start = da.zeros((2, 2), chunks=1)
    src = da.map_blocks(shared_task2, start, dtype=start.dtype,
                        meta=np.array((), dtype=start.dtype),
                        new_axis=[0],
                        chunks=(2,) + start.chunks)
    target1 = np.zeros((2, 2))
    target2 = np.zeros((2, 2))

    with dask.config.set(schedulers='single-threaded'):
        store_res1 = da.store(src[0], target1, compute=False)
        store_res2 = da.store(src[1], target2, compute=False)
        da.compute(store_res1, store_res2)
    # one call per chunk
    assert TOTAL_CALLS == start.blocks.size

Anything else we need to know?:

As mentioned above, I'm actually trying to use to_zarr to save some dask arrays. I want to write them to zarr and then get the resulting loaded zarr arrays as results, but I couldn't find a way to do that with the combination of return_stored and compute=False as you need to compute to write to the zarr arrays, but that then returns the resulting numpy arrays.

Also note that my map_blocks function is returning the np.stack because it is actually returning two results.

Environment:

  • Dask version: 2021.11.1
  • Python version: 3.9
  • Operating System: Ubuntu/PopOS
  • Install method (conda, pip, source): conda-forge
@djhoese
Copy link
Contributor Author

djhoese commented Nov 15, 2021

Also note this does not happen if the map_blocks function is a single result or if I don't do src[0] and instead store the entire 3D array twice. Something about the __getitem__ task seems to be the problem.

@djhoese
Copy link
Contributor Author

djhoese commented Nov 15, 2021

Ok new find, if I call da.store with a list of sources and targets then it works as expected. So I guess the new question is how can I do to_zarr for multiple sources and targets?

@djhoese
Copy link
Contributor Author

djhoese commented Nov 15, 2021

Found an ugly workaround, change the bottom portion of the script to turn fuse.active off:

    with dask.config.set(schedulers='single-threaded'):
        with dask.config.set({"optimization.fuse.active": False}):
            store_res1 = da.store(src[0], target1, compute=False)
            store_res2 = da.store(src[1], target2, compute=False)
        da.compute(store_res1, store_res2)
    # one call per chunk
    assert TOTAL_CALLS == start.blocks.size

This stops da.store from fusing the tasks and causing the later da.compute to not realize that the two sources are from the same dependencies.

@jsignell
Copy link
Member

It seems to me that if optimization should not happen until right before computation starts. So in the dask.compute call. @jrbourbeau and @crusaderky it seems like this might be related to #8261 if you have any insight.

@crusaderky
Copy link
Collaborator

Correct, store automatically optimizes the graph. My recent PR didn't change that.
We should disable optimization when compute=False.

@djhoese
Copy link
Contributor Author

djhoese commented Nov 16, 2021

Would that just mean putting an if statement around the __dask_optimize__ call? How should source_layers be constructed if not optimized?

dask/dask/array/core.py

Lines 1017 to 1019 in f588189

sources_layer = Array.__dask_optimize__(
sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources]))
)

@crusaderky
Copy link
Collaborator

without optimization, you need to merge sources_hlg into the final graph - see HighLevelGraph.merge

@djhoese
Copy link
Contributor Author

djhoese commented Nov 17, 2021

I'm not familiar with the structural differences between the various dictionaries and graphs used here, but the line just above that uses HighLevelGraph.merge:

dask/dask/array/core.py

Lines 1016 to 1022 in f588189

sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources])
sources_layer = Array.__dask_optimize__(
sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources]))
)
sources_name = "store-sources-" + tokenize(sources)
layers = {sources_name: sources_layer}
dependencies = {sources_name: set()}

So theoretically, if we're not optimizing then we can make layers = source_hlg.layers, right? And I guess dependencies would stay the same as above.

@crusaderky
Copy link
Collaborator

shallow copy, but yes. You'll also need to find the topmost layers using e.__dask_layers__() to build the dependencies later on in the method.

@ian-r-rose ian-r-rose self-assigned this Nov 24, 2021
@github-actions github-actions bot added the needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. label Dec 27, 2021
@ian-r-rose ian-r-rose removed their assignment Mar 9, 2022
@ian-r-rose ian-r-rose added array bug Something is broken and removed needs attention It's been a while since this was pushed on. Needs attention from the owner or a maintainer. labels Mar 9, 2022
@djhoese
Copy link
Contributor Author

djhoese commented Dec 8, 2022

@crusaderky I'm finally making time to fix this, but I'm still having trouble with some of the HLG graph interfaces. The main question I'm coming up with now is: is there any reason to optimize the graphs at all in da.store? I see that both the source and the target graphs are optimized (__dask_optimize__ being called), but if they are given to a Delayed object or computed immediately or computed by the caller, then they'll get optimized then, right?

@djhoese djhoese linked a pull request Dec 8, 2022 that will close this issue
3 tasks
@crusaderky
Copy link
Collaborator

@djhoese no I don't think the graph should be optimized.
If the user doesn't want it to, they can still pass the preference through kwargs when compute=True.

@djhoese
Copy link
Contributor Author

djhoese commented Dec 9, 2022

@crusaderky I made #9732 so let me know what you think. What I was trying to say is that there is no need to optimize the graph explicitly regardless of the compute=False/True keyword argument passed to da.store. Eventually the resulting graph is passed to da.compute which will do the optimization anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
array bug Something is broken
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants