Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up normalize_chunks #10648

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
243 changes: 160 additions & 83 deletions dask/array/core.py
Expand Up @@ -24,7 +24,7 @@
from numbers import Integral, Number
from operator import add, mul
from threading import Lock
from typing import Any, TypeVar, Union, cast
from typing import Any, Literal, TypeVar, Union, cast

import numpy as np
from numpy.typing import ArrayLike
Expand Down Expand Up @@ -86,6 +86,12 @@
from dask.widgets import get_template

T_IntOrNaN = Union[int, float] # Should be Union[int, Literal[np.nan]]
T_AutoOrBytes = Union[Literal["auto"], str]
T_ChunkVal = Union[T_IntOrNaN, T_AutoOrBytes]
T_ChunkVals = tuple[T_ChunkVal, ...]
T_ChunksVals = tuple[tuple[T_IntOrNaN, ...], ...]
T_ChunkValsDict = dict[int, T_ChunkVal]
T_ChunksNormalized = tuple[tuple[T_IntOrNaN, ...], ...]

DEFAULT_GET = named_schedulers.get("threads", named_schedulers["sync"])

Expand Down Expand Up @@ -2970,7 +2976,17 @@ def ensure_int(f):
return i


def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks=None):
def normalize_chunks(
chunks: T_ChunkVal
| T_ChunkVals
| T_ChunksVals
| T_ChunkValsDict
| T_ChunksNormalized,
shape: tuple[T_IntOrNaN, ...] | None = None,
limit: int | None = None,
dtype: np.typing.DTypeLike | None = None,
previous_chunks: T_ChunksNormalized | None = None,
) -> T_ChunksNormalized:
"""Normalize chunks to tuple of tuples

This takes in a variety of input types and information and produces a full
Expand All @@ -2994,16 +3010,21 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

Examples
--------
Fully explicit tuple-of-tuples

# >>> from dask.array.core import normalize_chunks
>>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6))
((2, 2, 1), (2, 2, 2))

Specify uniform chunk sizes

>>> from dask.array.core import normalize_chunks
>>> normalize_chunks((2, 2), shape=(5, 6))
((2, 2, 1), (2, 2, 2))

Also passes through fully explicit tuple-of-tuples
Cleans up missing outer tuple

>>> normalize_chunks(((2, 2, 1), (2, 2, 2)), shape=(5, 6))
((2, 2, 1), (2, 2, 2))
>>> normalize_chunks((3, 2), (5,))
((3, 2),)

Cleans up lists to tuples

Expand All @@ -3024,6 +3045,8 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

>>> normalize_chunks((5, -1), shape=(10, 10))
((5, 5), (10,))
>>> normalize_chunks((5, None), shape=(10, 10))
((5, 5), (10,))

Use the value "auto" to automatically determine chunk sizes along certain
dimensions. This uses the ``limit=`` and ``dtype=`` keywords to
Expand All @@ -3033,6 +3056,8 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

>>> normalize_chunks(("auto",), shape=(20,), limit=5, dtype='uint8')
((5, 5, 5, 5),)
>>> normalize_chunks("auto", (2, 3), dtype=np.int32)
((2,), (3,))

You can also use byte sizes (see :func:`dask.utils.parse_bytes`) in place of
"auto" to ask for a particular size
Expand All @@ -3042,101 +3067,153 @@ def normalize_chunks(chunks, shape=None, limit=None, dtype=None, previous_chunks

Respects null dimensions

>>> normalize_chunks(())
()
>>> normalize_chunks((), ())
()
>>> normalize_chunks((), shape=(0, 0))
((0,), (0,))

Handles NaNs

>>> normalize_chunks((1, (np.nan,)), (1, np.nan))
((1,), (nan,))
"""
if dtype and not isinstance(dtype, np.dtype):
dtype = np.dtype(dtype)
if chunks is None:
raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
if isinstance(chunks, list):
chunks = tuple(chunks)
if isinstance(chunks, (Number, str)):
chunks = (chunks,) * len(shape)
if isinstance(chunks, dict):
chunks = tuple(chunks.get(i, None) for i in range(len(shape)))
if isinstance(chunks, np.ndarray):
chunks = chunks.tolist()
if not chunks and shape and all(s == 0 for s in shape):
chunks = ((0,),) * len(shape)
if shape is not None:
shape_: tuple[T_IntOrNaN, ...] = shape
shape_len: int = len(shape)
shape_is_not_none: bool = True
else:
shape_ = ()
shape_len = -1
shape_is_not_none = False

# Normalize chunks' outer tuple:
chunks_tuple: T_ChunkVals | T_ChunksVals
if isinstance(chunks, tuple):
chunks_tuple = chunks
elif isinstance(chunks, (int, float, str)):
chunks_tuple = (chunks,) * shape_len
elif isinstance(chunks, dict):
chunks_tuple = tuple(chunks.get(i, -1) for i in range(shape_len))
elif isinstance(chunks, np.ndarray):
chunks_tuple = tuple(chunks.tolist())
else:
if isinstance(chunks, list):
chunks_tuple = tuple(chunks)
elif isinstance(chunks, np.ndarray):
chunks_tuple = tuple(chunks.tolist())
elif chunks is None:
raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
else:
raise ValueError("Type of chunk is not supported.")
Illviljan marked this conversation as resolved.
Show resolved Hide resolved

if (
shape
and len(shape) == 1
and len(chunks) > 1
and all(isinstance(c, (Number, str)) for c in chunks)
):
chunks = (chunks,)
if shape_is_not_none:
# chunks=(3,2), shape=(6,)
if (
shape_len == 1
and len(chunks_tuple) > 1
and all(isinstance(c, (int, float, str)) for c in chunks_tuple)
):
chunks_tuple = (chunks_tuple,)

if shape and len(chunks) != len(shape):
raise ValueError(
"Chunks and shape must be of the same length/dimension. "
"Got chunks=%s, shape=%s" % (chunks, shape)
)
if -1 in chunks or None in chunks:
chunks = tuple(s if c == -1 or c is None else c for c, s in zip(chunks, shape))

# If specifying chunk size in bytes, use that value to set the limit.
# Verify there is only one consistent value of limit or chunk-bytes used.
for c in chunks:
if isinstance(c, str) and c != "auto":
parsed = parse_bytes(c)
if limit is None:
limit = parsed
elif parsed != limit:
# Null dimensions:
if len(chunks_tuple) == 0 and all(s == 0 for s in shape_):
chunks_tuple = ((0,),) * shape_len

# Check chunks and shape have the same length:
if len(chunks_tuple) != shape_len:
raise ValueError(
"Chunks and shape must be of the same length/dimension. "
f"Got chunks={chunks_tuple}, shape={shape_}"
)

chunks_list = list(chunks_tuple)
chunks_list_fin: list[tuple[T_IntOrNaN | Literal["auto"], ...]] = []
any_auto = False
for i, c in enumerate(chunks_list):
if isinstance(c, (tuple, list)):
if len(c) == 0:
raise ValueError(
"Only one consistent value of limit or chunk is allowed."
"Used %s != %s" % (parsed, limit)
"Empty tuples are not allowed in chunks. Express "
"zero length dimensions with 0(s) in chunks"
)
# Substitute byte limits with 'auto' now that limit is set.
chunks = tuple("auto" if isinstance(c, str) and c != "auto" else c for c in chunks)
if shape_is_not_none and (
sum(c) != shape_[i]
and not (math.isnan(shape_[i]) or any(math.isnan(x) for x in c))
):
raise ValueError(
"Chunks do not add up to shape. "
f"Got chunks={tuple(chunks_list)}, shape={shape_}"
)
# chunks_list[i] = tuple(int(x) if not math.isnan(x) else np.nan for x in c)
chunks_list_fin.append(
tuple(int(x) if not math.isnan(x) else np.nan for x in c)
)

if any(c == "auto" for c in chunks):
chunks = auto_chunks(chunks, shape, limit, dtype, previous_chunks)
elif c == -1 or c is None:
if shape_is_not_none:
# s = shape_[i]
# sb = blockdims_from_blockshape((s,), (s,))
# chunks_list[i] = sb[0]
# chunks_list[i] = (shape_[i],)
chunks_list_fin.append((shape_[i],))
else:
raise ValueError(
"Using -1 or None in chunks without shape is not allowed."
)

if shape is not None:
chunks = tuple(c if c not in {None, -1} else s for c, s in zip(chunks, shape))
elif isinstance(c, (int, float)):
if shape_is_not_none:
sb = blockdims_from_blockshape((shape_[i],), (c,))
# chunks_list[i] = sb[0]
chunks_list_fin.append(sb[0])
else:
# chunks_list[i] = int(c) if not math.isnan(c) else np.nan
chunks_list_fin.append((int(c) if not math.isnan(c) else np.nan,))

elif isinstance(c, str):
if c == "auto":
chunks_list_fin.append("auto")
any_auto = True
else:
# If specifying chunk size in bytes, use that value to set the limit.
# Verify there is only one consistent value of limit or chunk-bytes used.
parsed = parse_bytes(c)
if limit is None:
limit = parsed
elif parsed != limit:
raise ValueError(
"Only one consistent value of limit or chunk is allowed."
f"Used {parsed} != {limit}"
)
# Substitute byte limits with 'auto' now that limit is set.
# chunks_list[i] = "auto"
chunks_list_fin.append("auto")
any_auto = True
else:
raise ValueError(f"Chunk element is not supported. Got {chunks}")

if any_auto:
if dtype is not None:
dtype = np.dtype(dtype)

allints = None
if chunks and shape is not None:
# allints: did we start with chunks as a simple tuple of ints?
allints = all(isinstance(c, int) for c in chunks)
chunks = sum(
# chunks_auto = auto_chunks(chunks_list, shape_, limit, dtype, previous_chunks)
chunks_auto = auto_chunks(
chunks_list_fin, shape_, limit, dtype, previous_chunks
)
return sum(
(
blockdims_from_blockshape((s,), (c,))
if not isinstance(c, (tuple, list))
else (c,)
for s, c in zip(shape, chunks)
for s, c in zip(shape_, chunks_auto)
),
(),
)
for c in chunks:
if not c:
raise ValueError(
"Empty tuples are not allowed in chunks. Express "
"zero length dimensions with 0(s) in chunks"
)

if shape is not None:
if len(chunks) != len(shape):
raise ValueError(
"Input array has %d dimensions but the supplied "
"chunks has only %d dimensions" % (len(shape), len(chunks))
)
if not all(
c == s or (math.isnan(c) or math.isnan(s))
for c, s in zip(map(sum, chunks), shape)
):
raise ValueError(
"Chunks do not add up to shape. "
"Got chunks=%s, shape=%s" % (chunks, shape)
)
if allints or isinstance(sum(sum(_) for _ in chunks), int):
# Fastpath for when we already know chunks contains only integers
return tuple(tuple(ch) for ch in chunks)
return tuple(
tuple(int(x) if not math.isnan(x) else np.nan for x in c) for c in chunks
)
# return tuple(chunks_list)
return tuple(chunks_list_fin)


def _compute_multiplier(limit: int, dtype, largest_block: int, result):
Expand Down