"""Lower-level DEM inversion routines used by :func:`demregpy.dn2dem`."""
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
from numpy.linalg import pinv, svd
from threadpoolctl import threadpool_limits
from tqdm import tqdm
__all__ = [
'dem_inv_gsvd',
'dem_pix',
'dem_reg_map',
'dem_unwrap',
'demmap',
]
[docs]
def demmap(
dd, ed, rmatrix, logt, dlogt, glc, reg_tweak=1.0, max_iter=10,
rgt_fact=1.5, dem_norm0=None, nmu=42, warn=False, l_emd=False
):
"""
Recover DEMs for a stack of one-dimensional observations.
Each row of ``dd`` is treated as an independent observation with the same
temperature response matrix. This function is the lower-level workhorse used
by :func:`demregpy.dn2dem` after the input arrays have been reshaped.
Parameters
----------
dd : array_like
Input counts with shape ``(na, nf)``.
ed : array_like
Uncertainties on ``dd`` with the same shape.
rmatrix : array_like
Response matrix with shape ``(nt, nf)``.
logt : array_like
Temperature-bin centres in log10(T).
dlogt : array_like
Width of each temperature bin in log10(T).
glc : array_like
Length-``nf`` 0/1 mask selecting the filters used for EM loci
weighting.
reg_tweak : float, optional
Initial chisq target. Default is 1.0.
max_iter : int, optional
Maximum number of times to attempt the gsvd before giving up, returns the last attempt if max_iter reached.
Default is 10.
rgt_fact : float, optional
Scale factor for the increase in chi-sqaured target for each iteration. Default is 1.5.
dem_norm0 : array_like, optional
Provides a "guess" dem as a starting point, if none is supplied one is created. Default is None.
nmu : int, optional
Number of reg param samples to use. Default is 42.
warn : bool, optional
Print out warnings. Default is False.
l_emd : bool, optional
Remove sqrt from constraint matrix (best with EMD). Default is False.
Returns
-------
dem : ndarray
DEM values with shape ``(na, nt)``.
edem : ndarray
Vertical uncertainties on ``dem``.
elogt : ndarray
Horizontal temperature resolution estimates in log10(T).
chisq : ndarray
Reduced chi-squared values for each observation.
dn_reg : ndarray
Reconstructed counts with shape ``(na, nf)``.
"""
na = dd.shape[0]
nf = rmatrix.shape[1]
nt = logt.shape[0]
if dem_norm0 is None:
dem_norm0 = np.ones((na, nt))
elif np.isscalar(dem_norm0):
raise ValueError("dem_norm0 must be an array matching (na, nt), not a scalar")
dem = np.zeros([na, nt])
edem = np.zeros([na, nt])
elogt = np.zeros([na, nt])
chisq = np.zeros([na])
dn_reg = np.zeros([na, nf])
# do we have enough DEM's to make parallel make sense?
if (na >= 200):
n_par = 100
niter = (int(np.floor((na)/n_par)))
# Put this here to make sure running dem calc in parallel, not the underlying np/gsvd stuff (this correct/needed?)
with threadpool_limits(limits=1):
with ProcessPoolExecutor() as exe:
futures = [exe.submit(dem_unwrap, dd[i*n_par:(i+1)*n_par, :], ed[i*n_par:(i+1)*n_par, :],
rmatrix, logt, dlogt, glc, reg_tweak=reg_tweak, max_iter=max_iter,
rgt_fact=rgt_fact, dem_norm0=dem_norm0[i*n_par:(i+1)*n_par, :],
nmu=nmu, warn=warn, l_emd=l_emd) for i in np.arange(niter)]
kwargs = {
'total': len(futures),
'unit': ' x10^2 DEM',
'unit_scale': True,
'leave': True
}
for f in tqdm(as_completed(futures), **kwargs):
pass
for i, f in enumerate(futures):
# store the outputs in arrays
dem[i*n_par:(i+1)*n_par, :] = f.result()[0]
edem[i*n_par:(i+1)*n_par, :] = f.result()[1]
elogt[i*n_par:(i+1)*n_par, :] = f.result()[2]
chisq[i*n_par:(i+1)*n_par] = f.result()[3]
dn_reg[i*n_par:(i+1)*n_par, :] = f.result()[4]
# if there are any remaining dems then execute remainder in serial
if (np.mod(na, niter*n_par) != 0):
i_start = niter*n_par
for i in range(na-i_start):
result = dem_pix(dd[i_start+i, :], ed[i_start+i, :], rmatrix, logt, dlogt, glc,
reg_tweak=reg_tweak, max_iter=max_iter, rgt_fact=rgt_fact,
dem_norm0=dem_norm0[i_start+i, :],
nmu=nmu, warn=warn, l_emd=l_emd)
dem[i_start+i, :] = result[0]
edem[i_start+i, :] = result[1]
elogt[i_start+i, :] = result[2]
chisq[i_start+i] = result[3]
dn_reg[i_start+i, :] = result[4]
# else we execute in serial
else:
for i in range(na):
result = dem_pix(dd[i, :], ed[i, :], rmatrix, logt, dlogt, glc,
reg_tweak=reg_tweak, max_iter=max_iter, rgt_fact=rgt_fact,
dem_norm0=dem_norm0[i, :], nmu=nmu, warn=warn, l_emd=l_emd)
dem[i, :] = result[0]
edem[i, :] = result[1]
elogt[i, :] = result[2]
chisq[i] = result[3]
dn_reg[i, :] = result[4]
return dem, edem, elogt, chisq, dn_reg
[docs]
def dem_unwrap(
dn, ed, rmatrix, logt, dlogt, glc, reg_tweak=1.0, max_iter=10,
rgt_fact=1.5, dem_norm0=None, nmu=42, warn=False, l_emd=False
):
"""
Run :func:`dem_pix` over a stack of observations in serial.
Parameters
----------
dn : ndarray
Input counts with shape ``(ndem, nf)``.
ed : ndarray
Uncertainties on ``dn`` with the same shape.
rmatrix : ndarray
Response matrix with shape ``(nt, nf)``.
logt : array_like
Log temperature bins.
dlogt : array_like
Size of temperature bins.
glc : array_like
Length-``nf`` 0/1 mask selecting the filters used for EM loci
weighting.
reg_tweak : float, optional
Initial Chisq target, by default 1.0
max_iter : int, optional
Max number of iterations to reach target chisq before giving up, by default 10
rgt_fact : float, optional
Factor to increase chisq by each iteration, by default 1.5
dem_norm0 : array_like, optional
Initial guess at the dem shape, by default 0
nmu : int, optional
number of reg param samples to use, by default 42
warn : bool, optional
Print warnings, by default False
l_emd : bool, optional
Remove sqrt from constraint matrix, by default False
Returns
-------
dem : ndarray
DEM values with shape ``(ndem, nt)``.
edem : ndarray
Vertical uncertainties on ``dem``.
elogt : ndarray
Horizontal temperature resolution estimates in log10(T).
chisq : array_like
Reduced chi-squared values.
dn_reg : ndarray
Reconstructed counts with shape ``(ndem, nf)``.
"""
ndem = dn.shape[0]
nt = logt.shape[0]
nf = dn.shape[1]
if dem_norm0 is None:
dem_norm0 = np.ones((ndem, nt))
elif np.isscalar(dem_norm0):
raise ValueError("dem_norm0 must be an array matching (ndem, nt), not a scalar")
dem = np.zeros([ndem, nt])
edem = np.zeros([ndem, nt])
elogt = np.zeros([ndem, nt])
chisq = np.zeros([ndem])
dn_reg = np.zeros([ndem, nf])
for i in range(ndem):
result = dem_pix(
dn[i, :], ed[i, :], rmatrix, logt, dlogt, glc,
reg_tweak=reg_tweak, max_iter=max_iter, rgt_fact=rgt_fact,
dem_norm0=dem_norm0[i, :], nmu=nmu, warn=warn, l_emd=l_emd
)
dem[i, :] = result[0]
edem[i, :] = result[1]
elogt[i, :] = result[2]
chisq[i] = result[3]
dn_reg[i, :] = result[4]
return dem, edem, elogt, chisq, dn_reg
[docs]
def dem_pix(dnin, ednin, rmatrix, logt, dlogt, glc, reg_tweak=1.0, max_iter=10,
rgt_fact=1.5, dem_norm0=None, nmu=42, warn=True, l_emd=False):
"""
Recover a DEM for one observation vector.
Parameters
----------
dnin : array_like
Input counts for one observation.
ednin : array_like
Uncertainties on ``dnin``.
rmatrix : ndarray
Temperature response of each channel.
logt : array_like
Log temperature bins.
dlogt : array_like
Size of temperature bins.
glc : array_like
Length-``nf`` 0/1 mask selecting the filters used for EM loci
weighting.
reg_tweak : float, optional
Initial Chisq target, by default 1.0
max_iter : int, optional
Max number of iterations to reach target chisq before giving up, by default 10
rgt_fact : float, optional
Factor to increase chisq by each iteration, by default 1.5
dem_norm0 : array_like, optional
Initial guess at the dem shape, by default 0
nmu : int, optional
number of reg param samples to use, by default 42
warn : bool, optional
Print warnings, by default False
l_emd : bool, optional
Remove sqrt from constraint matrix, by default False
Returns
-------
dem : ndarray
Recovered DEM values.
edem : ndarray
Vertical uncertainties on ``dem``.
elogt : ndarray
Horizontal temperature resolution estimates in log10(T).
chisq : array_like
Reduced chi-squared value.
dn_reg : ndarray
Reconstructed counts for each filter.
"""
nf = rmatrix.shape[1]
nt = logt.shape[0]
if not np.all(np.isfinite(dnin)):
raise ValueError("dnin must contain only finite values")
if np.any(dnin < 0):
raise ValueError("dnin must be non-negative")
user_supplied_dem_norm0 = dem_norm0 is not None
if dem_norm0 is None:
dem_norm0 = np.ones(nt)
elif np.isscalar(dem_norm0):
raise ValueError("dem_norm0 must be an array matching (nt,), not a scalar")
if nt < 3:
raise ValueError("logt/dlogt must define at least 3 DEM bins")
ltt = np.min(logt) + 1e-8 + (np.max(logt) - np.min(logt)) * np.arange(51) / (52 - 1.0)
dem = np.zeros(nt)
edem = np.zeros(nt)
elogt = np.zeros(nt)
chisq = 0
dn_reg = np.zeros(nf)
rmatrixin = rmatrix / ednin[np.newaxis, :]
dn = dnin/ednin
edn = ednin/ednin
ndem = 1
piter = 0
rgt = reg_tweak
# If you have supplied an initial guess/constraint normalized DEM then don't
# need to calculate one (either from L=1/sqrt(dLogT) or min of EM loci)
# As the call to this now sets dem_norm to array of 1s if nothing provided by user can also test for that
# Before calling this dem_norm0 is set to array of 1s if nothing provided by user
# So we need to work out some weighting for L or is one provided as dem_norm0 (not 0 or array of 1s)?
if ((not user_supplied_dem_norm0) or np.all(dem_norm0 == 1.0) or dem_norm0[0] == 0):
# Need to work out a weighting here then, have two approaches:
# 1. Do it via the min of em loci - chooses this if gloci, glc=1 from user
if (np.sum(glc) > 0.0):
gdglc = (glc > 0).nonzero()[0]
emloci = np.zeros((nt, gdglc.shape[0]))
# for each gloci take the minimum and work out the emission measure
for ee in np.arange(gdglc.shape[0]):
emloci[:, ee] = dnin[gdglc[ee]]/(rmatrix[:, gdglc[ee]])
# for each temp we take the min of the loci curves as the estimate of the dem
dem_model = np.zeros(nt)
for ttt in np.arange(nt):
nz = np.nonzero(emloci[ttt, :])[0]
dem_model[ttt] = np.min(emloci[ttt, nz]) if nz.size > 0 else 0.0
dem_reg_lwght = dem_model
# ~~~~~~~~~~~~~~~~~
# 2. Or if nothing selected will run reg once, and use solution as weighting (self norm approach)
else:
# Calculate the initial constraint matrix
# Just a diagonal matrix scaled by dlogT
ldiag = 1.0 / np.sqrt(dlogt[:])
# run gsvd
sva, svb, U, _V, W = dem_inv_gsvd_diag(rmatrixin.T, ldiag)
# run reg map
mu, misfit_curve, err_term = _dem_reg_map_curve(sva, svb, U, dn, edn, nmu)
lamb = _dem_reg_map_select(mu, misfit_curve, err_term, rgt)
# filt, diagonal matrix
U_nf = U[:nf, :nf]
W_nf = W[:, :nf]
sva_nf = sva[:nf]
svb_nf_sq = svb[:nf]**2
filt_diag = sva_nf / (sva_nf**2 + svb_nf_sq*lamb)
kdag = W_nf @ (U_nf * filt_diag[:, None])
dr0 = (kdag@dn).squeeze()
# only take the positive with certain amount (fcofmx) of max, then make rest small positive
fcofmax = 1e-4
mask = (dr0 > 0) & (dr0 > fcofmax * np.max(dr0))
dem_reg_lwght = np.ones(nt)
dem_reg_lwght[mask] = dr0[mask]
# ~~~~~~~~~~~~~~~~~
# Just smooth these initial dem_reg_lwght and max sure no value is too small
# dem_reg_lwght=(np.convolve(dem_reg_lwght,np.ones(3)/3))[1:-1]/np.max(dem_reg_lwght[:])
dem_reg_lwght = (np.convolve(dem_reg_lwght[1:-1], np.ones(5)/5))[1:-1]/np.max(dem_reg_lwght[:])
dem_reg_lwght[dem_reg_lwght <= 1e-8] = 1e-8
else:
# Otherwise just set dem_reg to inputted weight
dem_reg_lwght = dem_norm0
# Now actually do the dem regularisation using the L weighting from above
# Faster to do this and the GSVD on R and L before the pos loop
if l_emd:
# this works better with EMD calc, instead of DEM
ldiag = 1 / abs(dem_reg_lwght)
else:
ldiag = np.sqrt(dlogt) / np.sqrt(abs(dem_reg_lwght))
sva, svb, U, _V, W = dem_inv_gsvd_diag(rmatrixin.T, ldiag)
mu, misfit_curve, err_term = _dem_reg_map_curve(sva, svb, U, dn, edn, nmu)
U_nf = U[:nf, :nf]
W_nf = W[:, :nf]
sva_nf = sva[:nf]
svb_nf_sq = svb[:nf]**2
# Now loop until positive solution or max_iter reached
while ((ndem > 0) and (piter < max_iter)):
# #make L from 1/dem reg scaled by dlogt and diagonalise
# L=np.diag(np.sqrt(dlogt)/np.sqrt(abs(dem_reg_lwght)))
# #call gsvd and reg map
# sva,svb,U,V,W = dem_inv_gsvd(rmatrixin.T,L)
lamb = _dem_reg_map_select(mu, misfit_curve, err_term, rgt)
filt_diag = sva_nf / (sva_nf**2 + svb_nf_sq*lamb)
kdag = W_nf @ (U_nf * filt_diag[:, None])
dem_reg_out = (kdag@dn).squeeze()
ndem = len(dem_reg_out[dem_reg_out < 0])
rgt = rgt_fact*rgt
piter += 1
if (warn and (piter == max_iter)):
print('Warning, positivity loop hit max iterations, so increase max_iter? Or rgt_fact too small?')
dem = dem_reg_out
# work out the theoretical dn and compare to the input dn
dn_reg = (rmatrix.T @ dem_reg_out).squeeze()
residuals = (dnin-dn_reg)/ednin
# work out the chisquared
chisq = np.sum(residuals**2)/(nf)
# do error calculations on dem
edem = np.sqrt(np.sum(kdag**2, axis=1))
kdagk = kdag@rmatrixin.T
kdagk_max = np.max(kdagk, axis=0)
elogt = np.zeros(nt)
for kk in np.arange(nt):
rr = np.interp(ltt, logt, kdagk[:, kk])
hm_mask = (rr >= kdagk_max[kk]/2.)
elogt[kk] = dlogt[kk]
if (np.sum(hm_mask) > 0):
elogt[kk] = (ltt[hm_mask][-1]-ltt[hm_mask][0])/2
return dem, edem, elogt, chisq, dn_reg
def _dem_reg_map_curve(sigmaa, sigmab, U, data, err, nmu):
"""Precompute the regularization misfit curve for a fixed GSVD."""
nf = data.shape[0]
sigs = sigmaa[:nf]/sigmab[:nf]
maxx = np.max(sigs)
minx = np.min(sigs)**2.0*1E-4
minx = max(minx, np.finfo(float).tiny)
denom = max(nmu - 1.0, 1.0)
log_min = np.log(minx)
log_max = np.log(maxx)
log_max = min(log_max, np.log(np.finfo(float).max))
step = (log_max - log_min) / denom
log_mu = log_min + np.arange(nmu) * step
log_mu = np.clip(log_mu, log_min, log_max)
mu = np.exp(log_mu)
coef = U[:nf, :] @ data
sigmab_sq = sigmab[:nf, None]**2
numerator = mu[None, :] * sigmab_sq * coef[:, None]
denom = sigmaa[:nf, None]**2 + mu[None, :] * sigmab_sq
misfit_curve = np.sum((numerator / denom)**2, axis=0)
err_term = np.sum(err**2)
return mu, misfit_curve, err_term
def _dem_reg_map_select(mu, misfit_curve, err_term, reg_tweak):
"""Select the regularization parameter for a given target misfit."""
discr = misfit_curve - err_term * reg_tweak
return mu[np.argmin(np.abs(discr))]
[docs]
def dem_reg_map(sigmaa, sigmab, U, W, data, err, reg_tweak, nmu=500):
"""
Select the regularization parameter from a GSVD solution.
Parameters
----------
sigmaa : array_like
GSVD ``alpha`` singular values.
sigmab : array_like
GSVD ``beta`` singular values.
U : ndarray
GSVD ``U`` matrix.
W : ndarray
GSVD ``W`` matrix.
Unused.
data : array_like
Data vector in the weighted space used by the inversion.
err : array_like
Uncertainty vector corresponding to ``data``.
reg_tweak : float
Target chi-squared scaling used when choosing the regularization
parameter.
nmu : int, optional
Number of candidate regularization parameters to sample.
Returns
-------
float
Selected regularization parameter.
"""
mu, misfit_curve, err_term = _dem_reg_map_curve(sigmaa, sigmab, U, data, err, nmu)
return _dem_reg_map_select(mu, misfit_curve, err_term, reg_tweak)
def _dem_inv_gsvd_from_bdiag(A, bdiag):
"""GSVD helper for the common case where B is diagonal."""
bdiag = np.asarray(bdiag)
bdiag_inv = np.divide(
1.0,
bdiag,
out=np.zeros_like(bdiag, dtype=np.result_type(A, bdiag, float)),
where=bdiag != 0,
)
# For diagonal B, A @ pinv(B) is just column scaling by pinv(diag(B)).
AB1 = A * bdiag_inv
sze = AB1.shape
p = max(sze)
# Use the rectangular SVD directly, then pad the singular spectrum and U rows
# to preserve the output shapes expected by the existing GSVD code.
u, s, v = svd(AB1, full_matrices=True, compute_uv=True)
s_pad = np.zeros(p, dtype=s.dtype)
s_pad[:s.shape[0]] = s
beta = 1.0 / np.sqrt(1 + s_pad**2)
alpha = s_pad * beta
U = np.zeros((p, sze[0]), dtype=u.dtype)
U[:u.shape[1], :] = u.T
vb = v / beta[:, None]
w2 = pinv(vb * bdiag[np.newaxis, :])
return alpha, beta, U, v.T, w2
def dem_inv_gsvd_diag(A, bdiag):
"""Perform the GSVD of A with diagonal B defined by its diagonal entries."""
return _dem_inv_gsvd_from_bdiag(A, bdiag)
[docs]
def dem_inv_gsvd(A, B):
"""
Perform the generalised singular value decomposition of ``A`` and ``B``.
Parameters
----------
A : ndarray
Response-like matrix.
B : ndarray
Regularisation matrix.
Returns
-------
alpha : array_like
Diagonal values of ``SA``.
beta : array_like
Diagonal values of ``SB``.
U : ndarray
Left GSVD factor for ``A``.
V : ndarray
Left GSVD factor for ``B``.
w2 : ndarray
Right GSVD factor.
"""
bdiag = np.diagonal(B)
if np.all(B == np.diag(bdiag)):
return _dem_inv_gsvd_from_bdiag(A, bdiag)
# calculate the matrix A*B^-1
AB1 = A@pinv(B)
sze = AB1.shape
p = max(sze)
# Keep the historical output shapes while avoiding explicit padding of the
# matrix passed into SVD.
u, s, v = svd(AB1, full_matrices=True, compute_uv=True)
# from the svd products calculate the diagonal components form the gsvd
s_pad = np.zeros(p, dtype=s.dtype)
s_pad[:s.shape[0]] = s
beta = 1./np.sqrt(1+s_pad**2)
alpha = s_pad*beta
U = np.zeros((p, sze[0]), dtype=u.dtype)
U[:u.shape[1], :] = u.T
# calculate the w matrix
w2 = pinv((v / beta[:, None]) @ B)
# return gsvd products, transposing v as we do.
return alpha, beta, U, v.T, w2