[Fix] Fixed most type annotations in obs.py

This commit is contained in:
Fabian Joswig 2025-01-03 18:19:17 +01:00
parent a9e082c333
commit 01982568f0

View file

@ -13,7 +13,7 @@ import numdifftools as nd
from itertools import groupby
from .covobs import Covobs
from numpy import bool, float64, int64, ndarray
from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING
from typing import Any, Callable, Optional, Union, Sequence, TYPE_CHECKING
if sys.version_info >= (3, 10):
from types import NotImplementedType
@ -69,7 +69,7 @@ class Obs:
N_sigma_global = 1.0
N_sigma_dict: dict[str, int] = {}
def __init__(self, samples: Union[List[List[int]], List[ndarray], ndarray, List[List[float64]], List[List[float]]], names: List[str], idl: Optional[list[Union[list[int], range]]]=None, **kwargs):
def __init__(self, samples: list[Union[ndarray, list[Any]]], names: list[str], idl: Optional[list[Union[list[int], range]]]=None, **kwargs):
""" Initialize Obs object.
Parameters
@ -98,13 +98,16 @@ class Obs:
else:
if not isinstance(names[0], str):
raise TypeError('All names have to be strings.')
# This check does not work because of nan hacks in the json.gz export
# if not all((isinstance(o, np.ndarray) and o.ndim == 1) for o in samples):
# raise TypeError('All samples have to be 1d numpy arrays.')
if min(len(x) for x in samples) <= 4:
raise ValueError('Samples have to have at least 5 entries.')
self.names: list[str] = sorted(names)
self.shape = {}
self.r_values = {}
self.deltas = {}
self.r_values: dict[str, float] = {}
self.deltas: dict[str, ndarray] = {}
self._covobs: dict[str, Covobs] = {}
self._value: float = 0.0
@ -143,12 +146,12 @@ class Obs:
if len(sample) != self.shape[name]:
raise ValueError('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = np.mean(sample)
self.deltas[name] = sample - self.r_values[name]
self.deltas[name] = np.asarray(sample) - self.r_values[name]
self._value += self.shape[name] * self.r_values[name]
self._value /= self.N
self._dvalue = 0.0
self.ddvalue = 0.0
self._dvalue: float = 0.0
self.ddvalue: float = 0.0
self.reweighted = False
self.tag = None
@ -162,19 +165,19 @@ class Obs:
return self._dvalue
@property
def e_names(self) -> List[str]:
def e_names(self) -> list[str]:
return sorted(set([o.split('|')[0] for o in self.names]))
@property
def cov_names(self) -> List[Union[Any, str]]:
def cov_names(self) -> list[Union[Any, str]]:
return sorted(set([o for o in self.covobs.keys()]))
@property
def mc_names(self) -> List[Union[Any, str]]:
def mc_names(self) -> list[Union[Any, str]]:
return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names]))
@property
def e_content(self) -> Dict[str, List[str]]:
def e_content(self) -> dict[str, list[str]]:
res = {}
for e, e_name in enumerate(self.e_names):
res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names))
@ -183,7 +186,7 @@ class Obs:
return res
@property
def covobs(self) -> Dict[str, Covobs]:
def covobs(self) -> dict[str, Covobs]:
return self._covobs
def gamma_method(self, **kwargs):
@ -354,7 +357,7 @@ class Obs:
gm = gamma_method
def _calc_gamma(self, deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray:
def _calc_gamma(self, deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray:
"""Calculate Gamma_{AA} from the deltas, which are defined on idx.
idx is assumed to be a contiguous range (possibly with a stepsize != 1)
@ -438,19 +441,21 @@ class Obs:
my_string = ' ' + "\u00B7 Ensemble '" + key + "' "
if len(value) == 1:
my_string += f': {self.shape[value[0]]} configurations'
if isinstance(self.idl[value[0]], range):
my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')'
my_idl = self.idl[value[0]]
if isinstance(my_idl, range):
my_string += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')'
else:
my_string += f' (irregular range from {self.idl[value[0]][0]} to {self.idl[value[0]][-1]})'
my_string += f' (irregular range from {my_idl[0]} to {my_idl[-1]})'
else:
sublist = []
for v in value:
my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' "
my_substring += f': {self.shape[v]} configurations'
if isinstance(self.idl[v], range):
my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')'
my_idl = self.idl[v]
if isinstance(my_idl, range):
my_substring += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')'
else:
my_substring += f' (irregular range from {self.idl[v][0]} to {self.idl[v][-1]})'
my_substring += f' (irregular range from {my_idl[0]} to {my_idl[-1]})'
sublist.append(my_substring)
my_string += '\n' + '\n'.join(sublist)
@ -621,7 +626,7 @@ class Obs:
plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})')
plt.draw()
def plot_piechart(self, save: None=None) -> Dict[str, float64]:
def plot_piechart(self, save: None=None) -> dict[str, float64]:
"""Plot piechart which shows the fractional contribution of each
ensemble to the error and returns a dictionary containing the fractions.
@ -635,7 +640,7 @@ class Obs:
if np.isclose(0.0, self._dvalue, atol=1e-15):
raise ValueError('Error is 0.0')
labels = self.e_names
sizes = [self.e_dvalue[name] ** 2 for name in labels] / self._dvalue ** 2
sizes = np.array([self.e_dvalue[name] ** 2 for name in labels]) / self._dvalue ** 2
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, startangle=90, normalize=True)
ax1.axis('equal')
@ -660,8 +665,11 @@ class Obs:
path : str
specifies a custom path for the file (default '.')
"""
if 'path' in kwargs:
file_name = kwargs.get('path') + '/' + filename
path = kwargs.get('path')
if path is not None:
if not isinstance(path, str):
raise TypeError('path has to be a string.')
file_name = path + '/' + filename
else:
file_name = filename
@ -771,7 +779,8 @@ class Obs:
hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()])
hash_tuple += tuple([o.encode() for o in self.names])
m = hashlib.md5()
[m.update(o) for o in hash_tuple]
for o in hash_tuple:
m.update(o)
return int(m.hexdigest(), 16) & 0xFFFFFFFF
# Overload comparisons
@ -806,7 +815,7 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1])
def __radd__(self, y: Union[float, int]) -> Obs:
def __radd__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return self + y
def __mul__(self, y: Any) -> Union[Obs, ndarray, CObs, NotImplementedType]:
@ -822,7 +831,7 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y])
def __rmul__(self, y: Union[float, int]) -> Obs:
def __rmul__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return self * y
def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]:
@ -836,13 +845,13 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1])
def __rsub__(self, y: Union[float, int]) -> Obs:
def __rsub__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return -1 * (self - y)
def __pos__(self) -> Obs:
return self
def __neg__(self) -> Obs:
def __neg__(self) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return -1 * self
def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]:
@ -984,7 +993,7 @@ class CObs:
if isinstance(other, np.ndarray):
return other * self
elif hasattr(other, 'real') and hasattr(other, 'imag'):
if all(isinstance(i, Obs) for i in [self.real, self.imag, other.real, other.imag]):
if isinstance(self.real, Obs) and isinstance(self.imag, Obs) and isinstance(other.real, Obs) and isinstance(other.imag, Obs):
return CObs(derived_observable(lambda x, **kwargs: x[0] * x[1] - x[2] * x[3],
[self.real, other.real, self.imag, other.imag],
man_grad=[other.real.value, self.real.value, -other.imag.value, -self.imag.value]),
@ -1027,8 +1036,11 @@ class CObs:
def __neg__(self) -> "CObs":
return -1 * self
def __eq__(self, other: Union[CObs, int]) -> bool:
def __eq__(self, other: object) -> bool:
if hasattr(other, 'real') and hasattr(other, 'imag'):
return self.real == other.real and self.imag == other.imag
else:
return False
def __str__(self) -> str:
return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)'
@ -1045,7 +1057,7 @@ class CObs:
return f"({self.real:{format_type}}{self.imag:+{significance}}j)"
def gamma_method(x: Union[Corr, Obs, ndarray, List[Obs]], **kwargs) -> ndarray:
def gamma_method(x: Union[Corr, Obs, ndarray, list[Obs]], **kwargs) -> ndarray:
"""Vectorized version of the gamma_method applicable to lists or arrays of Obs.
See docstring of pe.Obs.gamma_method for details.
@ -1073,7 +1085,7 @@ def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float,
return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})"
def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray:
def _expand_deltas(deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray:
"""Expand deltas defined on idx to a regular range with spacing gapsize between two
configurations and where holes are filled by 0.
If idx is of type range, the deltas are not changed if the idx.step == gapsize.
@ -1099,7 +1111,7 @@ def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], s
return ret
def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> Union[List[Union[int64, int]], range, List[int]]:
def _merge_idx(idl: list[Union[list[Union[int, int]], range, list[int]]]) -> Union[list[Union[int, int]], range, list[int]]:
"""Returns the union of all lists in idl as range or sorted list
Parameters
@ -1122,7 +1134,7 @@ def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> U
return idunion
def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[int]]:
def _intersection_idx(idl: list[Union[range, list[int]]]) -> Union[range, list[int]]:
"""Returns the intersection of all lists in idl as range or sorted list
Parameters
@ -1148,7 +1160,7 @@ def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[i
return idinter
def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, List[int]], shape: int, new_idx: Union[range, List[int]], scalefactor: Union[float, int]) -> ndarray:
def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, list[int]], shape: int, new_idx: Union[range, list[int]], scalefactor: Union[float, int]) -> ndarray:
"""Expand deltas defined on idx to the list of configs that is defined by new_idx.
New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest
common divisor of the step sizes is used as new step size.
@ -1218,7 +1230,7 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar
if isinstance(raveled_data[i], (int, float)):
raveled_data[i] = cov_Obs(raveled_data[i], 0.0, "###dummy_covobs###")
allcov = {}
allcov: dict[str, ndarray] = {}
for o in raveled_data:
for name in o.cov_names:
if name in allcov:
@ -1308,8 +1320,8 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar
self.grad = np.zeros((N, 1))
new_covobs_lengths = dict(set([y for x in [[(n, o.covobs[n].N) for n in o.cov_names] for o in raveled_data] for y in x]))
d_extracted = {}
g_extracted = {}
d_extracted: dict[str, list] = {}
g_extracted: dict[str, list] = {}
for name in new_sample_names:
d_extracted[name] = []
ens_length = len(new_idl_d[name])
@ -1370,7 +1382,7 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar
return final_result
def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, List[int]], idx_new: Union[range, List[int], ndarray]) -> Union[List[float], ndarray]:
def _reduce_deltas(deltas: Union[list[float], ndarray], idx_old: Union[range, list[int]], idx_new: Union[range, list[int], ndarray]) -> Union[list[float], ndarray]:
"""Extract deltas defined on idx_old on all configs of idx_new.
Assumes, that idx_old and idx_new are correctly defined idl, i.e., they
@ -1399,7 +1411,7 @@ def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, Li
return np.array(deltas)[indices]
def reweight(weight: Obs, obs: Union[ndarray, List[Obs]], **kwargs) -> List[Obs]:
def reweight(weight: Obs, obs: Union[ndarray, list[Obs]], **kwargs) -> list[Obs]:
"""Reweight a list of observables.
Parameters
@ -1484,7 +1496,7 @@ def correlate(obs_a: Obs, obs_b: Obs) -> Obs:
return o
def covariance(obs: Union[ndarray, List[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray:
def covariance(obs: Union[ndarray, list[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray:
r'''Calculates the error covariance matrix of a set of observables.
WARNING: This function should be used with care, especially for observables with support on multiple
@ -1577,7 +1589,7 @@ def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray:
return chol_inv
def sort_corr(corr: ndarray, kl: List[str], yd: Dict[str, List[Obs]]) -> ndarray:
def sort_corr(corr: ndarray, kl: list[str], yd: dict[str, list[Obs]]) -> ndarray:
""" Reorders a correlation matrix to match the alphabetical order of its underlying y data.
The ordering of the input correlation matrix `corr` is given by the list of keys `kl`.
@ -1717,7 +1729,7 @@ def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]:
return dvalue
def import_jackknife(jacks: ndarray, name: str, idl: Optional[List[range]]=None) -> Obs:
def import_jackknife(jacks: ndarray, name: str, idl: Optional[list[Union[list[int], range]]]=None) -> Obs:
"""Imports jackknife samples and returns an Obs
Parameters
@ -1767,7 +1779,7 @@ def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs:
return ret
def merge_obs(list_of_obs: List[Obs]) -> Obs:
def merge_obs(list_of_obs: list[Obs]) -> Obs:
"""Combine all observables in list_of_obs into one new observable
Parameters
@ -1785,11 +1797,11 @@ def merge_obs(list_of_obs: List[Obs]) -> Obs:
if any([len(o.cov_names) for o in list_of_obs]):
raise ValueError('Not possible to merge data that contains covobs!')
new_dict = {}
idl_dict = {}
idl_dict: dict[str, Union[range, list[int]]] = {}
for o in list_of_obs:
new_dict.update({key: o.deltas.get(key, 0) + o.r_values.get(key, 0)
for key in set(o.deltas) | set(o.r_values)})
idl_dict.update({key: o.idl.get(key, 0) for key in set(o.deltas)})
idl_dict.update({key: o.idl.get(key) for key in set(o.deltas)})
names = sorted(new_dict.keys())
o = Obs([new_dict[name] for name in names], names, idl=[idl_dict[name] for name in names])
@ -1797,7 +1809,7 @@ def merge_obs(list_of_obs: List[Obs]) -> Obs:
return o
def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, List[Obs]]:
def cov_Obs(means: Union[int, list[float], float, list[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, list[Obs]]:
"""Create an Obs based on mean(s) and a covariance matrix
Parameters
@ -1840,13 +1852,14 @@ def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any,
return ol
def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Union[int64, int]:
def _determine_gap(o: Obs, e_content: dict[str, list[str]], e_name: str) -> Union[int64, int]:
gaps = []
for r_name in e_content[e_name]:
if isinstance(o.idl[r_name], range):
gaps.append(o.idl[r_name].step)
my_idl =o.idl[r_name]
if isinstance(my_idl, range):
gaps.append(my_idl.step)
else:
gaps.append(np.min(np.diff(o.idl[r_name])))
gaps.append(np.min(np.diff(my_idl)))
gap = min(gaps)
if not np.all([gi % gap == 0 for gi in gaps]):
@ -1855,7 +1868,7 @@ def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Unio
return gap
def _check_lists_equal(idl: List[Union[List[int], List[Union[int64, int]], range, ndarray]]):
def _check_lists_equal(idl: Sequence[Union[list[int], list[Union[int64, int]], range, ndarray]]):
'''
Use groupby to efficiently check whether all elements of idl are identical.
Returns True if all elements are equal, otherwise False.