"""Observation models for simulations."""
from typing import Optional, Tuple, Union
import numpy as np
from osl_dynamics.utils import array_ops
[docs]
class MVN:
"""Class that generates data from a multivariate normal distribution.
Parameters
----------
means : np.ndarray or str
Mean vector for each mode, shape should be (n_modes, n_channels).
Either a numpy array or :code:`'zero'` or :code:`'random'`.
covariances : np.ndarray or str
Covariance matrix for each mode, shape should be (n_modes, n_channels,
n_channels). Either a numpy array or :code:`'random'`.
n_modes : int, optional
Number of modes.
n_channels : int, optional
Number of channels.
n_covariances_act : int, optional
Number of iterations to add activations to covariance matrices.
observation_error : float, optional
Standard deviation of the error added to the generated data.
"""
def __init__(
self,
means: Union[np.ndarray, str],
covariances: Union[np.ndarray, str],
n_modes: Optional[int] = None,
n_channels: Optional[int] = None,
n_covariances_act: int = 1,
observation_error: float = 0.0,
) -> None:
[docs]
self.n_covariances_act = n_covariances_act
[docs]
self.observation_error = observation_error
# Both the means and covariances were passed as numpy arrays
if isinstance(means, np.ndarray) and isinstance(covariances, np.ndarray):
if means.shape[0] != covariances.shape[0]:
raise ValueError(
"means and covariances have a different number of modes."
)
if means.shape[1] != covariances.shape[1]:
raise ValueError(
"means and covariances have a different number of channels."
)
self.n_modes = means.shape[0]
self.n_channels = means.shape[1]
self.means = means
self.covariances = covariances
# Only the means were passed as a numpy array
elif isinstance(means, np.ndarray) and not isinstance(covariances, np.ndarray):
self.n_modes = means.shape[0]
self.n_channels = means.shape[1]
self.means = means
self.covariances = self.create_covariances(covariances)
# Only the covariances were passed a numpy array
elif not isinstance(means, np.ndarray) and isinstance(covariances, np.ndarray):
self.n_modes = covariances.shape[0]
self.n_channels = covariances.shape[1]
self.means = self.create_means(means)
self.covariances = covariances
# Neither means or covariances were passed as numpy arrays
elif not isinstance(means, np.ndarray) and not isinstance(
covariances, np.ndarray
):
if n_modes is None or n_channels is None:
raise ValueError(
"If we are generating and means and covariances, "
"n_modes and n_channels must be passed."
)
self.n_modes = n_modes
self.n_channels = n_channels
self.means = self.create_means(means)
self.covariances = self.create_covariances(covariances)
else:
raise ValueError("means and covariance arguments not passed correctly.")
[docs]
def create_means(
self, option: str, mu: float = 0, sigma: float = 0.2
) -> np.ndarray:
if option == "zero":
means = np.zeros([self.n_modes, self.n_channels])
elif option == "random":
means = np.random.normal(
mu,
sigma,
size=[self.n_modes, self.n_channels],
)
else:
raise ValueError("means must be a np.array or 'zero' or 'random'.")
return means
[docs]
def create_covariances(
self, option: str, activation_strength: float = 1, eps: float = 1e-6
) -> np.ndarray:
if option == "random":
# Randomly sample the elements of W from a normal distribution
W = np.random.normal(
0, 0.1, size=[self.n_modes, self.n_channels, self.n_channels]
)
# Add a large activation to a small number of the channels at random
activation_strength_multipliers = np.linspace(1, 5, self.n_covariances_act)
for j in range(self.n_covariances_act):
n_active_channels = max(1, 2 * self.n_channels // self.n_modes)
for i in range(self.n_modes):
active_channels = np.unique(
np.random.randint(
0,
self.n_channels,
size=n_active_channels,
)
)
W[i, active_channels] += (
activation_strength_multipliers[j]
* activation_strength
/ self.n_channels
)
# A small value to add to the diagonal to ensure the covariances are
# invertible
eps = np.tile(np.eye(self.n_channels), [self.n_modes, 1, 1]) * eps
# Calculate the covariances
covariances = W @ W.transpose([0, 2, 1]) + eps
else:
raise ValueError("covariances must be a np.ndarray or 'random'.")
return covariances
[docs]
def simulate_data(self, state_time_course: np.ndarray) -> np.ndarray:
"""Simulate time series data.
Parameters
----------
state_time_course : np.ndarray
2D array containing state activations.
Shape must be (n_samples, n_states).
Returns
-------
data : np.ndarray
Time series data. Shape is (n_samples, n_channels).
"""
n_samples = state_time_course.shape[0]
# Initialise array to hold data
data = np.zeros((n_samples, self.n_channels))
# Loop through all unique combinations of modes
for alpha in np.unique(state_time_course, axis=0):
# Mean and covariance for this combination of modes
mu = np.sum(self.means * alpha[:, np.newaxis], axis=0)
sigma = np.sum(
self.covariances * alpha[:, np.newaxis, np.newaxis],
axis=0,
)
# Generate data for the time points that this combination of
# modes is active
data[np.all(state_time_course == alpha, axis=1)] = (
np.random.multivariate_normal(
mu,
sigma,
size=np.count_nonzero(np.all(state_time_course == alpha, axis=1)),
)
)
# Add an error to the data at all time points
data += np.random.normal(scale=self.observation_error, size=data.shape)
return data.astype(np.float32)
[docs]
def get_instantaneous_covariances(
self, state_time_course: np.ndarray
) -> np.ndarray:
"""Get the ground truth covariance at each time point.
Parameters
----------
state_time_course : np.ndarray
2D array containing state activations.
Shape must be (n_samples, n_states).
Returns
-------
inst_covs : np.ndarray
Instantaneous covariances.
Shape is (n_samples, n_channels, n_channels).
"""
# Initialise an array to hold the data
n_samples = state_time_course.shape[0]
inst_covs = np.empty([n_samples, self.n_channels, self.n_channels])
# Loop through all unique combinations of modes
for alpha in np.unique(state_time_course, axis=0):
# Covariance for this combination of modes
sigma = np.sum(
self.covariances * alpha[:, np.newaxis, np.newaxis],
axis=0,
)
inst_covs[np.all(state_time_course == alpha, axis=1)] = sigma
return inst_covs.astype(np.float32)
[docs]
class MDyn_MVN(MVN):
"""Class that generates data from a multivariate normal distribution.
Multi-time-scale version of MVN.
Parameters
----------
means : np.ndarray or str
Mean vector for each mode, shape should be (n_modes, n_channels).
Either a numpy array or :code:`'zero'` or :code:`'random'`.
covariances : np.ndarray or str
Covariance matrix for each mode, shape should be (n_modes, n_channels,
n_channels). Either a numpy array or :code:`'random'`.
n_modes : int, optional
Number of modes.
n_channels : int, optional
Number of channels.
n_covariances_act : int, optional
Number of iterations to add activations to covariance matrices.
observation_error : float, optional
Standard deviation of the error added to the generated data.
"""
def __init__(
self,
means: Union[np.ndarray, str],
covariances: Union[np.ndarray, str],
n_modes: Optional[int] = None,
n_channels: Optional[int] = None,
n_covariances_act: int = 1,
observation_error: float = 0.0,
) -> None:
super().__init__(
means=means,
covariances=covariances,
n_modes=n_modes,
n_channels=n_channels,
n_covariances_act=n_covariances_act,
observation_error=observation_error,
)
# Get the stds and corrs from self.covariance
[docs]
self.stds = array_ops.cov2std(self.covariances)
[docs]
self.corrs = array_ops.cov2corr(self.covariances)
[docs]
def simulate_data(self, state_time_courses: np.ndarray) -> np.ndarray:
"""Simulates data.
Parameters
----------
state_time_courses : np.ndarray
Should contain two time courses: one for the mean and standard
deviations and another for functional connectiivty. Shape is
(2, n_samples, n_modes).
Returns
-------
data : np.ndarray
Simulated data. Shape is (n_samples, n_channels).
"""
# Reshape state_time_courses so that the multi-time-scale dimension
# is last
state_time_courses = np.rollaxis(state_time_courses, 0, 3)
# Number of samples to simulate
n_samples = state_time_courses.shape[0]
# Initialise array to hold data
data = np.zeros([n_samples, self.n_channels])
# Loop through all unique combinations of states
for time_courses in np.unique(state_time_courses, axis=0):
# Extract the different time courses
alpha = time_courses[:, 0]
beta = time_courses[:, 1]
# Mean, standard deviation, corr for this combination of time courses
mu = np.sum(self.means * alpha[:, np.newaxis], axis=0)
G = np.diag(np.sum(self.stds * alpha[:, np.newaxis], axis=0))
F = np.sum(self.corrs * beta[:, np.newaxis, np.newaxis], axis=0)
# Calculate covariance matrix from the standard deviation and corr
sigma = G @ F @ G
# Generate data for the time points that this combination of states
# is active
data[np.all(state_time_courses == time_courses, axis=(1, 2))] = (
np.random.multivariate_normal(
mu,
sigma,
size=np.count_nonzero(
np.all(state_time_courses == time_courses, axis=(1, 2))
),
)
)
# Add an error to the data at all time points
data += np.random.normal(scale=self.observation_error, size=data.shape)
return data.astype(np.float32)
[docs]
def get_instantaneous_covariances(
self, state_time_courses: np.ndarray
) -> np.ndarray:
"""Get the ground truth covariance at each time point.
Parameters
----------
state_time_courses : np.ndarray
Should contain two time courses: one for the mean and standard
deviations and another for functional connectiivty. Shape is
(2, n_samples, n_modes).
Returns
-------
inst_covs : np.ndarray
Instantaneous covariances.
Shape is (n_samples, n_channels, n_channels).
"""
# Reshape state_time_courses so that the multi-time-scale dimension
# is last
state_time_courses = np.rollaxis(state_time_courses, 0, 3)
# Number of samples to simulate
n_samples = state_time_courses.shape[0]
# Initialise an array to hold data
inst_covs = np.empty([n_samples, self.n_channels, self.n_channels])
# Loop through all unique combinations of states
for time_courses in np.unique(state_time_courses, axis=0):
# Extract the different time courses
alpha = time_courses[:, 0]
beta = time_courses[:, 1]
# Mean, standard deviation, corr for this combination of time courses
G = np.diag(np.sum(self.stds * alpha[:, np.newaxis], axis=0))
F = np.sum(self.corrs * beta[:, np.newaxis, np.newaxis], axis=0)
# Calculate covariance matrix from the standard deviation and corr
sigma = G @ F @ G
inst_covs[np.all(state_time_courses == time_courses, axis=(1, 2))] = sigma
return inst_covs.astype(np.float32)
[docs]
class MSess_MVN(MVN):
"""Class that generates data from a multivariate normal distribution for multiple sessions.
Parameters
----------
session_means : np.ndarray or str
Mean vector for each mode for each session, shape should be
(n_sessions, n_modes, n_channels). Either a numpy array or
:code:`'zero'` or :code:`'random'`.
session_covariances : np.ndarray or str
Covariance matrix for each mode for each session, shape should
be (n_sessions, n_modes, n_channels, n_channels). Either a numpy
array or :code:`'random'`.
n_modes : int, optional
Number of modes.
n_channels : int, optional
Number of channels.
n_covariances_act : int, optional
Number of iterations to add activations to covariance matrices.
embedding_vectors : np.ndarray, optional
Embedding vectors for each session, shape should be
(n_sessions, embeddings_dim).
n_sessions : int, optional
Number of sessions.
embeddings_dim : int, optional
Dimension of embeddings.
spatial_embeddings_dim : int, optional
Dimension of spatial embeddings.
embeddings_scale : float, optional
Standard deviation when generating embeddings with a normal
distribution.
n_groups : int, optional
Number of groups when generating embeddings.
between_group_scale : float, optional
Standard deviation when generating centroids of groups of
embeddings.
observation_error : float, optional
Standard deviation of the error added to the generated data.
"""
def __init__(
self,
session_means: Union[np.ndarray, str],
session_covariances: Union[np.ndarray, str],
n_modes: Optional[int] = None,
n_channels: Optional[int] = None,
n_covariances_act: int = 1,
embedding_vectors: Optional[np.ndarray] = None,
n_sessions: Optional[int] = None,
embeddings_dim: Optional[int] = None,
spatial_embeddings_dim: Optional[int] = None,
embeddings_scale: Optional[float] = None,
n_groups: Optional[int] = None,
between_group_scale: Optional[float] = None,
observation_error: float = 0.0,
) -> None:
[docs]
self.n_covariances_act = n_covariances_act
[docs]
self.observation_error = observation_error
[docs]
self.embeddings_dim = embeddings_dim
[docs]
self.spatial_embeddings_dim = spatial_embeddings_dim
[docs]
self.embeddings_scale = embeddings_scale
[docs]
self.n_groups = n_groups
[docs]
self.between_group_scale = between_group_scale
if embedding_vectors is not None:
n_sessions = embedding_vectors.shape[0]
self.n_sessions = n_sessions
embeddings_dim = embedding_vectors.shape[1]
self.embeddings_dim = embeddings_dim
# Both the session means and covariances were passed as numpy arrays
if isinstance(session_means, np.ndarray) and isinstance(
session_covariances, np.ndarray
):
if session_means.ndim != 3:
raise ValueError(
"session_means must have shape (n_sessions, n_modes, n_channels)."
)
if session_covariances.ndim != 4:
raise ValueError(
"session_covariances must have shape "
"(n_sessions, n_modes, n_channels, n_channels)."
)
if session_means.shape[0] != session_covariances.shape[0]:
raise ValueError(
"session_means and session_covariances have a different "
"number of arrays."
)
if session_means.shape[1] != session_covariances.shape[1]:
raise ValueError(
"session_means and session_covariances have a different "
"number of modes."
)
if session_means.shape[2] != session_covariances.shape[2]:
raise ValueError(
"session_means and session_covariances have a different "
"number of channels."
)
self.n_sessions = session_means.shape[0]
self.n_modes = session_means.shape[1]
self.n_channels = session_means.shape[2]
self.n_groups = None
self.group_centroids = None
self.between_group_scale = None
self.embeddings_dim = None
self.spatial_embeddings_dim = None
self.embeddings_scale = None
self.group_means = None
self.session_means = session_means
self.group_covariances = None
self.session_covariances = session_covariances
# Only the session means were passed as a numpy array
elif isinstance(session_means, np.ndarray) and not isinstance(
session_covariances, np.ndarray
):
self.n_sessions = session_means.shape[0]
self.n_modes = session_means.shape[1]
self.n_channels = session_means.shape[2]
self.validate_embedding_parameters(embedding_vectors)
self.create_embeddings(embedding_vectors)
self.group_means = None
self.session_means = session_means
self.group_covariances = super().create_covariances(session_covariances)
self.session_covariances = self.create_session_covariances()
# Only the session covariances were passed as a numpy array
elif not isinstance(session_means, np.ndarray) and isinstance(
session_covariances, np.ndarray
):
self.n_sessions = session_covariances.shape[0]
self.n_modes = session_covariances.shape[1]
self.n_channels = session_covariances.shape[2]
if not session_means == "zero":
self.validate_embedding_parameters(embedding_vectors)
self.create_embeddings(embedding_vectors)
self.group_means = super().create_means(session_means)
self.session_means = self.create_session_means(session_means)
self.group_covariances = None
self.session_covariances = session_covariances
# Neither session means or nor covariances were passed as numpy arrays
elif not isinstance(session_means, np.ndarray) and not isinstance(
session_covariances, np.ndarray
):
if n_sessions is None or n_modes is None or n_channels is None:
raise ValueError(
"If we are generating array means and covariances, "
"n_sessions, n_modes, n_channels must be passed."
)
self.n_sessions = n_sessions
self.n_modes = n_modes
self.n_channels = n_channels
self.validate_embedding_parameters(embedding_vectors)
self.create_embeddings(embedding_vectors)
self.group_means = super().create_means(session_means)
self.session_means = self.create_session_means(session_means)
self.group_covariances = super().create_covariances(session_covariances)
self.session_covariances = self.create_session_covariances()
[docs]
def validate_embedding_parameters(
self, embedding_vectors: Optional[np.ndarray]
) -> None:
if embedding_vectors is None:
if self.embeddings_dim is None:
raise ValueError(
"Session means or covariances not passed, please pass "
"'embeddings_dim'."
)
if self.spatial_embeddings_dim is None:
raise ValueError(
"Session means or covariances not passed, please pass "
"'spatial_embeddings_dim'."
)
if self.embeddings_scale is None:
raise ValueError(
"Session means or covariances not passed, please pass "
"'embeddings_scale'."
)
if self.n_groups is None:
raise ValueError(
"Session means or covariances not passed, please pass 'n_groups'."
)
if self.between_group_scale is None:
raise ValueError(
"Session means or covariances not passed, please pass "
"'between_group_scale'."
)
[docs]
def create_embeddings(self, embedding_vectors: Optional[np.ndarray]) -> None:
if embedding_vectors is None:
# Assign groups to sessions
assigned_groups = np.random.choice(self.n_groups, self.n_sessions)
self.group_centroids = np.random.normal(
scale=self.between_group_scale,
size=[self.n_groups, self.embeddings_dim],
)
embeddings = np.zeros([self.n_sessions, self.embeddings_dim])
for i in range(self.n_groups):
group_mask = assigned_groups == i
embeddings[group_mask] = np.random.multivariate_normal(
mean=self.group_centroids[i],
cov=self.embeddings_scale * np.eye(self.embeddings_dim),
size=[np.sum(group_mask)],
)
self.assigned_groups = assigned_groups
self.embeddings = embeddings
else:
self.embeddings = embedding_vectors
[docs]
def create_session_means_deviations(self) -> None:
means_spatial_embeddings_lienar_transform = self.create_linear_transform(
self.n_channels, self.spatial_embeddings_dim
)
self.means_spatial_embeddings = (
means_spatial_embeddings_lienar_transform @ self.group_means.T
).T
# Match the shapes for concatenation
concat_array_embeddings = np.broadcast_to(
self.embeddings[:, None, :],
(
self.n_sessions,
self.n_modes,
self.embeddings_dim,
),
)
concat_means_spatial_embeddings = np.broadcast_to(
self.means_spatial_embeddings[None, :, :],
(
self.n_sessions,
self.n_modes,
self.spatial_embeddings_dim,
),
)
self.means_concat_embeddings = np.concatenate(
[concat_array_embeddings, concat_means_spatial_embeddings], axis=-1
)
means_linear_transform = self.create_linear_transform(
self.embeddings_dim + self.spatial_embeddings_dim,
self.n_channels,
)
self.means_deviations = np.squeeze(
means_linear_transform[None, None, ...]
@ self.means_concat_embeddings[..., None]
)
[docs]
def create_session_covariances_deviations(self) -> None:
covariances_spatial_embeddings_linear_transform = self.create_linear_transform(
self.n_channels * (self.n_channels + 1) // 2, self.spatial_embeddings_dim
)
group_cholesky_covariances = np.linalg.cholesky(self.group_covariances)
m, n = np.tril_indices(self.n_channels)
flattened_group_cholesky_covariances = group_cholesky_covariances[:, m, n]
self.covariances_spatial_embeddings = (
covariances_spatial_embeddings_linear_transform
@ flattened_group_cholesky_covariances.T
).T
# Match the shapes for concatenation
concat_array_embeddings = np.broadcast_to(
self.embeddings[:, None, :],
(
self.n_sessions,
self.n_modes,
self.embeddings_dim,
),
)
concat_covarainces_spatial_embeddings = np.broadcast_to(
self.covariances_spatial_embeddings[None, :, :],
(
self.n_sessions,
self.n_modes,
self.spatial_embeddings_dim,
),
)
self.covariances_concat_embeddings = np.concatenate(
[concat_array_embeddings, concat_covarainces_spatial_embeddings],
axis=-1,
)
covariances_linear_transform = self.create_linear_transform(
self.embeddings_dim + self.spatial_embeddings_dim,
self.n_channels * (self.n_channels + 1) // 2,
)
self.flattened_covariances_cholesky_deviations = np.squeeze(
covariances_linear_transform[None, None, ...]
@ self.covariances_concat_embeddings[..., None]
)
[docs]
def create_session_means(self, option: str) -> np.ndarray:
if option == "zero":
session_means = np.zeros([self.n_sessions, self.n_modes, self.n_channels])
else:
self.create_session_means_deviations()
session_means = self.group_means[None, ...] + self.means_deviations
return session_means
[docs]
def create_session_covariances(self, eps: float = 1e-6) -> np.ndarray:
self.create_session_covariances_deviations()
group_cholesky_covariances = np.linalg.cholesky(self.group_covariances)
m, n = np.tril_indices(self.n_channels)
flattened_group_cholesky_covariances = group_cholesky_covariances[:, m, n]
flattened_session_cholesky_covariances = (
flattened_group_cholesky_covariances[None, ...]
+ self.flattened_covariances_cholesky_deviations
)
session_cholesky_covariances = np.zeros(
[self.n_sessions, self.n_modes, self.n_channels, self.n_channels]
)
for i in range(self.n_sessions):
for j in range(self.n_modes):
session_cholesky_covariances[i, j, m, n] = (
flattened_session_cholesky_covariances[i, j]
)
session_covariances = session_cholesky_covariances @ np.transpose(
session_cholesky_covariances, (0, 1, 3, 2)
)
# A small value to add to the diagonal to ensure the covariances
# are invertible
session_covariances += eps * np.eye(self.n_channels)
return session_covariances
[docs]
def simulate_session_data(
self, session: int, mode_time_course: np.ndarray
) -> np.ndarray:
"""Simulate single session data.
Parameters
----------
session : int
Session number.
mode_time_course : np.ndarray
Mode time course. Shape is (n_samples, n_modes).
Returns
-------
data : np.ndarray
Simulated data. Shape is (n_samples, n_channels).
"""
# Initialise array to hold data
n_samples = mode_time_course.shape[0]
data = np.zeros((n_samples, self.n_channels))
# Loop through all unique combinations of modes
for alpha in np.unique(mode_time_course, axis=0):
# Mean and covariance for this combination of modes
mu = np.sum(self.session_means[session] * alpha[:, None], axis=0)
sigma = np.sum(
self.session_covariances[session] * alpha[:, None, None], axis=0
)
# Generate data for the time points that this combination of
# modes is active
data[np.all(mode_time_course == alpha, axis=1)] = (
np.random.multivariate_normal(
mu,
sigma,
size=np.count_nonzero(np.all(mode_time_course == alpha, axis=1)),
)
)
# Add an error to the data at all time points
data += np.random.normal(scale=self.observation_error, size=data.shape)
return data.astype(np.float32)
[docs]
def get_session_instantaneous_covariances(
self, session: int, mode_time_course: np.ndarray
) -> np.ndarray:
"""Get ground truth covariances at each time point for a particular session.
Parameters
----------
session : int
Session number.
mode_time_course : np.ndarray
Mode time course. Shape is (n_samples, n_modes).
Returns
-------
inst_covs : np.ndarray
Instantaneous covariances for an session.
Shape is (n_samples, n_channels, n_channels).
"""
# Initialise array to hold data
n_samples = mode_time_course.shape[0]
inst_covs = np.zeros([n_samples, self.n_channels, self.n_channels])
# Loop through all unique combinations of modes
for alpha in np.unique(mode_time_course, axis=0):
# Covariance for this combination of modes
sigma = np.sum(
self.session_covariances[session] * alpha[:, None, None], axis=0
)
inst_covs[np.all(mode_time_course == alpha, axis=1)] = sigma
return inst_covs.astype(np.float32)
[docs]
def get_instantaneous_covariances(
self, mode_time_courses: np.ndarray
) -> np.ndarray:
"""Get ground truth covariance at each time point for each session.
Parameters
----------
mode_time_courses : np.ndarray
Mode time courses.
Shape is (n_sessions, n_samples, n_modes).
Returns
-------
inst_covs : np.ndarray
Instantaneous covariances.
Shape is (n_sessions, n_samples, n_channels, n_channels).
"""
inst_covs = []
for session in range(self.n_sessions):
inst_covs.append(
self.get_session_instantaneous_covariances(
session, mode_time_courses[session]
)
)
return np.array(inst_covs)
[docs]
def simulate_multi_session_data(self, mode_time_courses: np.ndarray) -> np.ndarray:
"""Simulates data.
Parameters
----------
mode_time_courses : np.ndarray
It contains n_sessions time courses.
Shape is (n_sessions, n_samples, n_modes).
Returns
-------
data : np.ndarray
Simulated data for sessions.
Shape is (n_sessions, n_samples, n_channels).
"""
data = []
for session in range(self.n_sessions):
data.append(self.simulate_session_data(session, mode_time_courses[session]))
return np.array(data)
[docs]
class MAR:
r"""Class that generates data from a multivariate autoregressive (MAR) model.
A :math:`p`-order MAR model can be written as
.. math::
x_t = A_1 x_{t-1} + ... + A_p x_{t-p} + \epsilon_t
where :math:`\epsilon_t \sim N(0, \Sigma)`. The MAR model is therefore
parameterized by the MAR coefficients (:math:`A`) and covariance
(:math:`\Sigma`).
Parameters
----------
coeffs : np.ndarray
Array of MAR coefficients, :math:`A`. Shape must be
(n_states, n_lags, n_channels, n_channels).
covs : np.ndarray
Covariance of the error :math:`\epsilon_t`. Shape must be
(n_states, n_channels) or (n_states, n_channels, n_channels).
Note
----
This model is also known as VAR or MVAR.
"""
def __init__(self, coeffs: np.ndarray, covs: np.ndarray) -> None:
# Validation
if coeffs.ndim != 4:
raise ValueError(
"coeffs must be a (n_states, n_lags, n_channels, n_channels) array."
)
if covs.ndim == 2:
covs = np.array([np.diag(c) for c in covs])
if coeffs.shape[0] != covs.shape[0]:
raise ValueError("Different number of states in coeffs and covs passed.")
if coeffs.shape[-1] != covs.shape[-1]:
raise ValueError("Different number of channels in coeffs and covs passed.")
# Model parameters
[docs]
self.order = coeffs.shape[1]
# Number of states and channels
[docs]
self.n_states = coeffs.shape[0]
[docs]
self.n_channels = coeffs.shape[2]
[docs]
def simulate_data(self, state_time_course: np.ndarray) -> np.ndarray:
"""Simulate time series data.
We simulate MAR data based on the hidden state at each time point.
Parameters
----------
state_time_course : np.ndarray
State time course. Shape must be (n_samples, n_states).
States must be mutually exclusive.
Returns
-------
data : np.ndarray
Simulated data. Shape is (n_samples, n_channels).
"""
n_samples = state_time_course.shape[0]
data = np.empty([n_samples, self.n_channels])
# Generate the noise term first
for i in range(self.n_states):
time_points_active = state_time_course[:, i] == 1
n_time_points_active = np.count_nonzero(time_points_active)
data[time_points_active] = np.random.multivariate_normal(
np.zeros(self.n_channels),
self.covs[i],
size=n_time_points_active,
)
# Generate the MAR process
for t in range(n_samples):
state = state_time_course[t].argmax()
for lag in range(min(t, self.order)):
data[t] += np.dot(self.coeffs[state, lag], data[t - lag - 1])
return data.astype(np.float32)
[docs]
class OscillatoryBursts:
"""Oscillatory burst observation model.
Generates sinusoidal oscillatory bursts at specified frequencies. Each mode
has an associated frequency and a set of active channels defined by a
channel activity matrix. During active periods (determined by the state
time course), channels produce sinusoidal signals at the mode's frequency
with a slowly varying phase offset.
Parameters
----------
n_modes : int
Number of frequency modes.
n_channels : int
Number of channels.
true_freqs : np.ndarray
Frequencies for each mode in Hz. Shape: (n_modes,).
channel_activity : np.ndarray
Binary matrix indicating which channels are active for each mode.
Shape: (n_modes, n_channels).
sampling_frequency : float, optional
Sampling frequency in Hz. Default: 100.
snr : float, optional
Signal-to-noise ratio. Default: 4.
"""
def __init__(
self,
n_modes: int,
n_channels: int,
true_freqs: np.ndarray,
channel_activity: np.ndarray,
sampling_frequency: float = 100,
snr: float = 4,
):
[docs]
self.n_channels = n_channels
[docs]
self.true_freqs = np.asarray(true_freqs)
[docs]
self.channel_activity = np.asarray(channel_activity)
[docs]
self.sampling_frequency = sampling_frequency
[docs]
def simulate_data(
self,
state_time_course: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
"""Simulate oscillatory burst data for a single subject.
Parameters
----------
state_time_course : np.ndarray
One-hot encoded state time course. Shape: (n_samples, n_states).
The first n_modes columns correspond to the oscillatory modes.
Any additional columns (e.g. a "background" state) are ignored.
Returns
-------
data : np.ndarray
Simulated data with noise. Shape: (n_samples, n_channels).
true_signal : np.ndarray
Clean signal without noise. Shape: (n_samples, n_channels).
"""
n_samples = state_time_course.shape[0]
timestamps = np.arange(n_samples) / self.sampling_frequency
# Generate sinusoidal activity for each mode and channel
phase_diff = np.linspace(0, 0.25, self.n_channels)
true_signal = np.zeros((n_samples, self.n_channels))
for i in range(self.n_modes):
active = state_time_course[:, i] == 1
for j in range(self.n_channels):
if self.channel_activity[i, j] == 1:
phase = (
(0.5 * np.sin(2 * np.pi * 0.005 * timestamps) + phase_diff[j])
* 2
* np.pi
)
true_signal[active, j] += np.sin(
2 * np.pi * self.true_freqs[i] * timestamps[active]
+ phase[active]
)
# Add noise
noise_std = 1 / self.snr
data = true_signal + np.random.normal(0, noise_std, true_signal.shape)
return data, true_signal
[docs]
class Poisson:
"""Class that generates Poisson time series data.
The time series for each channel is a single Poisson observation. The rate
of the poisson observation can be different for different states and
channels.
Parameters
----------
rates : np.ndarray or str
Rate vector for each mode, shape should be (n_states, n_channels).
Either a numpy array or 'random'.
n_channels : int
Number of channels.
n_modes : int
Number of modes.
"""
def __init__(
self,
rates: Union[np.ndarray, str],
n_states: Optional[int] = None,
n_channels: Optional[int] = None,
) -> None:
if isinstance(rates, np.ndarray):
self.n_states = rates.shape[0]
self.n_channels = rates.shape[1]
self.rates = rates
elif not isinstance(rates, np.ndarray):
if n_states is None or n_channels is None:
raise ValueError(
"If we are generating rates, "
"n_states and n_channels must be passed."
)
self.n_states = n_states
self.n_channels = n_channels
self.rates = self.create_rates(rates)
[docs]
def create_rates(self, option: str, eps: float = 1e-2) -> np.ndarray:
if option == "random":
# Randomly sample the rates from a gamma distribution
rates = np.random.gamma(
shape=1.0, scale=1.1, size=(self.n_states, self.n_channels)
)
# Add a large rate to a small number of the channels at random
n_active_channels = max(1, self.n_channels // self.n_states)
for i in range(self.n_states):
active_channels = np.unique(
np.random.randint(0, self.n_channels, size=n_active_channels)
)
rates[i, active_channels] += 1
else:
raise NotImplementedError("Please use rates='random'.")
return rates + eps
[docs]
def simulate_data(self, state_time_course: np.ndarray) -> np.ndarray:
n_samples = state_time_course.shape[0]
data = np.empty([n_samples, self.n_channels])
# Generate data
for i in range(n_samples):
state = np.argmax(state_time_course[i])
data[i] = np.random.poisson(self.rates[state])
return data
[docs]
class TDECovs:
"""Time-delay embedded covariance observation model.
Generates time series data from TDE covariance matrices using conditional
multivariate normal sampling. Each mode is defined by a ``CE x CE``
covariance matrix (where ``C`` is the number of channels and ``E`` is the
number of embeddings). At each time point, the current sample is drawn
conditioned on the previous ``E-1`` samples.
Parameters
----------
true_tde_covs : list of np.ndarray
List of ``n_modes`` TDE covariance matrices, each of shape
``(n_channels * n_embeddings, n_channels * n_embeddings)``.
The row/column ordering is assumed to be blocks of ``E x E``
matrices (i.e. channel-major ordering).
n_embeddings : int, optional
Number of time-delay embeddings.
rho : float, optional
Regularisation parameter for inverting the covariance.
"""
def __init__(
self,
true_tde_covs: list,
n_embeddings: int = 1,
rho: float = 0.1,
):
[docs]
self.true_tde_covs = [np.asarray(c) for c in true_tde_covs]
[docs]
self.n_embeddings = n_embeddings
[docs]
self.n_channels = self.true_tde_covs[0].shape[0] // n_embeddings
[docs]
self.n_modes = len(self.true_tde_covs)
def _gen_data_from_tde_cov(
self,
tde_cov: np.ndarray,
n_samples: int,
) -> np.ndarray:
"""Generate a time series from a single TDE covariance matrix.
Uses conditional multivariate normal sampling: at each time step,
the current sample ``x_t`` is drawn from
``N(mu_cond, Sigma_cond)`` conditioned on the previous
``n_embeddings - 1`` samples.
Parameters
----------
tde_cov : np.ndarray
TDE covariance matrix. Shape: (CE, CE).
n_samples : int
Number of time points to generate.
Returns
-------
data : np.ndarray
Generated time series. Shape: (n_samples, n_channels).
"""
n_embeddings = self.n_embeddings
n_channels = self.n_channels
rho = self.rho
# Reorder from channel-major (blocks of ExE) to embedding-major
tde_cov = tde_cov.reshape(n_channels, n_embeddings, n_channels, n_embeddings)
tde_cov = np.transpose(tde_cov, [1, 0, 3, 2])
tde_cov = tde_cov.reshape(n_embeddings * n_channels, n_embeddings * n_channels)
# Partition covariance for conditional distribution
# See "Conditional Distributions":
# https://en.wikipedia.org/wiki/Multivariate_normal_distribution
Sig22 = tde_cov[:-n_channels, :-n_channels]
Sig11 = tde_cov[-n_channels:, -n_channels:]
Sig12 = tde_cov[-n_channels:, :-n_channels]
# Initial condition
x_2 = np.random.multivariate_normal(np.zeros(tde_cov.shape[0]), tde_cov, size=1)
x_2 = x_2[:, :-n_channels].T # (C*(E-1), 1)
# Precompute projection and conditional covariance
invSig22 = np.linalg.pinv(Sig22 + np.eye(Sig22.shape[0]) * rho)
Sig_cond = (Sig11 - Sig12 @ invSig22 @ Sig12.T) + np.eye(n_channels) * 0.001
proj = Sig12 @ invSig22
# Generate data autoregressively
data = np.zeros((n_samples, n_channels))
for t in range(n_samples):
mu = proj @ x_2
x_1 = np.random.multivariate_normal(mu.flatten(), Sig_cond)
data[t] = x_1
x_2 = np.concatenate([x_2[n_channels:], x_1[:, np.newaxis]], axis=0)
# Standardise
data = (data - np.mean(data, axis=0)) / np.std(data, axis=0)
return data
[docs]
def simulate_data(
self,
state_time_course: np.ndarray,
) -> np.ndarray:
"""Simulate time series data.
For each mode, generates a full-length time series from the mode's
TDE covariance, then masks it by the state time course.
Parameters
----------
state_time_course : np.ndarray
One-hot encoded state time course.
Shape: (n_samples, n_modes).
Returns
-------
data : np.ndarray
Simulated data. Shape: (n_samples, n_channels).
"""
n_samples = state_time_course.shape[0]
data = np.zeros((n_samples, self.n_channels))
for i in range(self.n_modes):
activity = self._gen_data_from_tde_cov(self.true_tde_covs[i], n_samples)
data += state_time_course[:, i : i + 1] * activity
return data