Skip to content

Commit

Permalink
Hosvd (#67)
Browse files Browse the repository at this point in the history
* HOSVD: Preliminary outline of core functionality

* HOSVD: Fix numeric bug
* Was slicing incorrectly
* Update test to check convergence

* HOSVD: Finish output and test coverage

* TENSOR: Prune numbers real
* Real and mypy don't play nice python/mypy#3186
* This allows partial typing support of HOSVD
  • Loading branch information
ntjohnson1 committed Mar 16, 2023
1 parent ec36a27 commit 171aeb0
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 11 deletions.
1 change: 1 addition & 0 deletions pyttb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pyttb.cp_als import cp_als
from pyttb.cp_apr import *
from pyttb.export_data import export_data
from pyttb.hosvd import hosvd
from pyttb.import_data import import_data
from pyttb.khatrirao import khatrirao
from pyttb.ktensor import ktensor
Expand Down
138 changes: 138 additions & 0 deletions pyttb/hosvd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Higher Order SVD Implementation"""
import warnings
from typing import List, Optional

import numpy as np
import scipy

import pyttb as ttb


def hosvd(
tensor,
tol: float,
verbosity: float = 1,
dimorder: Optional[List[int]] = None,
sequential: bool = True,
ranks: Optional[List[int]] = None,
):
"""Compute sequentially-truncated higher-order SVD (Tucker).
Computes a Tucker decomposition with relative error
specified by tol, i.e., it computes a ttensor T such that
||X-T||/||X|| <= tol.
Parameters
----------
tensor: Tensor to factor
tol: Relative error to stop at
verbosity: Print level
dimorder: Order to loop through dimensions
sequential: Use sequentially-truncated version
ranks: Specify ranks to consider rather than computing
Example
-------
>>> data = np.array([[29, 39.], [63., 85.]])
>>> tol = 1e-4
>>> disable_printing = -1
>>> tensorInstance = ttb.tensor().from_data(data)
>>> result = hosvd(tensorInstance, tol, verbosity=disable_printing)
>>> ((result.full() - tensorInstance).norm() / tensorInstance.norm()) < tol
True
"""
# In tucker als this is N
d = tensor.ndims

if ranks is not None:
if len(ranks) != d:
raise ValueError(
f"Ranks must be a list of length tensor ndims. Ndims: {d} but got "
f"ranks: {ranks}."
)
else:
ranks = [0] * d

# Set up dimorder if not specified (this is copy past from tucker_als
if not dimorder:
dimorder = list(range(d))
else:
if not isinstance(dimorder, list):
raise ValueError("Dimorder must be a list")
elif tuple(range(d)) != tuple(sorted(dimorder)):
raise ValueError(
"Dimorder must be a list or permutation of range(tensor.ndims)"
)

# TODO should unify printing throughout. Probably easier to use python logging levels
if verbosity > 0:
print("Computing HOSVD...\n")

normxsqr = (tensor**2).collapse()
eigsumthresh = ((tol**2) * normxsqr) / d

if verbosity > 2:
print(
f"||X||^2 = {normxsqr: g}\n"
f"tol = {tol: g}\n"
f"eigenvalue sum threshold = tol^2 ||X||^2 / d = {eigsumthresh: g}"
)

# Main Loop
factor_matrices = [np.empty(1)] * d
# Copy input tensor, shrinks every step for sequential
Y = ttb.tensor.from_tensor_type(tensor)

for k in dimorder:
# Compute Gram matrix
Yk = ttb.tenmat.from_tensor_type(Y, np.array([k])).double()
Z = np.dot(Yk, Yk.transpose())

# Compute eigenvalue decomposition
D, V = scipy.linalg.eigh(Z)
pi = np.argsort(-D, kind="quicksort")
eigvec = D[pi]

# If rank not provided compute it.
if ranks[k] == 0:
eigsum = np.cumsum(eigvec[::-1])
eigsum = eigsum[::-1]
ranks[k] = np.where(eigsum > eigsumthresh)[0][-1]

if verbosity > 5:
print(f"Reverse cummulative sum of evals of Gram matrix:")
for i in range(len(eigsum)):
print_msg = f"{i: d}: {eigsum[i]: 6.4f}"
if i == ranks[k]:
print_msg += " <-- Cutoff"
print(print_msg)

# Extract factor matrix b picking leading eigenvectors of V
# NOTE: Plus 1 in pi slice for inclusive range to match MATLAB
factor_matrices[k] = V[:, pi[0 : ranks[k] + 1]]

# Shrink!
if sequential:
Y = Y.ttm(factor_matrices[k].transpose(), k)
# Extract final core
if sequential:
G = Y
else:
G = Y.ttm(factor_matrices, transpose=True)

result = ttb.ttensor.from_data(G, factor_matrices)

if verbosity > 0:
diffnormsqr = ((tensor - result.full()) ** 2).collapse()
relnorm = np.sqrt(diffnormsqr / normxsqr)
print(f" Size of core: {G.shape}")
if relnorm <= tol:
print(f"||X-T||/||X|| = {relnorm: g} <=" f"{tol: f} (tol)")
else:
print(
"Tolerance not satisfied!! "
f"||X-T||/||X|| = {relnorm: g} >="
f"{tol: f} (tol)"
)
warnings.warn("Specified tolerance was not achieved")
return result
21 changes: 10 additions & 11 deletions pyttb/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import warnings
from itertools import permutations
from math import factorial
from numbers import Real
from typing import Any, Callable, List, Literal, Optional, Tuple, Union

import numpy as np
Expand Down Expand Up @@ -181,9 +180,9 @@ def collapse(
self,
dims: Optional[np.ndarray] = None,
fun: Union[
Literal["sum"], Callable[[np.ndarray], Union[Real, np.ndarray]]
Literal["sum"], Callable[[np.ndarray], Union[float, np.ndarray]]
] = "sum",
) -> Union[Real, np.ndarray, tensor]:
) -> Union[float, np.ndarray, tensor]:
"""
Collapse tensor along specified dimensions.
Expand Down Expand Up @@ -389,7 +388,7 @@ def full(self) -> tensor:
"""
return ttb.tensor.from_data(self.data)

def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> Real:
def innerprod(self, other: Union[tensor, ttb.sptensor, ttb.ktensor]) -> float:
"""
Efficient inner product with a tensor
Expand Down Expand Up @@ -542,7 +541,7 @@ def issymmetric(
return bool((all_diffs == 0).all())
return bool((all_diffs == 0).all()), all_diffs, all_perms

def logical_and(self, B: Union[Real, tensor]) -> tensor:
def logical_and(self, B: Union[float, tensor]) -> tensor:
"""
Logical and for tensors
Expand Down Expand Up @@ -578,7 +577,7 @@ def logical_not(self) -> tensor:
"""
return ttb.tensor.from_data(np.logical_not(self.data))

def logical_or(self, other: Union[Real, tensor]) -> tensor:
def logical_or(self, other: Union[float, tensor]) -> tensor:
"""
Logical or for tensors
Expand All @@ -598,7 +597,7 @@ def tensor_or(x, y):

return ttb.tt_tenfun(tensor_or, self, other)

def logical_xor(self, other: Union[Real, tensor]) -> tensor:
def logical_xor(self, other: Union[float, tensor]) -> tensor:
"""
Logical xor for tensors
Expand Down Expand Up @@ -867,7 +866,7 @@ def reshape(self, shape: Tuple[int, ...]) -> tensor:

return ttb.tensor.from_data(np.reshape(self.data, shape, order="F"), shape)

def squeeze(self) -> Union[tensor, np.ndarray, Real]:
def squeeze(self) -> Union[tensor, np.ndarray, float]:
"""
Removes singleton dimensions from a tensor
Expand Down Expand Up @@ -1029,7 +1028,7 @@ def symmetrize(
def ttm(
self,
matrix: Union[np.ndarray, List[np.ndarray]],
dims: Optional[Union[Real, np.ndarray]] = None,
dims: Optional[Union[float, np.ndarray]] = None,
transpose: bool = False,
) -> tensor:
"""
Expand All @@ -1046,7 +1045,7 @@ def ttm(
dims = np.arange(self.ndims)
elif isinstance(dims, list):
dims = np.array(dims)
elif isinstance(dims, Real):
elif isinstance(dims, (float, int, np.generic)):
dims = np.array([dims])

if isinstance(matrix, list):
Expand All @@ -1060,7 +1059,7 @@ def ttm(
return Y

if not isinstance(matrix, np.ndarray):
assert False, "matrix must be of type numpy.ndarray"
assert False, f"matrix must be of type numpy.ndarray but got:\n{matrix}"

if not (dims.size == 1 and np.isin(dims, np.arange(self.ndims))):
assert False, "dims must contain values in [0,self.dims)"
Expand Down
67 changes: 67 additions & 0 deletions tests/test_hosvd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np
import pytest

import pyttb as ttb


@pytest.fixture()
def sample_tensor():
data = np.array([[29, 39.0], [63.0, 85.0]])
shape = (2, 2)
params = {"data": data, "shape": shape}
tensorInstance = ttb.tensor().from_data(data, shape)
return params, tensorInstance


@pytest.mark.indevelopment
def test_hosvd_simple_convergence(capsys, sample_tensor):
(data, T) = sample_tensor
tol = 1e-4
result = ttb.hosvd(T, tol)
assert (result.full() - T).norm() / T.norm() < tol, f"Failed to converge"

tol = 1e-4
result = ttb.hosvd(T, tol, sequential=False)
assert (
result.full() - T
).norm() / T.norm() < tol, f"Failed to converge for non-sequential option"

impossible_tol = 1e-20
with pytest.warns(UserWarning):
result = ttb.hosvd(T, impossible_tol)
assert (
result.full() - T
).norm() / T.norm() > impossible_tol, f"Converged beyond provided precision"


@pytest.mark.indevelopment
def test_hosvd_default_init(capsys, sample_tensor):
(data, T) = sample_tensor
_ = ttb.hosvd(T, 1)


@pytest.mark.indevelopment
def test_hosvd_smoke_test_verbosity(capsys, sample_tensor):
"""For now just make sure verbosity calcs don't crash"""
(data, T) = sample_tensor
ttb.hosvd(T, 1, verbosity=10)


@pytest.mark.indevelopment
def test_hosvd_incorrect_ranks(capsys, sample_tensor):
(data, T) = sample_tensor
ranks = list(range(T.ndims - 1))
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, ranks=ranks)


@pytest.mark.indevelopment
def test_hosvd_incorrect_dimorder(capsys, sample_tensor):
(data, T) = sample_tensor
dimorder = list(range(T.ndims - 1))
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, dimorder=dimorder)

dimorder = 1
with pytest.raises(ValueError):
_ = ttb.hosvd(T, 1, dimorder=dimorder)

0 comments on commit 171aeb0

Please sign in to comment.