Source code for dcmri.utils

import math

import numpy as np
from scipy.special import gamma
from scipy.interpolate import CubicSpline
from scipy.integrate import trapezoid


[docs] def interp(y, x, pos=False, floor=False) -> np.ndarray: """Interpolate uniformly sampled data. This function is a convenience wrapper for standard interpolation, used in dcmri for instance to parametrize non-stationat models. Args: y (array-like): List of values to interpolate. These are assumed to be uniformly distributed over x. x (array-like): Interpolate y at these locations. pos (bool, optional): return only positive values. Defaults to False. floor (bool, optional): Return only results higher than the lowest value in y. Defaults to False. Returns: np.ndarray: array of the same length as x, containing the values of y interpolated on x. Notes: The function uses linear interpolate when y has length 2, quadratic when y has length 3, and cubic spline interpolation otherwise. Example: >>> import dcmri as dc >>> dc.interp([1,3,2], np.arange(10)) array([1. , 1.68888889, 2.23333333, 2.63333333, 2.88888889, 3. , 2.96666667, 2.78888889, 2.46666667, 2. ]) """ # Interpolate y on x, assuming y-values are uniformly distributed over the x-range if np.isscalar(y): yi = y*np.ones(len(x)) elif np.size(y) == 1: yi = y[0]*np.ones(len(x)) elif np.size(y) == 2: yi = _lin(x, y) elif np.size(y) == 3: yi = _quad(x, y) else: x_y = np.linspace(np.amin(x), np.amax(x), len(y)) yi = CubicSpline(x_y, y)(x) if pos: yi[yi < 0] = 0 if floor: y0 = np.amin(y) yi[yi < y0] = y0 return yi
def _quad(t, K): # Helper nt = len(t) mid = math.floor(nt/2) return _quadratic(t, t[0], t[mid], t[-1], K[0], K[1], K[2]) def _lin(t, K): # Helper return _linear(t, t[0], t[-1], K[0], K[1]) def _linear(x, x1, x2, y1, y2): # Helper # returns a linear function of x # that goes through the two points (xi, yi) L1 = (x2-x)/(x2-x1) L2 = (x-x1)/(x2-x1) return y1*L1 + y2*L2 def _quadratic(x, x0, x1, x2, y0, y1, y2): # Helper # returns a quadratic function of x # that goes through the three points (xi, yi) L0 = (x-x1)*(x-x2)/((x0-x1)*(x0-x2)) L1 = (x-x0)*(x-x2)/((x1-x0)*(x1-x2)) L2 = (x-x0)*(x-x1)/((x2-x0)*(x2-x1)) return y0*L0 + y1*L1 + y2*L2 def tarray(n, t=None, dt=1.0): # Helper function - generate time array. if t is None: t = dt*np.arange(n) else: if not isinstance(t, np.ndarray): t = np.array(t) if len(t) != n: raise ValueError('Time array must have same length as the input.') return t def trapz(f, t=None, dt=1.0): # Helper function - perform trapezoidal integration. # Replace by scipy.integrate.trapezoid f = np.array(f) n = len(f) t = tarray(n, t=t, dt=dt) g = np.empty(n) g[0] = 0 for i in range(n-1): g[i+1] = g[i] + (t[i+1]-t[i]) * (f[i+1]+f[i]) / 2 return g def ddelta(T, t): # Helper function - discrete delta if not isinstance(t, np.ndarray): t = np.array(t) n = len(t) h = np.zeros(n) if T < t[0]: return h if T > t[-1]: return h if T == t[0]: h[0] = 2/(t[1]-t[0]) return h if T == t[-1]: h[-1] = 2/(t[-1]-t[-2]) return h i = np.where(T >= t)[0][-1] u = (T-t[i])/(t[i+1]-t[i]) if i == 0: h[i] = (1-u)*2/(t[i+1]-t[i]) else: h[i] = (1-u)*2/(t[i+1]-t[i-1]) if i == n-2: h[i+1] = u*2/(t[i+1]-t[i]) else: h[i+1] = u*2/(t[i+2]-t[i]) return h def dstep(T0, T1, t): # Helper function - discrete step if not isinstance(t, np.ndarray): t = np.array(t) n = len(t) i = np.where((t > T0)*(t < T1))[0] if len(i) == 0: return ddelta((T0+T1)/2, t) i0, i1 = i[0], i[-1] t0, t1 = t[i0], t[i1] hi = 0 if i0 > 0: u0 = (t0-T0)/(t0-t[i0-1]) hi += 0.5*(1+u0)*(t0-t[i0-1]) if i0 > 1: hi += 0.5*u0*(t[i0-1]-t[i0-2]) hi += t1-t0 if i1 < n-1: u1 = (T1-t1)/(t[i1+1]-t1) hi += 0.5*(1+u1)*(t[i1+1]-t1) if i1 < n-2: hi += 0.5*u1*(t[i1+2]-t[i1+1]) h = np.zeros(n) h[i] = 1/hi if i0 > 0: h[i0-1] = u0/hi if i1 < n-1: h[i1+1] = u1/hi return h def ddist(H, T, t): # discrete distribution - T is an array of times with the boundaries of the histogram bins h = np.zeros(len(t)) for k in range(len(T)-1): h += H[k]*dstep(T[k], T[k+1], t) return h def intprod(f, h, t=None, dt=1.0): # Helper function # Integrate the product of two piecewise linear functions # by analytical integration over each interval. # Derivation: # If f and h are linear between x and y, then we can define slopes: # dx = y-x # sf(x) = (f(y)-f(x))/dx # sh(x) = (h(y)-h(x))/dx # With this the integral over the interval becomes: # \int_x^y du f(u)h(u) # = \int_0^dx du [f(x)+usf(x)] [h(x)+ush(x)] # = f(x)*h(x)*dx # + [f(x)*sh(x)+sf(x)*h(x)]dx**2/2 # + sf(x)*sh(x)*dx**3/3 g = 0 for l in range(len(f)-1): if t is not None: dt = t[l+1]-t[l] sf = (f[l+1]-f[l])/dt sh = (h[l+1]-h[l])/dt g += h[l]*f[l]*dt g += (sh*f[l]+sf*h[l])*dt**2/2 g += sh*sf*dt**3/3 return g def _intstep(f, h, t=None, dt=1.0): g = f*h if t is None: return 0.5*np.sum(g[1:]+g[:-1])*dt else: return 0.5*np.sum((g[1:]+g[:-1])*(t[1:]-t[:-1])) def uconv(f, h, dt=1.0, solver='trap'): # Helper function: convolution over uniformly sampled grid. n = len(f) g = np.zeros(n) h = np.flip(h) for k in range(1, n): if solver == 'trap': g[k] = intprod(f[:k+1], h[-(k+1):], dt=dt) elif solver == 'step': g[k] = _intstep(f[:k+1], h[-(k+1):], dt=dt) return g
[docs] def conv(f, h, t=None, dt=1.0, solver='step'): """Convolve two 1D-arrays. This function returns the convolution :math:`f(t)\\otimes h(t)`, using piecewise linear integration to approximate the integrals in the convolution product. Args: f (array_like): the first 1D array to be convolved. h (array_like): the second 1D array to be convolved. t (array_like, optional): the time points where the values of f are defined. If t=None, the time points are assumed to be uniformly spaced with spacing dt. Defaults to None. dt (float, optional): spacing between time points for uniformly spaced time points. This parameter is ignored if t is explicity provided. Defaults to 1.0. Raises: ValueError: if f and h have a different length. Returns: numpy.ndarray: a 1D numpy array of the same length as f and h. See Also: `expconv`, `biexpconv`, `nexpconv`, `stepconv` Notes: The convolution product :math:`f(t)\\otimes h(t)` implemented by `conv` is explicitly defined as: .. math:: g(t) = \\int_0^t du\\, f(u) h(t-u) `conv` returns an approximation to this continuous convolution product, calculated by piecewise linear integration. This is not to be confused with other convolution functions, such as `numpy.convolve` which performs discrete convolution. Tracer-kinetic theory is defined by continuous equations and therefore should be performed with `conv` for maximal accuracy, though the difference may be small at high temporal resolution. `conv` is generally applicable to any f(t) and h(t), but more accurate formulae for some special cases exists and should be used if available. An example is `expconv`, to be used when either f(t) or h(t) is an exponential. Example: Import package and create vectors f and h: >>> import dcmri as dc >>> f = [5,4,3,6] >>> h = [2,9,1,3] Calculate :math:`g(t) = f(t) \\otimes h(t)` over a uniformly sampled grid of time points with spacing dt=1: >>> dc.conv(f, h) array([ 0. , 25.33333333, 41.66666667, 49. ]) Calculate the same convolution over a grid of time points with spacing dt=2: >>> dc.conv(f, h, dt=2) array([ 0. , 50.66666667, 83.33333333, 98. ]) Calculate the same convolution over a non-uniform grid of time points: >>> t = [0,1,3,7] >>> dc.conv(f, h, t) array([ 0. , 25.33333333, 57.41666667, 108.27083333]) """ if len(f) != len(h): raise ValueError('f and h must have the same length.') if t is None: return uconv(f, h, dt, solver=solver) n = len(t) g = np.zeros(n) tf = np.flip(t) f = np.flip(f) for k in range(1, n): tkf = t[k]-tf[-(k+1):] tk = np.unique(np.concatenate((t[:k+1], tkf))) fk = np.interp(tk, tkf, f[-(k+1):], left=0, right=0) hk = np.interp(tk, t[:k+1], h[:k+1], left=0, right=0) if solver == 'trap': g[k] = intprod(fk, hk, tk) elif solver == 'step': g[k] = _intstep(fk, hk, tk) return g
def inttrap(f, t, t0, t1): # Helper function: integrate f from t0 to t1 ti = t[(t0 < t)*(t < t1)] ti = np.concatenate(([t0], ti, [t1])) fi = np.interp(ti, t, f, left=0, right=0) return np.trapezoid(fi, ti)
[docs] def stepconv(f, T, D, t=None, dt=1.0): """Convolve a 1D-array with a normalised step function. Args: f (array_like): the 1D array to be convolved. T (float): the central time point of the step function. D (float): half-width of the step function, as a fraction of T. D must be less or equal to 1. t (array_like, optional): the time points where the values of f are defined, in the same units as T. If t=None, the time points are assumed to be uniformly spaced with spacing dt. Defaults to None. dt (float, optional): spacing between time points for uniformly spaced time points. This parameter is ignored if t is explicity provided. Defaults to 1.0. Raises: ValueError: if D > 1. Returns: numpy.ndarray: a 1D numpy array of the same length as f. See Also: `conv`, `expconv`, `biexpconv`, `nexpconv` Notes: `stepconv` implements the same convolution product as `conv`, but is more accurate and faster in the special case where one of the factors is known to be a step function. Example: Import package, create a vector f and an array of time points: >>> import dcmri as dc >>> f = [5,4,3,6] >>> t = [0,2,4,7] Convolve f with a step function that is centered on t=3 with a half width of 1.5 = 0.5*3: >>> dc.stepconv(f, 3, 0.5, t) array([0. , 0.8125 , 3.64583333, 3.5625 ]) """ if D > 1: raise ValueError('The dispersion factor D must be <= 1') TW = D*T # Half width of step T0 = T-TW # Initial time point of step T1 = T+TW n = len(f) t = tarray(n, t=t, dt=dt) g = np.zeros(n) k = len(t[t < T0]) ti = t[(T0 <= t)*(t <= T1)] for tk in ti: g[k] = inttrap(f, t, 0, tk-T0) k += 1 ti = t[T1 < t] for tk in ti: g[k] = inttrap(f, t, tk-T1, tk-T0) k += 1 return g/(2*TW)
[docs] def expconv(f, T, t=None, dt=1.0): """Convolve a 1D-array with a normalised exponential. This function returns the convolution :math:`f(t)\\otimes\\exp(-t/T)/T` using an efficient and accurate numerical formula, as detailed in the appendix of `Flouri et al (2016) <https://onlinelibrary.wiley.com/doi/full/10.1002/mrm.25991>`_ Args: f (array_like): the 1D array to be convolved. T (float): the characteristic time of the normalized exponential function. t (array_like, optional): the time points where the values of f are defined, in the same units as T. If t=None, the time points are assumed to be uniformly spaced with spacing dt. Defaults to None. dt (float, optional): spacing between time points for uniformly spaced time points. This parameter is ignored if t is explicity provided. Defaults to 1.0. Returns: numpy.ndarray: a 1D numpy array of the same length as f. See Also: `conv`, `biexpconv`, `nexpconv`, `stepconv` Notes: `expconv` implements the same convolution product as `conv`, but is more accurate and faster in the special case where one of the factors is known to be an exponential: .. math:: g(t) = \\frac{e^{-t/T}}{T} \\otimes f(t) In code this translates as: .. code-block:: python g = expconv(f, T, t) `expconv` should be used instead of `conv` whenever this applies. Since the transit time distribution of a compartment is exponential, this is an important use case. `expconv` can calculate a convolution between two exponential factors, but in that case an analytical formula can be used which is faster and more accurate. It is implemented in the function `biexpconv`. Example: Import package and create a vector f: >>> import dcmri as dc >>> f = [5,4,3,6] Calculate :math:`g(t) = f(t) \\otimes \\exp(-t/3)/3` over a uniformly sampled grid of time points with spacing dt=1: >>> dc.expconv(f, 3) array([0. , 1.26774952, 1.89266305, 2.6553402 ]) Calculate the same convolution over a grid of time points with spacing dt=2: >>> dc.expconv(f, 3, dt=2) array([0. , 2.16278873, 2.7866186 , 3.70082337]) Calculate the same convolution over a non-uniform grid of time points: >>> t = [0,1,3,7] >>> dc.expconv(f, 3, t) array([0. , 1.26774952, 2.32709015, 4.16571645]) """ if T == 0: return f f = np.array(f) n = len(f) t = tarray(n, t=t, dt=dt) x = (t[1:n] - t[0:n-1])/T df = (f[1:n] - f[0:n-1])/x E = np.exp(-x) E0 = 1-E E1 = x-E0 add = f[0:n-1]*E0 + df*E1 g = np.zeros(n, dtype=E.dtype) for i in range(0, n-1): g[i+1] = E[i]*g[i] + add[i] return g
[docs] def biexpconv(T1, T2, t): """Convolve two normalised exponentials analytically. Args: T1 (float): the characteristic time of the first exponential function. T2 (float): the characteristic time of the second exponential function, in the same units as T1. t (array_like, optional): the time points where the values of f are defined, in the same units as T. Returns: numpy.ndarray: The result of the convolution as a 1D array. See Also: `conv`, `expconv`, `nexpconv`, `stepconv` Notes: `biexpconv` returns the exact analytical result of the following convolution: .. math:: g(t) = \\frac{e^{-t/A}}{A} \\otimes \\frac{e^{-t/B}}{B} The formula is a biexponential with unit area: .. math:: g(t) = \\frac{Ae^{-t/A}-Be^{-t/B}}{A-B} In code this translates as: .. code-block:: python g = biexpconv(A, B, t) Example: Import package and create a vector of uniformly sampled time points t with spacing 5.0s: >>> import dcmri as dc >>> t = 5.0*np.arange(4) Calculate the convolution of two normalised exponentials with time constants 10s and 15s: >>> g = dc.biexpconv(10, 15, t) array([-0. , 0.02200013, 0.02910754, 0.02894986]) """ if T1 == T2: return (t/T1) * np.exp(-t/T1)/T1 else: return (np.exp(-t/T1)-np.exp(-t/T2))/(T1-T2)
[docs] def nexpconv(n, T, t): """Convolve n identical normalised exponentials analytically Args: n (float): number of exponentials. Since an analytical formula is used this can also be non-integer. T (float): the characteristic time of the exponential. t (array_like, optional): the time points where the values of f are defined, in the same units as T. Returns: numpy.ndarray: The result of the convolution as a 1D array. Raises: ValueError: if n<1 and if T<0 See Also: `conv`, `expconv`, `biexpconv`, `stepconv` Notes: `nexpconv` returns the exact analytical result of the following n convolutions: .. math:: g(t) = \\frac{e^{-t/T}}{T} \\otimes \\ldots \\otimes \\frac{e^{-t/T}}{T} The result is a gamma variate function with unit area: .. math:: g(t) = \\frac{1}{\\Gamma(n)}\\left(\\frac{t}{T}\\right)^{n-1} \\frac{e^{-t/T}}{T} Note the gamma-variate function becomes unstable due to numerical overflow at large n and/or short T. This function handles that situation by moving to a numerical solution, but this slows down the computations significantly. Callers should if possible avoid this situation by placing suitable bounds on the parameters. Example: Import package and create a vector of uniformly sampled time points t with spacing 5.0s: >>> import dcmri as dc >>> t = 5.0*np.arange(4) Calculate the convolution of 4 normalised exponentials with time constants 5s: >>> g = dc.nexpconv(4, 5, t) array([0. , 0.01226265, 0.03608941, 0.04480836]) """ if T < 0: raise ValueError('T must be non-negative') if n < 1: raise ValueError('n cannot be smaller than 1') if not isinstance(t, np.ndarray): t = np.array(t) u = t/T # Calculate gamma variate, silencing the warnings # as invalid cases are handled properly in the next line. with np.errstate(divide='ignore', invalid='ignore', over='ignore', under='ignore'): g = u**(n-1) * np.exp(-u)/T/gamma(n) # Handle overflow/invalid values if False in np.isfinite(g): # At large n the analytical formula runs into overflow # use numerical calculation in this case (slower and less accurate) # The caller is responsible for avoiding this situation by setting suitable parameter bounds g = np.exp(-t/T)/T n0 = int(np.floor(n)) for _ in range(n0-1): g = expconv(g, T, t) if n != n0: # Interpolate between n0 and n0+1 g1 = expconv(g, T, t) u = n-n0 g = g*u + g1*(1-u) return g
[docs] def sample(t, tp, Sp, dt=None) -> np.ndarray: """Sample a signal at given time points. Args: t (array-like): The time points at which to evaluate the signal. tp (array-like): the time points of the signal to be sampled. Sp (array-like): the values of the signal to be sampled. Values that are outside of the range are set to zero. dt (float, optional): sampling duration. If this is not provided, linear interpolation between the data points is used. Defaults to None. Returns: np.ndarray: Signals sampled at times t. """ if dt is None: return np.interp(t, tp, Sp, left=0, right=0) if dt == 0: return np.interp(t, tp, Sp, left=0, right=0) Ss = np.zeros(len(t)) for k, tk in enumerate(t): # data = Sp[(tp >= tk) & (tp < tk+dt)] # data = Sp[(tp >= tk-dt/2) & (tp < tk+dt/2)] # if data.size > 0: # Ss[k] = np.mean(data) # NEW (trapezoidal integration - more accurate) tb = [tk-dt/2, tk+dt/2] Sb = np.interp(tb, tp, Sp) i = (tp > tb[0]) & (tp < tb[1]) ti = np.concatenate(([tb[0]], tp[i], [tb[1]])) Si = np.concatenate(([Sb[0]], Sp[i], [Sb[1]])) Ss[k] = trapezoid(Si, ti)/dt return Ss
[docs] def add_noise(signal, sdev: float) -> np.ndarray: """Add noise to an MRI magnitude signal. Args: signal (array-like): Signal values. sdev (float): Standard deviation of the noise. Returns: np.ndarray: signal with noise added. """ noise_x = np.random.normal(0, sdev, np.size(signal)) noise_y = np.random.normal(0, sdev, np.size(signal)) signal = np.sqrt((signal+noise_x)**2 + noise_y**2) return signal