Source code for osl_dynamics.utils.array_ops

"""Helper functions for manipulating `NumPy \
<https://numpy.org/doc/stable/user/index.html>`_ arrays.

"""

from typing import Callable, List, Optional, Tuple, Union

import numpy as np


[docs] def get_one_hot(values: np.ndarray, n_states: Optional[int] = None) -> np.ndarray: """Expand a categorical variable to a series of boolean columns (one-hot encoding). +----------------------+ | Categorical Variable | +======================+ | A | +----------------------+ | C | +----------------------+ | D | +----------------------+ | B | +----------------------+ becomes +---+---+---+---+ | A | B | C | D | +===+===+===+===+ | 1 | 0 | 0 | 0 | +---+---+---+---+ | 0 | 0 | 1 | 0 | +---+---+---+---+ | 0 | 0 | 0 | 1 | +---+---+---+---+ | 0 | 1 | 0 | 0 | +---+---+---+---+ Parameters ---------- values : np.ndarray 1D array of categorical values with shape (n_samples,). The values should be integers (0, 1, 2, 3, ... , :code:`n_states` - 1). Or 2D array of shape (n_samples, n_states) to be binarized. n_states : int, optional Total number of states in :code:`values`. Must be at least the number of states present in :code:`values`. Default is the number of unique values in :code:`values`. Returns ------- one_hot : np.ndarray A 2D array containing the one-hot encoded form of :code:`values`. Shape is (n_samples, n_states). """ if values.ndim == 2: values = values.argmax(axis=1) if n_states is None: n_states = values.max() + 1 res = np.eye(n_states)[np.array(values).reshape(-1)] return res.reshape([*list(values.shape), n_states]).astype(int)
[docs] def cov2corr(cov: np.ndarray) -> np.ndarray: """Covariance to correlation. Converts batches of covariance matrices into batches of correlation matrices. Parameters ---------- cov : np.ndarray Covariance matrices. Shape must be (..., N, N). Returns ------- corr : np.ndarray Correlation matrices. Shape is (..., N, N). """ # Validation cov = np.array(cov) if cov.ndim < 2: raise ValueError("input covariances must have more than 1 dimension.") # Extract batches of standard deviations std = np.sqrt(np.diagonal(cov, axis1=-2, axis2=-1)) normalisation = np.expand_dims(std, -1) @ np.expand_dims(std, -2) return cov / normalisation
[docs] def cov2std(cov: np.ndarray) -> np.ndarray: """Get the standard deviation of batches of covariance matrices. Parameters ---------- cov : np.ndarray Covariance matrix. Shape must be (..., N, N). Returns ------- std : np.ndarray Standard deviations. Shape is (..., N). """ cov = np.array(cov) if cov.ndim < 2: raise ValueError("input covariances must have more than 1 dimension.") return np.sqrt(np.diagonal(cov, axis1=-2, axis2=-1))
[docs] def cov2partialcorr(cov: np.ndarray) -> np.ndarray: """Covariance to partial correlation. Converts batches of covariance matrices into batches of partial correlation matrices. Parameters ---------- cov : np.ndarray Covariance matrices. Shape must be (..., N, N). Returns ------- partial_corr : np.ndarray Partial correlation matrices. Shape is (..., N, N). """ cov = np.array(cov) if cov.ndim < 2: raise ValueError("input covariances must have more than 1 dimension.") N = cov.shape[-1] precision = np.linalg.inv(cov) diag = np.diagonal(precision, axis1=-2, axis2=-1) outer_diag = diag[..., :, np.newaxis] * diag[..., np.newaxis, :] partial_corr = -precision / np.sqrt(outer_diag) partial_corr[..., range(N), range(N)] = 1.0 / diag return partial_corr
[docs] def cov2partialcov(cov: np.ndarray, use_pinv: bool = False) -> np.ndarray: """Covariance to partial covariance. Converts batches of covariance matrices into batches of partial covariance matrices. Parameters ---------- cov : np.ndarray Covariance matrix or batch of covariance matrices. Shape (..., N, N). use_pinv : bool, optional If True, use np.linalg.pinv to calculate inverse of the covariance. If False, we use np.linalg.inv. Returns ------- partial_cov : np.ndarray Partial covariance matrices. Shape (..., N, N). Off-diagonals: -Omega_ij / (Omega_ii * Omega_jj - Omega_ij^2) Diagonals: 1 / Omega_ii (conditional variances) """ cov = np.asarray(cov) if cov.ndim < 2: raise ValueError( "input covariances must have at least 2 dimensions (..., N, N)." ) if cov.shape[-1] != cov.shape[-2]: raise ValueError("last two dimensions must be square (N, N).") if use_pinv: precision = np.linalg.pinv(cov) else: precision = np.linalg.inv(cov) diag = np.diagonal(precision, axis1=-2, axis2=-1) # (..., N) outer_diag = diag[..., :, np.newaxis] * diag[..., np.newaxis, :] denom = outer_diag - precision * precision # (..., N, N) denom_safe = denom.copy() eps = np.finfo(cov.dtype if np.issubdtype(cov.dtype, np.floating) else float).eps small_mask = np.abs(denom_safe) <= eps denom_safe[small_mask] = 1.0 partial = -precision / denom_safe diag_safe = diag.copy() tiny_diag_mask = np.abs(diag_safe) <= eps diag_safe[tiny_diag_mask] = np.nan # will produce nan for truly degenerate cases diag_inv = 1.0 / diag_safe N = cov.shape[-1] idx = np.arange(N) partial[..., idx, idx] = diag_inv return partial
[docs] def ensure_pos_def(cov: np.ndarray, eps: float = 1e-5) -> np.ndarray: """Ensure a covariance (or batch of covariances) is positive definite. Parameters ---------- cov : np.ndarray Covariances of shape (..., N, N). eps : float, optional Small value added to the diagonal. Returns ------- cov : np.ndarray Covariances of shape (..., N, N), symmetrized and regularized. """ cov = np.asarray(cov) cov = 0.5 * (cov + np.swapaxes(cov, -1, -2)) return cov + eps * np.eye(cov.shape[-1])
[docs] def sliding_window_view( x: np.ndarray, window_shape: Union[int, Tuple[int, ...]], axis: Optional[Union[int, Tuple[int, ...]]] = None, *, subok: bool = False, writeable: bool = False, ) -> np.ndarray: """Create a sliding window over an array in arbitrary dimensions. Unceremoniously ripped from numpy 1.20, `np.lib.stride_tricks.sliding_window_view \ <https://numpy.org/doc/1.20/reference/generated/\ numpy.lib.stride_tricks.sliding_window_view.html>`_. """ if np.iterable(window_shape): window_shape = tuple(window_shape) else: window_shape = (window_shape,) # First convert input to array, possibly keeping subclass x = np.array(x, copy=False, subok=subok) window_shape_array = np.array(window_shape) if np.any(window_shape_array < 0): raise ValueError("`window_shape` cannot contain negative values") if axis is None: axis = tuple(range(x.ndim)) if len(window_shape) != len(axis): raise ValueError( f"Since axis is `None`, must provide " f"window_shape for all dimensions of `x`; " f"got {len(window_shape)} window_shape elements " f"and `x.ndim` is {x.ndim}." ) else: axis = np.core.numeric.normalize_axis_tuple( axis, x.ndim, allow_duplicate=True, ) if len(window_shape) != len(axis): raise ValueError( f"Must provide matching length window_shape and " f"axis; got {len(window_shape)} window_shape " f"elements and {len(axis)} axes elements." ) out_strides = x.strides + tuple(x.strides[ax] for ax in axis) # note: same axis can be windowed repeatedly x_shape_trimmed = list(x.shape) for ax, dim in zip(axis, window_shape): if x_shape_trimmed[ax] < dim: msg = "window shape cannot be larger than input array shape" raise ValueError(msg) x_shape_trimmed[ax] -= dim - 1 out_shape = tuple(x_shape_trimmed) + window_shape return np.lib.stride_tricks.as_strided( x, strides=out_strides, shape=out_shape, subok=subok, writeable=writeable, )
[docs] def validate( array: np.ndarray, correct_dimensionality: int, allow_dimensions: int, error_message: str, ) -> np.ndarray: """Checks if the dimensionality of an array is correct. Parameters ---------- array : np.ndarray Array to be checked. correct_dimensionality : int The desired number of dimensions in the array. allow_dimensions : int The number of dimensions that is acceptable for the passed array to have. error_message : str Message to print if the array is not valid. Returns ------- array : np.ndarray Array with the correct dimensionality. """ array = np.array(array) # Add dimensions to ensure array has the correct dimensionality for dimensionality in allow_dimensions: if array.ndim == dimensionality: for i in range(correct_dimensionality - dimensionality): array = array[np.newaxis, ...] # Check no other dimensionality has been passed if array.ndim != correct_dimensionality: raise ValueError(error_message) return array
[docs] def check_symmetry( mat: Union[np.ndarray, List[np.ndarray]], precision: float = 1e-6 ) -> np.ndarray: """Checks if one or more matrices are symmetric. Parameters ---------- mat : np.ndarray or list of np.ndarray Matrices to be checked. Shape of a matrix should be (..., N, N). precision : float, optional Precision for comparing values. Corresponds to an absolute tolerance parameter. Default is :code:`1e-6`. Returns ------- symmetry : np.ndarray of bool Array indicating whether matrices are symmetric. """ mat = np.array(mat) if mat.ndim < 2: msg = "Input matrix must be an array with shape (..., N, N)." raise ValueError(msg) transpose_axes = np.concatenate((np.arange(mat.ndim - 2), [-1, -2])) symmetry = np.all( np.isclose( mat, np.transpose(mat, axes=transpose_axes), rtol=0, atol=precision, equal_nan=True, ), axis=(-1, -2), ) return symmetry
[docs] def ezclump(binary_array: np.ndarray) -> List[slice]: """Find the clumps (groups of data with the same values) for a 1D bool array. Taken wholesale from :code:`numpy.ma.extras.ezclump`. """ if binary_array.ndim > 1: binary_array = binary_array.ravel() idx = (binary_array[1:] ^ binary_array[:-1]).nonzero() idx = idx[0] + 1 if binary_array[0]: if len(idx) == 0: return [slice(0, binary_array.size)] r = [slice(0, idx[0])] r.extend((slice(left, right) for left, right in zip(idx[1:-1:2], idx[2::2]))) else: if len(idx) == 0: return [] r = [slice(left, right) for left, right in zip(idx[:-1:2], idx[1::2])] if binary_array[-1]: r.append(slice(idx[-1], binary_array.size)) return r
[docs] def slice_length(slice_: slice) -> int: """Return the length of a slice. Parameters ---------- slice_ : slice Slice. Returns ------- length : int Length. """ return slice_.stop - slice_.start
[docs] def apply_to_lists( list_of_lists: List[list], func: Callable, check_empty: bool = True ) -> np.ndarray: """Apply a function to each list in a list of lists. Parameters ---------- list_of_lists : list of list List of lists. func : callable Function to apply to each list. check_empty : bool, optional Return :code:`0` for empty lists if set as :code:`True`. If :code:`False`, the function will be applied to an empty list. Returns ------- result : np.ndarray Numpy array with the function applied to each list. """ if check_empty: return np.array( [ [ func(inner_list) if np.any(inner_list) else 0 for inner_list in array_list ] for array_list in list_of_lists ], ) return np.array( [ [func(inner_list) for inner_list in array_list] for array_list in list_of_lists ], )
[docs] def list_means(list_of_lists: List[list]) -> np.ndarray: """Calculate the mean of each list in a list of lists. Parameters ---------- list_of_lists : list of list List of lists. Returns ------- result : np.ndarray Numpy array with the mean of each list. """ return apply_to_lists(list_of_lists, func=np.mean)
[docs] def list_stds(list_of_lists: List[list]) -> np.ndarray: """Calculate the standard deviation of each list in a list of lists. Parameters ---------- list_of_lists : list of list List of lists. Returns ------- result : np.ndarray Numpy array with the standard deviation of each list. """ return apply_to_lists(list_of_lists, func=np.std)