Merge pull request #12 from s-kuberski/feature/covobs

Feature/covobs
This commit is contained in:
Fabian Joswig 2021-12-01 14:49:07 +00:00 committed by GitHub
commit 9e3dbea78b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 356 additions and 91 deletions

54
pyerrors/covobs.py Normal file
View file

@ -0,0 +1,54 @@
import numpy as np
class Covobs:
def __init__(self, mean, cov, name, pos=None, grad=None):
""" Initialize Covobs object.
Parameters
----------
mean : float
Mean value of the new Obs
cov : list or array
2d Covariance matrix or 1d diagonal entries
name : str
identifier for the covariance matrix
pos : int
Position of the variance belonging to mean in cov.
Is taken to be 1 if cov is 0-dimensional
grad : list or array
Gradient of the Covobs wrt. the means belonging to cov.
"""
self.cov = np.array(cov)
if self.cov.ndim == 0:
self.N = 1
elif self.cov.ndim == 1:
self.N = len(self.cov)
self.cov = np.diag(self.cov)
elif self.cov.ndim == 2:
self.N = self.cov.shape[0]
if self.cov.shape[1] != self.N:
raise Exception('Covariance matrix has to be a square matrix!')
else:
raise Exception('Covariance matrix has to be a 2 dimensional square matrix!')
self.name = name
if grad is None:
if pos is None:
if self.N == 1:
pos = 0
else:
raise Exception('Have to specify position of cov-element belonging to mean!')
else:
if pos > self.N:
raise Exception('pos %d too large for covariance matrix with dimension %dx%d!' % (pos, self.N, self.N))
self.grad = np.zeros((self.N, 1))
self.grad[pos] = 1.
else:
self.grad = np.array(grad)
self.value = mean
def errsq(self):
""" Return the variance (= square of the error) of the Covobs
"""
return float(np.dot(np.transpose(self.grad), np.dot(self.cov, self.grad)))

View file

@ -6,6 +6,7 @@ from autograd import jacobian
import matplotlib.pyplot as plt
import numdifftools as nd
from itertools import groupby
from .covobs import Covobs
class Obs:
@ -41,7 +42,7 @@ class Obs:
'ddvalue', 'reweighted', 'S', 'tau_exp', 'N_sigma',
'e_dvalue', 'e_ddvalue', 'e_tauint', 'e_dtauint',
'e_windowsize', 'e_rho', 'e_drho', 'e_n_tauint', 'e_n_dtauint',
'idl', 'is_merged', 'tag', '__dict__']
'idl', 'is_merged', 'tag', 'covobs', '__dict__']
S_global = 2.0
S_dict = {}
@ -51,7 +52,7 @@ class Obs:
N_sigma_dict = {}
filter_eps = 1e-10
def __init__(self, samples, names, idl=None, means=None, **kwargs):
def __init__(self, samples, names, idl=None, means=None, covobs=None, **kwargs):
""" Initialize Obs object.
Parameters
@ -67,7 +68,7 @@ class Obs:
already subtracted from the samples
"""
if means is None:
if means is None and samples is not None:
if len(samples) != len(names):
raise Exception('Length of samples and names incompatible.')
if idl is not None:
@ -80,52 +81,65 @@ class Obs:
if min(len(x) for x in samples) <= 4:
raise Exception('Samples have to have at least 5 entries.')
self.names = sorted(names)
if names:
self.names = sorted(names)
else:
self.names = []
self.shape = {}
self.r_values = {}
self.deltas = {}
if covobs is None:
self.covobs = {}
else:
self.covobs = covobs
self.idl = {}
if idl is not None:
for name, idx in sorted(zip(names, idl)):
if isinstance(idx, range):
self.idl[name] = idx
elif isinstance(idx, (list, np.ndarray)):
dc = np.unique(np.diff(idx))
if np.any(dc < 0):
raise Exception("Unsorted idx for idl[%s]" % (name))
if len(dc) == 1:
self.idl[name] = range(idx[0], idx[-1] + dc[0], dc[0])
if samples is not None:
if idl is not None:
for name, idx in sorted(zip(names, idl)):
if isinstance(idx, range):
self.idl[name] = idx
elif isinstance(idx, (list, np.ndarray)):
dc = np.unique(np.diff(idx))
if np.any(dc < 0):
raise Exception("Unsorted idx for idl[%s]" % (name))
if len(dc) == 1:
self.idl[name] = range(idx[0], idx[-1] + dc[0], dc[0])
else:
self.idl[name] = list(idx)
else:
self.idl[name] = list(idx)
else:
raise Exception('incompatible type for idl[%s].' % (name))
else:
for name, sample in sorted(zip(names, samples)):
self.idl[name] = range(1, len(sample) + 1)
raise Exception('incompatible type for idl[%s].' % (name))
else:
for name, sample in sorted(zip(names, samples)):
self.idl[name] = range(1, len(sample) + 1)
if means is not None:
for name, sample, mean in sorted(zip(names, samples, means)):
self.shape[name] = len(self.idl[name])
if len(sample) != self.shape[name]:
raise Exception('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = mean
self.deltas[name] = sample
else:
for name, sample in sorted(zip(names, samples)):
self.shape[name] = len(self.idl[name])
if len(sample) != self.shape[name]:
raise Exception('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = np.mean(sample)
self.deltas[name] = sample - self.r_values[name]
self.is_merged = {}
self.N = sum(list(self.shape.values()))
if means is not None:
for name, sample, mean in sorted(zip(names, samples, means)):
self.shape[name] = len(self.idl[name])
if len(sample) != self.shape[name]:
raise Exception('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = mean
self.deltas[name] = sample
else:
for name, sample in sorted(zip(names, samples)):
self.shape[name] = len(self.idl[name])
if len(sample) != self.shape[name]:
raise Exception('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = np.mean(sample)
self.deltas[name] = sample - self.r_values[name]
self.is_merged = {}
self.N = sum(list(self.shape.values()))
self._value = 0
if means is None:
for name in self.names:
self._value += self.shape[name] * self.r_values[name]
self._value /= self.N
self._value = 0
if means is None:
for name in self.names:
self._value += self.shape[name] * self.r_values[name]
self._value /= self.N
else:
self._value = 0
self.is_merged = {}
self.N = 0
self._dvalue = 0.0
self.ddvalue = 0.0
@ -145,6 +159,14 @@ class Obs:
def e_names(self):
return sorted(set([o.split('|')[0] for o in self.names]))
@property
def cov_names(self):
return sorted(set([o for o in self.covobs.keys()]))
@property
def mc_names(self):
return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names]))
@property
def e_content(self):
res = {}
@ -219,8 +241,7 @@ class Obs:
_parse_kwarg('tau_exp')
_parse_kwarg('N_sigma')
for e, e_name in enumerate(self.e_names):
for e, e_name in enumerate(self.mc_names):
r_length = []
for r_name in e_content[e_name]:
if isinstance(self.idl[r_name], range):
@ -306,11 +327,16 @@ class Obs:
self._dvalue += self.e_dvalue[e_name] ** 2
self.ddvalue += (self.e_dvalue[e_name] * self.e_ddvalue[e_name]) ** 2
self._dvalue = np.sqrt(self.dvalue)
for e_name in self.cov_names:
self.e_dvalue[e_name] = np.sqrt(self.covobs[e_name].errsq())
self.e_ddvalue[e_name] = 0
self._dvalue += self.e_dvalue[e_name]**2
self._dvalue = np.sqrt(self._dvalue)
if self._dvalue == 0.0:
self.ddvalue = 0.0
else:
self.ddvalue = np.sqrt(self.ddvalue) / self.dvalue
self.ddvalue = np.sqrt(self.ddvalue) / self._dvalue
return
def _calc_gamma(self, deltas, idx, shape, w_max, fft):
@ -362,17 +388,19 @@ class Obs:
if self.value == 0.0:
percentage = np.nan
else:
percentage = np.abs(self.dvalue / self.value) * 100
print('Result\t %3.8e +/- %3.8e +/- %3.8e (%3.3f%%)' % (self.value, self.dvalue, self.ddvalue, percentage))
percentage = np.abs(self._dvalue / self.value) * 100
print('Result\t %3.8e +/- %3.8e +/- %3.8e (%3.3f%%)' % (self.value, self._dvalue, self.ddvalue, percentage))
if len(self.e_names) > 1:
print(' Ensemble errors:')
for e_name in self.e_names:
for e_name in self.mc_names:
if len(self.e_names) > 1:
print('', e_name, '\t %3.8e +/- %3.8e' % (self.e_dvalue[e_name], self.e_ddvalue[e_name]))
if self.tau_exp[e_name] > 0:
print(' t_int\t %3.8e +/- %3.8e tau_exp = %3.2f, N_sigma = %1.0i' % (self.e_tauint[e_name], self.e_dtauint[e_name], self.tau_exp[e_name], self.N_sigma[e_name]))
else:
print(' t_int\t %3.8e +/- %3.8e S = %3.2f' % (self.e_tauint[e_name], self.e_dtauint[e_name], self.S[e_name]))
for e_name in self.cov_names:
print('', e_name, '\t %3.8e' % (self.e_dvalue[e_name]))
if ens_content is True:
if len(self.e_names) == 1:
print(self.N, 'samples in', len(self.e_names), 'ensemble:')
@ -380,25 +408,28 @@ class Obs:
print(self.N, 'samples in', len(self.e_names), 'ensembles:')
my_string_list = []
for key, value in sorted(self.e_content.items()):
my_string = ' ' + "\u00B7 Ensemble '" + key + "' "
if len(value) == 1:
my_string += f': {self.shape[value[0]]} configurations'
if isinstance(self.idl[value[0]], range):
my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')'
else:
my_string += ' (irregular range)'
else:
sublist = []
for v in value:
my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' "
my_substring += f': {self.shape[v]} configurations'
if isinstance(self.idl[v], range):
my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')'
if key not in self.covobs:
my_string = ' ' + "\u00B7 Ensemble '" + key + "' "
if len(value) == 1:
my_string += f': {self.shape[value[0]]} configurations'
if isinstance(self.idl[value[0]], range):
my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')'
else:
my_substring += ' (irregular range)'
sublist.append(my_substring)
my_string += ' (irregular range)'
else:
sublist = []
for v in value:
my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' "
my_substring += f': {self.shape[v]} configurations'
if isinstance(self.idl[v], range):
my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')'
else:
my_substring += ' (irregular range)'
sublist.append(my_substring)
my_string += '\n' + '\n'.join(sublist)
my_string += '\n' + '\n'.join(sublist)
else:
my_string = ' ' + "\u00B7 Covobs '" + key + "' "
my_string_list.append(my_string)
print('\n'.join(my_string_list))
@ -416,7 +447,7 @@ class Obs:
Works only properly when the gamma method was run.
"""
return self.is_zero() or np.abs(self.value) <= sigma * self.dvalue
return self.is_zero() or np.abs(self.value) <= sigma * self._dvalue
def is_zero(self, rtol=1.e-5, atol=1.e-8):
"""Checks whether the observable is zero within a given tolerance.
@ -428,7 +459,7 @@ class Obs:
atol : float
Absolute tolerance (for details see numpy documentation).
"""
return np.isclose(0.0, self.value, rtol, atol) and all(np.allclose(0.0, delta, rtol, atol) for delta in self.deltas.values())
return (np.isclose(0.0, self.value, rtol, atol) and all(np.allclose(0.0, delta, rtol, atol) for delta in self.deltas.values()) and all(np.allclose(0.0, delta.grad, rtol, atol) for delta in self.covobs.values()))
def plot_tauint(self, save=None):
"""Plot integrated autocorrelation time for each ensemble.
@ -442,7 +473,7 @@ class Obs:
raise Exception('Run the gamma method first.')
fig = plt.figure()
for e, e_name in enumerate(self.e_names):
for e, e_name in enumerate(self.mc_names):
plt.xlabel(r'$W$')
plt.ylabel(r'$\tau_\mathrm{int}$')
length = int(len(self.e_n_tauint[e_name]))
@ -473,7 +504,7 @@ class Obs:
"""Plot normalized autocorrelation function time for each ensemble."""
if not hasattr(self, 'e_dvalue'):
raise Exception('Run the gamma method first.')
for e, e_name in enumerate(self.e_names):
for e, e_name in enumerate(self.mc_names):
plt.xlabel('W')
plt.ylabel('rho')
length = int(len(self.e_drho[e_name]))
@ -495,7 +526,7 @@ class Obs:
"""Plot replica distribution for each ensemble with more than one replicum."""
if not hasattr(self, 'e_dvalue'):
raise Exception('Run the gamma method first.')
for e, e_name in enumerate(self.e_names):
for e, e_name in enumerate(self.mc_names):
if len(self.e_content[e_name]) == 1:
print('No replica distribution for a single replicum (', e_name, ')')
continue
@ -521,7 +552,7 @@ class Obs:
expand : bool
show expanded history for irregular Monte Carlo chains (default: True).
"""
for e, e_name in enumerate(self.e_names):
for e, e_name in enumerate(self.mc_names):
plt.figure()
r_length = []
tmp = []
@ -544,10 +575,10 @@ class Obs:
ensemble to the error and returns a dictionary containing the fractions."""
if not hasattr(self, 'e_dvalue'):
raise Exception('Run the gamma method first.')
if self.dvalue == 0.0:
if self._dvalue == 0.0:
raise Exception('Error is 0.0')
labels = self.e_names
sizes = [i ** 2 for i in list(self.e_dvalue.values())] / self.dvalue ** 2
sizes = [self.e_dvalue[name] ** 2 for name in labels] / self._dvalue ** 2
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, startangle=90, normalize=True)
ax1.axis('equal')
@ -605,15 +636,15 @@ class Obs:
return 'Obs[' + str(self) + ']'
def __str__(self):
if self.dvalue == 0.0:
if self._dvalue == 0.0:
return str(self.value)
fexp = np.floor(np.log10(self.dvalue))
fexp = np.floor(np.log10(self._dvalue))
if fexp < 0.0:
return '{:{form}}({:2.0f})'.format(self.value, self.dvalue * 10 ** (-fexp + 1), form='.' + str(-int(fexp) + 1) + 'f')
return '{:{form}}({:2.0f})'.format(self.value, self._dvalue * 10 ** (-fexp + 1), form='.' + str(-int(fexp) + 1) + 'f')
elif fexp == 0.0:
return '{:.1f}({:1.1f})'.format(self.value, self.dvalue)
return '{:.1f}({:1.1f})'.format(self.value, self._dvalue)
else:
return '{:.0f}({:2.0f})'.format(self.value, self.dvalue)
return '{:.0f}({:2.0f})'.format(self.value, self._dvalue)
# Overload comparisons
def __lt__(self, other):
@ -1028,6 +1059,15 @@ def derived_observable(func, data, **kwargs):
if isinstance(raveled_data[i], (int, float)):
raveled_data[i] = Obs([raveled_data[i] + np.zeros(first_shape)], [first_name], idl=[first_idl])
allcov = {}
for o in raveled_data:
for name in o.cov_names:
if name in allcov:
if not np.array_equal(allcov[name], o.covobs[name].cov):
raise Exception('Inconsistent covariance matrices for %s!' % (name))
else:
allcov[name] = o.covobs[name].cov
n_obs = len(raveled_data)
new_names = sorted(set([y for x in [o.names for o in raveled_data] for y in x]))
@ -1100,24 +1140,41 @@ def derived_observable(func, data, **kwargs):
for i_val, new_val in np.ndenumerate(new_values):
new_deltas = {}
new_grad = {}
for j_obs, obs in np.ndenumerate(data):
for name in obs.names:
new_deltas[name] = new_deltas.get(name, 0) + deriv[i_val + j_obs] * _expand_deltas_for_merge(obs.deltas[name], obs.idl[name], obs.shape[name], new_idl_d[name])
if name in obs.cov_names:
if name in new_grad:
new_grad[name] += deriv[i_val + j_obs] * obs.covobs[name].grad
else:
new_grad[name] = deriv[i_val + j_obs] * obs.covobs[name].grad
else:
new_deltas[name] = new_deltas.get(name, 0) + deriv[i_val + j_obs] * _expand_deltas_for_merge(obs.deltas[name], obs.idl[name], obs.shape[name], new_idl_d[name])
new_covobs = {name: Covobs(0, allcov[name], name, grad=new_grad[name]) for name in new_grad}
new_samples = []
new_means = []
new_idl = []
new_names_obs = []
for name in new_names:
if is_merged[name]:
filtered_deltas, filtered_idl_d = _filter_zeroes(new_deltas[name], new_idl_d[name])
else:
filtered_deltas = new_deltas[name]
filtered_idl_d = new_idl_d[name]
if name not in new_covobs:
if is_merged[name]:
filtered_deltas, filtered_idl_d = _filter_zeroes(new_deltas[name], new_idl_d[name])
else:
filtered_deltas = new_deltas[name]
filtered_idl_d = new_idl_d[name]
new_samples.append(filtered_deltas)
new_idl.append(filtered_idl_d)
new_means.append(new_r_values[name][i_val])
final_result[i_val] = Obs(new_samples, new_names, means=new_means, idl=new_idl)
new_samples.append(filtered_deltas)
new_idl.append(filtered_idl_d)
new_means.append(new_r_values[name][i_val])
new_names_obs.append(name)
final_result[i_val] = Obs(new_samples, new_names_obs, means=new_means, idl=new_idl)
for name in new_covobs:
final_result[i_val].names.append(name)
final_result[i_val].shape[name] = 1
final_result[i_val].idl[name] = []
final_result[i_val].covobs = new_covobs
final_result[i_val]._value = new_val
final_result[i_val].is_merged = is_merged
final_result[i_val].reweighted = reweighted
@ -1180,6 +1237,8 @@ def reweight(weight, obs, **kwargs):
"""
result = []
for i in range(len(obs)):
if len(obs[i].cov_names):
raise Exception('Error: Not possible to reweight an Obs that contains covobs!')
if sorted(weight.names) != sorted(obs[i].names):
raise Exception('Error: Ensembles do not fit')
for name in weight.names:
@ -1221,6 +1280,8 @@ def correlate(obs_a, obs_b):
if sorted(obs_a.names) != sorted(obs_b.names):
raise Exception('Ensembles do not fit')
if len(obs_a.cov_names) or len(obs_b.cov_names):
raise Exception('Error: Not possible to correlate Obs that contain covobs!')
for name in obs_a.names:
if obs_a.shape[name] != obs_b.shape[name]:
raise Exception('Shapes of ensemble', name, 'do not fit')
@ -1278,7 +1339,7 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
dvalue = 0
for e_name in obs1.e_names:
for e_name in obs1.mc_names:
if e_name not in obs2.e_names:
continue
@ -1298,6 +1359,13 @@ def covariance(obs1, obs2, correlation=False, **kwargs):
tau_combined = (obs1.e_tauint[e_name] + obs2.e_tauint[e_name]) / 2
dvalue += gamma / e_N * (1 + 1 / e_N) / e_N * 2 * tau_combined
for e_name in obs1.cov_names:
if e_name not in obs2.cov_names:
continue
dvalue += float(np.dot(np.transpose(obs1.covobs[e_name].grad), np.dot(obs1.covobs[e_name].cov, obs2.covobs[e_name].grad)))
if np.abs(dvalue / obs1.dvalue / obs2.dvalue) > 1.0:
dvalue = np.sign(dvalue) * obs1.dvalue * obs2.dvalue
@ -1369,9 +1437,9 @@ def covariance2(obs1, obs2, correlation=False, **kwargs):
e_n_tauint = {}
e_rho = {}
for e_name in obs1.e_names:
for e_name in obs1.mc_names:
if e_name not in obs2.e_names:
if e_name not in obs2.mc_names:
continue
idl_d = {}
@ -1422,6 +1490,13 @@ def covariance2(obs1, obs2, correlation=False, **kwargs):
dvalue += e_dvalue[e_name]
for e_name in obs1.cov_names:
if e_name not in obs2.cov_names:
continue
dvalue += float(np.dot(np.transpose(obs1.covobs[e_name].grad), np.dot(obs1.covobs[e_name].cov, obs2.covobs[e_name].grad)))
if np.abs(dvalue / obs1.dvalue / obs2.dvalue) > 1.0:
dvalue = np.sign(dvalue) * obs1.dvalue * obs2.dvalue
@ -1559,6 +1634,8 @@ def merge_obs(list_of_obs):
replist = [item for obs in list_of_obs for item in obs.names]
if (len(replist) == len(set(replist))) is False:
raise Exception('list_of_obs contains duplicate replica: %s' % (str(replist)))
if any([len(o.cov_names) for o in list_of_obs]):
raise Exception('Not possible to merge data that contains covobs!')
new_dict = {}
idl_dict = {}
for o in list_of_obs:
@ -1571,3 +1648,48 @@ def merge_obs(list_of_obs):
o.is_merged = {name: np.any([oi.is_merged.get(name, False) for oi in list_of_obs]) for name in o.names}
o.reweighted = np.max([oi.reweighted for oi in list_of_obs])
return o
def cov_Obs(means, cov, name, grad=None):
"""Create an Obs based on mean(s) and a covariance matrix
Parameters
----------
mean : list of floats or float
N mean value(s) of the new Obs
cov : list or array
2d (NxN) Covariance matrix, 1d diagonal entries or 0d covariance
name : str
identifier for the covariance matrix
grad : list or array
Gradient of the Covobs wrt. the means belonging to cov.
"""
def covobs_to_obs(co):
"""Make an Obs out of a Covobs
Parameters
----------
co : Covobs
Covobs to be embedded into the Obs
"""
o = Obs(None, None, empty=True)
o._value = co.value
o.names.append(co.name)
o.covobs[co.name] = co
o._dvalue = np.sqrt(co.errsq())
o.shape[co.name] = 1
o.idl[co.name] = []
return o
ol = []
if isinstance(means, (float, int)):
means = [means]
for i in range(len(means)):
ol.append(covobs_to_obs(Covobs(means[i], cov, name, pos=i, grad=grad)))
if ol[0].covobs[name].N != len(means):
raise Exception('You have to provide %d mean values!' % (ol[0].N))
if len(ol) == 1:
return ol[0]
return ol

View file

@ -46,6 +46,19 @@ def test_least_squares():
assert((out.fit_parameters[0] - beta[0]).is_zero())
assert((out.fit_parameters[1] - beta[1]).is_zero())
oyc = []
for i, item in enumerate(x):
oyc.append(pe.cov_Obs(y[i], yerr[i]**2, 'cov' + str(i)))
outc = pe.least_squares(x, oyc, func)
betac = outc.fit_parameters
for i in range(2):
betac[i].gamma_method(S=1.0)
assert math.isclose(betac[i].value, popt[i], abs_tol=1e-5)
assert math.isclose(pcov[i, i], betac[i].dvalue ** 2, abs_tol=1e-3)
assert math.isclose(pe.covariance(betac[0], betac[1]), pcov[0, 1], abs_tol=1e-3)
num_samples = 400
N = 10
@ -135,6 +148,44 @@ def test_total_least_squares():
assert(diff / beta[0] < 1e-3 * beta[0].dvalue)
assert((out.fit_parameters[1] - beta[1]).is_zero())
oxc = []
for i, item in enumerate(x):
oxc.append(pe.cov_Obs(x[i], xerr[i]**2, 'covx' + str(i)))
oyc = []
for i, item in enumerate(x):
oyc.append(pe.cov_Obs(y[i], yerr[i]**2, 'covy' + str(i)))
outc = pe.total_least_squares(oxc, oyc, func)
betac = outc.fit_parameters
for i in range(2):
betac[i].gamma_method(S=1.0)
assert math.isclose(betac[i].value, output.beta[i], rel_tol=1e-3)
assert math.isclose(output.cov_beta[i, i], betac[i].dvalue ** 2, rel_tol=2.5e-1), str(output.cov_beta[i, i]) + ' ' + str(betac[i].dvalue ** 2)
assert math.isclose(pe.covariance(betac[0], betac[1]), output.cov_beta[0, 1], rel_tol=2.5e-1)
outc = pe.total_least_squares(oxc, oyc, func, const_par=[betac[1]])
diffc = outc.fit_parameters[0] - betac[0]
assert(diffc / betac[0] < 1e-3 * betac[0].dvalue)
assert((outc.fit_parameters[1] - betac[1]).is_zero())
outc = pe.total_least_squares(oxc, oy, func)
betac = outc.fit_parameters
for i in range(2):
betac[i].gamma_method(S=1.0)
assert math.isclose(betac[i].value, output.beta[i], rel_tol=1e-3)
assert math.isclose(output.cov_beta[i, i], betac[i].dvalue ** 2, rel_tol=2.5e-1), str(output.cov_beta[i, i]) + ' ' + str(betac[i].dvalue ** 2)
assert math.isclose(pe.covariance(betac[0], betac[1]), output.cov_beta[0, 1], rel_tol=2.5e-1)
outc = pe.total_least_squares(oxc, oy, func, const_par=[betac[1]])
diffc = outc.fit_parameters[0] - betac[0]
assert(diffc / betac[0] < 1e-3 * betac[0].dvalue)
assert((outc.fit_parameters[1] - betac[1]).is_zero())
def test_odr_derivatives():
x = []

View file

@ -543,3 +543,41 @@ def test_import_jackknife():
reconstructed_obs = pe.import_jackknife(my_jacks, 'test')
assert my_obs == reconstructed_obs
def test_covobs():
val = 1.123124
cov = .243423
name = 'Covariance'
co = pe.cov_Obs(val, cov, name)
co.gamma_method()
assert (co.dvalue == np.sqrt(cov))
assert (co.value == val)
do = 2 * co
assert (do.covobs[name].grad[0] == 2)
do = co * co
assert (do.covobs[name].grad[0] == 2 * val)
assert np.array_equal(do.covobs[name].cov, co.covobs[name].cov)
pi = [16.7457, -19.0475]
cov = [[3.49591, -6.07560], [-6.07560, 10.5834]]
cl = pe.cov_Obs(pi, cov, 'rAP')
pl = pe.misc.gen_correlated_data(pi, np.asarray(cov), 'rAPpseudo')
def rAP(p, g0sq):
return -0.0010666 * g0sq * (1 + np.exp(p[0] + p[1] / g0sq))
for g0sq in [1, 1.5, 1.8]:
oc = rAP(cl, g0sq)
oc.gamma_method()
op = rAP(pl, g0sq)
op.gamma_method()
assert(np.isclose(oc.value, op.value, rtol=1e-14, atol=1e-14))
assert(pe.covariance(cl[0], cl[1]) == cov[0][1])
assert(pe.covariance2(cl[0], cl[1]) == cov[1][0])
do = cl[0] * cl[1]
assert(np.array_equal(do.covobs['rAP'].grad, np.transpose([pi[1], pi[0]]).reshape(2, 1)))