"""Source reconstruction."""
from __future__ import annotations
import os
import numpy as np
import nibabel as nib
from scipy.spatial import KDTree
import mne
from mne.beamformer._compute_beamformer import (
Beamformer,
_reduce_leadfield_rank,
_sym_inv_sm,
)
from mne.beamformer._lcmv import _apply_lcmv
from mne.minimum_norm.inverse import (
_check_depth,
_check_reference,
_get_vertno,
_prepare_forward,
)
from mne.utils import logger as mne_logger
from osl_dynamics.utils.filenames import OSLFilenames
from osl_dynamics.utils.misc import system_call
from . import rhino
def _make_lcmv(
info: mne.Info,
fwd: mne.Forward,
data_cov: mne.Covariance,
reg: float = 0.05,
noise_cov: mne.Covariance | None = None,
label: mne.Label | None = None,
pick_ori: str | None = None,
rank: str | dict = "info",
noise_rank: str | dict = "info",
weight_norm: str | None = "unit-noise-gain-invariant",
reduce_rank: bool = False,
depth: float | dict | None = None,
inversion: str = "matrix",
verbose: bool | str | int | None = None,
) -> Beamformer:
"""Compute LCMV spatial filter.
Modified version of mne.beamformer.make_lcmv.
Parameters
----------
info : instance of mne.Info
The measurement info to specify the channels to include.
fwd : instance of mne.Forward
The fwd solution.
data_cov : instance of mne.Covariance
The data covariance object.
reg : float
The regularization for the whitened data covariance.
noise_cov : instance of mne.Covariance
The noise covariance object.
label : instance of mne.Label
Restricts the LCMV solution to a given label.
pick_ori : None | 'normal' | 'max-power' | max-power-pre-weight-norm
The source orientation to compute the beamformer in.
rank : dict
Calculate the rank only for a subset of channel types, and
explicitly specify the rank for the remaining channel types.
This can be extremely useful if you already know the rank of (part
of) your data, for instance in case you have calculated it earlier.
This parameter must be a dictionary whose keys correspond to channel
types in the data (e.g. 'meg', 'mag', 'grad', 'eeg'), and whose
values are integers representing the respective ranks. For example,
{'mag': 90, 'eeg': 45} will assume a rank of 90 and 45 for
magnetometer data and EEG data, respectively.
The ranks for all channel types present in the data, but not
specified in the dictionary will be estimated empirically. That is,
if you passed a dataset containing magnetometer, gradiometer, and
EEG data together with the dictionary from the previous example,
only the gradiometer rank would be determined, while the specified
magnetometer and EEG ranks would be taken for granted.
noise_rank : dict
This controls the rank computation that can be read from the measurement
info or estimated from the data. When a noise covariance is used for
whitening, this should reflect the rank of that covariance, otherwise
amplification of noise components can occur in whitening (e.g., often
during source localization).
weight_norm : None | 'unit-noise-gain' | 'nai'
The weight normalization scheme to use.
reduce_rank : bool
Whether to reduce the rank by one during computation of the filter.
depth : None | float | dict
How to weight (or normalize) the forward using a depth prior (see Notes).
inversion : 'matrix' | 'single'
The inversion scheme to compute the weights.
verbose : bool, str, int, or None
If not None, override default verbose level (see mne.verbose).
Returns
-------
filters : instance of mne.beamformer.Beamformer
Dictionary containing filter weights from LCMV beamformer. See MNE docs.
"""
# Check number of sensor types present in the data and ensure a noise cov
info = mne._fiff.meas_info._simplify_info(info)
noise_cov, _, allow_mismatch = mne.utils._check_one_ch_type(
"lcmv", info, fwd, data_cov, noise_cov
)
# NOTE: we need this extra picking step (can't just rely on minimum
# norm's because there can be a mismatch. Should probably add an extra
# arg to _prepare_beamformer_input at some point (later)
picks = mne.utils._check_info_inv(info, fwd, data_cov, noise_cov)
info = mne._fiff.pick.pick_info(info, picks)
data_rank = mne.rank.compute_rank(data_cov, rank=rank, info=info)
noise_rank = mne.rank.compute_rank(noise_cov, rank=noise_rank, info=info)
mne_logger.info(f"Making LCMV beamformer with data cov rank {data_rank}")
mne_logger.info(f"Making LCMV beamformer with noise cov rank {noise_rank}")
depth = _check_depth(depth, "depth_sparse")
if inversion == "single":
depth["combine_xyz"] = False
is_free_ori, info, proj, vertno, G, whitener, nn, orient_std = (
_prepare_beamformer_input(
info,
fwd,
label,
pick_ori,
noise_cov=noise_cov,
rank=noise_rank,
pca=False,
**depth,
)
)
del noise_rank
ch_names = list(info["ch_names"])
data_cov = mne._fiff.pick.pick_channels_cov(data_cov, include=ch_names)
Cm = data_cov._get_square()
if "estimator" in data_cov:
del data_cov["estimator"]
rank_int = sum(data_rank.values())
del data_rank
# Compute spatial filter
n_orient = 3 if is_free_ori else 1
W, max_power_ori = _compute_beamformer(
G,
Cm,
reg,
n_orient,
weight_norm,
pick_ori,
reduce_rank,
rank_int,
inversion=inversion,
nn=nn,
orient_std=orient_std,
whitener=whitener,
)
# Get src type to store with filters for _make_stc
src_type = mne.source_estimate._get_src_type(fwd["src"], vertno)
# Get subject to store with filters
subject_from = mne.forward._subject_from_forward(fwd)
# Is the computed beamformer a scalar or vector beamformer?
is_free_ori = is_free_ori if pick_ori in [None, "vector"] else False
is_ssp = bool(info["projs"])
filters = Beamformer(
kind="LCMV",
weights=W,
data_cov=data_cov,
noise_cov=noise_cov,
whitener=whitener,
weight_norm=weight_norm,
pick_ori=pick_ori,
ch_names=ch_names,
proj=proj,
is_ssp=is_ssp,
vertices=vertno,
is_free_ori=is_free_ori,
n_sources=fwd["nsource"],
src_type=src_type,
source_nn=fwd["source_nn"].copy(),
subject=subject_from,
rank=rank_int,
max_power_ori=max_power_ori,
inversion=inversion,
)
return filters
def _compute_beamformer(
G: np.ndarray,
Cm: np.ndarray,
reg: float,
n_orient: int,
weight_norm: str | None,
pick_ori: str | None,
reduce_rank: bool,
rank: int,
inversion: str,
nn: np.ndarray,
orient_std: np.ndarray,
whitener: np.ndarray,
) -> tuple[np.ndarray, np.ndarray | None]:
"""Compute a spatial beamformer filter (LCMV or DICS).
Modified version of mne.beamformer._compute_beamformer.
Parameters
----------
G : (n_dipoles, n_channels) numpy.ndarray
The leadfield.
Cm : (n_channels, n_channels) numpy.ndarray
The data covariance matrix.
reg : float
Regularization parameter.
n_orient : int
Number of dipole orientations defined at each source point
weight_norm : None | 'unit-noise-gain' | 'nai'
The weight normalization scheme to use.
pick_ori : None | 'normal' | 'max-power' | max-power-pre-weight-norm
The source orientation to compute the beamformer in.
reduce_rank : bool
Whether to reduce the rank by one during computation of the filter.
rank : dict | None | 'full' | 'info'
See compute_rank.
inversion : 'matrix' | 'single'
The inversion scheme to compute the weights.
nn : (n_dipoles, 3) numpy.ndarray
The source normals.
orient_std : (n_dipoles,) numpy.ndarray
The std of the orientation prior used in weighting the lead fields.
whitener : (n_channels, n_channels) numpy.ndarray
The whitener.
For more detailed information on the parameters, see the docstrings of
`make_lcmv` and `make_dics`.
Returns
-------
W : (n_dipoles, n_channels) numpy.ndarray
The beamformer filter weights.
"""
# Lines changes are marked with MWW
mne.utils._check_option(
"weight_norm",
weight_norm,
["unit-noise-gain-invariant", "unit-noise-gain", "nai", None],
)
# Whiten the data covariance
Cm = whitener @ Cm @ whitener.T.conj()
# Restore to properly Hermitian as large whitening coefs can have bad
# rounding error
Cm[:] = (Cm + Cm.T.conj()) / 2.0
assert Cm.shape == (G.shape[0],) * 2
s, _ = np.linalg.eigh(Cm)
if not (s >= -s.max() * 1e-7).all():
# This shouldn't ever happen, but just in case
warn(
"data covariance does not appear to be positive semidefinite, "
"results will likely be incorrect"
)
# Tikhonov regularization using reg parameter to control for trade-off
# between spatial resolution and noise sensitivity
# eq. 25 in Gross and Ioannides, 1999 Phys. Med. Biol. 44 2081
Cm_inv, loading_factor, rank = mne.utils._reg_pinv(Cm, reg, rank)
assert orient_std.shape == (G.shape[1],)
n_sources = G.shape[1] // n_orient
assert nn.shape == (n_sources, 3)
mne_logger.info(
f"Computing beamformer filters for {n_sources} source{mne.utils._pl(n_sources)}"
)
n_channels = G.shape[0]
assert n_orient in (3, 1)
Gk = np.reshape(G.T, (n_sources, n_orient, n_channels)).transpose(0, 2, 1)
assert Gk.shape == (n_sources, n_channels, n_orient)
sk = np.reshape(orient_std, (n_sources, n_orient))
del G, orient_std
mne.utils._check_option("reduce_rank", reduce_rank, (True, False))
# inversion of the denominator
mne.utils._check_option("inversion", inversion, ("matrix", "single"))
if (
inversion == "single"
and n_orient > 1
and pick_ori == "vector"
and weight_norm == "unit-noise-gain-invariant"
):
raise ValueError(
'Cannot use pick_ori="vector" with inversion="single" and '
'weight_norm="unit-noise-gain-invariant"'
)
if reduce_rank and inversion == "single":
raise ValueError(
'reduce_rank cannot be used with inversion="single"; '
'consider using inversion="matrix" if you have a rank-deficient '
"forward model (i.e., from a sphere model with MEG channels), "
"otherwise consider using reduce_rank=False"
)
if n_orient > 1:
_, Gk_s, _ = np.linalg.svd(Gk, full_matrices=False)
assert Gk_s.shape == (n_sources, n_orient)
if not reduce_rank and (Gk_s[:, 0] > 1e6 * Gk_s[:, 2]).any():
raise ValueError(
"Singular matrix detected when estimating spatial filters. "
"Consider reducing the rank of the forward operator by using "
"reduce_rank=True."
)
del Gk_s
# --------------------------------
# 1. Reduce rank of the lead field
# --------------------------------
if reduce_rank:
Gk = _reduce_leadfield_rank(Gk)
def _compute_bf_terms(
Gk: np.ndarray, Cm_inv: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
bf_numer = np.matmul(Gk.swapaxes(-2, -1).conj(), Cm_inv)
bf_denom = np.matmul(bf_numer, Gk)
return bf_numer, bf_denom
# ----------------------------------------------------------
# 2. Reorient lead field in direction of max power or normal
# ----------------------------------------------------------
if pick_ori == "max-power" or pick_ori == "max-power-pre-weight-norm":
assert n_orient == 3
_, bf_denom = _compute_bf_terms(Gk, Cm_inv)
if pick_ori == "max-power":
if weight_norm is None:
ori_numer = np.eye(n_orient)[np.newaxis]
ori_denom = bf_denom
else:
# Compute power, cf Sekihara & Nagarajan 2008, eq. 4.47
ori_numer = bf_denom
# Cm_inv should be Hermitian so no need for .T.conj()
ori_denom = np.matmul(
np.matmul(Gk.swapaxes(-2, -1).conj(), Cm_inv @ Cm_inv), Gk
)
ori_denom_inv = _sym_inv_sm(ori_denom, reduce_rank, inversion, sk)
ori_pick = np.matmul(ori_denom_inv, ori_numer)
else:
# MWW
#
# Compute orientation based on pre-normalised power
#
# See eq 5 in Brookes et al, Optimising experimental design for
# MEG beamformer imaging, Neuroimage 2008. This optimises the
# orientation by maximising the power BEFORE any weight
# normalisation is performed
ori_pick = _sym_inv_sm(bf_denom, reduce_rank, inversion, sk)
assert ori_pick.shape == (n_sources, n_orient, n_orient)
# Pick eigenvector that corresponds to maximum eigenvalue
eig_vals, eig_vecs = np.linalg.eig(ori_pick.real) # not Hermitian!
# Sort eigenvectors by eigenvalues for picking
order = np.argsort(np.abs(eig_vals), axis=-1)
# eig_vals = np.take_along_axis(eig_vals, order, axis=-1)
max_power_ori = eig_vecs[np.arange(len(eig_vecs)), :, order[:, -1]]
assert max_power_ori.shape == (n_sources, n_orient)
# Set the (otherwise arbitrary) sign to match the normal
signs = np.sign(np.sum(max_power_ori * nn, axis=1, keepdims=True))
signs[signs == 0] = 1.0
max_power_ori *= signs
# Compute the lead field for the optimal orientation,
# and adjust number/denom
Gk = np.matmul(Gk, max_power_ori[..., np.newaxis])
n_orient = 1
else:
max_power_ori = None
if pick_ori == "normal":
Gk = Gk[..., 2:3]
n_orient = 1
# ----------------------------------------------------------------------
# 3. Compute numerator and denominator of beamformer formula (unit-gain)
# ----------------------------------------------------------------------
bf_numer, bf_denom = _compute_bf_terms(Gk, Cm_inv)
assert bf_denom.shape == (n_sources,) + (n_orient,) * 2
assert bf_numer.shape == (n_sources, n_orient, n_channels)
del Gk # lead field has been adjusted and should not be used anymore
# -------------------------
# 4. Invert the denominator
# -------------------------
# Here W is W_ug, i.e.: G.T @ Cm_inv / (G.T @ Cm_inv @ G)
bf_denom_inv = _sym_inv_sm(bf_denom, reduce_rank, inversion, sk)
assert bf_denom_inv.shape == (n_sources, n_orient, n_orient)
W = np.matmul(bf_denom_inv, bf_numer)
assert W.shape == (n_sources, n_orient, n_channels)
del bf_denom_inv, sk
# ----------------------------------------------------------------
# 5. Re-scale filter weights according to the selected weight_norm
# ----------------------------------------------------------------
# Weight normalization is done by computing, for each source:
#
# W_ung = W_ug / sqrt(W_ug @ W_ug.T)
#
# with W_ung referring to the unit-noise-gain (weight normalized) filter
# and W_ug referring to the above-calculated unit-gain filter stored in W.
if weight_norm is not None:
# Three different ways to calculate the normalization factors here.
# Only matters when in vector mode, as otherwise n_orient == 1 and
# they are all equivalent.
#
# In MNE < 0.21, we just used the Frobenius matrix norm:
#
# noise_norm = np.linalg.norm(W, axis=(1, 2), keepdims=True)
# assert noise_norm.shape == (n_sources, 1, 1)
# W /= noise_norm
#
# Sekihara 2008 says to use sqrt(diag(W_ug @ W_ug.T)),
# which is not rotation invariant:
if weight_norm in ("unit-noise-gain", "nai"):
noise_norm = np.matmul(W, W.swapaxes(-2, -1).conj()).real
noise_norm = np.reshape(noise_norm, (n_sources, -1, 1))[
:, :: n_orient + 1
] # np.diag operation over last two axes
np.sqrt(noise_norm, out=noise_norm)
noise_norm[noise_norm == 0] = np.inf
assert noise_norm.shape == (n_sources, n_orient, 1)
W /= noise_norm
else:
assert weight_norm == "unit-noise-gain-invariant"
# Here we use sqrtm. The shortcut:
#
# use = W
#
# ... does not match the direct route (it is rotated!),
# so we'll use the direct one to match FieldTrip:
use = bf_numer
inner = np.matmul(use, use.swapaxes(-2, -1).conj())
W = np.matmul(mne.utils._sym_mat_pow(inner, -0.5), use)
noise_norm = 1.0
if weight_norm == "nai":
# Estimate noise level based on covariance matrix, taking the
# first eigenvalue that falls outside the signal subspace
# or the loading factor used during regularization, whichever
# is largest.
if rank > len(Cm):
# Covariance matrix is full rank, no noise subspace!
# Use the loading factor as noise ceiling.
if loading_factor == 0:
raise RuntimeError(
"Cannot compute noise subspace with a full-rank "
"covariance matrix and no regularization. "
"Try manually specifying the rank of the covariance "
"matrix or using regularization."
)
noise = loading_factor
else:
noise, _ = np.linalg.eigh(Cm)
noise = noise[-rank]
noise = max(noise, loading_factor)
W /= np.sqrt(noise)
W = W.reshape(n_sources * n_orient, n_channels)
mne_logger.info("Filter computation complete")
return W, max_power_ori
def _prepare_beamformer_input(
info: mne.Info,
forward: mne.Forward,
label: mne.Label | None = None,
pick_ori: str | None = None,
noise_cov: mne.Covariance | None = None,
rank: str | dict | None = None,
pca: bool = False,
loose: float | None = None,
combine_xyz: str = "fro",
exp: float | None = None,
limit: float | None = None,
allow_fixed_depth: bool = True,
limit_depth_chs: bool = False,
) -> tuple[
bool,
mne.Info,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
]:
"""Input preparation common for LCMV, DICS, and RAP-MUSIC.
Modified version of mne.beamformer._prepare_beamformer_input.
Parameters
----------
info : instance of mne.Info
Measurement info
forward : instance of mne.Forward
The forward solution.
label : instance of mne.Label | None
Restricts the forward solution to a given label.
pick_ori : None | 'normal' | 'max-power' | 'vector' | 'max-power-pre-weight-norm'
The source orientation to compute the beamformer in.
noise_cov : instance of mne.Covariance | None
The noise covariance.
rank : dict | None | 'full' | 'info'
See :py:func:`mne.compute_rank`.
pca : bool
If True, the rank of the forward is reduced to match the rank of the
noise covariance matrix.
loose : float | None
Value that weights the source variances of the dipole components
defining the tangent space of the cortical surfaces. If ``None``,
no loose orientation constraint is applied.
combine_xyz : str
How to combine the dipoles in the same locations of the forward model
when picking normals. See :py:func:`mne.forward._pick_ori`.
exp : float | None
Exponent for the depth weighting. If None, no depth weighting is performed.
limit : float | None
Limit on depth weighting factors. If None, no upper limit is applied.
allow_fixed_depth : bool
If True, fixed depth weighting is allowed.
limit_depth_chs : bool
If True, use only grad channels for depth weighting.
Returns
-------
is_free_ori : bool
Whether the forward operator is free orientation.
info : instance of mne.Info
Measurement info restricted to selected channels.
proj : array
The SSP/PCA projector.
vertno : array
The indices of the vertices corresponding to the source space.
G : array
The forward operator restricted to selected channels.
whitener : array
The whitener for the selected channels.
nn : array
The normals of the source space.
orient_std : array
The standard deviation of the orientation prior.
"""
# Lines marked MWW are where code has been changed.
# MWW
# _check_option('pick_ori', pick_ori, ('normal', 'max-power', 'vector', None))
mne.utils._check_option(
"pick_ori",
pick_ori,
("normal", "max-power", "vector", "max-power-pre-weight-norm", None),
)
# MWW
# Restrict forward solution to selected vertices
# if label is not None:
# _, src_sel = label_src_vertno_sel(label, forward["src"])
# forward = _restrict_forward_to_src_sel(forward, src_sel)
if loose is None:
loose = 0.0 if mne.forward.forward.is_fixed_orient(forward) else 1.0
# MWW
# if noise_cov is None:
# noise_cov = make_ad_hoc_cov(info, std=1.0)
forward, info_picked, gain, _, orient_prior, _, trace_GRGT, noise_cov, whitener = (
_prepare_forward(
forward,
info,
noise_cov,
"auto",
loose,
rank=rank,
pca=pca,
use_cps=True,
exp=exp,
limit_depth_chs=limit_depth_chs,
combine_xyz=combine_xyz,
limit=limit,
allow_fixed_depth=allow_fixed_depth,
)
)
is_free_ori = not mne.forward.forward.is_fixed_orient(
forward
) # could have been changed
nn = forward["source_nn"]
if is_free_ori: # take Z coordinate
nn = nn[2::3]
nn = nn.copy()
vertno = _get_vertno(forward["src"])
if forward["surf_ori"]:
nn[...] = [0, 0, 1] # align to local +Z coordinate
if pick_ori is not None and not is_free_ori:
raise ValueError(
f"Normal or max-power orientation (got {pick_ori}) can only be "
"picked when a forward operator with free orientation is used."
)
if pick_ori == "normal" and not forward["surf_ori"]:
raise ValueError(
"Normal orientation can only be picked when a forward operator "
"oriented in surface coordinates is used."
)
mne.utils._check_src_normal(pick_ori, forward["src"])
del forward, info
# Undo the scaling that MNE prefers
scale = np.sqrt((noise_cov["eig"] > 0).sum() / trace_GRGT)
gain /= scale
if orient_prior is not None:
orient_std = np.sqrt(orient_prior)
else:
orient_std = np.ones(gain.shape[1])
# Get the projector
proj, _, _ = mne._fiff.proj.make_projector(
info_picked["projs"], info_picked["ch_names"]
)
return is_free_ori, info_picked, proj, vertno, gain, whitener, nn, orient_std
def _niimask2mmpointcloud(
nii_mask: str,
volindex: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""Takes in a nii.gz mask (which equals zero for background and neq zero
for the mask) and returns the mask as a 3 x npoints point cloud in native
space in mm's.
Parameters
----------
nii_mask : string
A nii.gz mask file name or the [x,y,z] volume (with zero for background,
and !=0 for the mask).
volindex : int
Volume index, used if nii_mask is a 4D file.
Returns
-------
pc : numpy.ndarray
3 x npoints point cloud as mm in native space (using sform).
values : numpy.ndarray
npoints values.
"""
vol = nib.load(nii_mask).get_fdata()
if len(vol.shape) == 4 and volindex is not None:
vol = vol[:, :, :, volindex]
if not len(vol.shape) == 3:
Exception(
"nii_mask must be a 3D volume, or nii_mask must be a 4D volume "
"with volindex specifying a volume index"
)
pc_nativeindex = np.asarray(np.where(vol != 0))
values = np.asarray(vol[vol != 0])
pc = rhino._xform_points(rhino._get_sform(nii_mask)["trans"], pc_nativeindex)
return pc, values
def _closest_node(
node: np.ndarray,
nodes: np.ndarray,
) -> tuple[int, float]:
"""Find nearest node in nodes to the passed in node.
Returns
-------
index : int
Index to the nearest node in nodes.
distance : float
Distance.
"""
if len(nodes) == 1:
nodes = np.reshape(nodes, [-1, 1])
kdtree = KDTree(nodes)
distance, index = kdtree.query(node)
return index, distance
def _get_gridstep(coords: np.ndarray) -> int:
"""Get gridstep (i.e. spatial resolution of dipole grid) in mm.
Parameters
----------
coords : numpy.ndarray
Coordinates.
Returns
-------
gridstep: int
Spatial resolution of dipole grid in mm.
"""
store = []
for ii in range(coords.shape[0]):
store.append(np.sqrt(np.sum(np.square(coords[ii, :] - coords[0, :]))))
store = np.asarray(store)
return int(np.round(np.min(store[np.where(store > 0)]) * 1000))