From 602443f70e6f67f27ec89e955401d2ddf0da8855 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 14 Oct 2021 11:08:36 +0100 Subject: [PATCH 1/3] da.store to create well-formed graphs --- dask/array/core.py | 154 +++++++++++++++++----------- dask/array/tests/test_array_core.py | 11 +- dask/dataframe/io/csv.py | 2 +- 3 files changed, 103 insertions(+), 64 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 1368387c77a..0a4d1993d39 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import contextlib import math import operator @@ -9,7 +11,7 @@ import uuid import warnings from bisect import bisect -from collections.abc import Iterable, Iterator, Mapping +from collections.abc import Collection, Hashable, Iterable, Iterator, Mapping from functools import partial, reduce, wraps from itertools import product, zip_longest from numbers import Integral, Number @@ -910,12 +912,12 @@ def broadcast_chunks(*chunkss): def store( - sources, + sources: Array | Collection[Array], targets, - lock=True, - regions=None, - compute=True, - return_stored=False, + lock: bool | Lock = True, + regions: tuple[slice, ...] | Collection[tuple[slice, ...]] | None = None, + compute: bool = True, + return_stored: bool = False, **kwargs, ): """Store dask arrays in array-like objects, overwrite data in target @@ -931,23 +933,35 @@ def store( Parameters ---------- - sources: Array or iterable of Arrays - targets: array-like or Delayed or iterable of array-likes and/or Delayeds + sources: Array or collection of Arrays + targets: array-like or Delayed or collection of array-likes and/or Delayeds These should support setitem syntax ``target[10:20] = ...`` lock: boolean or threading.Lock, optional Whether or not to lock the data stores while storing. Pass True (lock each file individually), False (don't lock) or a particular :class:`threading.Lock` object to be shared among all writes. - regions: tuple of slices or list of tuples of slices + regions: tuple of slices or collection of tuples of slices Each ``region`` tuple in ``regions`` should be such that ``target[region].shape = source.shape`` for the corresponding source and target in sources and targets, respectively. If this is a tuple, the contents will be assumed to be slices, so do not provide a tuple of tuples. compute: boolean, optional - If true compute immediately, return :class:`dask.delayed.Delayed` otherwise + If true compute immediately; return :class:`dask.delayed.Delayed` otherwise. return_stored: boolean, optional Optionally return the stored result (default False). + kwargs: + Parameters passed to compute/persist (only used if compute=True) + + Returns + ------- + + If return_stored=True + tuple[Array] + If return_stored=False and compute=True + None + If return_stored=False and compute=False + Delayed Examples -------- @@ -964,7 +978,6 @@ def store( >>> store([x, y, z], [dset1, dset2, dset3]) # doctest: +SKIP """ - if isinstance(sources, Array): sources = [sources] targets = [targets] @@ -995,7 +1008,6 @@ def store( sources_dsk = Array.__dask_optimize__( sources_dsk, list(core.flatten([e.__dask_keys__() for e in sources])) ) - sources2 = [Array(sources_dsk, e.name, e.chunks, meta=e) for e in sources] # Optimize all targets together targets2 = [] @@ -1015,17 +1027,26 @@ def store( targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys) load_stored = return_stored and not compute - toks = [str(uuid.uuid1()) for _ in range(len(sources))] - store_dsk = HighLevelGraph.merge( - *[ - insert_to_ooc(s, t, lock, r, return_stored, load_stored, tok) - for s, t, r, tok in zip(sources2, targets2, regions, toks) - ] - ) - store_keys = list(store_dsk.keys()) + names = ["store-" + tokenize(s, r) for s, r in zip(sources, regions)] + + store_dsk = {} + for s, t, n, r in zip(sources, targets2, names, regions): + store_dsk.update( + insert_to_ooc( + keys=s.__dask_keys__(), + chunks=s.chunks, + out=t, + name=n, + lock=lock, + region=r, + return_stored=return_stored, + load_stored=load_stored, + ) + ) - store_dsk = HighLevelGraph.merge(store_dsk, targets_dsk, sources_dsk) - store_dsk = HighLevelGraph.from_collections(id(store_dsk), dict(store_dsk)) + store_keys = list(store_dsk.keys()) + store_dsk.update(targets_dsk) + store_dsk.update(sources_dsk) if return_stored: load_store_dsk = store_dsk @@ -1033,23 +1054,26 @@ def store( store_dlyds = [Delayed(k, store_dsk) for k in store_keys] store_dlyds = persist(*store_dlyds, **kwargs) store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds]) - load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2) + names = ["load-" + n for n in names] - result = tuple( - Array(load_store_dsk, "load-store-%s" % t, s.chunks, meta=s) - for s, t in zip(sources, toks) + return tuple( + Array(load_store_dsk, n, s.chunks, meta=s) for s, n in zip(sources, names) ) - return result + elif compute: + compute_as_if_collection(Array, store_dsk, store_keys, **kwargs) + return None + else: - if compute: - compute_as_if_collection(Array, store_dsk, store_keys, **kwargs) - return None - else: - name = "store-" + str(uuid.uuid1()) - dsk = HighLevelGraph.merge({name: store_keys}, store_dsk) - return Delayed(name, dsk) + # Collecting the individual chunks is not trivial, as it could lead to poor + # memory management on a distributed cluster if we were to do it in a single + # step. Perform recursive aggregation to work around the problem. + from ..graph_manipulation import checkpoint + + return checkpoint( + Array(store_dsk, n, s.chunks, s.dtype) for n, s in zip(names, sources) + ) def blockdims_from_blockshape(shape, chunks): @@ -2878,7 +2902,7 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): limit = parse_bytes(limit) if dtype is None: - raise TypeError("DType must be known for auto-chunking") + raise TypeError("dtype must be known for auto-chunking") if dtype.hasobject: raise NotImplementedError( @@ -4038,17 +4062,29 @@ def load_chunk(out, index, lock): def insert_to_ooc( - arr, out, lock=True, region=None, return_stored=False, load_stored=False, tok=None -): + keys: list, + chunks: tuple[tuple[int, ...], ...], + out, + name: str, + *, + lock: Lock | bool = True, + region: slice | None = None, + return_stored: bool = False, + load_stored: bool = False, +) -> dict: """ Creates a Dask graph for storing chunks from ``arr`` in ``out``. Parameters ---------- - arr: da.Array - A dask array + keys: list + Dask keys of the input array + chunks: tuple + Dask chunks of the input array out: array-like - Where to store results too. + Where to store results to + name: str + First element of dask keys lock: Lock-like or bool, optional Whether to lock or with what (default is ``True``, which means a :class:`threading.Lock` instance). @@ -4062,51 +4098,53 @@ def insert_to_ooc( Whether to handling loading from ``out`` at the same time. Ignored if ``return_stored`` is not ``True``. (default is ``False``, meaning defer to ``return_stored``). - tok: str, optional - Token to use when naming keys + + Returns + ------- + dask graph of store operation Examples -------- >>> import dask.array as da >>> d = da.ones((5, 6), chunks=(2, 3)) >>> a = np.empty(d.shape) - >>> insert_to_ooc(d, a) # doctest: +SKIP + >>> insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") # doctest: +SKIP """ if lock is True: lock = Lock() - slices = slices_from_chunks(arr.chunks) + slices = slices_from_chunks(chunks) if region: slices = [fuse_slice(region, slc) for slc in slices] - name = "store-%s" % (tok or str(uuid.uuid1())) - func = store_chunk - args = () if return_stored and load_stored: - name = "load-%s" % name func = load_store_chunk - args = args + (load_stored,) + args = (load_stored,) + else: + func = store_chunk + args = () dsk = { (name,) + t[1:]: (func, t, out, slc, lock, return_stored) + args - for t, slc in zip(core.flatten(arr.__dask_keys__()), slices) + for t, slc in zip(core.flatten(keys), slices) } - return dsk -def retrieve_from_ooc(keys, dsk_pre, dsk_post=None): +def retrieve_from_ooc( + keys: Collection[Hashable], dsk_pre: Mapping, dsk_post: Mapping +) -> dict: """ Creates a Dask graph for loading stored ``keys`` from ``dsk``. Parameters ---------- - keys: Sequence + keys: Collection A sequence containing Dask graph keys to load dsk_pre: Mapping A Dask graph corresponding to a Dask Array before computation - dsk_post: Mapping, optional + dsk_post: Mapping A Dask graph corresponding to a Dask Array after computation Examples @@ -4114,13 +4152,9 @@ def retrieve_from_ooc(keys, dsk_pre, dsk_post=None): >>> import dask.array as da >>> d = da.ones((5, 6), chunks=(2, 3)) >>> a = np.empty(d.shape) - >>> g = insert_to_ooc(d, a) - >>> retrieve_from_ooc(g.keys(), g) # doctest: +SKIP + >>> g = insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") + >>> retrieve_from_ooc(g.keys(), g, {k: k for k in g.keys()}) # doctest: +SKIP """ - - if not dsk_post: - dsk_post = {k: k for k in keys} - load_dsk = { ("load-" + k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1] for k in keys diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index db7d121bb71..3cc0bc0502c 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -1764,7 +1764,7 @@ def test_store_regions(): v = store([a, b], [at, bt], regions=region, compute=False) assert isinstance(v, Delayed) assert (at == 0).all() and (bt[region] == 0).all() - assert all([ev is None for ev in v.compute()]) + assert v.compute() is None assert (at[region] == 2).all() and (bt[region] == 3).all() assert not (bt == 3).all() and not (bt == 0).all() assert not (at == 2).all() and not (at == 0).all() @@ -1775,7 +1775,7 @@ def test_store_regions(): v = store([a, b], [at, bt], regions=[region, region], compute=False) assert isinstance(v, Delayed) assert (at == 0).all() and (bt[region] == 0).all() - assert all([ev is None for ev in v.compute()]) + assert v.compute() is None assert (at[region] == 2).all() and (bt[region] == 3).all() assert not (bt == 3).all() and not (bt == 0).all() assert not (at == 2).all() and not (at == 0).all() @@ -1852,8 +1852,13 @@ def test_store_compute_false(): v = store([a, b], [at, bt], compute=False) assert isinstance(v, Delayed) + + # You need a well-formed HighLevelgraph for e.g. dask.graph_manipulation.bind + for layer in v.__dask_layers__(): + assert layer in v.dask.layers + assert (at == 0).all() and (bt == 0).all() - assert all([ev is None for ev in v.compute()]) + assert v.compute() is None assert (at == 2).all() and (bt == 3).all() at = np.zeros(shape=(4, 4)) diff --git a/dask/dataframe/io/csv.py b/dask/dataframe/io/csv.py index 2fd09f74cc1..0592d00e7fb 100644 --- a/dask/dataframe/io/csv.py +++ b/dask/dataframe/io/csv.py @@ -162,7 +162,7 @@ def pandas_read_text( kwargs : dict A dictionary of keyword arguments to be passed to ``reader`` dtypes : dict - DTypes to assign to columns + dtypes to assign to columns path : tuple A tuple containing path column name, path to file, and an ordered list of paths. From 40294797ebf88952ef1b7925014fa2c9cb9f4079 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 14 Oct 2021 11:41:24 +0100 Subject: [PATCH 2/3] fix order regression --- dask/array/core.py | 14 +++++--------- dask/array/tests/test_array_core.py | 6 +++--- dask/tests/test_order.py | 3 +-- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 0a4d1993d39..04db475b821 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -978,6 +978,7 @@ def store( >>> store([x, y, z], [dset1, dset2, dset3]) # doctest: +SKIP """ + if isinstance(sources, Array): sources = [sources] targets = [targets] @@ -1027,7 +1028,7 @@ def store( targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys) load_stored = return_stored and not compute - names = ["store-" + tokenize(s, r) for s, r in zip(sources, regions)] + names = ["store-map-" + tokenize(s, r) for s, r in zip(sources, regions)] store_dsk = {} for s, t, n, r in zip(sources, targets2, names, regions): @@ -1066,14 +1067,9 @@ def store( return None else: - # Collecting the individual chunks is not trivial, as it could lead to poor - # memory management on a distributed cluster if we were to do it in a single - # step. Perform recursive aggregation to work around the problem. - from ..graph_manipulation import checkpoint - - return checkpoint( - Array(store_dsk, n, s.chunks, s.dtype) for n, s in zip(names, sources) - ) + name = "store-" + tokenize(names) + store_dsk[name] = store_keys + return Delayed(name, HighLevelGraph({name: store_dsk}, {name: set()})) def blockdims_from_blockshape(shape, chunks): diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index 3cc0bc0502c..97393c00664 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -1764,7 +1764,7 @@ def test_store_regions(): v = store([a, b], [at, bt], regions=region, compute=False) assert isinstance(v, Delayed) assert (at == 0).all() and (bt[region] == 0).all() - assert v.compute() is None + assert all([ev is None for ev in v.compute()]) assert (at[region] == 2).all() and (bt[region] == 3).all() assert not (bt == 3).all() and not (bt == 0).all() assert not (at == 2).all() and not (at == 0).all() @@ -1775,7 +1775,7 @@ def test_store_regions(): v = store([a, b], [at, bt], regions=[region, region], compute=False) assert isinstance(v, Delayed) assert (at == 0).all() and (bt[region] == 0).all() - assert v.compute() is None + assert all([ev is None for ev in v.compute()]) assert (at[region] == 2).all() and (bt[region] == 3).all() assert not (bt == 3).all() and not (bt == 0).all() assert not (at == 2).all() and not (at == 0).all() @@ -1858,7 +1858,7 @@ def test_store_compute_false(): assert layer in v.dask.layers assert (at == 0).all() and (bt == 0).all() - assert v.compute() is None + assert all([ev is None for ev in v.compute()]) assert (at == 2).all() and (bt == 3).all() at = np.zeros(shape=(4, 4)) diff --git a/dask/tests/test_order.py b/dask/tests/test_order.py index 9e07719de5e..d9208915ca9 100644 --- a/dask/tests/test_order.py +++ b/dask/tests/test_order.py @@ -868,9 +868,8 @@ def test_array_store_final_order(tmpdir): o = order(d.dask) # Find the lowest store. Dask starts here. - stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-")] + stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-map-")] first_store = min(stores, key=lambda k: o[k]) - first_store connected_stores = [k for k in stores if k[-1] == first_store[-1]] disconnected_stores = [k for k in stores if k[-1] != first_store[-1]] From 87e3880ff215ba358a4587a195991c32e3128fde Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 14 Oct 2021 12:27:32 +0100 Subject: [PATCH 3/3] use hlg --- dask/array/core.py | 98 ++++++++++++++++------------- dask/array/tests/test_array_core.py | 12 ++++ 2 files changed, 67 insertions(+), 43 deletions(-) diff --git a/dask/array/core.py b/dask/array/core.py index 04db475b821..267b87e924a 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -957,7 +957,7 @@ def store( ------- If return_stored=True - tuple[Array] + tuple of Arrays If return_stored=False and compute=True None If return_stored=False and compute=False @@ -1005,71 +1005,83 @@ def store( ) # Optimize all sources together - sources_dsk = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources]) - sources_dsk = Array.__dask_optimize__( - sources_dsk, list(core.flatten([e.__dask_keys__() for e in sources])) + sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources]) + sources_layer = Array.__dask_optimize__( + sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources])) ) + sources_name = "store-sources-" + tokenize(sources) + layers = {sources_name: sources_layer} + dependencies = {sources_name: set()} # Optimize all targets together - targets2 = [] targets_keys = [] - targets_dsk = [] - for e in targets: - if isinstance(e, Delayed): - targets2.append(e.key) - targets_keys.extend(e.__dask_keys__()) - targets_dsk.append(e.__dask_graph__()) - elif is_dask_collection(e): + targets_dsks = [] + for t in targets: + if isinstance(t, Delayed): + targets_keys.append(t.key) + targets_dsks.append(t.__dask_graph__()) + elif is_dask_collection(t): raise TypeError("Targets must be either Delayed objects or array-likes") - else: - targets2.append(e) - targets_dsk = HighLevelGraph.merge(*targets_dsk) - targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys) + if targets_dsks: + targets_hlg = HighLevelGraph.merge(*targets_dsks) + targets_layer = Delayed.__dask_optimize__(targets_hlg, targets_keys) + targets_name = "store-targets-" + tokenize(targets_keys) + layers[targets_name] = targets_layer + dependencies[targets_name] = set() load_stored = return_stored and not compute - names = ["store-map-" + tokenize(s, r) for s, r in zip(sources, regions)] - - store_dsk = {} - for s, t, n, r in zip(sources, targets2, names, regions): - store_dsk.update( - insert_to_ooc( - keys=s.__dask_keys__(), - chunks=s.chunks, - out=t, - name=n, - lock=lock, - region=r, - return_stored=return_stored, - load_stored=load_stored, - ) - ) - store_keys = list(store_dsk.keys()) - store_dsk.update(targets_dsk) - store_dsk.update(sources_dsk) + map_names = [ + "store-map-" + tokenize(s, t if isinstance(t, Delayed) else id(t), r) + for s, t, r in zip(sources, targets, regions) + ] + map_keys = [] + + for s, t, n, r in zip(sources, targets, map_names, regions): + map_layer = insert_to_ooc( + keys=s.__dask_keys__(), + chunks=s.chunks, + out=t.key if isinstance(t, Delayed) else t, + name=n, + lock=lock, + region=r, + return_stored=return_stored, + load_stored=load_stored, + ) + layers[n] = map_layer + if isinstance(t, Delayed): + dependencies[n] = {sources_name, targets_name} + else: + dependencies[n] = {sources_name} + map_keys += map_layer.keys() if return_stored: + store_dsk = HighLevelGraph(layers, dependencies) load_store_dsk = store_dsk if compute: - store_dlyds = [Delayed(k, store_dsk) for k in store_keys] + store_dlyds = [Delayed(k, store_dsk) for k in map_keys] store_dlyds = persist(*store_dlyds, **kwargs) store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds]) - load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2) - names = ["load-" + n for n in names] + load_store_dsk = retrieve_from_ooc(map_keys, store_dsk, store_dsk_2) + map_names = ["load-" + n for n in map_names] return tuple( - Array(load_store_dsk, n, s.chunks, meta=s) for s, n in zip(sources, names) + Array(load_store_dsk, n, s.chunks, meta=s) + for s, n in zip(sources, map_names) ) elif compute: - compute_as_if_collection(Array, store_dsk, store_keys, **kwargs) + store_dsk = HighLevelGraph(layers, dependencies) + compute_as_if_collection(Array, store_dsk, map_keys, **kwargs) return None else: - name = "store-" + tokenize(names) - store_dsk[name] = store_keys - return Delayed(name, HighLevelGraph({name: store_dsk}, {name: set()})) + key = "store-" + tokenize(map_names) + layers[key] = {key: map_keys} + dependencies[key] = set(map_names) + store_dsk = HighLevelGraph(layers, dependencies) + return Delayed(key, store_dsk) def blockdims_from_blockshape(shape, chunks): diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index 97393c00664..e473caa37af 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -2009,6 +2009,18 @@ def test_store_multiprocessing_lock(): assert st is None +@pytest.mark.parametrize("return_stored", [False, True]) +@pytest.mark.parametrize("delayed_target", [False, True]) +def test_store_deterministic_keys(return_stored, delayed_target): + a = da.ones((10, 10), chunks=(2, 2)) + at = np.zeros(shape=(10, 10)) + if delayed_target: + at = delayed(at) + st1 = a.store(at, return_stored=return_stored, compute=False) + st2 = a.store(at, return_stored=return_stored, compute=False) + assert st1.dask.keys() == st2.dask.keys() + + def test_to_hdf5(): h5py = pytest.importorskip("h5py") x = da.ones((4, 4), chunks=(2, 2))