feat: positive semi-definite estimator for the covariance implemented,

fits.covariance matrix deprecated, covariance can now handle lists of
observables.
This commit is contained in:
Fabian Joswig 2022-03-01 09:45:25 +00:00
parent 8e3e34bbea
commit 82419b7a88
5 changed files with 65 additions and 79 deletions

View file

@ -239,7 +239,7 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
if kwargs.get('covariance') is not None:
cov = kwargs.get('covariance')
else:
cov = covariance_matrix(np.concatenate((y, x.ravel())))
cov = covariance(np.concatenate((y, x.ravel())))
number_of_x_parameters = int(m / x_f.shape[-1])
@ -455,7 +455,7 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
x0 = [0.1] * n_parms
if kwargs.get('correlated_fit') is True:
cov = covariance_matrix(y)
cov = covariance(y)
covdiag = np.diag(1. / np.sqrt(np.diag(cov)))
corr = np.copy(cov)
for i in range(len(y)):
@ -527,7 +527,7 @@ def _standard_fit(x, y, func, silent=False, **kwargs):
if kwargs.get('expected_chisquare') is True:
if kwargs.get('correlated_fit') is not True:
W = np.diag(1 / np.asarray(dy_f))
cov = covariance_matrix(y)
cov = covariance(y)
A = W @ jacobian(func)(fit_result.x, x)
P_phi = A @ np.linalg.inv(A.T @ A) @ A.T
expected_chisquare = np.trace((np.identity(x.shape[-1]) - P_phi) @ W @ cov @ W)
@ -651,33 +651,9 @@ def residual_plot(x, y, func, fit_res):
plt.draw()
def covariance_matrix(y):
"""Returns the covariance matrix of y.
Parameters
----------
y : list or numpy.ndarray
List or one dimensional array of Obs
"""
length = len(y)
cov = np.zeros((length, length))
for i, item in enumerate(y):
for j, jtem in enumerate(y[:i + 1]):
if i == j:
cov[i, j] = item.dvalue ** 2
else:
cov[i, j] = covariance(item, jtem)
cov = cov + cov.T - np.diag(np.diag(cov))
eigenvalues = np.linalg.eigh(cov)[0]
if not np.all(eigenvalues >= 0):
warnings.warn("Covariance matrix is not positive semi-definite", RuntimeWarning)
print("Eigenvalues of the covariance matrix:", eigenvalues)
return cov
def error_band(x, func, beta):
"""Returns the error band for an array of sample values x, for given fit function func with optimized parameters beta."""
cov = covariance_matrix(beta)
cov = covariance(beta)
if np.any(np.abs(cov - cov.T) > 1000 * np.finfo(np.float64).eps):
warnings.warn("Covariance matrix is not symmetric within floating point precision", RuntimeWarning)

View file

@ -1332,20 +1332,50 @@ def correlate(obs_a, obs_b):
return o
def covariance(obs1, obs2, correlation=False, **kwargs):
"""Calculates the covariance of two observables.
def covariance(obs, window=min, correlation=False, **kwargs):
"""Calculates the covariance matrix of a set of observables.
covariance(obs, obs) is equal to obs.dvalue ** 2
covariance([obs, obs])[0,1] is equal to obs.dvalue ** 2
The gamma method has to be applied first to both observables.
If abs(covariance(obs1, obs2)) > obs1.dvalue * obs2.dvalue, the covariance
is constrained to the maximum value.
Parameters
----------
obs : list or numpy.ndarray
List or one dimensional array of Obs
window: function or dict
Function which selects the window for each ensemble, examples 'min', 'max', 'np.mean', 'np.median'
Alternatively a dictionary with an entry for every ensemble can be manually specified.
correlation : bool
if true the correlation instead of the covariance is returned (default False)
"""
if isinstance(window, dict):
window_dict = window
else:
window_dict = {}
names = sorted(set([item for sublist in [o.mc_names for o in obs] for item in sublist]))
for name in names:
window_list = []
for ob in obs:
if ob.e_windowsize.get(name) is not None:
window_list.append(ob.e_windowsize[name])
window_dict[name] = int(window(window_list))
length = len(obs)
cov = np.zeros((length, length))
for i, item in enumerate(obs):
for j, jtem in enumerate(obs[:i + 1]):
cov[i, j] = _covariance_element(item, jtem, window_dict)
cov = cov + cov.T - np.diag(np.diag(cov))
eigenvalues = np.linalg.eigh(cov)[0]
if not np.all(eigenvalues >= 0):
warnings.warn("Covariance matrix is not positive semi-definite", RuntimeWarning)
print("Eigenvalues of the covariance matrix:", eigenvalues)
return cov
def _covariance_element(obs1, obs2, window_dict, correlation=False, **kwargs):
"""TODO
"""
def expand_deltas(deltas, idx, shape, new_idx):
"""Expand deltas defined on idx to a contiguous range [new_idx[0], new_idx[-1]].
@ -1398,21 +1428,16 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
if e_name not in obs2.mc_names:
continue
window = window_dict[e_name]
idl_d = {}
r_length = []
for r_name in obs1.e_content[e_name]:
if r_name not in obs2.e_content[e_name]:
continue
idl_d[r_name] = _merge_idx([obs1.idl[r_name], obs2.idl[r_name]])
if isinstance(idl_d[r_name], range):
r_length.append(len(idl_d[r_name]))
else:
r_length.append((idl_d[r_name][-1] - idl_d[r_name][0] + 1))
# TODO: Is a check needed if the length of an ensemble is zero?
if not r_length:
return 0.
w_max = max(r_length) // 2
w_max = window + 1
e_gamma[e_name] = np.zeros(w_max)
for r_name in obs1.e_content[e_name]:
@ -1438,11 +1463,10 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
e_rho[e_name] = e_gamma[e_name][:w_max] / e_gamma[e_name][0]
e_n_tauint[e_name] = np.cumsum(np.concatenate(([0.5], e_rho[e_name][1:])))
# Make sure no entry of tauint is smaller than 0.5
e_n_tauint[e_name][e_n_tauint[e_name] < 0.5] = 0.500000000001
e_n_tauint[e_name][e_n_tauint[e_name] < 0.5] = 0.5 + np.finfo(np.float64).eps
window = min(obs1.e_windowsize[e_name], obs2.e_windowsize[e_name])
# Bias correction hep-lat/0306017 eq. (49)
e_dvalue[e_name] = 2 * (e_n_tauint[e_name][window] + obs1.tau_exp[e_name] * np.abs(e_rho[e_name][window + 1])) * (1 + (2 * window + 1) / e_N) * e_gamma[e_name][0] / e_N
e_dvalue[e_name] = 2 * (e_n_tauint[e_name][window]) * (1 + (2 * window + 1) / e_N) * e_gamma[e_name][0] / e_N
dvalue += e_dvalue[e_name]
@ -1453,8 +1477,9 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
dvalue += float(np.dot(np.transpose(obs1.covobs[e_name].grad), np.dot(obs1.covobs[e_name].cov, obs2.covobs[e_name].grad)))
if np.abs(dvalue / obs1.dvalue / obs2.dvalue) > 1.0:
dvalue = np.sign(dvalue) * obs1.dvalue * obs2.dvalue
# TODO: Check if this is needed.
# if np.abs(dvalue / obs1.dvalue / obs2.dvalue) > 1.0:
# dvalue = np.sign(dvalue) * obs1.dvalue * obs2.dvalue
if correlation:
dvalue = dvalue / obs1.dvalue / obs2.dvalue