diff --git a/dask/array/core.py b/dask/array/core.py index 1368387c77a..267b87e924a 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import contextlib import math import operator @@ -9,7 +11,7 @@ import uuid import warnings from bisect import bisect -from collections.abc import Iterable, Iterator, Mapping +from collections.abc import Collection, Hashable, Iterable, Iterator, Mapping from functools import partial, reduce, wraps from itertools import product, zip_longest from numbers import Integral, Number @@ -910,12 +912,12 @@ def broadcast_chunks(*chunkss): def store( - sources, + sources: Array | Collection[Array], targets, - lock=True, - regions=None, - compute=True, - return_stored=False, + lock: bool | Lock = True, + regions: tuple[slice, ...] | Collection[tuple[slice, ...]] | None = None, + compute: bool = True, + return_stored: bool = False, **kwargs, ): """Store dask arrays in array-like objects, overwrite data in target @@ -931,23 +933,35 @@ def store( Parameters ---------- - sources: Array or iterable of Arrays - targets: array-like or Delayed or iterable of array-likes and/or Delayeds + sources: Array or collection of Arrays + targets: array-like or Delayed or collection of array-likes and/or Delayeds These should support setitem syntax ``target[10:20] = ...`` lock: boolean or threading.Lock, optional Whether or not to lock the data stores while storing. Pass True (lock each file individually), False (don't lock) or a particular :class:`threading.Lock` object to be shared among all writes. - regions: tuple of slices or list of tuples of slices + regions: tuple of slices or collection of tuples of slices Each ``region`` tuple in ``regions`` should be such that ``target[region].shape = source.shape`` for the corresponding source and target in sources and targets, respectively. If this is a tuple, the contents will be assumed to be slices, so do not provide a tuple of tuples. compute: boolean, optional - If true compute immediately, return :class:`dask.delayed.Delayed` otherwise + If true compute immediately; return :class:`dask.delayed.Delayed` otherwise. return_stored: boolean, optional Optionally return the stored result (default False). + kwargs: + Parameters passed to compute/persist (only used if compute=True) + + Returns + ------- + + If return_stored=True + tuple of Arrays + If return_stored=False and compute=True + None + If return_stored=False and compute=False + Delayed Examples -------- @@ -991,65 +1005,83 @@ def store( ) # Optimize all sources together - sources_dsk = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources]) - sources_dsk = Array.__dask_optimize__( - sources_dsk, list(core.flatten([e.__dask_keys__() for e in sources])) + sources_hlg = HighLevelGraph.merge(*[e.__dask_graph__() for e in sources]) + sources_layer = Array.__dask_optimize__( + sources_hlg, list(core.flatten([e.__dask_keys__() for e in sources])) ) - sources2 = [Array(sources_dsk, e.name, e.chunks, meta=e) for e in sources] + sources_name = "store-sources-" + tokenize(sources) + layers = {sources_name: sources_layer} + dependencies = {sources_name: set()} # Optimize all targets together - targets2 = [] targets_keys = [] - targets_dsk = [] - for e in targets: - if isinstance(e, Delayed): - targets2.append(e.key) - targets_keys.extend(e.__dask_keys__()) - targets_dsk.append(e.__dask_graph__()) - elif is_dask_collection(e): + targets_dsks = [] + for t in targets: + if isinstance(t, Delayed): + targets_keys.append(t.key) + targets_dsks.append(t.__dask_graph__()) + elif is_dask_collection(t): raise TypeError("Targets must be either Delayed objects or array-likes") - else: - targets2.append(e) - targets_dsk = HighLevelGraph.merge(*targets_dsk) - targets_dsk = Delayed.__dask_optimize__(targets_dsk, targets_keys) + if targets_dsks: + targets_hlg = HighLevelGraph.merge(*targets_dsks) + targets_layer = Delayed.__dask_optimize__(targets_hlg, targets_keys) + targets_name = "store-targets-" + tokenize(targets_keys) + layers[targets_name] = targets_layer + dependencies[targets_name] = set() load_stored = return_stored and not compute - toks = [str(uuid.uuid1()) for _ in range(len(sources))] - store_dsk = HighLevelGraph.merge( - *[ - insert_to_ooc(s, t, lock, r, return_stored, load_stored, tok) - for s, t, r, tok in zip(sources2, targets2, regions, toks) - ] - ) - store_keys = list(store_dsk.keys()) - store_dsk = HighLevelGraph.merge(store_dsk, targets_dsk, sources_dsk) - store_dsk = HighLevelGraph.from_collections(id(store_dsk), dict(store_dsk)) + map_names = [ + "store-map-" + tokenize(s, t if isinstance(t, Delayed) else id(t), r) + for s, t, r in zip(sources, targets, regions) + ] + map_keys = [] + + for s, t, n, r in zip(sources, targets, map_names, regions): + map_layer = insert_to_ooc( + keys=s.__dask_keys__(), + chunks=s.chunks, + out=t.key if isinstance(t, Delayed) else t, + name=n, + lock=lock, + region=r, + return_stored=return_stored, + load_stored=load_stored, + ) + layers[n] = map_layer + if isinstance(t, Delayed): + dependencies[n] = {sources_name, targets_name} + else: + dependencies[n] = {sources_name} + map_keys += map_layer.keys() if return_stored: + store_dsk = HighLevelGraph(layers, dependencies) load_store_dsk = store_dsk if compute: - store_dlyds = [Delayed(k, store_dsk) for k in store_keys] + store_dlyds = [Delayed(k, store_dsk) for k in map_keys] store_dlyds = persist(*store_dlyds, **kwargs) store_dsk_2 = HighLevelGraph.merge(*[e.dask for e in store_dlyds]) + load_store_dsk = retrieve_from_ooc(map_keys, store_dsk, store_dsk_2) + map_names = ["load-" + n for n in map_names] - load_store_dsk = retrieve_from_ooc(store_keys, store_dsk, store_dsk_2) - - result = tuple( - Array(load_store_dsk, "load-store-%s" % t, s.chunks, meta=s) - for s, t in zip(sources, toks) + return tuple( + Array(load_store_dsk, n, s.chunks, meta=s) + for s, n in zip(sources, map_names) ) - return result + elif compute: + store_dsk = HighLevelGraph(layers, dependencies) + compute_as_if_collection(Array, store_dsk, map_keys, **kwargs) + return None + else: - if compute: - compute_as_if_collection(Array, store_dsk, store_keys, **kwargs) - return None - else: - name = "store-" + str(uuid.uuid1()) - dsk = HighLevelGraph.merge({name: store_keys}, store_dsk) - return Delayed(name, dsk) + key = "store-" + tokenize(map_names) + layers[key] = {key: map_keys} + dependencies[key] = set(map_names) + store_dsk = HighLevelGraph(layers, dependencies) + return Delayed(key, store_dsk) def blockdims_from_blockshape(shape, chunks): @@ -2878,7 +2910,7 @@ def auto_chunks(chunks, shape, limit, dtype, previous_chunks=None): limit = parse_bytes(limit) if dtype is None: - raise TypeError("DType must be known for auto-chunking") + raise TypeError("dtype must be known for auto-chunking") if dtype.hasobject: raise NotImplementedError( @@ -4038,17 +4070,29 @@ def load_chunk(out, index, lock): def insert_to_ooc( - arr, out, lock=True, region=None, return_stored=False, load_stored=False, tok=None -): + keys: list, + chunks: tuple[tuple[int, ...], ...], + out, + name: str, + *, + lock: Lock | bool = True, + region: slice | None = None, + return_stored: bool = False, + load_stored: bool = False, +) -> dict: """ Creates a Dask graph for storing chunks from ``arr`` in ``out``. Parameters ---------- - arr: da.Array - A dask array + keys: list + Dask keys of the input array + chunks: tuple + Dask chunks of the input array out: array-like - Where to store results too. + Where to store results to + name: str + First element of dask keys lock: Lock-like or bool, optional Whether to lock or with what (default is ``True``, which means a :class:`threading.Lock` instance). @@ -4062,51 +4106,53 @@ def insert_to_ooc( Whether to handling loading from ``out`` at the same time. Ignored if ``return_stored`` is not ``True``. (default is ``False``, meaning defer to ``return_stored``). - tok: str, optional - Token to use when naming keys + + Returns + ------- + dask graph of store operation Examples -------- >>> import dask.array as da >>> d = da.ones((5, 6), chunks=(2, 3)) >>> a = np.empty(d.shape) - >>> insert_to_ooc(d, a) # doctest: +SKIP + >>> insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") # doctest: +SKIP """ if lock is True: lock = Lock() - slices = slices_from_chunks(arr.chunks) + slices = slices_from_chunks(chunks) if region: slices = [fuse_slice(region, slc) for slc in slices] - name = "store-%s" % (tok or str(uuid.uuid1())) - func = store_chunk - args = () if return_stored and load_stored: - name = "load-%s" % name func = load_store_chunk - args = args + (load_stored,) + args = (load_stored,) + else: + func = store_chunk + args = () dsk = { (name,) + t[1:]: (func, t, out, slc, lock, return_stored) + args - for t, slc in zip(core.flatten(arr.__dask_keys__()), slices) + for t, slc in zip(core.flatten(keys), slices) } - return dsk -def retrieve_from_ooc(keys, dsk_pre, dsk_post=None): +def retrieve_from_ooc( + keys: Collection[Hashable], dsk_pre: Mapping, dsk_post: Mapping +) -> dict: """ Creates a Dask graph for loading stored ``keys`` from ``dsk``. Parameters ---------- - keys: Sequence + keys: Collection A sequence containing Dask graph keys to load dsk_pre: Mapping A Dask graph corresponding to a Dask Array before computation - dsk_post: Mapping, optional + dsk_post: Mapping A Dask graph corresponding to a Dask Array after computation Examples @@ -4114,13 +4160,9 @@ def retrieve_from_ooc(keys, dsk_pre, dsk_post=None): >>> import dask.array as da >>> d = da.ones((5, 6), chunks=(2, 3)) >>> a = np.empty(d.shape) - >>> g = insert_to_ooc(d, a) - >>> retrieve_from_ooc(g.keys(), g) # doctest: +SKIP + >>> g = insert_to_ooc(d.__dask_keys__(), d.chunks, a, "store-123") + >>> retrieve_from_ooc(g.keys(), g, {k: k for k in g.keys()}) # doctest: +SKIP """ - - if not dsk_post: - dsk_post = {k: k for k in keys} - load_dsk = { ("load-" + k[0],) + k[1:]: (load_chunk, dsk_post[k]) + dsk_pre[k][3:-1] for k in keys diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index db7d121bb71..e473caa37af 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -1852,6 +1852,11 @@ def test_store_compute_false(): v = store([a, b], [at, bt], compute=False) assert isinstance(v, Delayed) + + # You need a well-formed HighLevelgraph for e.g. dask.graph_manipulation.bind + for layer in v.__dask_layers__(): + assert layer in v.dask.layers + assert (at == 0).all() and (bt == 0).all() assert all([ev is None for ev in v.compute()]) assert (at == 2).all() and (bt == 3).all() @@ -2004,6 +2009,18 @@ def test_store_multiprocessing_lock(): assert st is None +@pytest.mark.parametrize("return_stored", [False, True]) +@pytest.mark.parametrize("delayed_target", [False, True]) +def test_store_deterministic_keys(return_stored, delayed_target): + a = da.ones((10, 10), chunks=(2, 2)) + at = np.zeros(shape=(10, 10)) + if delayed_target: + at = delayed(at) + st1 = a.store(at, return_stored=return_stored, compute=False) + st2 = a.store(at, return_stored=return_stored, compute=False) + assert st1.dask.keys() == st2.dask.keys() + + def test_to_hdf5(): h5py = pytest.importorskip("h5py") x = da.ones((4, 4), chunks=(2, 2)) diff --git a/dask/dataframe/io/csv.py b/dask/dataframe/io/csv.py index 2fd09f74cc1..0592d00e7fb 100644 --- a/dask/dataframe/io/csv.py +++ b/dask/dataframe/io/csv.py @@ -162,7 +162,7 @@ def pandas_read_text( kwargs : dict A dictionary of keyword arguments to be passed to ``reader`` dtypes : dict - DTypes to assign to columns + dtypes to assign to columns path : tuple A tuple containing path column name, path to file, and an ordered list of paths. diff --git a/dask/tests/test_order.py b/dask/tests/test_order.py index 9e07719de5e..d9208915ca9 100644 --- a/dask/tests/test_order.py +++ b/dask/tests/test_order.py @@ -868,9 +868,8 @@ def test_array_store_final_order(tmpdir): o = order(d.dask) # Find the lowest store. Dask starts here. - stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-")] + stores = [k for k in o if isinstance(k, tuple) and k[0].startswith("store-map-")] first_store = min(stores, key=lambda k: o[k]) - first_store connected_stores = [k for k in stores if k[-1] == first_store[-1]] disconnected_stores = [k for k in stores if k[-1] != first_store[-1]]