Skip to content

Commit

Permalink
da.store to create well-formed HighLevelGraph (#8261)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 21, 2021
1 parent 9f0f002 commit ac01ddc
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 79 deletions.
194 changes: 118 additions & 76 deletions dask/array/core.py
@@ -1,3 +1,5 @@
from __future__ import annotations

import contextlib
import math
import operator
Expand All @@ -9,7 +11,7 @@
import uuid
import warnings
from bisect import bisect
from collections.abc import Iterable, Iterator, Mapping
from collections.abc import Collection, Hashable, Iterable, Iterator, Mapping
from functools import partial, reduce, wraps
from itertools import product, zip_longest
from numbers import Integral, Number
Expand Down Expand Up @@ -910,12 +912,12 @@ def broadcast_chunks(*chunkss):


def store(
sources,
sources: Array | Collection[Array],
targets,
lock=True,
regions=None,
compute=True,
return_stored=False,
lock: bool | Lock = True,
regions: tuple[slice, ...] | Collection[tuple[slice, ...]] | None = None,
compute: bool = True,
return_stored: bool = False,
**kwargs,
):
"""Store dask arrays in array-like objects, overwrite data in target
Expand All @@ -931,23 +933,35 @@ def store(
Parameters
----------
sources: Array or iterable of Arrays
targets: array-like or Delayed or iterable of array-likes and/or Delayeds
sources: Array or collection of Arrays
targets: array-like or Delayed or collection of array-likes and/or Delayeds
These should support setitem syntax ``target[10:20] = ...``
lock: boolean or threading.Lock, optional
Whether or not to lock the data stores while storing.
Pass True (lock each file individually), False (don't lock) or a
particular :class:`threading.Lock` object to be shared among all writes.
regions: tuple of slices or list of tuples of slices
regions: tuple of slices or collection of tuples of slices
Each ``region`` tuple in ``regions`` should be such that
``target[region].shape = source.shape``
for the corresponding source and target in sources and targets,
respectively. If this is a tuple, the contents will be assumed to be
slices, so do not provide a tuple of tuples.
compute: boolean, optional
If true compute immediately, return :class:`dask.delayed.Delayed` otherwise
If true compute immediately; return :class:`dask.delayed.Delayed` otherwise.
return_stored: boolean, optional
Optionally return the stored result (default False).
kwargs:
Parameters passed to compute/persist (only used if compute=True)
Returns
-------
If return_stored=True
tuple of Arrays
If return_stored=False and compute=True
None
If return_stored=False and compute=False
Delayed
Examples
--------
Expand Down Expand Up @@ -991,65 +1005,83 @@ def store(
)

# Optimize all sources together
sources_dsk = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources])
sources_dsk = Array.__dask_optimize__(
sources_dsk, list(core.flatten([e.__dask_keys__() for e in sources]))
sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources])
sources_layer = Array.__dask_optimize__(
sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources]))
)
sources2 = [Array(sources_dsk, e.name, e.chunks, meta=e) for e in sources]
sources_name = "store-sources-" + tokenize(sources)
layers = {sources_name: sources_layer}
dependencies = {sources_name: set()}

# Optimize all targets together
targets2 = []
targets_keys = []
targets_dsk = []
for e in targets:
if isinstance(e, Delayed):
targets2.append(e.key)
targets_keys.extend(e.__dask_keys__())
targets_dsk.append(e.__dask_graph__())
elif is_dask_collection(e):
targets_dsks = []
for t in targets:
if isinstance(t, Delayed):
targets_keys.append(t.key)
targets_dsks.append(t.__dask_graph__())
elif is_dask_collection(t):
raise TypeError("Targets must be either Delayed objects or array-likes")
else:
targets2.append(e)

targets_dsk = HighLevelGraph.merge(*targets_dsk)
targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys)
if targets_dsks:
targets_hlg = HighLevelGraph.merge(*targets_dsks)
targets_layer = Delayed.__dask_optimize__(targets_hlg, targets_keys)
targets_name = "store-targets-" + tokenize(targets_keys)
layers[targets_name] = targets_layer
dependencies[targets_name] = set()

load_stored = return_stored and not compute
toks = [str(uuid.uuid1()) for _ in range(len(sources))]
store_dsk = HighLevelGraph.merge(
*[
insert_to_ooc(s, t, lock, r, return_stored, load_stored, tok)
for s, t, r, tok in zip(sources2, targets2, regions, toks)
]
)
store_keys = list(store_dsk.keys())

store_dsk = HighLevelGraph.merge(store_dsk, targets_dsk, sources_dsk)
store_dsk = HighLevelGraph.from_collections(id(store_dsk), dict(store_dsk))
map_names = [
"store-map-" + tokenize(s, t if isinstance(t, Delayed) else id(t), r)
for s, t, r in zip(sources, targets, regions)
]
map_keys = []

for s, t, n, r in zip(sources, targets, map_names, regions):
map_layer = insert_to_ooc(
keys=s.__dask_keys__(),
chunks=s.chunks,
out=t.key if isinstance(t, Delayed) else t,
name=n,
lock=lock,
region=r,
return_stored=return_stored,
load_stored=load_stored,
)
layers[n] = map_layer
if isinstance(t, Delayed):
dependencies[n] = {sources_name, targets_name}
else:
dependencies[n] = {sources_name}
map_keys += map_layer.keys()

if return_stored:
store_dsk = HighLevelGraph(layers, dependencies)
load_store_dsk = store_dsk
if compute:
store_dlyds = [Delayed(k, store_dsk) for k in store_keys]
store_dlyds = [Delayed(k, store_dsk) for k in map_keys]
store_dlyds = persist(*store_dlyds, **kwargs)
store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds])
load_store_dsk = retrieve_from_ooc(map_keys, store_dsk, store_dsk_2)
map_names = ["load-" + n for n in map_names]

load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2)

result = tuple(
Array(load_store_dsk, "load-store-%s" % t, s.chunks, meta=s)
for s, t in zip(sources, toks)
return tuple(
Array(load_store_dsk, n, s.chunks, meta=s)
for s, n in zip(sources, map_names)
)

return result
elif compute:
store_dsk = HighLevelGraph(layers, dependencies)
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
return None

else:
if compute:
compute_as_if_collection(Array, store_dsk, store_keys, **kwargs)
return None
else:
name = "store-" + str(uuid.uuid1())
dsk = HighLevelGraph.merge({name: store_keys}, store_dsk)
return Delayed(name, dsk)
key = "store-" + tokenize(map_names)
layers[key] = {key: map_keys}
dependencies[key] = set(map_names)
store_dsk = HighLevelGraph(layers, dependencies)
return Delayed(key, store_dsk)


def blockdims_from_blockshape(shape, chunks):
Expand Down Expand Up @@ -2878,7 +2910,7 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None):
limit = parse_bytes(limit)

if dtype is None:
raise TypeError("DType must be known for auto-chunking")
raise TypeError("dtype must be known for auto-chunking")

if dtype.hasobject:
raise NotImplementedError(
Expand Down Expand Up @@ -4040,17 +4072,29 @@ def load_chunk(out, index, lock):


def insert_to_ooc(
arr, out, lock=True, region=None, return_stored=False, load_stored=False, tok=None
):
keys: list,
chunks: tuple[tuple[int, ...], ...],
out,
name: str,
*,
lock: Lock | bool = True,
region: slice | None = None,
return_stored: bool = False,
load_stored: bool = False,
) -> dict:
"""
Creates a Dask graph for storing chunks from ``arr`` in ``out``.
Parameters
----------
arr: da.Array
A dask array
keys: list
Dask keys of the input array
chunks: tuple
Dask chunks of the input array
out: array-like
Where to store results too.
Where to store results to
name: str
First element of dask keys
lock: Lock-like or bool, optional
Whether to lock or with what (default is ``True``,
which means a :class:`threading.Lock` instance).
Expand All @@ -4064,65 +4108,63 @@ def insert_to_ooc(
Whether to handling loading from ``out`` at the same time.
Ignored if ``return_stored`` is not ``True``.
(default is ``False``, meaning defer to ``return_stored``).
tok: str, optional
Token to use when naming keys
Returns
-------
dask graph of store operation
Examples
--------
>>> import dask.array as da
>>> d = da.ones((5, 6), chunks=(2, 3))
>>> a = np.empty(d.shape)
>>> insert_to_ooc(d, a) # doctest: +SKIP
>>> insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") # doctest: +SKIP
"""

if lock is True:
lock = Lock()

slices = slices_from_chunks(arr.chunks)
slices = slices_from_chunks(chunks)
if region:
slices = [fuse_slice(region, slc) for slc in slices]

name = "store-%s" % (tok or str(uuid.uuid1()))
func = store_chunk
args = ()
if return_stored and load_stored:
name = "load-%s" % name
func = load_store_chunk
args = args + (load_stored,)
args = (load_stored,)
else:
func = store_chunk
args = ()

dsk = {
(name,) + t[1:]: (func, t, out, slc, lock, return_stored) + args
for t, slc in zip(core.flatten(arr.__dask_keys__()), slices)
for t, slc in zip(core.flatten(keys), slices)
}

return dsk


def retrieve_from_ooc(keys, dsk_pre, dsk_post=None):
def retrieve_from_ooc(
keys: Collection[Hashable], dsk_pre: Mapping, dsk_post: Mapping
) -> dict:
"""
Creates a Dask graph for loading stored ``keys`` from ``dsk``.
Parameters
----------
keys: Sequence
keys: Collection
A sequence containing Dask graph keys to load
dsk_pre: Mapping
A Dask graph corresponding to a Dask Array before computation
dsk_post: Mapping, optional
dsk_post: Mapping
A Dask graph corresponding to a Dask Array after computation
Examples
--------
>>> import dask.array as da
>>> d = da.ones((5, 6), chunks=(2, 3))
>>> a = np.empty(d.shape)
>>> g = insert_to_ooc(d, a)
>>> retrieve_from_ooc(g.keys(), g) # doctest: +SKIP
>>> g = insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123")
>>> retrieve_from_ooc(g.keys(), g, {k: k for k in g.keys()}) # doctest: +SKIP
"""

if not dsk_post:
dsk_post = {k: k for k in keys}

load_dsk = {
("load-" + k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1]
for k in keys
Expand Down
17 changes: 17 additions & 0 deletions dask/array/tests/test_array_core.py
Expand Up @@ -1853,6 +1853,11 @@ def test_store_compute_false():

v = store([a, b], [at, bt], compute=False)
assert isinstance(v, Delayed)

# You need a well-formed HighLevelgraph for e.g. dask.graph_manipulation.bind
for layer in v.__dask_layers__():
assert layer in v.dask.layers

assert (at == 0).all() and (bt == 0).all()
assert all([ev is None for ev in v.compute()])
assert (at == 2).all() and (bt == 3).all()
Expand Down Expand Up @@ -2005,6 +2010,18 @@ def test_store_multiprocessing_lock():
assert st is None


@pytest.mark.parametrize("return_stored", [False, True])
@pytest.mark.parametrize("delayed_target", [False, True])
def test_store_deterministic_keys(return_stored, delayed_target):
a = da.ones((10, 10), chunks=(2, 2))
at = np.zeros(shape=(10, 10))
if delayed_target:
at = delayed(at)
st1 = a.store(at, return_stored=return_stored, compute=False)
st2 = a.store(at, return_stored=return_stored, compute=False)
assert st1.dask.keys() == st2.dask.keys()


def test_to_hdf5():
h5py = pytest.importorskip("h5py")
x = da.ones((4, 4), chunks=(2, 2))
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/io/csv.py
Expand Up @@ -162,7 +162,7 @@ def pandas_read_text(
kwargs : dict
A dictionary of keyword arguments to be passed to ``reader``
dtypes : dict
DTypes to assign to columns
dtypes to assign to columns
path : tuple
A tuple containing path column name, path to file, and an ordered list of paths.
Expand Down
3 changes: 1 addition & 2 deletions dask/tests/test_order.py
Expand Up @@ -868,9 +868,8 @@ def test_array_store_final_order(tmpdir):
o = order(d.dask)

# Find the lowest store. Dask starts here.
stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-")]
stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-map-")]
first_store = min(stores, key=lambda k: o[k])
first_store
connected_stores = [k for k in stores if k[-1] == first_store[-1]]
disconnected_stores = [k for k in stores if k[-1] != first_store[-1]]

Expand Down

0 comments on commit ac01ddc

Please sign in to comment.