Added the possibility to use constrained fit parameters. Added correlated least squares.

This commit is contained in:
Simon Kuberski 2021-11-15 16:39:50 +01:00
parent 4bf95da346
commit dbe1c26362
3 changed files with 183 additions and 34 deletions

View file

@ -109,6 +109,12 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
corrected by effects caused by correlated input data.
This can take a while as the full correlation matrix
has to be calculated (default False).
correlated_fit : int
If true, use the full correlation matrix in the definition of the chisquare
(only works for prior==None and when no method is given, at the moment).
const_par : list, optional
List of N Obs that are used to constrain the last N fit parameters of func and
to take into account the correlations.
'''
if priors is not None:
return _prior_fit(x, y, func, priors, silent=silent, **kwargs)
@ -154,6 +160,9 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
corrected by effects caused by correlated input data.
This can take a while as the full correlation matrix
has to be calculated (default False).
const_par : list, optional
List of N Obs that are used to constrain the last N fit parameters of func and
to take into account the correlations.
Based on the orthogonal distance regression module of scipy
'''
@ -169,6 +178,17 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
if not callable(func):
raise TypeError('func has to be a function.')
func_aug = func
if 'const_par' in kwargs:
const_par = kwargs['const_par']
if isinstance(const_par, Obs):
const_par = [const_par]
def func(p, x):
return func_aug(np.concatenate((p, [o.value for o in const_par])), x)
else:
const_par = []
for i in range(25):
try:
func(np.arange(i), x.T[0])
@ -180,6 +200,8 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
n_parms = i
if not silent:
print('Fit with', n_parms, 'parameters')
if(len(const_par) > 0):
print('\t and %d constrained parameter%s' % (len(const_par), 's' if len(const_par) > 1 else ''), const_par)
x_f = np.vectorize(lambda o: o.value)(x)
dx_f = np.vectorize(lambda o: o.dvalue)(x)
@ -195,7 +217,7 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
if 'initial_guess' in kwargs:
x0 = kwargs.get('initial_guess')
if len(x0) != n_parms:
raise Exception('Initial guess does not have the correct length.')
raise Exception('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms))
else:
x0 = [1] * n_parms
@ -222,12 +244,18 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
raise Exception('The minimization procedure did not converge.')
m = x_f.size
n_parms_aug = n_parms + len(const_par)
def odr_chisquare(p):
model = func(p[:n_parms], p[n_parms:].reshape(x_shape))
chisq = anp.sum(((y_f - model) / dy_f) ** 2) + anp.sum(((x_f - p[n_parms:].reshape(x_shape)) / dx_f) ** 2)
return chisq
def odr_chisquare_aug(p):
model = func_aug(np.concatenate((p[:n_parms_aug], [o.value for o in const_par])), p[n_parms_aug:].reshape(x_shape))
chisq = anp.sum(((y_f - model) / dy_f) ** 2) + anp.sum(((x_f - p[n_parms_aug:].reshape(x_shape)) / dx_f) ** 2)
return chisq
if kwargs.get('expected_chisquare') is True:
W = np.diag(1 / np.asarray(np.concatenate((dy_f.ravel(), dx_f.ravel()))))
@ -254,31 +282,32 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
print('chisquare/expected_chisquare:',
output.chisquare_by_expected_chisquare)
hess_inv = np.linalg.pinv(jacobian(jacobian(odr_chisquare))(np.concatenate((out.beta, out.xplus.ravel()))))
fitp = np.concatenate((out.beta, [o.value for o in const_par]))
hess_inv = np.linalg.pinv(jacobian(jacobian(odr_chisquare_aug))(np.concatenate((fitp, out.xplus.ravel()))))
def odr_chisquare_compact_x(d):
model = func(d[:n_parms], d[n_parms:n_parms + m].reshape(x_shape))
chisq = anp.sum(((y_f - model) / dy_f) ** 2) + anp.sum(((d[n_parms + m:].reshape(x_shape) - d[n_parms:n_parms + m].reshape(x_shape)) / dx_f) ** 2)
model = func_aug(d[:n_parms_aug], d[n_parms_aug:n_parms_aug + m].reshape(x_shape))
chisq = anp.sum(((y_f - model) / dy_f) ** 2) + anp.sum(((d[n_parms_aug + m:].reshape(x_shape) - d[n_parms_aug:n_parms_aug + m].reshape(x_shape)) / dx_f) ** 2)
return chisq
jac_jac_x = jacobian(jacobian(odr_chisquare_compact_x))(np.concatenate((out.beta, out.xplus.ravel(), x_f.ravel())))
jac_jac_x = jacobian(jacobian(odr_chisquare_compact_x))(np.concatenate((fitp, out.xplus.ravel(), x_f.ravel())))
deriv_x = -hess_inv @ jac_jac_x[:n_parms + m, n_parms + m:]
deriv_x = -hess_inv @ jac_jac_x[:n_parms_aug + m, n_parms_aug + m:]
def odr_chisquare_compact_y(d):
model = func(d[:n_parms], d[n_parms:n_parms + m].reshape(x_shape))
chisq = anp.sum(((d[n_parms + m:] - model) / dy_f) ** 2) + anp.sum(((x_f - d[n_parms:n_parms + m].reshape(x_shape)) / dx_f) ** 2)
model = func_aug(d[:n_parms_aug], d[n_parms_aug:n_parms_aug + m].reshape(x_shape))
chisq = anp.sum(((d[n_parms_aug + m:] - model) / dy_f) ** 2) + anp.sum(((x_f - d[n_parms_aug:n_parms_aug + m].reshape(x_shape)) / dx_f) ** 2)
return chisq
jac_jac_y = jacobian(jacobian(odr_chisquare_compact_y))(np.concatenate((out.beta, out.xplus.ravel(), y_f)))
jac_jac_y = jacobian(jacobian(odr_chisquare_compact_y))(np.concatenate((fitp, out.xplus.ravel(), y_f)))
deriv_y = -hess_inv @ jac_jac_y[:n_parms + m, n_parms + m:]
deriv_y = -hess_inv @ jac_jac_y[:n_parms_aug + m, n_parms_aug + m:]
result = []
for i in range(n_parms):
result.append(derived_observable(lambda x, **kwargs: x[0], [pseudo_Obs(out.beta[i], 0.0, y[0].names[0], y[0].shape[y[0].names[0]])] + list(x.ravel()) + list(y), man_grad=[0] + list(deriv_x[i]) + list(deriv_y[i])))
output.fit_parameters = result
output.fit_parameters = result + const_par
output.odr_chisquare = odr_chisquare(np.concatenate((out.beta, out.xplus.ravel())))
output.dof = x.shape[-1] - n_parms
@ -432,6 +461,17 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
if not callable(func):
raise TypeError('func has to be a function.')
func_aug = func
if 'const_par' in kwargs:
const_par = kwargs['const_par']
if isinstance(const_par, Obs):
const_par = [const_par]
def func(p, x):
return func_aug(np.concatenate((p, [o.value for o in const_par])), x)
else:
const_par = []
for i in range(25):
try:
func(np.arange(i), x.T[0])
@ -444,6 +484,8 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
if not silent:
print('Fit with', n_parms, 'parameters')
if(len(const_par) > 0):
print('\t and %d constrained parameter%s' % (len(const_par), 's' if len(const_par) > 1 else ''), const_par)
y_f = [o.value for o in y]
dy_f = [o.dvalue for o in y]
@ -454,14 +496,44 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
if 'initial_guess' in kwargs:
x0 = kwargs.get('initial_guess')
if len(x0) != n_parms:
raise Exception('Initial guess does not have the correct length.')
raise Exception('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms))
else:
x0 = [0.1] * n_parms
def chisqfunc(p):
model = func(p, x)
chisq = anp.sum(((y_f - model) / dy_f) ** 2)
return chisq
if kwargs.get('correlated_fit') is True:
cov = covariance_matrix(y)
covdiag = np.diag(1. / np.sqrt(np.diag(cov)))
corr = np.copy(cov)
for i in range(len(y)):
for j in range(len(y)):
corr[i][j] = cov[i][j] / np.sqrt(cov[i][i] * cov[j][j])
condn = np.linalg.cond(corr)
if condn > 1e4:
warnings.warn("Correlation matrix may be ill-conditioned! condition number: %1.2e" % (condn), RuntimeWarning)
chol = np.linalg.cholesky(corr)
chol_inv = np.linalg.inv(chol)
chol_inv = np.dot(chol_inv, covdiag)
def chisqfunc(p):
model = func(p, x)
chisq = anp.sum(anp.dot(chol_inv, (y_f - model)) ** 2)
return chisq
def chisqfunc_aug(p):
model = func_aug(np.concatenate((p, [o.value for o in const_par])), x)
chisq = anp.sum(anp.dot(chol_inv, (y_f - model)) ** 2)
return chisq
else:
def chisqfunc(p):
model = func(p, x)
chisq = anp.sum(((y_f - model) / dy_f) ** 2)
return chisq
def chisqfunc_aug(p):
model = func_aug(np.concatenate((p, [o.value for o in const_par])), x)
chisq = anp.sum(((y_f - model) / dy_f) ** 2)
return chisq
if 'method' in kwargs:
output.method = kwargs.get('method')
@ -482,10 +554,17 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
if not silent:
print('Method: Levenberg-Marquardt')
def chisqfunc_residuals(p):
model = func(p, x)
chisq = ((y_f - model) / dy_f)
return chisq
if kwargs.get('correlated_fit') is True:
def chisqfunc_residuals(p):
model = func(p, x)
chisq = anp.dot(chol_inv, (y_f - model))
return chisq
else:
def chisqfunc_residuals(p):
model = func(p, x)
chisq = ((y_f - model) / dy_f)
return chisq
fit_result = scipy.optimize.least_squares(chisqfunc_residuals, x0, method='lm', ftol=1e-15, gtol=1e-15, xtol=1e-15)
@ -507,32 +586,44 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
print('chisquare/d.o.f.:', output.chisquare_by_dof)
if kwargs.get('expected_chisquare') is True:
W = np.diag(1 / np.asarray(dy_f))
cov = covariance_matrix(y)
A = W @ jacobian(func)(fit_result.x, x)
P_phi = A @ np.linalg.inv(A.T @ A) @ A.T
expected_chisquare = np.trace((np.identity(x.shape[-1]) - P_phi) @ W @ cov @ W)
output.chisquare_by_expected_chisquare = chisquare / expected_chisquare
if kwargs.get('correlated_fit') is True:
output.chisquare_by_expected_chisquare = output.chisquare_by_dof
else:
W = np.diag(1 / np.asarray(dy_f))
cov = covariance_matrix(y)
A = W @ jacobian(func)(fit_result.x, x)
P_phi = A @ np.linalg.inv(A.T @ A) @ A.T
expected_chisquare = np.trace((np.identity(x.shape[-1]) - P_phi) @ W @ cov @ W)
output.chisquare_by_expected_chisquare = chisquare / expected_chisquare
if not silent:
print('chisquare/expected_chisquare:',
output.chisquare_by_expected_chisquare)
hess_inv = np.linalg.pinv(jacobian(jacobian(chisqfunc))(fit_result.x))
fitp = np.concatenate((fit_result.x, [o.value for o in const_par]))
hess_inv = np.linalg.pinv(jacobian(jacobian(chisqfunc_aug))(fitp))
def chisqfunc_compact(d):
model = func(d[:n_parms], x)
chisq = anp.sum(((d[n_parms:] - model) / dy_f) ** 2)
return chisq
n_parms_aug = n_parms + len(const_par)
if kwargs.get('correlated_fit') is True:
def chisqfunc_compact(d):
model = func_aug(d[:n_parms_aug], x)
chisq = anp.sum(anp.dot(chol_inv, (d[n_parms_aug:] - model)) ** 2)
return chisq
else:
def chisqfunc_compact(d):
model = func_aug(d[:n_parms_aug], x)
chisq = anp.sum(((d[n_parms_aug:] - model) / dy_f) ** 2)
return chisq
jac_jac = jacobian(jacobian(chisqfunc_compact))(np.concatenate((fit_result.x, y_f)))
jac_jac = jacobian(jacobian(chisqfunc_compact))(np.concatenate((fitp, y_f)))
deriv = -hess_inv @ jac_jac[:n_parms, n_parms:]
deriv = -hess_inv @ jac_jac[:n_parms_aug, n_parms_aug:]
result = []
for i in range(n_parms):
result.append(derived_observable(lambda x, **kwargs: x[0], [pseudo_Obs(fit_result.x[i], 0.0, y[0].names[0], y[0].shape[y[0].names[0]])] + list(y), man_grad=[0] + list(deriv[i])))
output.fit_parameters = result
output.fit_parameters = result + const_par
output.chisquare = chisqfunc(fit_result.x)
output.dof = x.shape[-1] - n_parms

View file

@ -1196,6 +1196,8 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
if true the correlation instead of the covariance is
returned (default False)
"""
if set(obs1.names).isdisjoint(set(obs2.names)):
return 0.
for name in sorted(set(obs1.names + obs2.names)):
if (obs1.shape.get(name) != obs2.shape.get(name)) and (obs1.shape.get(name) is not None) and (obs2.shape.get(name) is not None):
@ -1287,6 +1289,9 @@ def covariance2(obs1, obs2, correlation=False, **kwargs):
return gamma
if set(obs1.names).isdisjoint(set(obs2.names)):
return 0.
if not hasattr(obs1, 'e_names') or not hasattr(obs2, 'e_names'):
raise Exception('The gamma method has to be applied to both Obs first.')

View file

@ -2,6 +2,8 @@ import autograd.numpy as np
import math
import scipy.optimize
from scipy.odr import ODR, Model, RealData
from scipy.linalg import cholesky
from scipy.stats import norm
import pyerrors as pe
import pytest
@ -41,6 +43,53 @@ def test_least_squares():
chi2_scipy = np.sum(((f(x, *popt) - y) / yerr) ** 2) / (len(x) - 2)
assert math.isclose(chi2_pyerrors, chi2_scipy, abs_tol=1e-10)
out = pe.least_squares(x, oy, func, const_par=[beta[1]])
assert((out.fit_parameters[0] - beta[0]).is_zero)
assert((out.fit_parameters[1] - beta[1]).is_zero)
num_samples = 400
N = 10
x = norm.rvs(size=(N, num_samples))
r = np.zeros((N, N))
for i in range(N):
for j in range(N):
r[i, j] = np.exp(-0.1 * np.fabs(i - j))
errl = np.sqrt([3.4, 2.5, 3.6, 2.8, 4.2, 4.7, 4.9, 5.1, 3.2, 4.2])
errl *= 4
for i in range(N):
for j in range(N):
r[i, j] *= errl[i] * errl[j]
c = cholesky(r, lower=True)
y = np.dot(c, x)
x = np.arange(N)
for linear in [True, False]:
data = []
for i in range(N):
if linear:
data.append(pe.Obs([[i + 1 + o for o in y[i]]], ['ens']))
else:
data.append(pe.Obs([[np.exp(-(i + 1)) + np.exp(-(i + 1)) * o for o in y[i]]], ['ens']))
[o.gamma_method() for o in data]
if linear:
def fitf(p, x):
return p[1] + p[0] * x
else:
def fitf(p, x):
return p[1] * np.exp(-p[0] * x)
fitp = pe.least_squares(x, data, fitf, expected_chisquare=True)
fitpc = pe.least_squares(x, data, fitf, correlated_fit=True)
for i in range(2):
assert((fitp[i] - fitpc[i]).is_zero_within_error)
def test_total_least_squares():
dim = 10 + int(30 * np.random.rand())
@ -79,6 +128,10 @@ def test_total_least_squares():
assert math.isclose(beta[i].value, output.beta[i], rel_tol=1e-5)
assert math.isclose(output.cov_beta[i, i], beta[i].dvalue ** 2, rel_tol=2.5e-1), str(output.cov_beta[i, i]) + ' ' + str(beta[i].dvalue ** 2)
assert math.isclose(pe.covariance(beta[0], beta[1]), output.cov_beta[0, 1], rel_tol=2.5e-1)
out = pe.total_least_squares(ox, oy, func, const_par=[beta[1]])
assert((out.fit_parameters[0] - beta[0]).is_zero)
assert((out.fit_parameters[1] - beta[1]).is_zero)
pe.Obs.e_tag_global = 0