Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up normalize_chunks #10648

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
242 changes: 158 additions & 84 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from numbers import Integral, Number
from operator import add, mul
from threading import Lock
from typing import Any, TypeVar, Union, cast
from typing import Any, Literal, TypeVar, Union, cast

import numpy as np
from numpy.typing import ArrayLike
Expand Down Expand Up @@ -86,6 +86,11 @@
from dask.widgets import get_template

T_IntOrNaN = Union[int, float] # Should be Union[int, Literal[np.nan]]
T_AutoOrBytes = Union[Literal["auto"], str]
T_ChunkVal = Union[T_IntOrNaN, T_AutoOrBytes]
T_ChunkVals = tuple[T_ChunkVal, ...]
T_ChunkValsDict = dict[int, T_ChunkVal]
T_ChunksNormalized = tuple[tuple[T_IntOrNaN, ...], ...]

DEFAULT_GET = named_schedulers.get("threads", named_schedulers["sync"])

Expand Down Expand Up @@ -2970,7 +2975,13 @@ def ensure_int(f):
return i


def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None):
def normalize_chunks(
chunks: T_ChunkVal | T_ChunkVals | T_ChunkValsDict | T_ChunksNormalized,
shape: tuple[T_IntOrNaN, ...] | None = None,
limit: int | None = None,
dtype: np.typing.DTypeLike | None = None,
previous_chunks: T_ChunksNormalized | None = None,
) -> T_ChunksNormalized:
"""Normalize chunks to tuple of tuples

This takes in a variety of input types and information and produces a full
Expand All @@ -2994,17 +3005,22 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

Examples
--------
Specify uniform chunk sizes
Fully explicit tuple-of-tuples

>>> from dask.array.core import normalize_chunks
>>> normalize_chunks((2, 2), shape=(5, 6))
>>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6))
((2, 2, 1), (2, 2, 2))

Also passes through fully explicit tuple-of-tuples
Specify uniform chunk sizes

>>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6))
>>> normalize_chunks((2, 2), shape=(5, 6))
((2, 2, 1), (2, 2, 2))

Cleans up missing outer tuple

>>> normalize_chunks((3, 2), (5,))
((3, 2),)

Cleans up lists to tuples

>>> normalize_chunks([[2, 2], [3, 3]])
Expand All @@ -3024,6 +3040,8 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

>>> normalize_chunks((5, -1), shape=(10, 10))
((5, 5), (10,))
>>> normalize_chunks((5, None), shape=(10, 10))
((5, 5), (10,))

Use the value "auto" to automatically determine chunk sizes along certain
dimensions. This uses the ``limit=`` and ``dtype=`` keywords to
Expand All @@ -3033,6 +3051,8 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

>>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8')
((5, 5, 5, 5),)
>>> normalize_chunks("auto", (2, 3), dtype=np.int32)
((2,), (3,))

You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of
"auto" to ask for a particular size
Expand All @@ -3042,101 +3062,155 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

Respects null dimensions

>>> normalize_chunks(())
()
>>> normalize_chunks((), ())
()
>>> normalize_chunks((1,), ())
()
>>> normalize_chunks((), shape=(0, 0))
((0,), (0,))

Handles NaNs

>>> normalize_chunks((1, (np.nan,)), (1, np.nan))
((1,), (nan,))
"""
if dtype and not isinstance(dtype, np.dtype):
dtype = np.dtype(dtype)
if chunks is None:
raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
if isinstance(chunks, list):
chunks = tuple(chunks)
if isinstance(chunks, (Number, str)):
chunks = (chunks,) * len(shape)
if isinstance(chunks, dict):
chunks = tuple(chunks.get(i, None) for i in range(len(shape)))
if isinstance(chunks, np.ndarray):
chunks = chunks.tolist()
if not chunks and shape and all(s == 0 for s in shape):
chunks = ((0,),) * len(shape)
if shape is not None:
shape_: tuple[T_IntOrNaN, ...] = shape
shape_len: int = len(shape)
else:
shape_ = ()
shape_len = -1

# Normalize chunks' outer tuple:
chunks_tuple: T_ChunkVals | T_ChunksNormalized
if isinstance(chunks, tuple):
chunks_tuple = chunks
elif isinstance(chunks, (int, float, str)):
chunks_tuple = (chunks,) * shape_len
elif isinstance(chunks, dict):
chunks_tuple = tuple(chunks.get(i, -1) for i in range(shape_len))
else:
if isinstance(chunks, list): # type: ignore[unreachable]
# TODO: Not sure this should be encouraged typing wise, it's an easy fix.
chunks_tuple = tuple(chunks)
elif isinstance(chunks, np.ndarray):
# TODO: Not sure this should be encouraged typing wise, it's an easy fix.
chunks_tuple = tuple(chunks.tolist())
elif chunks is None:
raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
else:
raise ValueError(
f"{chunks=} is of type {type(chunks)} which is not supported."
)

if (
shape
and len(shape) == 1
and len(chunks) > 1
and all(isinstance(c, (Number, str)) for c in chunks)
):
chunks = (chunks,)
if shape_len > 0:
# chunks=(3,2), shape=(6,)
if (
shape_len == 1
and len(chunks_tuple) > 1
and all(isinstance(c, (int, float, str)) for c in chunks_tuple)
):
# TODO: Missing outer tuple. How to narrow down the typing?
chunks_tuple = (chunks_tuple,) # type: ignore[assignment]

if shape and len(chunks) != len(shape):
raise ValueError(
"Chunks and shape must be of the same length/dimension. "
"Got chunks=%s, shape=%s" % (chunks, shape)
)
if -1 in chunks or None in chunks:
chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape))

# If specifying chunk size in bytes, use that value to set the limit.
# Verify there is only one consistent value of limit or chunk-bytes used.
for c in chunks:
if isinstance(c, str) and c != "auto":
parsed = parse_bytes(c)
if limit is None:
limit = parsed
elif parsed != limit:
# Null dimensions:
if len(chunks_tuple) == 0 and all(s == 0 for s in shape_):
chunks_tuple = ((0,),) * shape_len

# Check chunks and shape have the same length:
if len(chunks_tuple) != shape_len:
raise ValueError(
"Chunks and shape must be of the same length/dimension. "
f"Got chunks={chunks_tuple}, shape={shape_}"
)
shape_or_falses: tuple[T_IntOrNaN, ...] = shape_
elif shape_len == 0:
shape_or_falses = ()
else:
# shape is None. Just broadcast a tuple with dummy values to the same
# size as chunks:
shape_or_falses = (False,) * len(chunks_tuple)

chunks_list_fin: list[tuple[T_IntOrNaN, ...] | Literal["auto"]] = []
any_auto = False
for c, s in zip(chunks_tuple, shape_or_falses):
if isinstance(c, (tuple, list)):
if len(c) == 0:
raise ValueError(
"Only one consistent value of limit or chunk is allowed."
"Used %s != %s" % (parsed, limit)
"Empty tuples are not allowed in chunks. Express "
"zero length dimensions with 0(s) in chunks"
)
# Substitute byte limits with 'auto' now that limit is set.
chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks)
if s is not False and (
sum(c) != s and not (math.isnan(s) or any(math.isnan(x) for x in c))
):
raise ValueError(
"Chunks do not add up to shape. "
f"Got chunks={chunks_tuple}, shape={shape_}"
)
chunks_list_fin.append(
tuple(int(x) if not math.isnan(x) else np.nan for x in c)
)

if any(c == "auto" for c in chunks):
chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks)
elif c == -1 or c is None:
if s is not False:
chunks_list_fin.append((s,))
else:
raise ValueError(
"Using -1 or None in chunks without shape is not allowed."
)

if shape is not None:
chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape))
elif isinstance(c, (int, float)):
if s is not False:
sb = blockdims_from_blockshape((s,), (c,))
chunks_list_fin.append(sb[0])
else:
chunks_list_fin.append((int(c) if not math.isnan(c) else np.nan,))

elif isinstance(c, str):
if c == "auto":
chunks_list_fin.append("auto")
any_auto = True
else:
# If specifying chunk size in bytes, use that value to set the limit.
# Verify there is only one consistent value of limit or chunk-bytes used.
parsed = parse_bytes(c)
if limit is None:
limit = parsed
elif parsed != limit:
raise ValueError(
"Only one consistent value of limit or chunk is allowed."
f"Used {parsed} != {limit}"
)
# Substitute byte limits with 'auto' now that limit is set.
chunks_list_fin.append("auto")
any_auto = True
else:
raise ValueError(
"Chunk element is not supported. "
f"Got {c} of type {type(c)} from chunks={chunks_tuple}"
)

if any_auto:
if dtype is not None:
dtype = np.dtype(dtype)

allints = None
if chunks and shape is not None:
# allints: did we start with chunks as a simple tuple of ints?
allints = all(isinstance(c, int) for c in chunks)
chunks = sum(
chunks_auto = auto_chunks(
chunks_list_fin, shape_, limit, dtype, previous_chunks
)
return sum(
(
blockdims_from_blockshape((s,), (c,))
if not isinstance(c, (tuple, list))
else (c,)
for s, c in zip(shape, chunks)
for s, c in zip(shape_, chunks_auto)
),
(),
)
for c in chunks:
if not c:
raise ValueError(
"Empty tuples are not allowed in chunks. Express "
"zero length dimensions with 0(s) in chunks"
)

if shape is not None:
if len(chunks) != len(shape):
raise ValueError(
"Input array has %d dimensions but the supplied "
"chunks has only %d dimensions" % (len(shape), len(chunks))
)
if not all(
c == s or (math.isnan(c) or math.isnan(s))
for c, s in zip(map(sum, chunks), shape)
):
raise ValueError(
"Chunks do not add up to shape. "
"Got chunks=%s, shape=%s" % (chunks, shape)
)
if allints or isinstance(sum(sum(_) for _ in chunks), int):
# Fastpath for when we already know chunks contains only integers
return tuple(tuple(ch) for ch in chunks)
return tuple(
tuple(int(x) if not math.isnan(x) else np.nan for x in c) for c in chunks
)
else:
# TODO: the chunks are ints or nans only. How to narrow this down?
return tuple(chunks_list_fin) # type: ignore[arg-type]


def _compute_multiplier(limit: int, dtype, largest_block: int, result):
Expand Down
4 changes: 2 additions & 2 deletions dask/array/tests/test_array_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4727,7 +4727,7 @@ def test_zarr_regions():
a.to_zarr(d, region=(slice(2), slice(2)))


def test_tiledb_roundtrip():
def test_tiledb_roundtrip() -> None:
tiledb = pytest.importorskip("tiledb")
# 1) load with default chunking
# 2) load from existing tiledb.DenseArray
Expand Down Expand Up @@ -4761,7 +4761,7 @@ def test_tiledb_roundtrip():
assert a.chunks == tdb.chunks


def test_tiledb_multiattr():
def test_tiledb_multiattr() -> None:
tiledb = pytest.importorskip("tiledb")
dom = tiledb.Domain(
tiledb.Dim("x", (0, 1000), tile=100), tiledb.Dim("y", (0, 1000), tile=100)
Expand Down
4 changes: 2 additions & 2 deletions dask/array/tiledb_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from dask.array import core


def _tiledb_to_chunks(tiledb_array):
def _tiledb_to_chunks(tiledb_array) -> tuple[int, ...]:
schema = tiledb_array.schema
return list(schema.domain.dim(i).tile for i in range(schema.ndim))
return tuple(schema.domain.dim(i).tile.item() for i in range(schema.ndim))


def from_tiledb(uri, attribute=None, chunks=None, storage_options=None, **kwargs):
Expand Down