Source code for dcmri.pk_inv

import numpy as np
from scipy import integrate
from scipy.stats import rice
from scipy.optimize import minimize
from tqdm import tqdm
from scipy.optimize import curve_fit
from scipy import stats


[docs] def vfa_nonlinear(signal_intensities, flip_angles_deg, tr, bounds=None, verbose=0): """ Calculates R1 and S0 from VFA data using a NON-LINEAR fit. This function fits the data directly to the SPGR signal equation: S(a) = S0 * sin(a) * (1 - exp(-TR*R1)) / (1 - cos(a) * exp(-TR*R1)) This method can be more stable and accurate than the linear fit, especially in the presence of noise. Args: signal_intensities (list or np.ndarray): A list or array of measured signal intensities. flip_angles_deg (list or np.ndarray): A list or array of corresponding flip angles in degrees. tr (float): The repetition time (TR) of the sequence. bounds (tuple): bounds on (R1, S0) as a tuple ([lower_R1, lower_S0], [upper_R1, upper_S0]). Default is ([0, 0], [np.inf, np.inf]) verbose (int): if set to 1, warning messages are printed. Defaults to 0. Returns: tuple: A tuple containing the calculated (R1, S0). Returns initial guesses if the non-linear fit fails to converge. """ # Convert inputs to numpy arrays for vectorized operations signals = np.array(signal_intensities) # If the signal intensities are image arrays, loop over the pixels if signals.ndim > 1: signals_shape = signals.shape signals_array = signals.reshape(-1, signals_shape[-1]) R1_array = np.zeros(signals_array.shape[0]) S0_array = np.zeros(signals_array.shape[0]) for x in tqdm(range(signals_array.shape[0]), desc='Performing non-linear VFA fit'): R1_array[x], S0_array[x] = vfa_nonlinear(signals_array[x,:], flip_angles_deg, tr, bounds, verbose) R1_array = R1_array.reshape(signals_shape[:-1]) S0_array = S0_array.reshape(signals_shape[:-1]) return R1_array, S0_array # --- 0. Input Validation and Conversion --- if len(flip_angles_deg) != len(signals): raise ValueError("Input arrays for flip angles and signals must have the same length.") # Default bounds if bounds is None: bounds = ([0, 0], [np.inf, np.inf]) # Convert flip angles from degrees to radians for trigonometric functions flip_angles_rad = np.deg2rad(flip_angles_deg) # --- 1. Define the SPGR signal model for curve_fit --- # tr is passed as a fixed argument to the model function def spgr_model(alpha_rad, r1, s0): if r1 <= 0: # T1 must be positive return np.inf e1 = np.exp(-tr * r1) return s0 * np.sin(alpha_rad) * (1 - e1) / (1 - np.cos(alpha_rad) * e1) # --- 2. Provide Initial Guesses and Bounds --- # Good initial guesses are important for non-linear fitting. # Guess S0 as the maximum signal, and T1 as a typical biological value. initial_s0_guess = np.max(signals) initial_r1_guess = 1/1.2 initial_guesses = [initial_r1_guess, initial_s0_guess] # --- 3. Perform Non-Linear Fit --- try: popt, pcov = curve_fit( spgr_model, flip_angles_rad, signals, p0=initial_guesses, bounds=bounds ) calculated_r1, calculated_s0 = popt return calculated_r1, calculated_s0 except RuntimeError: print("Warning (Non-Linear Fit): Could not converge to a solution. Returning initial guesses.") return initial_r1_guess, initial_s0_guess
[docs] def vfa_linear(signal_intensities, flip_angles_deg, tr, bounds=None, verbose=0): """ Calculates R1 and S0 from variable flip angle (VFA) SPGR data. This function uses the linearized form of the steady-state spoiled gradient-echo (SPGR) signal equation to perform a linear fit and extract R1 and S0. The linearized equation is: S(a)/sin(a) = E1 * S(a)/tan(a) + S0*(1-E1) where E1 = exp(-TR * R1). This is a linear equation of the form y = m*x + c. Args: signal_intensities (list or np.ndarray): A list or array of measured signal intensities. flip_angles_deg (list or np.ndarray): A list or array of corresponding flip angles in degrees. tr (float): The repetition time (TR) of the sequence. bounds (tuple): bounds on (R1, S0) as a tuple ([lower_R1, lower_S0], [upper_R1, upper_S0]). Default is ([0, 0], [np.inf, np.inf]) verbose (int): if set to 1, warning messages are printed. Defaults to 0. Returns: tuple: A tuple containing the calculated (R1, S0). Returns (0, 0) if the calculation is not physically plausible (e.g., due to noisy data leading to a slope >= 1). """ # Convert inputs to numpy arrays for vectorized operations signals = np.array(signal_intensities) # If the signal intensities are image arrays, loop over the pixels if signals.ndim > 1: signals_shape = signals.shape signals_array = signals.reshape(-1, signals_shape[-1]) R1_array = np.zeros(signals_array.shape[0]) S0_array = np.zeros(signals_array.shape[0]) for x in tqdm(range(signals_array.shape[0]), desc='Performing linear VFA fit'): R1_array[x], S0_array[x] = vfa_linear(signals_array[x,:], flip_angles_deg, tr, bounds, verbose) R1_array = R1_array.reshape(signals_shape[:-1]) S0_array = S0_array.reshape(signals_shape[:-1]) return R1_array, S0_array # --- 1. Input Validation and Conversion --- if len(flip_angles_deg) != len(signals): raise ValueError("Input arrays for flip angles and signals must have the same length.") # Default bounds if bounds is None: bounds = ([0, 0], [np.inf, np.inf]) # Convert flip angles from degrees to radians for trigonometric functions flip_angles_rad = np.deg2rad(flip_angles_deg) # --- 2. Data Transformation for Linearization --- # Avoid division by zero for tan(90 degrees) if present # and for sin(0 degrees). We filter out these data points. valid_indices = (np.sin(flip_angles_rad) != 0) & (np.cos(flip_angles_rad) != 0) if np.sum(valid_indices) < 2: if verbose==1: print("Warning: Not enough valid data points (<2) for a linear fit. Returning lower bounds.") return bounds[0][0], bounds[0][1] y = signals[valid_indices] / np.sin(flip_angles_rad[valid_indices]) x = signals[valid_indices] / np.tan(flip_angles_rad[valid_indices]) if np.array_equal(x,y): if verbose==1: print("Warning: Equal values for x and y - cannot perform linear fit. Returning lower bounds") return bounds[0][0], bounds[0][1] # --- 3. Linear Regression --- # Use np.polyfit to find the slope (m) and intercept (c) of the line y = mx + c # The degree of the polynomial is 1 for a linear fit. #slope, intercept = np.polyfit(x, y, 1) slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) # --- 4. Calculate T1 and S0 --- # The slope corresponds to E1 e1 = slope # Check for physically plausible E1 value. E1 must be > 0 and < 1. # A slope >= 1 or <= 0 would result in a non-real or negative T1. if not (0 < e1 < 1): if verbose==1: print(f"Warning: Calculated slope (E1 = {e1:.4f}) is outside the valid range (0, 1).") print("This may be due to noise or other artifacts. Returning lower bounds.") return bounds[0][0], bounds[0][1] # Calculate T1 from E1 # T1 = -TR / ln(E1) r1 = - np.log(e1) / tr # Calculate S0 from the intercept # Intercept = S0 * (1 - E1) => S0 = Intercept / (1 - E1) s0 = intercept / (1 - e1) # Apply bounds r1 = bounds[0][0] if r1 < bounds[0][0] else r1 s0 = bounds[0][1] if s0 < bounds[0][1] else s0 r1 = bounds[1][0] if r1 > bounds[1][0] else r1 s0 = bounds[1][1] if s0 > bounds[1][1] else s0 return r1, s0
def convmat(f:np.ndarray, order=2): """Return the convolution matrix The convolution product f*g can be computed by a matrix multiplication dt * M(f) # g. This function returns the matrix M(f) for a given f, which can be inverted to perform deconvolution. Args: f (numpy.ndarray): 1D array to be convolved order (int, optional): Order of the integration. Defaults to 2. Returns: numpy.ndarray: n x n square matrix """ n = len(f) mat = np.zeros((n,n)) if order==1: for i in range(0,n): for j in range(0,i+1): mat[i,j] = f[i-j] elif order==2: for i in range(1,n): mat[i,i] = 2*f[0] + f[1] for j in range(1,i): mat[i,i-j] = f[j-1] + 4*f[j] + f[j+1] mat[i,0] = f[i-1] + 2*f[i] mat = mat/6 return mat def invconvmat(f, order=2, tol=1e-15, method='TSVD'): mat = convmat(f, order) U, s, Vt = np.linalg.svd(mat, full_matrices=False) svmin = tol*np.amax(s) if method=='Tikhonov': s_inv = s/(s**2 + svmin**2) elif method=='TSVD': s_inv = np.array([1/x if x > svmin else 0 for x in s]) else: raise ValueError( f"Unknown deconvolution method {method}. Possible values " "are 'TSVD' (Truncated Singular Value Decomposition) or " "'Tikhonov'." ) return np.dot(Vt.T * s_inv, U.T)
[docs] def deconv(h:np.ndarray, g:np.ndarray, dt=1.0, order=2, method='TSVD', tol=1e-15) -> np.ndarray: """Deconvolve two uniformly sampled 1D functions. If and (h,g) are known in h = g*f, this function estimates f = deconv(h, g). Args: h (numpy array): Result of the convolution. if g has N elements, than h can be an N-element array, or a N x K - element array where N is the length of g. In this case each column is deconvolved indepdently with g. g (numpy array): One factor of the convolution (1D array). dt (float, optional): Time between samples. Defaults to 1.0. order (int, optional): Integration order of the convolution matrix. Defaults to 2. method (str, optional): Regularization method. Current options are 'TSVD' (Truncated Singular Value Decomposition) or 'Tikhonov'. Defaults to False. tol (float, optional): Tolerance for the inversion of the convolution matrix (rgularization parameter). Singular values less than a fraction 'tol' of the largest singular value are ignored. Defaults to 1e-15. Returns: numpy.ndarray: Estimate of the convolution factor f. This has the same shape as h. """ if g.ndim > 1: raise ValueError("g must be 1-dimensional.") if h.ndim > 2: raise ValueError("h must have 1 or 2 dimensions.") if h.shape[0] != len(g): raise ValueError( "The first dimension of h must have the same length as g." ) ginv = invconvmat(g, order, tol, method) return (ginv @ h) / dt
[docs] def describe(data, n0=1, rician=False): """Compute descriptive parameter maps for a signal array. Args: data (numpy.ndarray): array with signal data. Dimensions have to be at least 2, where the last dimension is time. n0 (int, optional): Number of baseline points. Defaults to 1. rician (bool, optional): Whether to correct for Rician noise in computation of baseline signal and noise (slow). Defaults to False. Raises: ValueError: if rician=True, n0 needs to be 3 or higher. Returns: dict: Dictionary with parameter maps. """ maps = {} if n0==1: maps['Sb'] = data[...,0] else: maps['Sb'] = np.mean(data[...,:n0], axis=-1) if n0 > 2: maps['Nb'] = np.std(data[...,:n0], axis=-1) maps['SEmax'] = np.max(data, axis=-1) - maps['Sb'] maps['SEauc'] = np.sum(data, axis=-1) - maps['Sb'] * data.shape[-1] with np.errstate(divide='ignore', over='ignore', invalid='ignore'): maps['RSEmax'] = np.where(maps['Sb']!=0, maps['SEmax']/maps['Sb'], 0) maps['RSEauc'] = np.where(maps['Sb']!=0, maps['SEauc']/maps['Sb'], 0) if rician: if n0 < 3: raise ValueError('Rician correction can only be applied if n0 > 2') Sb_rice = np.zeros(maps['Sb'].size) Nb_rice = np.zeros(maps['Sb'].size) data_xt = data.reshape(-1, data.shape[-1]) for x in tqdm(range(data_xt.shape[0]), desc='Computing Rician noise', total=data_xt.shape[0]): rice = mle_rice(data_xt[x,:n0]) Sb_rice[x] = rice['nu'] Nb_rice[x] = rice['sigma'] maps['Sb_rician'] = Sb_rice.reshape(data.shape[:-1]) maps['Nb_rician'] = Nb_rice.reshape(data.shape[:-1]) return maps
def mle_rice(data, fit_loc=False): """ Maximum-likelihood estimate of Rician parameters from 1D array `data`. Parameters ---------- data : array-like, shape (n,) Observations (must be >= 0 unless you fit loc). fit_loc : bool If True, estimate loc as well. If False, assume loc == 0. Returns ------- dict with keys: - 'nu' : estimated noncentrality parameter nu - 'sigma' : estimated scale sigma - 'b' : estimated shape parameter b = nu/sigma - 'loc' : estimated location (0 if fit_loc=False) - 'success', 'message' from optimizer """ data = np.asarray(data, dtype=float) if not fit_loc and (data < 0).any(): raise ValueError("data contains negative values but fit_loc=False. " "Either remove negatives or set fit_loc=True.") # initial guess using scipy's fit (fast and robust) if fit_loc: b0, loc0, sigma0 = rice.fit(data) # returns (shape, loc, scale) x0 = np.array([np.log(b0), loc0, np.log(sigma0)]) else: b0, loc0, sigma0 = rice.fit(data, floc=0) x0 = np.log([b0, sigma0]) # we optimize in log-space for positivity # Negative log-likelihood to minimize (we parametrize to enforce positivity) if fit_loc: def neglog(x): b = np.exp(x[0]) loc = x[1] sigma = np.exp(x[2]) return -np.sum(rice.logpdf(data, b, loc=loc, scale=sigma)) bounds = [(None, None), (None, None), (None, None)] else: def neglog(x): b = np.exp(x[0]) sigma = np.exp(x[1]) return -np.sum(rice.logpdf(data, b, loc=0.0, scale=sigma)) bounds = [(None, None), (None, None)] res = minimize(neglog, x0, method='L-BFGS-B', bounds=bounds, options={'ftol':1e-12, 'gtol':1e-8}) if fit_loc: b_hat = float(np.exp(res.x[0])) loc_hat = float(res.x[1]) sigma_hat = float(np.exp(res.x[2])) else: b_hat = float(np.exp(res.x[0])) sigma_hat = float(np.exp(res.x[1])) loc_hat = 0.0 nu_hat = b_hat * sigma_hat return { 'nu': nu_hat, 'sigma': sigma_hat, 'b': b_hat, 'loc': loc_hat, 'success': res.success, 'message': res.message, 'nll': float(res.fun) } # Used in iBEAt - not yet exposed in dcmri def pixel_2cfm_linfit(imgs: np.ndarray, aif: np.ndarray = None, time: np.ndarray = None, baseline: int = 1, Hct=0.45): # Reshape to 2D (x,t) shape = np.shape(imgs) imgs = imgs.reshape((-1, shape[-1])) S0 = np.mean(imgs[:, :baseline], axis=1) Sa0 = np.mean(aif[:baseline]) ca = (aif-Sa0)/(1-Hct) A = np.empty((imgs.shape[1], 4)) A[:, 2], A[:, 3] = _ddint(ca, time) fit = np.empty(imgs.shape) par = np.empty((imgs.shape[0], 4)) for x in range(imgs.shape[0]): c = imgs[x, :] - S0[x] ctii, cti = _ddint(c, time) A[:, 0] = -ctii A[:, 1] = -cti p = np.linalg.lstsq(A, c, rcond=None)[0] fit[x, :] = S0[x] + p[0]*A[:, 0] + p[1] * \ A[:, 1] + p[2]*A[:, 2] + p[3]*A[:, 3] par[x, :] = _params_2cfm(p) # Apply bounds smax = np.amax(imgs) fit[fit < 0] = 0 fit[fit > 2*smax] = 2*smax # Return in original shape fit = fit.reshape(shape) par = par.reshape(shape[:-1] + (4,)) return fit, par def _ddint(c, t): ci = integrate.cumulative_trapezoid(c, t, initial=0) ci = np.insert(ci, 0, 0) cii = integrate.cumulative_trapezoid(ci, t, initial=0) cii = np.insert(cii, 0, 0) return cii, ci def _params_2cfm(X): alpha = X[0] beta = X[1] gamma = X[2] Fp = X[3] if alpha == 0: if beta == 0: return [Fp, 0, 0, 0] else: return [Fp, 1/beta, 0, 0] nom = 2*alpha det = beta**2 - 4*alpha if det < 0: Tp = beta/nom Te = Tp else: root = np.sqrt(det) Tp = (beta - root)/nom Te = (beta + root)/nom if Te == 0: PS = 0 else: if Fp == 0: PS == 0 else: T = gamma/(alpha*Fp) PS = Fp*(T-Tp)/Te # Convert to conventional units and apply bounds Fp *= 6000 if Fp < 0: Fp = 0 if Fp > 2000: Fp = 2000 if Tp < 0: Tp = 0 if Tp > 600: Tp = 600 PS *= 6000 if PS < 0: PS = 0 if PS > 2000: PS = 2000 if Te < 0: Te = 0 if Te > 600: Te = 600 return [Fp, Tp, PS, Te]