diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index d2a3b0e..ab9122f 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -18,7 +18,7 @@ jobs: strategy: matrix: os: ["ubuntu-latest", "macos-latest", "windows-latest"] - python-version: ["3.7", "3.9"] + python-version: ["3.7", "3.9", "3.10"] defaults: run: shell: bash -l {0} diff --git a/docs/intro.md b/docs/intro.md index 4fd6d80..f131d20 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -30,6 +30,14 @@ the root directory run pip install -e . ``` +Alternatively, you can install sharrow plus all the dependencies (including +additional optional dependencies for development and testing) in a conda environment, +using the `envs/development.yml` environment to create a `sh-dev` environment: + +```shell +conda env create -f envs/development.yml +``` + ## Testing Sharrow includes unit tests both in the `sharrow/tests` directory and embedded diff --git a/docs/walkthrough/sparse.ipynb b/docs/walkthrough/sparse.ipynb index 719f511..73565bf 100644 --- a/docs/walkthrough/sparse.ipynb +++ b/docs/walkthrough/sparse.ipynb @@ -508,6 +508,35 @@ "# TEST\n", "assert skims.redirection.blenders == {'DISTWALK': {'max_blend_distance': 1.0, 'blend_distance_name': None}}" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "96a7c554", + "metadata": {}, + "outputs": [], + "source": [ + "# TEST\n", + "# reverse skims in sparse\n", + "flow3 = tree.setup_flow({\n", + " 'plain_distance': 'DISTWALK',\n", + " 'reverse_distance': 'skims.reverse(\"DISTWALK\")',\n", + "})\n", + "\n", + "assert flow3.load() == approx(np.array([[ 0.0111, 0.0111],\n", + " [ 0.184 , 0.12 ],\n", + " [ 0.12 , 0.12 ],\n", + " [ 0.17 , 0.17 ],\n", + " [ 0.17 , 0.17 ]], dtype=np.float32))\n", + "\n", + "z = skims.iat(\n", + " omaz=[ 0, 1, 3, 101, 102],\n", + " dmaz=[ 0, 0, 0, 100, 100],\n", + " _names=['DIST', 'DISTWALK'], _load=True,\n", + ")\n", + "assert z['DISTWALK'].data == approx(np.array([ 0.0111, 0.12 , 0.12 , 0.17 , 0.17 ]))\n", + "assert z['DIST'].data == approx(np.array([ 0.12, 0.12 , 0.12 , 0.17 , 0.17 ]))" + ] } ], "metadata": { diff --git a/envs/development.yml b/envs/development.yml new file mode 100644 index 0000000..6f58961 --- /dev/null +++ b/envs/development.yml @@ -0,0 +1,31 @@ +name: sh-dev +channels: + - conda-forge + - nodefaults +dependencies: + - python=3.9 + - pip + # required for testing + - dask + - filelock + - flake8 + - jupyter + - nbmake + - networkx + - notebook + - numba>=0.53 + - numexpr + - numpy>=1.19 + - openmatrix + - pandas>=1.2 + - pyarrow + - pytest + - pytest-cov + - pytest-regressions + - pytest-xdist + - sparse + - xarray + - zarr + + - pip: + - -e .. diff --git a/pyproject.toml b/pyproject.toml index 56c1948..c4bee96 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,8 @@ [build-system] requires = [ - "setuptools>=42", + "setuptools>=42,<64", "wheel", - "setuptools_scm[toml]>=3.4", - "setuptools_scm_git_archive", + "setuptools_scm[toml]>=7.0", ] build-backend = "setuptools.build_meta" diff --git a/sharrow/__init__.py b/sharrow/__init__.py index 7fdf500..040cd54 100644 --- a/sharrow/__init__.py +++ b/sharrow/__init__.py @@ -1,6 +1,6 @@ from xarray import DataArray -from . import example_data, selectors, shared_memory, sparse +from . import dataset, example_data, selectors, shared_memory, sparse from ._version import version as __version__ from .dataset import Dataset from .digital_encoding import array_decode, array_encode diff --git a/sharrow/aster.py b/sharrow/aster.py index ed6c55a..e9e5637 100755 --- a/sharrow/aster.py +++ b/sharrow/aster.py @@ -392,8 +392,13 @@ def _replacement( if self.spacevars is not None: if attr not in self.spacevars: - raise KeyError(f"{topname}..{attr}") - # return original_node + if topname == pref_topname: + raise KeyError(f"{topname}..{attr}") + # we originally raised a KeyError here regardless, but what if we just + # give back the original node, and see if other spaces, + # possibly fallback spaces, might work? If nothing works then + # it will still eventually error out when compiling? + return original_node dim_slots = self.dim_slots if isinstance(self.spacevars, dict): @@ -549,8 +554,8 @@ def _maybe_transpose_first_two_args(_slice): ast.Name( id=f"__{pref_topname}___s_{attr}__data", ctx=ast.Load() ), - result_arg_[0], - result_arg_[1], + result_arg_[0 if not transpose_lead else 1], + result_arg_[1 if not transpose_lead else 0], ast_Constant(blender.get("max_blend_distance")), # blend_limit ], keywords=[], @@ -694,12 +699,14 @@ def visit_Call(self, node): result = None # implement ActivitySim's "reverse" skims - if isinstance(node.func, ast.Attribute) and node.func.attr == "reverse": - if isinstance(node.func.value, ast.Name): - if node.func.value.id == self.spacename: + if ( + isinstance(node.func, ast.Attribute) and node.func.attr == "reverse" + ): # *.reverse(...) + if isinstance(node.func.value, ast.Name): # somename.reverse(...) + if node.func.value.id == self.spacename: # spacename.reverse(...) if len(node.args) == 1 and isinstance( node.args[0], ast_Constant_Type - ): + ): # spacename.reverse('constant') result = self._replacement( ast_String_value(node.args[0]), node.func.ctx, @@ -708,7 +715,17 @@ def visit_Call(self, node): ) # handle clip as a method if isinstance(node.func, ast.Attribute) and node.func.attr == "clip": - if len(node.args) == 1 and len(node.keywords) == 0: + if isinstance(node.func.value, ast.Name) and node.func.value.id == "np": + # call to np.clip(...), change to local clip implementation + clip_args = [] + for a in node.args: + clip_args.append(self.visit(a)) + result = ast.Call( + func=ast.Name("clip", cts=ast.Load()), + args=clip_args, + keywords=[self.visit(i) for i in node.keywords], + ) + elif len(node.args) == 1 and len(node.keywords) == 0: # single positional arg becomes max result = ast.Call( func=ast.Name("max", cts=ast.Load()), diff --git a/sharrow/dataset.py b/sharrow/dataset.py index 7102f60..6c6c6fc 100755 --- a/sharrow/dataset.py +++ b/sharrow/dataset.py @@ -82,7 +82,7 @@ def construct(source): source : pandas.DataFrame, pyarrow.Table, xarray.Dataset, or Sequence[str] The source from which to create a Dataset. DataFrames and Tables are converted to Datasets that have one dimension (the rows) and - seperate variables for each of the columns. A list of strings + separate variables for each of the columns. A list of strings creates a dataset with those named empty variables. Returns @@ -90,8 +90,7 @@ def construct(source): Dataset """ if isinstance(source, pd.DataFrame): - source = xr.Dataset.from_dataframe(source) - # source = cls.from_dataframe_fast(source) # older xarray was slow + source = dataset_from_dataframe_fast(source) # xarray default can be slow elif isinstance(source, (Table, pa.Table)): source = xr.Dataset.from_table(source) elif isinstance(source, (pa.Table)): @@ -105,6 +104,63 @@ def construct(source): return source +def dataset_from_dataframe_fast( + dataframe: pd.DataFrame, sparse: bool = False +) -> "Dataset": + """Convert a pandas.DataFrame into an xarray.Dataset + + Each column will be converted into an independent variable in the + Dataset. If the dataframe's index is a MultiIndex, it will be expanded + into a tensor product of one-dimensional indices (filling in missing + values with NaN). This method will produce a Dataset very similar to + that on which the 'to_dataframe' method was called, except with + possibly redundant dimensions (since all dataset variables will have + the same dimensionality) + + Parameters + ---------- + dataframe : DataFrame + DataFrame from which to copy data and indices. + sparse : bool, default: False + If true, create a sparse arrays instead of dense numpy arrays. This + can potentially save a large amount of memory if the DataFrame has + a MultiIndex. Requires the sparse package (sparse.pydata.org). + + Returns + ------- + New Dataset. + + See Also + -------- + xarray.DataArray.from_series + pandas.DataFrame.to_xarray + """ + + # this is much faster than the default xarray version when not + # using a MultiIndex. + + if isinstance(dataframe.index, pd.MultiIndex) or sparse: + return Dataset.from_dataframe(dataframe, sparse) + + if not dataframe.columns.is_unique: + raise ValueError("cannot convert DataFrame with non-unique columns") + + if isinstance(dataframe.index, pd.CategoricalIndex): + idx = dataframe.index.remove_unused_categories() + else: + idx = dataframe.index + + index_name = idx.name if idx.name is not None else "index" + # Cast to a NumPy array first, in case the Series is a pandas Extension + # array (which doesn't have a valid NumPy dtype) + arrays = { + name: ([index_name], np.asarray(dataframe[name].values)) + for name in dataframe.columns + if name != index_name + } + return Dataset(arrays, coords={index_name: (index_name, dataframe.index.values)}) + + def from_table( tbl, index_name="index", @@ -527,8 +583,22 @@ def from_zarr_with_attr(*args, **kwargs): and avalue.endswith("} ") ): avalue = ast.literal_eval(avalue[1:-1]) + if isinstance(avalue, str) and avalue == " < None > ": + avalue = None attrs[aname] = avalue obj[k] = obj[k].assign_attrs(attrs) + attrs = {} + for aname, avalue in obj.attrs.items(): + if ( + isinstance(avalue, str) + and avalue.startswith(" {") + and avalue.endswith("} ") + ): + avalue = ast.literal_eval(avalue[1:-1]) + if isinstance(avalue, str) and avalue == " < None > ": + avalue = None + attrs[aname] = avalue + obj = obj.assign_attrs(attrs) return obj @@ -759,8 +829,18 @@ def to_zarr_with_attr(self, *args, **kwargs): for aname, avalue in self[k].attrs.items(): if isinstance(avalue, dict): avalue = f" {avalue!r} " + if avalue is None: + avalue = " < None > " attrs[aname] = avalue obj[k] = self[k].assign_attrs(attrs) + attrs = {} + for aname, avalue in self.attrs.items(): + if isinstance(avalue, dict): + avalue = f" {avalue!r} " + if avalue is None: + avalue = " < None > " + attrs[aname] = avalue + obj = obj.assign_attrs(attrs) return obj.to_zarr(*args, **kwargs) diff --git a/sharrow/flows.py b/sharrow/flows.py index d3c41a3..da5d184 100644 --- a/sharrow/flows.py +++ b/sharrow/flows.py @@ -835,48 +835,62 @@ def init_sub_funcs( # write individual function files for each expression for n, (k, expr) in enumerate(defs.items()): expr = str(expr).lstrip() - init_expr = expr - for spacename, spacearrays in self.tree.subspaces.items(): - dim_slots, digital_encodings, blenders = meta_data[spacename] - try: - expr = expression_for_numba( - expr, - spacename, - dim_slots, - dim_slots, - digital_encodings=digital_encodings, - extra_vars=self.tree.extra_vars, - blenders=blenders, - ) - except KeyError as key_err: - if ".." in key_err.args[0]: - topkey, attrkey = key_err.args[0].split("..") - else: - raise - # check if we can resolve this name on any other subspace - other_way = False - for other_spacename in self.tree.subspace_fallbacks.get(topkey, []): - dim_slots, digital_encodings, blenders = meta_data[ - other_spacename - ] - try: - expr = expression_for_numba( - expr, - spacename, - dim_slots, - dim_slots, - digital_encodings=digital_encodings, - prefer_name=other_spacename, - extra_vars=self.tree.extra_vars, - blenders=blenders, - ) - except KeyError: - pass + prior_expr = init_expr = expr + other_way = True + while other_way: + other_way = False + # if other_way is triggered, there may be residual other terms + # that were not addressed, so this loop should be applied again. + for spacename, spacearrays in self.tree.subspaces.items(): + dim_slots, digital_encodings, blenders = meta_data[spacename] + try: + expr = expression_for_numba( + expr, + spacename, + dim_slots, + dim_slots, + digital_encodings=digital_encodings, + extra_vars=self.tree.extra_vars, + blenders=blenders, + ) + except KeyError as key_err: + if ".." in key_err.args[0]: + topkey, attrkey = key_err.args[0].split("..") else: - other_way = True - break - if not other_way: - raise + raise + # check if we can resolve this name on any other subspace + for other_spacename in self.tree.subspace_fallbacks.get( + topkey, [] + ): + dim_slots, digital_encodings, blenders = meta_data[ + other_spacename + ] + try: + expr = expression_for_numba( + expr, + spacename, + dim_slots, + dim_slots, + digital_encodings=digital_encodings, + prefer_name=other_spacename, + extra_vars=self.tree.extra_vars, + blenders=blenders, + ) + except KeyError as err: # noqa: F841 + pass + else: + other_way = True + # at least one variable was found in a fallback + break + if not other_way: + raise + if prior_expr == expr: + # nothing was changed, break out of loop + break + else: + # something was changed, run the loop again to confirm + # nothing else needs to change + prior_expr = expr # now find instances where an identifier is previously created in this flow. expr = expression_for_numba( @@ -1340,7 +1354,10 @@ def iload_raw( runner_ = self._idotter else: runner_ = runner - named_args = inspect.getfullargspec(runner_.py_func).args + try: + named_args = inspect.getfullargspec(runner_.py_func).args + except AttributeError: + named_args = inspect.getfullargspec(runner_).args arguments = [] for arg in named_args: if arg in { diff --git a/sharrow/relationships.py b/sharrow/relationships.py index e93033a..74dc4b4 100644 --- a/sharrow/relationships.py +++ b/sharrow/relationships.py @@ -802,9 +802,10 @@ def drop_dims(self, dims, inplace=False, ignore_missing_dims=True): booted = set() for (up, dn, n), e in obj._graph.edges.items(): if up == obj.root_node_name: - if e.get("analog", "") in dims: + _analog = e.get("analog", "") + if _analog in dims: boot_queue.add(dn) - if e.get("analog", "") not in new_root_dataset: + if _analog != "" and _analog not in new_root_dataset: boot_queue.add(dn) if e.get("parent_name", "") in dims: boot_queue.add(dn) diff --git a/sharrow/shared_memory.py b/sharrow/shared_memory.py index 50bdc69..9f92f7d 100644 --- a/sharrow/shared_memory.py +++ b/sharrow/shared_memory.py @@ -297,7 +297,7 @@ def emit(k, a, is_coord): "dtype": a.dtype, "shape": a.shape, "coord": is_coord, - "nbytes": a.nbytes, + "nbytes": a.data.nbytes, "position": position, "data.nbytes": a.data.data.nbytes, "indices.nbytes": a.data.indices.nbytes, @@ -307,6 +307,7 @@ def emit(k, a, is_coord): "indptr.dtype": a.data.indptr.dtype, } ) + a_nbytes = a.data.nbytes else: wrappers.append( { @@ -320,9 +321,11 @@ def emit(k, a, is_coord): "position": position, } ) - sizes.append(a.nbytes) + a_nbytes = a.nbytes + + sizes.append(a_nbytes) names.append(k) - position += a.nbytes + position += a_nbytes for k, a in self._obj.coords.items(): emit(k, a, True) diff --git a/sharrow/sparse.py b/sharrow/sparse.py index 676ac34..444e526 100644 --- a/sharrow/sparse.py +++ b/sharrow/sparse.py @@ -10,22 +10,28 @@ def _get_idx(indices, indptr, data, i, j): pool_lb = indptr[i] pool_ub = indptr[i + 1] - 1 + if pool_ub < pool_lb: + # This indicates there are no values at all for row i + return np.nan idx_lo = indices[pool_lb] idx_hi = indices[pool_ub] # check top and bottom if j == idx_lo: - # printd(f"{pool_lb=} {pool_ub=} {indices[pool_lb]=} {indices[pool_ub]=} !!!lo {i=} {j=}") + # The lower bound on possible j values is the j value, return it return data[pool_lb] elif j == idx_hi: - # printd(f"{pool_lb=} {pool_ub=} {indices[pool_lb]=} {indices[pool_ub]=} !!!hi {i=} {j=}") + # The upper bound on possible j values is the j value, return it return data[pool_ub] # check if out of original range elif j < idx_lo or j > idx_hi: - # printd(f"{pool_lb=} {pool_ub=} {indices[pool_lb]=} {indices[pool_ub]=} :( {i=} {j=}") + # the j value is outside the possible range, there is no value to return return np.nan + # The j value is somewhere inside the bounds, so conduct an efficient search to + # see if we can find it. span = pool_ub - pool_lb - 1 - # printd(f"{pool_lb=} {pool_ub=} {indices[pool_lb]=} {indices[pool_ub]=} {span=} {i=} {j=}") + # assume the j values are uniformly distributed between bounds, guess at the + # approximate location of the target j peek = (j - idx_lo) / (idx_hi - idx_lo) while span > 3: candidate = int(peek * span) + pool_lb