"""Functions to perform spectral analysis."""
import logging
from typing import List, Optional, Tuple, Union
import numpy as np
from scipy import signal
from scipy.signal._spectral_py import _median_bias
from sklearn.decomposition import non_negative_factorization
from pqdm.threads import pqdm
from threadpoolctl import threadpool_limits
from tqdm.auto import trange
from osl_dynamics.utils import array_ops
from osl_dynamics.utils.sklearn_wrappers import linear_regression
from osl_dynamics.utils.misc import nextpow2
_logger = logging.getLogger("osl-dynamics")
[docs]
def wavelet(
data: np.ndarray,
sampling_frequency: float,
w: float = 5,
standardize: bool = True,
time_range: Optional[List] = None,
frequency_range: Optional[List] = None,
df: float = 0.5,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate a wavelet transform.
This function uses a Morlet2 window to calculate a wavelet transform.
Parameters
----------
data : np.ndarray
1D time series data. Shape must be (n_samples,).
sampling_frequency : float
Sampling frequency in Hz.
w : float, optional
Number of oscillations in the Gaussian envelope.
standardize : bool, optional
Should we standardize the data before calculating the wavelet?
time_range : list, optional
Start time and end time to plot in seconds.
Default is the full time axis of the data.
frequency_range : list of length 2, optional
Start and end frequency to plot in Hz.
Default is :code:`[1, sampling_frequency / 2]`.
df : float, optional
Frequency resolution in Hz.
Returns
-------
t : np.ndarray
1D numpy array for the time axis.
f : np.ndarray
1D numpy array for the frequency axis.
wt : np.ndarray
2D numpy array (frequency, time) containing the wavelet transform.
"""
# Validation
if np.array(data).ndim != 1:
raise ValueError("data must be a 1D numpy array.")
if time_range is None:
time_range = [0, data.shape[0] / sampling_frequency]
if time_range[0] is None:
time_range[0] = 0
if time_range[1] is None:
time_range[1] = data.shape[0] / sampling_frequency
if frequency_range is None:
frequency_range = [1, sampling_frequency / 2]
if frequency_range[0] is None:
frequency_range[0] = 1
if frequency_range[1] is None:
frequency_range[1] = sampling_frequency / 2
# Standardize the data
if standardize:
data = (data - np.mean(data)) / np.std(data)
# Keep selected time points
start_index = int(time_range[0] * sampling_frequency)
end_index = int(time_range[1] * sampling_frequency)
data = data[start_index:end_index]
# Time axis (s)
dt = 1 / sampling_frequency
t = np.arange(time_range[0], time_range[1], dt)
# Frequency axis (Hz)
f = np.arange(frequency_range[0], frequency_range[1], df)
# Calculate the width for each Morlet window based on the frequency
widths = w * sampling_frequency / (2 * f * np.pi)
def _morlet2(M, s):
x = np.arange(0, M) - (M - 1.0) / 2
x = x / s
sinusoidal = np.exp(1j * w * x)
gaussian = np.exp(-0.5 * x**2)
normalization = np.pi ** (-0.25) * np.sqrt(1 / s)
return sinusoidal * gaussian * normalization
def _cwt(data, widths):
output = np.empty((len(widths), len(data)), dtype=np.complex64)
for i, width in enumerate(widths):
M = np.min([10 * width, len(data)])
wavelet_data = np.conj(_morlet2(M, width)[::-1])
output[i] = np.convolve(data, wavelet_data, mode="same")
return output
# Calculate wavelet transform
wt = _cwt(data, widths)
wt = abs(wt)
return t, f, wt
[docs]
def welch_spectra(
data: Union[np.ndarray, list],
sampling_frequency: float,
alpha: Optional[Union[np.ndarray, list]] = None,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
frequency_range: Optional[List] = None,
standardize: bool = True,
averaging: str = "mean",
calc_cpsd: bool = False,
calc_coh: bool = True,
return_weights: bool = False,
keepdims: bool = False,
n_jobs: int = 1,
):
"""Calculates spectra for inferred states using Welch's method.
The scaling for the power spectra calculated by this function
matches SciPy (:code:`scipy.signal.welch`).
Parameters
----------
data : np.ndarray or list
Time series data. Must have shape (n_sessions, n_samples,
n_channels) or (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
alpha : np.ndarray or list, optional
Inferred state probability time course. Must have shape
(n_sessions, n_samples, n_states) or (n_samples, n_states).
window_length : int, optional
Length of the data segment to use to calculate spectra.
If None, we use :code:`2 * sampling_frequency`.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length // 2` (50% overlap).
frequency_range : list, optional
Minimum and maximum frequency to keep.
standardize : bool, optional
Should we standardize the data before calculating the spectra?
averaging : str, optional
Method used to average periodograms.
Must be :code:`'mean'` or :code:`'median'`.
calc_cpsd : bool, optional
Should we return the cross spectra for :code:`psd`?
If True, we force :code:`calc_coh` to False.
calc_coh : bool, optional
Should we also return the coherence spectra?
return_weights : bool, optional
Should we return the weights for session-specific spectra?
This is useful for calculating a group average.
keepdims : bool, optional
Should we enforce a (n_sessions, n_states, ...) array is returned
for :code:`psd` and :code:`coh`? If :code:`False`, we remove any
dimension of length 1.
n_jobs : int, optional
Number of parallel jobs.
Returns
-------
f : np.ndarray
Frequencies of the power spectra and coherences. Shape is (n_freq,).
psd : np.ndarray
Power spectra for each session and state. Shape is (n_sessions,
n_states, n_channels, n_freq) if :code:`calc_cpsd=False`, otherwise
the shape is (n_sessions, n_states, n_channels, n_channels, n_freq).
Any axis of length 1 is removed if :code:`keepdims=False`.
coh : np.ndarray
Coherences for each session and state. Shape is (n_sessions, n_states,
n_channels, n_channels, n_freq). Any axis of length 1 is removed if
:code:`keepdims=False`. Only returned is :code:`calc_coh=True`.
w : np.ndarray
Weighting for session-specific spectra. Only returned if
:code:`return_weights=True`. Shape is (n_sessions,).
"""
# Validation
if alpha is not None:
if (isinstance(data, list) != isinstance(alpha, list)) or (
isinstance(data, np.ndarray) != isinstance(alpha, np.ndarray)
):
raise TypeError(
f"data is type {type(data)} and alpha is type "
f"{type(alpha)}. They must both be lists or numpy arrays."
)
if isinstance(data, list):
# Check data and state time course for the same number of arrays
# has been passed
if len(data) != len(alpha):
raise ValueError(
"A different number of arrays has been passed for "
f"data and alpha: len(data)={len(data)}, "
f"len(alpha)={len(alpha)}."
)
# Check the number of samples in data and alpha
for i in range(len(alpha)):
if alpha[i].shape[0] != data[i].shape[0]:
raise ValueError(
"items in data and alpha must have the number of samples."
)
elif isinstance(data, list):
alpha = np.array([None] * len(data))
if isinstance(data, np.ndarray):
if alpha is not None and alpha.shape[0] != data.shape[0]:
raise ValueError("data and alpha must have the number of samples.")
if data.ndim == 2:
data = [data]
alpha = [alpha]
if window_length is None:
window_length = 2 * sampling_frequency
window_length = int(window_length)
if step_size is None:
step_size = window_length // 2
step_size = int(step_size)
if frequency_range is None:
frequency_range = [0, sampling_frequency / 2]
if calc_cpsd:
calc_coh = False
# Create keyword arguments to pass to the main function
# that calculates spectra
kwargs = []
for d, a in zip(data, alpha):
kwargs.append(
{
"data": d,
"sampling_frequency": sampling_frequency,
"alpha": a,
"window_length": window_length,
"step_size": step_size,
"frequency_range": frequency_range,
"standardize": standardize,
"averaging": averaging,
"calc_cpsd": calc_cpsd,
"calc_coh": calc_coh,
"keepdims": True,
}
)
# Main calculation of spectra
if len(data) == 1:
# We only have one array so we don't need to parallelise
# the calculation
_logger.info("Calculating spectra")
results = [_welch(**kwargs[0])]
elif n_jobs == 1:
# We have multiple arrays but we're running in serial
results = []
for i in trange(len(data), desc="Calculating spectra"):
results.append(_welch(**kwargs[i]))
else:
# Calculate spectra in parallel
_logger.info("Calculating spectra")
with threadpool_limits(limits=1):
results = pqdm(kwargs, _welch, n_jobs=n_jobs, argument_type="kwargs")
# Unpack the results
if calc_coh:
psd = []
coh = []
for result in results:
f, p, c = result
psd.append(p)
coh.append(c)
else:
psd = []
for result in results:
f, p = result
psd.append(p)
if not keepdims:
# Remove any axes that are of length 1
psd = np.squeeze(psd)
if calc_coh:
coh = np.squeeze(coh)
# Weights for calculating a group-average spectrum
n_samples = [d.shape[0] for d in data]
w = np.array(n_samples) / np.sum(n_samples)
if calc_coh:
if return_weights:
return f, psd, coh, w
else:
return f, psd, coh
else:
if return_weights:
return f, psd, w
else:
return f, psd
[docs]
def multitaper_spectra(
data: Union[np.ndarray, list],
sampling_frequency: float,
alpha: Optional[Union[np.ndarray, list]] = None,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
time_half_bandwidth: float = 4,
n_tapers: int = 7,
frequency_range: Optional[List] = None,
standardize: bool = True,
averaging: str = "mean",
calc_cpsd: bool = False,
calc_coh: bool = True,
return_weights: bool = False,
keepdims: bool = False,
n_jobs: int = 1,
):
"""Calculates spectra for inferred states using a multitaper.
Parameters
----------
data : np.ndarray or list
Time series data. Must have shape (n_sessions, n_samples,
n_channels) or (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
alpha : np.ndarray or list, optional
Inferred state probability time course. Must have shape
(n_sessions, n_samples, n_states) or (n_samples, n_states).
window_length : int, optional
Length of the data segment to use to calculate spectra.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length` (no overlap).
time_half_bandwidth : float, optional
Parameter to control the resolution of the spectra.
n_tapers : int, optional
Number of tapers.
frequency_range : list, optional
Minimum and maximum frequency to keep.
standardize : bool, optional
Should we standardize the data before calculating the spectra?
averaging : str, optional
Method used to average periodograms.
Must be :code:`'mean'` or :code:`'median'`.
calc_cpsd : bool, optional
Should we return the cross spectra for :code:`psd`?
If True, we force :code:`calc_coh` to False.
calc_coh : bool, optional
Should we also return the coherence spectra?
return_weights : bool, optional
Should we return the weights for session-specific spectra?
This is useful for calculating a group average.
keepdims : bool, optional
Should we enforce a (n_sessions, n_states, ...) array is returned
for :code:`psd` and :code:`coh`? If :code:`False`, we remove any
dimension of length 1.
n_jobs : int, optional
Number of parallel jobs.
Returns
-------
f : np.ndarray
Frequencies of the power spectra and coherences. Shape is (n_freq,).
psd : np.ndarray
Power spectra for each session and state. Shape is (n_sessions,
n_states, n_channels, n_freq) if :code:`calc_cpsd=False`, otherwise
the shape is (n_sessions, n_states, n_channels, n_channels, n_freq).
Any axis of length 1 is removed if :code:`keepdims=False`.
coh : np.ndarray
Coherences for each session and state. Shape is (n_sessions, n_states,
n_channels, n_channels, n_freq). Any axis of length 1 is removed if
:code:`keepdims=False`. Only returned is :code:`calc_coh=True`.
w : np.ndarray
Weighting for session-specific spectra. Only returned if
:code:`return_weights=True`. Shape is (n_sessions,).
"""
# Validation
if alpha is not None:
if (isinstance(data, list) != isinstance(alpha, list)) or (
isinstance(data, np.ndarray) != isinstance(alpha, np.ndarray)
):
raise TypeError(
f"data is type {type(data)} and alpha is type "
f"{type(alpha)}. They must both be lists or numpy arrays."
)
if isinstance(data, list):
# Check data and state time course for the same number of arrays
# has been passed
if len(data) != len(alpha):
raise ValueError(
"A different number of arrays has been passed for "
f"data and alpha: len(data)={len(data)}, "
f"len(alpha)={len(alpha)}."
)
# Check the number of samples in data and alpha
for i in range(len(alpha)):
if alpha[i].shape[0] != data[i].shape[0]:
raise ValueError(
"items in data and alpha must have the same number of samples."
)
elif isinstance(data, list):
alpha = np.array([None] * len(data))
if isinstance(data, np.ndarray):
if alpha is not None and alpha.shape[0] != data.shape[0]:
raise ValueError("data and alpha must have the number of samples.")
if data.ndim == 2:
data = [data]
alpha = [alpha]
if window_length is None:
window_length = 2 * sampling_frequency
window_length = int(window_length)
if step_size is None:
step_size = window_length
step_size = int(step_size)
if frequency_range is None:
frequency_range = [0, sampling_frequency / 2]
if calc_cpsd:
calc_coh = False
# Create keyword arguments to pass to the main function
# that calculates spectra
kwargs = []
for d, a in zip(data, alpha):
kwargs.append(
{
"data": d,
"sampling_frequency": sampling_frequency,
"alpha": a,
"window_length": window_length,
"step_size": step_size,
"time_half_bandwidth": time_half_bandwidth,
"n_tapers": n_tapers,
"frequency_range": frequency_range,
"standardize": standardize,
"averaging": averaging,
"calc_cpsd": calc_cpsd,
"calc_coh": calc_coh,
"keepdims": True,
}
)
# Main calculation of spectra
if len(data) == 1:
# We only have one array so we don't need to parallelise
# the calculation
_logger.info("Calculating spectra")
results = [_multitaper(**kwargs[0])]
elif n_jobs == 1:
# We have multiple arrays but we're running in serial
results = []
for i in trange(len(data), desc="Calculating spectra"):
results.append(_multitaper(**kwargs[i]))
else:
# Calculate spectra in parallel
_logger.info("Calculating spectra")
with threadpool_limits(limits=1):
results = pqdm(
kwargs,
_multitaper,
n_jobs=n_jobs,
argument_type="kwargs",
)
# Unpack the results
if calc_coh:
psd = []
coh = []
for result in results:
f, p, c = result
psd.append(p)
coh.append(c)
else:
psd = []
for result in results:
f, p = result
psd.append(p)
if not keepdims:
# Remove any axes that are of length 1
psd = np.squeeze(psd)
if calc_coh:
coh = np.squeeze(coh)
# Weights for calculating a group-average spectrum
n_samples = [d.shape[0] for d in data]
w = np.array(n_samples) / np.sum(n_samples)
if calc_coh:
if return_weights:
return f, psd, coh, w
else:
return f, psd, coh
else:
if return_weights:
return f, psd, w
else:
return f, psd
[docs]
def regression_spectra(
data: Union[np.ndarray, list],
alpha: Union[np.ndarray, list],
sampling_frequency: float,
window_length: int,
step_size: int,
n_sub_windows: int = 1,
frequency_range: Optional[List] = None,
standardize: bool = True,
calc_coh: bool = True,
return_weights: bool = False,
return_coef_int: bool = False,
keepdims: bool = False,
n_jobs: int = 1,
):
"""Calculates mode-specific spectra by regressing a spectrogram with alpha.
We use :func:`osl_dynamics.analysis.spectral._welch_spectrogram`
to calculate the spectrogram.
Parameters
----------
data : np.ndarray or list
Data to calculate a spectrogram for. Shape must be
(n_sessions, n_samples, n_channels) or (n_samples, n_channels).
alpha : np.ndarray or list
Inferred mode mixing factors. Shape must be
(n_sessions, n_samples, n_modes) or (n_samples, n_modes).
sampling_frequency : float
Sampling_frequency in Hz.
window_length : int
Number samples to use in the window to calculate a spectrum.
step_size : int
Step size for shifting the window.
n_sub_windows : int, optional
We split the window into a number of sub-windows and average the
spectra for each sub-window. window_length must be divisible by
:code:`n_sub_windows`.
frequency_range : list, optional
Minimum and maximum frequency to keep.
standardize : bool, optional
Should we standardize the data before calculating the spectrogram?
calc_coh : bool, optional
Should we also return the coherence spectra?
return_weights : bool, optional
Should we return the weights for session-specific spectra?
Useful for calculating the group average spectra.
return_coef_int : bool, optional
Should we return the regression coefficients and intercept
separately for the mode PSDs?
keepdims : bool, optional
Should we enforce a (n_sessions, n_states, ...) array is returned for
:code:`psd` and :code:`coh`? If :code:`False`, we remove any dimension
of length 1.
n_jobs : int, optional
Number of parallel jobs.
Returns
-------
f : np.ndarray
Frequency axis. Shape is (n_freq).
psd : np.ndarray
Mode PSDs. A numpy array with shape (n_sessions, 2, n_modes, n_channels,
n_freq) where :code:`axis=1` is the coefficients/intercept if
:code:`return_coef_int=True`, otherwise shape is (n_sessions, n_modes,
n_channels, n_freq). Any axis of length 1 is removed if
:code:`keepdims=False`.
coh : np.ndarray
Mode coherences. Shape is (n_sessions, n_modes, n_channels, n_channels,
n_freq). Any axis of length 1 is removed if :code:`keepdims=False`.
w : np.ndarray
Weight for each session-specific PSD. Shape is (n_sessions,).
Only returned if :code:`return_weights=True`.
"""
# Validation
if (isinstance(data, list) != isinstance(alpha, list)) or (
isinstance(data, np.ndarray) != isinstance(alpha, np.ndarray)
):
raise TypeError(
f"data is type {type(data)} and alpha is type "
f"{type(alpha)}. They must both be lists or numpy arrays."
)
if isinstance(data, list):
# Check data and state time course for the same number of arrays
# has been passed
if len(data) != len(alpha):
raise ValueError(
"A different number of arrays has been passed for "
f"data and alpha: len(data)={len(data)}, "
f"len(alpha)={len(alpha)}."
)
# Check the number of samples in data and alpha
for i in range(len(alpha)):
if alpha[i].shape[0] != data[i].shape[0]:
raise ValueError(
"items in data and alpha must have the number of samples."
)
if isinstance(data, np.ndarray):
if alpha.shape[0] != data.shape[0]:
raise ValueError("data and alpha must have the number of samples.")
if data.ndim == 2:
data = [data]
alpha = [alpha]
# Ensure data and alpha are float32
data = [d.astype(np.float32) for d in data]
alpha = [a.astype(np.float32) for a in alpha]
if frequency_range is None:
frequency_range = [0, sampling_frequency / 2]
if window_length % n_sub_windows != 0:
raise ValueError("window_length must be divisible by n_sub_windows.")
# Standardise before calculating the spectrogram
if standardize:
data = [(d - np.mean(d, axis=0)) / np.std(d, axis=0) for d in data]
# Create keyword arguments to pass to the main function
# that calculates spectra
kwargs = []
for d, a in zip(data, alpha):
kwargs.append(
{
"data": d,
"alpha": a,
"sampling_frequency": sampling_frequency,
"window_length": window_length,
"step_size": step_size,
"frequency_range": frequency_range,
"calc_cpsd": calc_coh,
"n_sub_windows": n_sub_windows,
}
)
# Main calculation of spectra
if len(data) == 1:
# We only have one array so we don't need to parallelise the
# calculation
_logger.info("Calculating spectra")
results = [_regress_welch_spectrogram(**kwargs[0])]
elif n_jobs == 1:
# We have multiple arrays but we're running in serial
results = []
for i, kws in enumerate(kwargs):
_logger.info(f"Calculating spectra {i}")
results.append(_regress_welch_spectrogram(**kws))
else:
# Calculate a time-varying PSD and regress to get the mode PSDs
_logger.info("Calculating spectra")
with threadpool_limits(limits=1):
results = pqdm(
kwargs,
_regress_welch_spectrogram,
n_jobs=n_jobs,
argument_type="kwargs",
)
# Unpack results
Pj = []
for result in results:
f, coefs, intercept = result
Pj.append([coefs, [intercept] * coefs.shape[0]])
Pj = np.array(Pj)
# Weights for calculating a group-average spectrum
n_samples = [d.shape[0] for d in data]
w = np.array(n_samples) / np.sum(n_samples)
if not calc_coh:
if not return_coef_int:
# Sum coefficients and intercept
Pj = np.sum(Pj, axis=1)
if return_weights:
return f, np.squeeze(Pj), w
else:
return f, np.squeeze(Pj)
# Number of channels and frequency bins
n_channels = data[0].shape[1]
n_modes = alpha[0].shape[1]
n_freq = Pj.shape[-1]
# Indices of the upper triangle of an n_channels by n_channels array
m, n = np.triu_indices(n_channels)
# Number of arrays
n_sessions = len(data)
# Create a n_channels by n_channels array
P = np.empty(
[n_sessions, 2, n_modes, n_channels, n_channels, n_freq],
dtype=np.complex64,
)
for i in range(n_sessions):
for j in range(2): # j=0 is the coefficients and j=1 is the intercepts
P[i, j][:, m, n] = Pj[i, j]
P[i, j][:, n, m] = np.conj(Pj[i, j])
# PSDs and coherences for each mode
psd = np.empty(
[n_sessions, 2, n_modes, n_channels, n_freq],
dtype=np.float32,
)
coh = np.empty(
[n_sessions, n_modes, n_channels, n_channels, n_freq],
dtype=np.float32,
)
for i in range(n_sessions):
# PSDs
p = P[i, :, :, range(n_channels), range(n_channels)].real
psd[i] = np.rollaxis(p, axis=0, start=3)
# Calculate coherence
for k in range(n_channels):
for l in range(n_channels):
Pxy = P[i, 0, :, k, l] # regression coefficients
Pxx = P[i, 1, :, k, k].real # regression intercept
Pyy = P[i, 1, :, l, l].real # regression intercept
coh[i, :, k, l] = np.abs(Pxy) / np.sqrt(Pxx * Pyy)
if not return_coef_int:
# Sum coefficients and intercept
psd = np.sum(psd, axis=1)
if not keepdims:
# Remove any axes that are of length 1
psd = np.squeeze(psd)
coh = np.squeeze(coh)
if return_weights:
return f, psd, coh, w
else:
return f, psd, coh
[docs]
def rescale_regression_coefs(
psd: np.ndarray,
alpha: Union[np.ndarray, list],
window_length: int,
step_size: int,
n_sub_windows: int = 1,
) -> np.ndarray:
"""Rescales regression coefficients with maximum regressor values.
Parameters
----------
psd : np.ndarray
Mode PSDs. Shape must be (n_sessions, 2, n_modes, n_channels, n_freq).
alpha : np.ndarray or list
Inferred mode mixing coefficients. Shape must be (n_sessions, n_samples,
n_modes) or (n_samples, n_modes).
window_length : int
Number samples used as the window to calculate a PSD.
step_size : int
Step size used for shifting the window.
n_sub_windows : int, optional
Number of sub-windows used to average the spectra. :code:`window_length`
must be divisible by :code:`n_sub_windows`.
Returns
-------
psd : np.ndarray
Mode PSDs rescaled with maximum regressor values.
Shape is identical to the input.
"""
# Validation
if psd.ndim == 4:
psd = psd[np.newaxis, ...]
if psd.ndim != 5:
raise ValueError(
"psd must have shape (n_sessions, 2, n_modes, n_channels, n_freqs)."
)
if isinstance(alpha, np.ndarray):
alpha = [alpha]
if len(psd) != len(alpha):
raise ValueError(
"A different number of arrays has been passed for "
f"psd and alpha: len(psd)={len(psd)}, "
f"len(alpha)={len(alpha)}."
)
for i in range(len(alpha)):
if alpha[i].shape[1] != psd[i].shape[1]:
raise ValueError("psd and alpha must have same number of modes.")
# Prevent in-place operation
psd = psd.copy()
# Number of arrays
n_sessions = len(alpha)
# Window alphas to match the windowing of STFT
for i, a in enumerate(alpha):
alpha[i] = _window_mean(
a,
"hann",
window_length,
step_size=step_size,
n_sub_windows=n_sub_windows,
)
# Normalize the alphas
alpha = [(a - np.mean(a, axis=0)) / np.std(a, axis=0) for a in alpha]
# Get maximum regressor values
max_alpha = [np.max(a, axis=0) for a in alpha]
# Rescale the regression coefficients
for n in range(n_sessions):
psd[n, 0] *= np.expand_dims(max_alpha[n], axis=(1, 2))
return psd
[docs]
def coherence_spectra(cpsd: np.ndarray, keepdims: bool = False) -> np.ndarray:
r"""Calculate coherence from cross power spectral densities.
We calculate the coherence as:
.. math::
C_{xy}(f) = \frac{|P_{xy}(f)|}{\sqrt{P_{xx}(f) P_{yy}(f)}}
where :math:`x,y` are different channels.
Parameters
----------
cpsd : np.ndarray
Cross power spectra.
Shape is (..., n_channels, n_channels, n_freq).
keepdims: bool, optional
Should we squeeze any axis of length 1?
Returns
-------
coh : np.ndarray
Coherence spectra.
Shape is (..., n_channels, n_channels, n_freq).
"""
n_channels = cpsd.shape[-2]
# Calculate coherency spectrum
coh = np.empty_like(cpsd, dtype=np.float32)
for j in range(n_channels):
for k in range(n_channels):
Pxy = cpsd[..., j, k, :]
Pxx = cpsd[..., j, j, :].real
Pyy = cpsd[..., k, k, :].real
coh[..., j, k, :] = abs(Pxy) / np.sqrt(Pxx * Pyy)
# Zero nan values
coh = np.nan_to_num(coh)
if not keepdims:
coh = np.squeeze(coh)
return coh
[docs]
def partial_coherence_spectra(cpsd: np.ndarray, keepdims: bool = False) -> np.ndarray:
r"""Calculate partial coherence from cross power spectral densities.
We calculate the partial coherence as:
.. math::
PC_{xy}(f) = \
\frac{|P^{-1}_{xy}(f)|}{\sqrt{P^{-1}_{xx}(f) P^{-1}_{yy}(f)}}
where :math:`x,y` are different channels and :math:`P^{-1}(f)`
is the inverse of the cross spectral density matrix.
Parameters
----------
cpsd : np.ndarray
Cross power spectra.
Shape is (..., n_channels, n_channels, n_freq).
keepdims: bool, optional
Should we squeeze any axis of length 1?
Returns
-------
pcoh : np.ndarray
Partial coherence spectra.
Shape is (..., n_channels, n_channels, n_freq).
"""
# Calculate inverse cross spectra
# (Need to move axis for compatibility with np.linalg.pinv)
cpsd = np.moveaxis(cpsd, -1, 0)
icpsd = np.linalg.inv(cpsd)
icpsd = np.moveaxis(icpsd, 0, -1)
# The partial coherence calculation is the same as the
# coherence calculation except we input the inverse cross
# spectral densities
return coherence_spectra(icpsd, keepdims=keepdims)
[docs]
def partial_directed_coherence_spectra(
f: np.ndarray,
cpsd: np.ndarray,
sampling_frequency: float,
n_iterations: int = 50,
tol: float = 1e-7,
keepdims: bool = False,
) -> np.ndarray:
r"""Calculate partial directed coherence from cross power spectra.
We calculate the partial directed coherence as:
.. math::
PDC_{ij}(f) = \frac{|A_{ij}(f)|}{\sqrt{\sum_m |A_{mj}(f)|^2}}
where :math:`i,j` are different channels and :math:`A(f) = H^{-1}(f)`
is the inverse of the transfer function, which is calculated by
factorizing the cross spectral density matrix.
By default this function will use all cores available.
Parameters
----------
f : np.ndarray
Frequency axis. Shape is (n_freq,).
cpsd : np.ndarray
Cross power spectra.
Shape is (..., n_channels, n_channels, n_freq).
sampling_frequency : float
Sampling frequency (Hz).
n_iterations : int, optional
Number of iterations in the Wilson factorization of the cross
spectra.
tol : float, optional
Tolerance for the Wilson factorization of the cross spectra.
keepdims: bool, optional
Should we squeeze any axis of length 1?
Returns
-------
pdc : np.ndarray
Partial directed coherence spectra.
Shape is (..., n_channels, n_channels, n_freq).
"""
def _plus_operator(g):
# Helper function for _wilson_factorization
N = g.shape[0] // 2 + 1
gam = np.fft.ifft(g, axis=0)
beta0 = 0.5 * gam[0]
gamp = gam.copy()
gamp[0] = np.triu(beta0)
gamp[N:] = 0
return np.fft.fft(gamp, axis=0)
def _wilson_factorization(
S, freq=f, fs=sampling_frequency, Niterations=n_iterations, tol=tol
):
# Algorithm for the Wilson Factorization of the spectral matrix. From:
# https://github.com/ViniciusLima94/pyGC/blob/master/pygc/non_parametric.py
#
# Args:
# - Cross spectra: S (frequencies, channels, channels)
#
# Returns:
# - Transfer function: H (frequencies, channels, channels)
m = S.shape[-1]
N = freq.shape[0] - 1
Sarr = np.zeros([2 * N, m, m], dtype=complex)
Sarr[:N] = S[:N]
Sarr[N:] = S[1:].transpose(0, 2, 1)[::-1]
gam = np.fft.ifft(Sarr, axis=0).real
h = np.linalg.cholesky(gam[0]).T
psi = np.tile(h[np.newaxis, ...], (2 * N, 1, 1)).astype(complex)
I = np.eye(m)[np.newaxis, ...]
for _ in range(Niterations):
inv_psi = np.linalg.inv(psi)
g = inv_psi @ Sarr @ np.conj(inv_psi).transpose(0, 2, 1) + I
gp = _plus_operator(g)
psiold = psi.copy()
psi = psi @ gp
psierr = np.mean(np.linalg.norm(psi - psiold, 1, axis=1)) / (2 * N)
if psierr < tol:
break
A = np.fft.ifft(psi, axis=0).real
inv_A0 = np.linalg.inv(A[0])
return psi[: N + 1] @ inv_A0
def _calc_pdc(cpsd):
# Calculate partial directed coherence from a single cross spectrum.
#
# Args:
# - cpsd (n_channels, n_channels, n_freq) array
#
# Returns:
# - pdc (n_channels, n_channels, n_freq) array
S = np.moveaxis(cpsd, -1, 0)
H = _wilson_factorization(S)
A = np.linalg.inv(H)
pdc = np.abs(A) / np.sqrt(np.sum(np.abs(A) ** 2, axis=1, keepdims=True))
return np.moveaxis(pdc, 0, -1).astype(np.float32)
# _calc_pdc works on 3D data, we flatten the input here
original_shape = cpsd.shape
flattened_cpsd = cpsd.reshape((-1, *original_shape[-3:]))
# Create keyword arguments to pass to _calc_pdc
kwargs = [{"cpsd": S} for S in flattened_cpsd]
# Main calculation
if len(kwargs) == 1:
_logger.info("Calculating PDC")
results = [_calc_pdc(**kwargs[0])]
else:
results = []
for i in trange(len(kwargs), desc="Calculating PDC"):
results.append(_calc_pdc(**kwargs[i]))
# Put the results back into the original shape
pdc = np.array(results).reshape(original_shape)
if not keepdims:
# Remove any axes that are of length 1
pdc = np.squeeze(pdc)
return pdc
[docs]
def decompose_spectra(
coherences: np.ndarray,
n_components: int,
max_iter: int = 50000,
random_state: Optional[int] = None,
verbose: int = 0,
) -> np.ndarray:
"""Uses non-negative matrix factorization to decompose spectra.
Parameters
----------
coherences : np.ndarray
Coherences spectra.
Shape must be (..., n_channels, n_channels, n_freq,).
n_components : int
Number of spectral components to fit.
max_iter : int, optional
Maximum number of iterations in
`sklearn.decomposition.non_negative_factorization
<https://scikit-learn.org/stable/modules/generated/sklearn\
.decomposition.non_negative_factorization.html>`_.
random_state : int, optional
Seed for the random number generator.
verbose : int, optional
Show verbose? (:code:`1`) yes, (:code:`0`) no.
Returns
-------
components : np.ndarray
Spectral components. Shape is (n_components, n_freq).
"""
_logger.info("Performing spectral decomposition")
# Validation
error_message = (
"coherences must be a numpy array with shape "
"(n_channels, n_channels, n_freq), "
"(n_modes, n_channels, n_channels, n_freq) or "
"(n_sessions, n_modes, n_channels, n_channels, n_freq)."
)
coherences = array_ops.validate(
coherences,
correct_dimensionality=5,
allow_dimensions=[3, 4],
error_message=error_message,
)
# Number of arrays, modes, channels and frequency bins
n_sessions, n_modes, n_channels, n_channels, n_freq = coherences.shape
# Indices of the upper triangle of the
# [n_channels, n_channels, n_freq] sub-array
i, j = np.triu_indices(n_channels, 1)
# Concatenate coherences for each array and mode
# and only keep the upper triangle
coherences = coherences[:, :, i, j].reshape(-1, n_freq)
# Perform non-negative matrix factorisation
_, components, _ = non_negative_factorization(
coherences,
n_components=n_components,
init=None,
max_iter=max_iter,
random_state=random_state,
verbose=verbose,
)
# Order the weights and components in ascending frequency
order = np.argsort(components.argmax(axis=1))
components = components[order]
return components
[docs]
def mar_spectra(
coeffs: np.ndarray, covs: np.ndarray, sampling_frequency: float, n_freq: int = 512
) -> Tuple[np.ndarray, np.ndarray]:
"""Calculates cross power spectral densities from MAR model parameters.
Parameters
----------
coeffs : np.ndarray
MAR coefficients. Shape must be (n_modes, n_channels, n_channels) or
(n_channels, n_channels) or (n_channels,).
covs : np.ndarray
MAR covariances. Shape must be (n_modes, n_channels, n_channels) or
(n_channels, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
n_freq : int, optional
Number of frequency bins in the cross power spectral density to
calculate.
Returns
-------
f : np.ndarray
Frequency axis. Shape is (n_freq,).
P : np.ndarray
Cross power spectral densities. Shape is (n_freq, n_modes, n_channels,
n_channels) or (n_freq, n_channels, n_channels).
"""
# Validation
if covs.shape[-1] != covs.shape[-2]:
if covs.ndim == 2:
covs = [np.diag(c) for c in covs]
else:
covs = np.diag(covs)
error_message = (
"covs must be a numpy array with shape "
"(n_modes, n_channels, n_channels), "
"(n_channels, n_channels) or (n_channels,)."
)
covs = array_ops.validate(
covs,
correct_dimensionality=3,
allow_dimensions=[2],
error_message=error_message,
)
error_message = (
"coeffs must be a numpy array with shape "
"(n_modes, n_lags, n_channels, n_channels), "
"(n_lags, n_channels, n_channels)."
)
coeffs = array_ops.validate(
coeffs,
correct_dimensionality=4,
allow_dimensions=[3],
error_message=error_message,
)
n_modes = coeffs.shape[0]
n_lags = coeffs.shape[1]
n_channels = coeffs.shape[-1]
# Frequencies to evaluate the PSD at
f = np.arange(0, sampling_frequency / 2, sampling_frequency / (2 * n_freq))
# z-score of the coefficients
A = np.zeros([n_freq, n_modes, n_channels, n_channels], dtype=np.complex64)
for i in range(n_freq):
for l in range(n_lags):
z = np.exp(-1j * (l + 1) * 2 * np.pi * f[i] / sampling_frequency)
A[i] += coeffs[:, l] * z
# Transfer function
I = np.identity(n_channels)[np.newaxis, np.newaxis, ...]
e = 1e-6 * I
H = np.linalg.inv(I - A + e)
# Cross PSDs
P = H @ covs[np.newaxis, ...] @ np.transpose(np.conj(H), axes=[0, 1, 3, 2])
return f, np.squeeze(P)
[docs]
def autocorr_to_spectra(
autocorr_func: np.ndarray,
sampling_frequency: float,
nfft: int = 64,
frequency_range: Optional[List] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculates spectra from the autocorrelation function.
The power spectrum of each mode is calculated as the Fourier transform of
the auto-correlation function. Coherences are calculated from the power
spectra.
Parameters
----------
autocorr_func : np.ndarray
Autocorrelation functions. Shape must be (n_channels, n_channels,
n_lags) or (n_modes, n_channels, n_channels, n_lags).
sampling_frequency : float
Sampling frequency in Hz.
nfft : int, optional
Number of data points in the FFT. The auto-correlation function will
only have :code:`2 * (n_embeddings + 2) - 1` data points. We pad the
auto-correlation function with zeros to have :code:`nfft` data points
if the number of data points in the auto-correlation function is less
than :code:`nfft`.
frequency_range : list, optional
Minimum and maximum frequency to keep (Hz).
Returns
-------
f : np.ndarray
Frequencies of the power spectra and coherences. Shape is (n_freq,).
psd : np.ndarray
Power (or cross) spectra calculated for each mode. Shape is
(n_channels, n_channels, n_freq) or (n_modes, n_channels, n_channels,
n_freq).
coh : np.ndarray
Coherences calculated for each mode. Shape is (n_channels, n_channels,
n_freq) or (n_modes, n_channels, n_channels, n_freq).
"""
# Validation
error_message = (
"autocorrelation_functions must be of shape (n_channels, n_channels, "
"n_lags) or (n_modes, n_channels, n_channels, n_lags)."
)
autocorr_func = array_ops.validate(
autocorr_func,
correct_dimensionality=4,
allow_dimensions=[3],
error_message=error_message,
)
if frequency_range is None:
frequency_range = [0, sampling_frequency / 2]
_logger.info("Calculating power spectra")
# Number of data points in the autocorrelation function and FFT
n_lags = autocorr_func.shape[-1]
nfft = max(nfft, 2 ** nextpow2(n_lags))
# Calculate the arguments to keep for the given frequency range
f = np.arange(nfft // 2 + 1) * sampling_frequency / nfft
[min_arg, max_arg] = get_frequency_args_range(f, frequency_range)
f = f[min_arg : max_arg + 1]
# Calculate cross power spectra as the Fourier transform of the
# auto/cross-correlation function
psd = np.fft.fft(autocorr_func, nfft)
psd = psd[..., min_arg : max_arg + 1]
psd = abs(psd)
# Normalise the power spectra
psd /= nfft**2
# Coherences for each mode
coh = coherence_spectra(psd, keepdims=True)
return f, np.squeeze(psd), np.squeeze(coh)
[docs]
def get_frequency_args_range(
frequencies: np.ndarray, frequency_range: List
) -> List[int]:
"""Get min/max indices of a range in a frequency axis.
Parameters
----------
frequencies : np.ndarray
1D numpy array containing a Frequency axis.
frequency_range : list of len 2
Minimum and maximum frequency.
Returns
-------
args_range : list of len 2
Indices for minimum and maximum frequency.
"""
f_min_arg = np.argwhere(frequencies >= frequency_range[0])[0, 0]
f_max_arg = np.argwhere(frequencies <= frequency_range[1])[-1, 0]
if f_max_arg <= f_min_arg:
raise ValueError("Cannot select requested frequency range.")
return [f_min_arg, f_max_arg]
def _welch_spectrogram(
data: np.ndarray,
sampling_frequency: float,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
frequency_range: Optional[List] = None,
calc_cpsd: bool = True,
n_sub_windows: int = 1,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculates a spectogram (time-varying power spectral density)
using Welch's method.
Steps:
1. Segment data into overlapping windows.
2. Multiply each window by a Hann tapering function.
3. Calculate a periodogram for each window.
We use the same scaling for the power spectra as SciPy (i.e. when
you use :code:`scipy.signal.periodogram(..., scaling="density")`).
Parameters
----------
data : np.ndarray
Data to calculate the spectrogram for.
Shape must be (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
window_length : int, optional
Number of data points to use when calculating the periodogram.
Defaults to :code:`2 * sampling_frequency`.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length // 2` (50% overlap).
frequency_range : list, optional
Minimum and maximum frequency to keep.
calc_cpsd : bool, optional
Should we calculate cross spectra?
n_sub_windows : int, optional
We split the window into a number of sub-windows and average the
spectra for each sub-window. window_length must be divisible by
:code:`n_sub_windows`.
Returns
-------
t : np.ndarray
Time axis.
f : np.ndarray
Frequency axis.
P : np.ndarray
Spectrogram. 3D numpy array with shape (time, channels *
(channels + 1) / 2, freq) if :code:`calc_cpsd=True`, otherwise
it is (time, channels, freq).
"""
n_samples, n_channels = data.shape
# Validation
if window_length is None:
window_length = 2 * sampling_frequency
window_length = int(window_length)
if step_size is None:
step_size = window_length // 2
step_size = int(step_size)
# First pad the data so we have enough data points to estimate the
# periodogram for time points at the start/end of the data
data = np.pad(data, window_length // 2)[
:, window_length // 2 : window_length // 2 + n_channels
]
# Window to apply to the data before calculating the Fourier transform
window = signal.get_window("hann", (window_length // n_sub_windows))
# Number of data points in the FFT
nfft = max(256, window_length // n_sub_windows)
# Time and frequency axis
t = np.arange(n_samples) / sampling_frequency
f = np.arange(nfft // 2 + 1) * sampling_frequency / nfft
# Only keep a particular frequency range
[min_arg, max_arg] = get_frequency_args_range(f, frequency_range)
f = f[min_arg : max_arg + 1]
# Number of frequency bins
n_freq = max_arg - min_arg + 1
# Indices of an upper triangle of an n_channels by n_channels array
m, n = np.triu_indices(n_channels)
# Indices of time points to calculate a periodogram for
time_indices = range(0, n_samples, step_size)
n_windows = n_samples // step_size
if calc_cpsd:
# Calculate cross periodograms for each segment of the data
P = np.empty(
[n_windows, n_channels * (n_channels + 1) // 2, n_freq],
dtype=np.complex64,
)
XY_sub_window = np.empty(
[n_sub_windows, n_channels * (n_channels + 1) // 2, n_freq],
dtype=np.complex64,
)
for i in range(n_windows):
# Data in the window
j = time_indices[i]
x_window = data[j : j + window_length].T
for k in range(n_sub_windows):
# Data in the sub-window with the windowing function applied
x_sub_window = (
x_window[
:,
k
* window_length
// n_sub_windows : (k + 1)
* window_length
// n_sub_windows,
]
* window[np.newaxis, ...]
)
# Calculate cross spectra for the sub-window
X = np.fft.fft(x_sub_window, nfft)
X = X[..., min_arg : max_arg + 1]
XY = np.conj(X)[np.newaxis, :, :] * X[:, np.newaxis, :]
XY_sub_window[k] = XY[m, n]
# Average the cross spectra for each sub-window
P[i] = np.mean(XY_sub_window, axis=0)
else:
# Calculate the periodogram for each segment of the data
P = np.empty([n_windows, n_channels, n_freq], dtype=np.float32)
XX_sub_window = np.empty(
[n_sub_windows, n_channels, n_freq],
dtype=np.float32,
)
for i in range(n_windows):
# Data in the window
j = time_indices[i]
x_window = data[j : j + window_length].T
for k in range(n_sub_windows):
# Data in the sub-window with the windowing function applied
x_sub_window = (
x_window[
:,
k
* window_length
// n_sub_windows : (k + 1)
* window_length
// n_sub_windows,
]
* window[np.newaxis, ...]
)
# Calculate PSD for the sub-window
X = np.fft.fft(x_sub_window, nfft)
X = X[..., min_arg : max_arg + 1]
XX_sub_window[k] = np.real(np.conj(X) * X)
# Average the cross spectra for each sub-window
P[i] = np.mean(XX_sub_window, axis=0)
# Scaling for the periodograms (we use the same scaling as SciPy)
P *= 2 / (sampling_frequency * np.sum(window**2))
return t, f, P
def _welch(
data: np.ndarray,
sampling_frequency: float,
alpha: Optional[np.ndarray] = None,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
frequency_range: Optional[List] = None,
standardize: bool = True,
averaging: str = "mean",
calc_cpsd: bool = False,
calc_coh: bool = False,
keepdims: bool = False,
):
"""Calculate a (cross) power spectrum using Welch's method.
This function first calculates a spectrogram using
:func:`osl_dynamics.analysis.spectral._welch_spectrogram`
then takes the time average.
The scaling for the power spectra calculated by this function
matches SciPy (:code:`scipy.signal.welch`).
Parameters
----------
data : np.ndarray
Time series data. Must have shape (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
alpha : np.ndarray, optional
Inferred state probability time course.
Must have shape (n_samples, n_states).
window_length : int, optional
Length of the data segment to use to calculate spectra.
If None, we use :code:`2 * sampling_frequency`.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length // 2` (50% overlap).
frequency_range : list, optional
Minimum and maximum frequency to keep.
standardize : bool, optional
Should we standardize the data before calculating the spectra?
averaging : str, optional
Method used to average periodograms.
Must be :code:`'mean'` or :code:`'median'`.
calc_cpsd : bool, optional
Should we return the cross spectra for :code:`psd`?
calc_coh : bool, optional
Should we calculate the coherence between channels?
keepdims : bool, optional
Should we squeeze any axes of length 1?
Returns
-------
f : np.ndarray
Frequencies of the power spectra and coherences.
Shape is (n_freq,).
psd : np.ndarray
Power spectra. Shape is (..., n_channels, n_freq) if
:code:`calc_cpsd=False`, otherwise the shape is
(..., n_channels, n_channels, n_freq).
coh : np.ndarray
Coherences spectra.
Shape is (..., n_channels, n_channels, n_freq).
Only returned is :code:`calc_coh=True`.
"""
if averaging not in ["mean", "median"]:
raise ValueError(f"averaging must be 'mean' or 'median', got '{averaging}'.")
n_samples, n_channels = data.shape
if alpha is None:
alpha = np.ones([n_samples, 1], dtype=np.float32)
n_states = alpha.shape[-1]
# Indices for the upper triangle of a channels by channels matrix
m, n = np.triu_indices(n_channels)
# Standardise before calculating spectra
if standardize:
data = (data - np.mean(data, axis=0)) / np.std(data, axis=0)
# Rescaling for the PSD to account for how much time each
# state was active. Note, we using the same scaling as the
# the multitaper calculation in the HMM-MAR toolbox
sum_alpha2 = np.sum(alpha**2, axis=0)
scaling = n_samples / sum_alpha2
psd = []
coh = []
for i in range(n_states):
# Calculate spectrogram for this state's data
x = data * alpha[..., i][..., np.newaxis]
_, f, p = _welch_spectrogram(
data=x,
sampling_frequency=sampling_frequency,
window_length=window_length,
step_size=step_size,
frequency_range=frequency_range,
calc_cpsd=calc_cpsd or calc_coh,
)
# Average over the time dimension and rescale
if averaging == "median":
bias = _median_bias(p.shape[0])
if np.iscomplexobj(p):
p = np.median(p.real, axis=0) + 1j * np.median(p.imag, axis=0)
else:
p = np.median(p, axis=0)
p /= bias
else:
p = np.mean(p, axis=0)
p *= scaling[i]
n_freq = p.shape[-1]
if calc_coh:
# Create a channels by channels matrix for cross PSDs
cpsd = np.empty(
[n_channels, n_channels, n_freq],
dtype=np.complex64,
)
cpsd[m, n] = p
cpsd[n, m] = np.conj(p)
# Unpack PSDs
psd.append(cpsd[range(n_channels), range(n_channels)].real)
# Calculate coherence
coh.append(coherence_spectra(cpsd))
elif calc_cpsd:
cpsd = np.empty(
[n_channels, n_channels, n_freq],
dtype=np.complex64,
)
cpsd[m, n] = p
cpsd[n, m] = np.conj(p)
psd.append(cpsd)
else:
psd.append(p)
# Check for nans
if np.isnan(psd).any():
_logger.warn("PSD contains NaN values.")
psd = np.nan_to_num(psd) # zero out nan values
if not keepdims:
# Squeeze any axes of length 1
psd = np.squeeze(psd)
if calc_coh:
coh = np.squeeze(coh)
if calc_coh:
return f, psd, coh
else:
return f, psd
def _multitaper_spectrogram(
data: np.ndarray,
sampling_frequency: float,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
frequency_range: Optional[List] = None,
calc_cpsd: bool = True,
n_sub_windows: int = 1,
time_half_bandwidth: float = 4,
n_tapers: int = 7,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculates a spectogram (time-varying power spectral density) using a multitaper.
Steps:
1. Segment data into overlapping windows.
2. Multiply each window by a number of tapering functions.
3. Calculate a periodogram for each tapered window.
4. Average over tapers.
Parameters
----------
data : np.ndarray
Data to calculate the spectrogram for.
Shape must be (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
window_length : int, optional
Number of data points to use when calculating the periodogram.
Defaults to :code:`2 * sampling_frequency`.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length` (no overlap).
frequency_range : list, optional
Minimum and maximum frequency to keep.
calc_cpsd : bool, optional
Should we calculate cross spectra?
n_sub_windows : int, optional
We split the window into a number of sub-windows and average the
spectra for each sub-window. window_length must be divisible by
:code:`n_sub_windows`.
time_half_bandwidth : float, optional
Parameter to control the resolution of the spectra.
n_tapers : int, optional
Number of tapers.
Returns
-------
t : np.ndarray
Time axis.
f : np.ndarray
Frequency axis.
P : np.ndarray
Spectrogram. 3D numpy array with shape (time, channels *
(channels + 1) / 2, freq) if :code:`calc_cpsd=True`, otherwise
it is (time, channels, freq).
"""
n_samples, n_channels = data.shape
# Validation
if window_length is None:
window_length = 2 * sampling_frequency
window_length = int(window_length)
if step_size is None:
step_size = window_length
step_size = int(step_size)
# First pad the data so we have enough data points to estimate the
# periodogram for time points at the start/end of the data
data = np.pad(data, window_length // 2)[
:, window_length // 2 : window_length // 2 + n_channels
]
# Tapering windows to apply to the data before calculating the
# Fourier transform
tapers = signal.windows.dpss(
window_length,
NW=time_half_bandwidth,
Kmax=n_tapers,
)
# Number of data points in the FFT
nfft = window_length // n_sub_windows
# Time and frequency axis
t = np.arange(n_samples) / sampling_frequency
f = np.arange(nfft // 2 + 1) * sampling_frequency / nfft
# Only keep a particular frequency range
[min_arg, max_arg] = get_frequency_args_range(f, frequency_range)
f = f[min_arg : max_arg + 1]
# Number of frequency bins
n_freq = max_arg - min_arg + 1
# Indices of an upper triangle of an n_channels by n_channels array
m, n = np.triu_indices(n_channels)
# Indices of time points to calculate a periodogram for
time_indices = range(0, n_samples, step_size)
n_windows = n_samples // step_size
if calc_cpsd:
# Calculate cross PSDs for each segment of the data
P = np.empty(
[n_windows, n_channels * (n_channels + 1) // 2, n_freq],
dtype=np.complex64,
)
XY_sub_window = np.empty(
[n_sub_windows, n_channels * (n_channels + 1) // 2, n_freq],
dtype=np.complex64,
)
for i in range(n_windows):
# Data in the window
j = time_indices[i]
x_window = data[j : j + window_length].T
for k in range(n_sub_windows):
# Data in the sub-window
x_sub_window = x_window[
:,
k
* window_length
// n_sub_windows : (k + 1)
* window_length
// n_sub_windows,
]
# Apply tapers
x_sub_window = x_sub_window[np.newaxis, :, :] * tapers[:, np.newaxis, :]
# Fourier transform
X = np.fft.fft(x_sub_window, nfft)
X = X[..., min_arg : max_arg + 1]
# Calculate cross spectra for the sub-window
XY = np.conj(X)[:, :, np.newaxis, :] * X[:, np.newaxis, :, :]
XY_sub_window[k] = np.mean(XY[:, m, n], axis=0)
# Average the cross spectra for each sub-window
P[i] = np.mean(XY_sub_window, axis=0)
else:
# Calculate PSDs for each segment of the data
P = np.empty([n_windows, n_channels, n_freq], dtype=np.float32)
XX_sub_window = np.empty(
[n_sub_windows, n_channels, n_freq],
dtype=np.float32,
)
for i in range(n_windows):
# Data in the window
j = time_indices[i]
x_window = data[j : j + window_length].T
for k in range(n_sub_windows):
# Data in the sub-window
x_sub_window = x_window[
:,
k
* window_length
// n_sub_windows : (k + 1)
* window_length
// n_sub_windows,
]
# Apply tapers
x_sub_window = x_sub_window[np.newaxis, :, :] * tapers[:, np.newaxis, :]
# Fourier transform
X = np.fft.fft(x_sub_window, nfft)
X = X[..., min_arg : max_arg + 1]
# Calculate spectra for the sub-window
XX = np.real(np.conj(X) * X)
XX_sub_window[k] = np.mean(XX, axis=0)
# Average the spectra for each sub-window
P[i] = np.mean(XX_sub_window, axis=0)
# Scaling for the multitapers
P *= 2 / sampling_frequency
return t, f, P
def _multitaper(
data: np.ndarray,
sampling_frequency: float,
alpha: Optional[np.ndarray] = None,
window_length: Optional[int] = None,
step_size: Optional[int] = None,
time_half_bandwidth: float = 4,
n_tapers: int = 7,
frequency_range: Optional[List] = None,
standardize: bool = True,
averaging: str = "mean",
calc_cpsd: bool = False,
calc_coh: bool = True,
keepdims: bool = False,
):
"""Calculate a multitaper.
This function first calculates a spectrogram using
:func:`osl_dynamics.analysis.spectral._multitaper_spectrogram`
then takes the time average.
Parameters
----------
data : np.ndarray
Time series data. Must have shape (n_samples, n_channels).
sampling_frequency : float
Sampling frequency in Hz.
alpha : np.ndarray, optional
Inferred state probability time course.
Must have shape (n_samples, n_states).
window_length : int, optional
Length of the data segment to use to calculate spectra.
If None, we use :code:`2 * sampling_frequency`.
step_size : int, optional
Step size for shifting the window.
Defaults to :code:`window_length` (no overlap).
time_half_bandwidth : float, optional
Parameter to control the resolution of the spectra.
n_tapers : int, optional
Number of tapers.
frequency_range : list, optional
Minimum and maximum frequency to keep.
standardize : bool, optional
Should we standardize the data before calculating the spectra?
averaging : str, optional
Method used to average periodograms.
Must be :code:`'mean'` or :code:`'median'`.
calc_cpsd : bool, optional
Should we return the cross spectra for :code:`psd`?
calc_coh : bool, optional
Should we calculate the coherence between channels?
keepdims : bool, optional
Should we squeeze any axes of length 1?
Returns
-------
f : np.ndarray
Frequencies of the power spectra and coherences.
Shape is (n_freq,).
psd : np.ndarray
Power spectra. Shape is (..., n_channels, n_freq) if
:code:`calc_cpsd=False`, otherwise the shape is
(..., n_channels, n_channels, n_freq).
coh : np.ndarray
Coherences spectra.
Shape is (..., n_channels, n_channels, n_freq).
Only returned is :code:`calc_coh=True`.
"""
if averaging not in ["mean", "median"]:
raise ValueError(f"averaging must be 'mean' or 'median', got '{averaging}'.")
n_samples, n_channels = data.shape
if alpha is None:
alpha = np.ones([n_samples, 1], dtype=np.float32)
n_states = alpha.shape[-1]
# Indices for the upper triangle of a channels by channels matrix
m, n = np.triu_indices(n_channels)
# Standardise before calculating spectra
if standardize:
data = (data - np.mean(data, axis=0)) / np.std(data, axis=0)
# Rescaling for the PSD to account for how much time each
# state was active. Note, we using the same scaling as the
# the multitaper calculation in the HMM-MAR toolbox
sum_alpha2 = np.sum(alpha**2, axis=0)
scaling = n_samples / sum_alpha2
psd = []
coh = []
for i in range(n_states):
# Calculate spectrogram for this state's data
x = data * alpha[:, i][..., np.newaxis]
_, f, p = _multitaper_spectrogram(
data=x,
sampling_frequency=sampling_frequency,
window_length=window_length,
step_size=step_size,
frequency_range=frequency_range,
calc_cpsd=calc_cpsd or calc_coh,
time_half_bandwidth=time_half_bandwidth,
n_tapers=n_tapers,
)
# Average over the time dimension and rescale
if averaging == "median":
bias = _median_bias(p.shape[0])
if np.iscomplexobj(p):
p = np.median(p.real, axis=0) + 1j * np.median(p.imag, axis=0)
else:
p = np.median(p, axis=0)
p /= bias
else:
p = np.mean(p, axis=0)
p *= scaling[i]
n_freq = p.shape[-1]
if calc_coh:
# Create a channels by channels matrix for cross PSDs
cpsd = np.empty(
[n_channels, n_channels, n_freq],
dtype=np.complex64,
)
cpsd[m, n] = p
cpsd[n, m] = np.conj(p)
# Unpack PSDs
psd.append(cpsd[range(n_channels), range(n_channels)].real)
# Calculate coherence
coh.append(coherence_spectra(cpsd))
elif calc_cpsd:
cpsd = np.empty(
[n_channels, n_channels, n_freq],
dtype=np.complex64,
)
cpsd[m, n] = p
cpsd[n, m] = np.conj(p)
psd.append(cpsd)
else:
psd.append(p)
# Check for nans
if np.isnan(psd).any():
_logger.warn("PSD contains NaN values.")
psd = np.nan_to_num(psd) # zero out nan values
if not keepdims:
# Squeeze any axes of length 1
psd = np.squeeze(psd)
if calc_coh:
coh = np.squeeze(coh)
if calc_coh:
return f, psd, coh
else:
return f, psd
def _regress_welch_spectrogram(
data: np.ndarray,
alpha: np.ndarray,
sampling_frequency: float,
window_length: int,
step_size: int,
n_sub_windows: int,
frequency_range: List,
calc_cpsd: bool,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Calculate regression spectra for a single array.
Parameters
----------
data : np.ndarray
Data to calculate a spectrogram for.
Shape must be (n_samples, n_channels).
alpha : np.ndarray
Inferred mode mixing factors. Shape must be (n_samples, n_modes).
sampling_frequency : float
Sampling_frequency in Hz.
window_length : int
Number samples to use in the window to calculate a PSD.
step_size : int
Step size for shifting the window.
n_sub_windows : int, optional
We split the window into a number of sub-windows and average the
spectra for each sub-window. window_length must be divisible by
:code:`n_sub_windows`.
frequency_range : list, optional
Minimum and maximum frequency to keep.
calc_cpsd : bool, optional
Should we calculate the cross spectra?
Returns
-------
f : np.ndarray
Frequency axis. Shape is (n_freq,).
coefs : np.ndarray
Regression coefficients. Shape is
(n_modes, n_channels * (n_channels + 1) / 2, n_freq) if
:code:`calc_cpsd=True`, otherwise it is (n_modes, n_channels, n_freq).
intercept : np.ndarray
Intercept. Shape is (n_channels * (n_channels + 1) / 2, n_freq)
if :code:`calc_cpsd=True`, otherwise it is (n_channels, n_freq).
"""
_, f, p = _welch_spectrogram(
data=data,
sampling_frequency=sampling_frequency,
window_length=window_length,
step_size=step_size,
frequency_range=frequency_range,
calc_cpsd=calc_cpsd,
n_sub_windows=n_sub_windows,
)
a = _window_mean(alpha, "hann", window_length, step_size, n_sub_windows)
coefs, intercept = linear_regression(
a,
p,
fit_intercept=True,
normalize=True,
log_message=False,
n_jobs=1,
)
return f, coefs, intercept
def _window_mean(
alpha: np.ndarray,
window: str,
window_length: int,
step_size: int,
n_sub_windows: int,
) -> np.ndarray:
"""Applies a windowing function to a time series and takes the mean.
Parameters
----------
alpha : np.ndarray
Time series data. Shape is (n_samples, n_modes).
window : str
Name of the windowing function.
window_length : int
Number of data points in a window.
step_size : int
Step size for shifting the window.
n_sub_windows : int
We split the window into a number of sub-windows and average
across all sub-windows. This is the number of sub-windows.
Returns
-------
mean_window_alpha : np.ndarray
Mean for each window.
"""
n_samples, n_modes = alpha.shape
# Pad alphas
alpha = np.pad(alpha, window_length // 2)[
:, window_length // 2 : window_length // 2 + n_modes
]
# Window to apply to alpha
window = signal.get_window(window, window_length // n_sub_windows)
# Indices of time points to calculate a periodogram for
time_indices = range(0, n_samples, step_size)
n_windows = n_samples // step_size
# Array to hold mean of alpha multiplied by the windowing function
mean_window_alpha = np.empty([n_windows, n_modes], dtype=np.float32)
for i in range(n_windows):
# Alpha in the window
j = time_indices[i]
a_window = alpha[j : j + window_length]
# Calculate alpha for the sub-window by taking the mean
# over time after applying the windowing function
a_sub_window = np.empty([n_sub_windows, n_modes], dtype=np.float32)
for k in range(n_sub_windows):
a_sub_window[k] = np.mean(
a_window[
k
* window_length
// n_sub_windows : (k + 1)
* window_length
// n_sub_windows
]
* window[..., np.newaxis],
axis=0,
)
# Average alpha for each sub-window
mean_window_alpha[i] = np.mean(a_sub_window, axis=0)
return mean_window_alpha