From 01982568f0c5eb0c29820f27d4ab1788cb957608 Mon Sep 17 00:00:00 2001 From: Fabian Joswig Date: Fri, 3 Jan 2025 18:19:17 +0100 Subject: [PATCH] [Fix] Fixed most type annotations in obs.py --- pyerrors/obs.py | 119 +++++++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 53 deletions(-) diff --git a/pyerrors/obs.py b/pyerrors/obs.py index c009d765..83b69549 100644 --- a/pyerrors/obs.py +++ b/pyerrors/obs.py @@ -13,7 +13,7 @@ import numdifftools as nd from itertools import groupby from .covobs import Covobs from numpy import bool, float64, int64, ndarray -from typing import Any, Callable, Dict, List, Optional, Union, TYPE_CHECKING +from typing import Any, Callable, Optional, Union, Sequence, TYPE_CHECKING if sys.version_info >= (3, 10): from types import NotImplementedType @@ -69,7 +69,7 @@ class Obs: N_sigma_global = 1.0 N_sigma_dict: dict[str, int] = {} - def __init__(self, samples: Union[List[List[int]], List[ndarray], ndarray, List[List[float64]], List[List[float]]], names: List[str], idl: Optional[list[Union[list[int], range]]]=None, **kwargs): + def __init__(self, samples: list[Union[ndarray, list[Any]]], names: list[str], idl: Optional[list[Union[list[int], range]]]=None, **kwargs): """ Initialize Obs object. Parameters @@ -98,13 +98,16 @@ class Obs: else: if not isinstance(names[0], str): raise TypeError('All names have to be strings.') + # This check does not work because of nan hacks in the json.gz export + # if not all((isinstance(o, np.ndarray) and o.ndim == 1) for o in samples): + # raise TypeError('All samples have to be 1d numpy arrays.') if min(len(x) for x in samples) <= 4: raise ValueError('Samples have to have at least 5 entries.') self.names: list[str] = sorted(names) self.shape = {} - self.r_values = {} - self.deltas = {} + self.r_values: dict[str, float] = {} + self.deltas: dict[str, ndarray] = {} self._covobs: dict[str, Covobs] = {} self._value: float = 0.0 @@ -143,12 +146,12 @@ class Obs: if len(sample) != self.shape[name]: raise ValueError('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name])) self.r_values[name] = np.mean(sample) - self.deltas[name] = sample - self.r_values[name] + self.deltas[name] = np.asarray(sample) - self.r_values[name] self._value += self.shape[name] * self.r_values[name] self._value /= self.N - self._dvalue = 0.0 - self.ddvalue = 0.0 + self._dvalue: float = 0.0 + self.ddvalue: float = 0.0 self.reweighted = False self.tag = None @@ -162,19 +165,19 @@ class Obs: return self._dvalue @property - def e_names(self) -> List[str]: + def e_names(self) -> list[str]: return sorted(set([o.split('|')[0] for o in self.names])) @property - def cov_names(self) -> List[Union[Any, str]]: + def cov_names(self) -> list[Union[Any, str]]: return sorted(set([o for o in self.covobs.keys()])) @property - def mc_names(self) -> List[Union[Any, str]]: + def mc_names(self) -> list[Union[Any, str]]: return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names])) @property - def e_content(self) -> Dict[str, List[str]]: + def e_content(self) -> dict[str, list[str]]: res = {} for e, e_name in enumerate(self.e_names): res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names)) @@ -183,7 +186,7 @@ class Obs: return res @property - def covobs(self) -> Dict[str, Covobs]: + def covobs(self) -> dict[str, Covobs]: return self._covobs def gamma_method(self, **kwargs): @@ -354,7 +357,7 @@ class Obs: gm = gamma_method - def _calc_gamma(self, deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray: + def _calc_gamma(self, deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray: """Calculate Gamma_{AA} from the deltas, which are defined on idx. idx is assumed to be a contiguous range (possibly with a stepsize != 1) @@ -438,19 +441,21 @@ class Obs: my_string = ' ' + "\u00B7 Ensemble '" + key + "' " if len(value) == 1: my_string += f': {self.shape[value[0]]} configurations' - if isinstance(self.idl[value[0]], range): - my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')' + my_idl = self.idl[value[0]] + if isinstance(my_idl, range): + my_string += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')' else: - my_string += f' (irregular range from {self.idl[value[0]][0]} to {self.idl[value[0]][-1]})' + my_string += f' (irregular range from {my_idl[0]} to {my_idl[-1]})' else: sublist = [] for v in value: my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' " my_substring += f': {self.shape[v]} configurations' - if isinstance(self.idl[v], range): - my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')' + my_idl = self.idl[v] + if isinstance(my_idl, range): + my_substring += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')' else: - my_substring += f' (irregular range from {self.idl[v][0]} to {self.idl[v][-1]})' + my_substring += f' (irregular range from {my_idl[0]} to {my_idl[-1]})' sublist.append(my_substring) my_string += '\n' + '\n'.join(sublist) @@ -621,7 +626,7 @@ class Obs: plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})') plt.draw() - def plot_piechart(self, save: None=None) -> Dict[str, float64]: + def plot_piechart(self, save: None=None) -> dict[str, float64]: """Plot piechart which shows the fractional contribution of each ensemble to the error and returns a dictionary containing the fractions. @@ -635,7 +640,7 @@ class Obs: if np.isclose(0.0, self._dvalue, atol=1e-15): raise ValueError('Error is 0.0') labels = self.e_names - sizes = [self.e_dvalue[name] ** 2 for name in labels] / self._dvalue ** 2 + sizes = np.array([self.e_dvalue[name] ** 2 for name in labels]) / self._dvalue ** 2 fig1, ax1 = plt.subplots() ax1.pie(sizes, labels=labels, startangle=90, normalize=True) ax1.axis('equal') @@ -660,8 +665,11 @@ class Obs: path : str specifies a custom path for the file (default '.') """ - if 'path' in kwargs: - file_name = kwargs.get('path') + '/' + filename + path = kwargs.get('path') + if path is not None: + if not isinstance(path, str): + raise TypeError('path has to be a string.') + file_name = path + '/' + filename else: file_name = filename @@ -771,7 +779,8 @@ class Obs: hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()]) hash_tuple += tuple([o.encode() for o in self.names]) m = hashlib.md5() - [m.update(o) for o in hash_tuple] + for o in hash_tuple: + m.update(o) return int(m.hexdigest(), 16) & 0xFFFFFFFF # Overload comparisons @@ -806,7 +815,7 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1]) - def __radd__(self, y: Union[float, int]) -> Obs: + def __radd__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return self + y def __mul__(self, y: Any) -> Union[Obs, ndarray, CObs, NotImplementedType]: @@ -822,7 +831,7 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y]) - def __rmul__(self, y: Union[float, int]) -> Obs: + def __rmul__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return self * y def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: @@ -836,13 +845,13 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1]) - def __rsub__(self, y: Union[float, int]) -> Obs: + def __rsub__(self, y: Union[float, int]) -> Union[Obs, NotImplementedType, CObs, ndarray]: return -1 * (self - y) def __pos__(self) -> Obs: return self - def __neg__(self) -> Obs: + def __neg__(self) -> Union[Obs, NotImplementedType, CObs, ndarray]: return -1 * self def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: @@ -984,7 +993,7 @@ class CObs: if isinstance(other, np.ndarray): return other * self elif hasattr(other, 'real') and hasattr(other, 'imag'): - if all(isinstance(i, Obs) for i in [self.real, self.imag, other.real, other.imag]): + if isinstance(self.real, Obs) and isinstance(self.imag, Obs) and isinstance(other.real, Obs) and isinstance(other.imag, Obs): return CObs(derived_observable(lambda x, **kwargs: x[0] * x[1] - x[2] * x[3], [self.real, other.real, self.imag, other.imag], man_grad=[other.real.value, self.real.value, -other.imag.value, -self.imag.value]), @@ -1027,8 +1036,11 @@ class CObs: def __neg__(self) -> "CObs": return -1 * self - def __eq__(self, other: Union[CObs, int]) -> bool: - return self.real == other.real and self.imag == other.imag + def __eq__(self, other: object) -> bool: + if hasattr(other, 'real') and hasattr(other, 'imag'): + return self.real == other.real and self.imag == other.imag + else: + return False def __str__(self) -> str: return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)' @@ -1045,7 +1057,7 @@ class CObs: return f"({self.real:{format_type}}{self.imag:+{significance}}j)" -def gamma_method(x: Union[Corr, Obs, ndarray, List[Obs]], **kwargs) -> ndarray: +def gamma_method(x: Union[Corr, Obs, ndarray, list[Obs]], **kwargs) -> ndarray: """Vectorized version of the gamma_method applicable to lists or arrays of Obs. See docstring of pe.Obs.gamma_method for details. @@ -1073,7 +1085,7 @@ def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float, return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})" -def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray: +def _expand_deltas(deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray: """Expand deltas defined on idx to a regular range with spacing gapsize between two configurations and where holes are filled by 0. If idx is of type range, the deltas are not changed if the idx.step == gapsize. @@ -1099,7 +1111,7 @@ def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], s return ret -def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> Union[List[Union[int64, int]], range, List[int]]: +def _merge_idx(idl: list[Union[list[Union[int, int]], range, list[int]]]) -> Union[list[Union[int, int]], range, list[int]]: """Returns the union of all lists in idl as range or sorted list Parameters @@ -1122,7 +1134,7 @@ def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> U return idunion -def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[int]]: +def _intersection_idx(idl: list[Union[range, list[int]]]) -> Union[range, list[int]]: """Returns the intersection of all lists in idl as range or sorted list Parameters @@ -1148,7 +1160,7 @@ def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[i return idinter -def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, List[int]], shape: int, new_idx: Union[range, List[int]], scalefactor: Union[float, int]) -> ndarray: +def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, list[int]], shape: int, new_idx: Union[range, list[int]], scalefactor: Union[float, int]) -> ndarray: """Expand deltas defined on idx to the list of configs that is defined by new_idx. New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest common divisor of the step sizes is used as new step size. @@ -1218,7 +1230,7 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar if isinstance(raveled_data[i], (int, float)): raveled_data[i] = cov_Obs(raveled_data[i], 0.0, "###dummy_covobs###") - allcov = {} + allcov: dict[str, ndarray] = {} for o in raveled_data: for name in o.cov_names: if name in allcov: @@ -1308,8 +1320,8 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar self.grad = np.zeros((N, 1)) new_covobs_lengths = dict(set([y for x in [[(n, o.covobs[n].N) for n in o.cov_names] for o in raveled_data] for y in x])) - d_extracted = {} - g_extracted = {} + d_extracted: dict[str, list] = {} + g_extracted: dict[str, list] = {} for name in new_sample_names: d_extracted[name] = [] ens_length = len(new_idl_d[name]) @@ -1370,7 +1382,7 @@ def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwar return final_result -def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, List[int]], idx_new: Union[range, List[int], ndarray]) -> Union[List[float], ndarray]: +def _reduce_deltas(deltas: Union[list[float], ndarray], idx_old: Union[range, list[int]], idx_new: Union[range, list[int], ndarray]) -> Union[list[float], ndarray]: """Extract deltas defined on idx_old on all configs of idx_new. Assumes, that idx_old and idx_new are correctly defined idl, i.e., they @@ -1399,7 +1411,7 @@ def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, Li return np.array(deltas)[indices] -def reweight(weight: Obs, obs: Union[ndarray, List[Obs]], **kwargs) -> List[Obs]: +def reweight(weight: Obs, obs: Union[ndarray, list[Obs]], **kwargs) -> list[Obs]: """Reweight a list of observables. Parameters @@ -1484,7 +1496,7 @@ def correlate(obs_a: Obs, obs_b: Obs) -> Obs: return o -def covariance(obs: Union[ndarray, List[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray: +def covariance(obs: Union[ndarray, list[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray: r'''Calculates the error covariance matrix of a set of observables. WARNING: This function should be used with care, especially for observables with support on multiple @@ -1577,7 +1589,7 @@ def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray: return chol_inv -def sort_corr(corr: ndarray, kl: List[str], yd: Dict[str, List[Obs]]) -> ndarray: +def sort_corr(corr: ndarray, kl: list[str], yd: dict[str, list[Obs]]) -> ndarray: """ Reorders a correlation matrix to match the alphabetical order of its underlying y data. The ordering of the input correlation matrix `corr` is given by the list of keys `kl`. @@ -1717,7 +1729,7 @@ def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]: return dvalue -def import_jackknife(jacks: ndarray, name: str, idl: Optional[List[range]]=None) -> Obs: +def import_jackknife(jacks: ndarray, name: str, idl: Optional[list[Union[list[int], range]]]=None) -> Obs: """Imports jackknife samples and returns an Obs Parameters @@ -1767,7 +1779,7 @@ def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs: return ret -def merge_obs(list_of_obs: List[Obs]) -> Obs: +def merge_obs(list_of_obs: list[Obs]) -> Obs: """Combine all observables in list_of_obs into one new observable Parameters @@ -1785,11 +1797,11 @@ def merge_obs(list_of_obs: List[Obs]) -> Obs: if any([len(o.cov_names) for o in list_of_obs]): raise ValueError('Not possible to merge data that contains covobs!') new_dict = {} - idl_dict = {} + idl_dict: dict[str, Union[range, list[int]]] = {} for o in list_of_obs: new_dict.update({key: o.deltas.get(key, 0) + o.r_values.get(key, 0) for key in set(o.deltas) | set(o.r_values)}) - idl_dict.update({key: o.idl.get(key, 0) for key in set(o.deltas)}) + idl_dict.update({key: o.idl.get(key) for key in set(o.deltas)}) names = sorted(new_dict.keys()) o = Obs([new_dict[name] for name in names], names, idl=[idl_dict[name] for name in names]) @@ -1797,7 +1809,7 @@ def merge_obs(list_of_obs: List[Obs]) -> Obs: return o -def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, List[Obs]]: +def cov_Obs(means: Union[int, list[float], float, list[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, list[Obs]]: """Create an Obs based on mean(s) and a covariance matrix Parameters @@ -1840,13 +1852,14 @@ def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any, return ol -def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Union[int64, int]: +def _determine_gap(o: Obs, e_content: dict[str, list[str]], e_name: str) -> Union[int64, int]: gaps = [] for r_name in e_content[e_name]: - if isinstance(o.idl[r_name], range): - gaps.append(o.idl[r_name].step) + my_idl =o.idl[r_name] + if isinstance(my_idl, range): + gaps.append(my_idl.step) else: - gaps.append(np.min(np.diff(o.idl[r_name]))) + gaps.append(np.min(np.diff(my_idl))) gap = min(gaps) if not np.all([gi % gap == 0 for gi in gaps]): @@ -1855,7 +1868,7 @@ def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Unio return gap -def _check_lists_equal(idl: List[Union[List[int], List[Union[int64, int]], range, ndarray]]): +def _check_lists_equal(idl: Sequence[Union[list[int], list[Union[int64, int]], range, ndarray]]): ''' Use groupby to efficiently check whether all elements of idl are identical. Returns True if all elements are equal, otherwise False.