From 3db8eb2989db390cfd91c8d9708acb70e864e68a Mon Sep 17 00:00:00 2001 From: Fabian Joswig Date: Wed, 25 Dec 2024 11:09:58 +0100 Subject: [PATCH] [Feat] Add type hints to pyerrors modules --- pyerrors/correlators.py | 133 +++++++++++++------------- pyerrors/covobs.py | 15 +-- pyerrors/dirac.py | 8 +- pyerrors/fits.py | 29 +++--- pyerrors/integrate.py | 5 +- pyerrors/linalg.py | 29 +++--- pyerrors/misc.py | 13 ++- pyerrors/mpm.py | 5 +- pyerrors/obs.py | 201 ++++++++++++++++++++-------------------- pyerrors/roots.py | 4 +- pyerrors/special.py | 1 + 11 files changed, 236 insertions(+), 207 deletions(-) diff --git a/pyerrors/correlators.py b/pyerrors/correlators.py index 0375155f..74db429d 100644 --- a/pyerrors/correlators.py +++ b/pyerrors/correlators.py @@ -1,3 +1,4 @@ +from __future__ import annotations import warnings from itertools import permutations import numpy as np @@ -9,6 +10,8 @@ from .misc import dump_object, _assert_equal_properties from .fits import least_squares from .roots import find_root from . import linalg +from numpy import float64, int64, ndarray, ufunc +from typing import Any, Callable, List, Optional, Tuple, Union class Corr: @@ -42,7 +45,7 @@ class Corr: __slots__ = ["content", "N", "T", "tag", "prange"] - def __init__(self, data_input, padding=[0, 0], prange=None): + def __init__(self, data_input: Any, padding: List[int]=[0, 0], prange: Optional[List[int]]=None): """ Initialize a Corr object. Parameters @@ -119,7 +122,7 @@ class Corr: self.T = len(self.content) self.prange = prange - def __getitem__(self, idx): + def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, List[ndarray]]: """Return the content of timeslice idx""" if self.content[idx] is None: return None @@ -151,7 +154,7 @@ class Corr: gm = gamma_method - def projected(self, vector_l=None, vector_r=None, normalize=False): + def projected(self, vector_l: Optional[Union[ndarray, List[Optional[ndarray]]]]=None, vector_r: None=None, normalize: bool=False) -> "Corr": """We need to project the Correlator with a Vector to get a single value at each timeslice. The method can use one or two vectors. @@ -190,7 +193,7 @@ class Corr: newcontent = [None if (_check_for_none(self, self.content[t]) or vector_l[t] is None or vector_r[t] is None) else np.asarray([vector_l[t].T @ self.content[t] @ vector_r[t]]) for t in range(self.T)] return Corr(newcontent) - def item(self, i, j): + def item(self, i: int, j: int) -> "Corr": """Picks the element [i,j] from every matrix and returns a correlator containing one Obs per timeslice. Parameters @@ -205,7 +208,7 @@ class Corr: newcontent = [None if (item is None) else item[i, j] for item in self.content] return Corr(newcontent) - def plottable(self): + def plottable(self) -> Union[Tuple[List[int], List[float64], List[float64]], Tuple[List[int], List[float], List[float64]]]: """Outputs the correlator in a plotable format. Outputs three lists containing the timeslice index, the value on each @@ -219,7 +222,7 @@ class Corr: return x_list, y_list, y_err_list - def symmetric(self): + def symmetric(self) -> "Corr": """ Symmetrize the correlator around x0=0.""" if self.N != 1: raise ValueError('symmetric cannot be safely applied to multi-dimensional correlators.') @@ -240,7 +243,7 @@ class Corr: raise ValueError("Corr could not be symmetrized: No redundant values") return Corr(newcontent, prange=self.prange) - def anti_symmetric(self): + def anti_symmetric(self) -> "Corr": """Anti-symmetrize the correlator around x0=0.""" if self.N != 1: raise TypeError('anti_symmetric cannot be safely applied to multi-dimensional correlators.') @@ -277,7 +280,7 @@ class Corr: return False return True - def trace(self): + def trace(self) -> "Corr": """Calculates the per-timeslice trace of a correlator matrix.""" if self.N == 1: raise ValueError("Only works for correlator matrices.") @@ -289,7 +292,7 @@ class Corr: newcontent.append(np.trace(self.content[t])) return Corr(newcontent) - def matrix_symmetric(self): + def matrix_symmetric(self) -> "Corr": """Symmetrizes the correlator matrices on every timeslice.""" if self.N == 1: raise ValueError("Trying to symmetrize a correlator matrix, that already has N=1.") @@ -299,7 +302,7 @@ class Corr: transposed = [None if _check_for_none(self, G) else G.T for G in self.content] return 0.5 * (Corr(transposed) + self) - def GEVP(self, t0, ts=None, sort="Eigenvalue", vector_obs=False, **kwargs): + def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[List[List[Optional[ndarray]]], ndarray, List[Optional[ndarray]]]: r'''Solve the generalized eigenvalue problem on the correlator matrix and returns the corresponding eigenvectors. The eigenvectors are sorted according to the descending eigenvalues, the zeroth eigenvector(s) correspond to the @@ -405,7 +408,7 @@ class Corr: else: return reordered_vecs - def Eigenvalue(self, t0, ts=None, state=0, sort="Eigenvalue", **kwargs): + def Eigenvalue(self, t0: int, ts: None=None, state: int=0, sort: str="Eigenvalue", **kwargs) -> "Corr": """Determines the eigenvalue of the GEVP by solving and projecting the correlator Parameters @@ -418,7 +421,7 @@ class Corr: vec = self.GEVP(t0, ts=ts, sort=sort, **kwargs)[state] return self.projected(vec) - def Hankel(self, N, periodic=False): + def Hankel(self, N: int, periodic: bool=False) -> "Corr": """Constructs an NxN Hankel matrix C(t) c(t+1) ... c(t+n-1) @@ -459,7 +462,7 @@ class Corr: return Corr(new_content) - def roll(self, dt): + def roll(self, dt: int) -> "Corr": """Periodically shift the correlator by dt timeslices Parameters @@ -469,11 +472,11 @@ class Corr: """ return Corr(list(np.roll(np.array(self.content, dtype=object), dt, axis=0))) - def reverse(self): + def reverse(self) -> "Corr": """Reverse the time ordering of the Corr""" return Corr(self.content[:: -1]) - def thin(self, spacing=2, offset=0): + def thin(self, spacing: int=2, offset: int=0) -> "Corr": """Thin out a correlator to suppress correlations Parameters @@ -491,7 +494,7 @@ class Corr: new_content.append(self.content[t]) return Corr(new_content) - def correlate(self, partner): + def correlate(self, partner: Union[Corr, float, Obs]) -> "Corr": """Correlate the correlator with another correlator or Obs Parameters @@ -520,7 +523,7 @@ class Corr: return Corr(new_content) - def reweight(self, weight, **kwargs): + def reweight(self, weight: Obs, **kwargs) -> "Corr": """Reweight the correlator. Parameters @@ -543,7 +546,7 @@ class Corr: new_content.append(np.array(reweight(weight, t_slice, **kwargs))) return Corr(new_content) - def T_symmetry(self, partner, parity=+1): + def T_symmetry(self, partner: "Corr", parity: int=+1) -> "Corr": """Return the time symmetry average of the correlator and its partner Parameters @@ -573,7 +576,7 @@ class Corr: return (self + T_partner) / 2 - def deriv(self, variant="symmetric"): + def deriv(self, variant: Optional[str]="symmetric") -> "Corr": """Return the first derivative of the correlator with respect to x0. Parameters @@ -638,7 +641,7 @@ class Corr: else: raise ValueError("Unknown variant.") - def second_deriv(self, variant="symmetric"): + def second_deriv(self, variant: Optional[str]="symmetric") -> "Corr": r"""Return the second derivative of the correlator with respect to x0. Parameters @@ -701,7 +704,7 @@ class Corr: else: raise ValueError("Unknown variant.") - def m_eff(self, variant='log', guess=1.0): + def m_eff(self, variant: str='log', guess: float=1.0) -> "Corr": """Returns the effective mass of the correlator as correlator object Parameters @@ -785,7 +788,7 @@ class Corr: else: raise ValueError('Unknown variant.') - def fit(self, function, fitrange=None, silent=False, **kwargs): + def fit(self, function: Callable, fitrange: Optional[Union[str, List[int]]]=None, silent: bool=False, **kwargs) -> Fit_result: r'''Fits function to the data Parameters @@ -819,7 +822,7 @@ class Corr: result = least_squares(xs, ys, function, silent=silent, **kwargs) return result - def plateau(self, plateau_range=None, method="fit", auto_gamma=False): + def plateau(self, plateau_range: Optional[List[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs: """ Extract a plateau value from a Corr object Parameters @@ -856,7 +859,7 @@ class Corr: else: raise ValueError("Unsupported plateau method: " + method) - def set_prange(self, prange): + def set_prange(self, prange: List[Union[int, float]]): """Sets the attribute prange of the Corr object.""" if not len(prange) == 2: raise ValueError("prange must be a list or array with two values") @@ -868,7 +871,7 @@ class Corr: self.prange = prange return - def show(self, x_range=None, comp=None, y_range=None, logscale=False, plateau=None, fit_res=None, fit_key=None, ylabel=None, save=None, auto_gamma=False, hide_sigma=None, references=None, title=None): + def show(self, x_range: Optional[List[int64]]=None, comp: Optional[Corr]=None, y_range: None=None, logscale: bool=False, plateau: None=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: None=None, save: None=None, auto_gamma: bool=False, hide_sigma: None=None, references: None=None, title: None=None): """Plots the correlator using the tag of the correlator as label if available. Parameters @@ -993,7 +996,7 @@ class Corr: else: raise TypeError("'save' has to be a string.") - def spaghetti_plot(self, logscale=True): + def spaghetti_plot(self, logscale: bool=True): """Produces a spaghetti plot of the correlator suited to monitor exceptional configurations. Parameters @@ -1022,7 +1025,7 @@ class Corr: plt.title(name) plt.draw() - def dump(self, filename, datatype="json.gz", **kwargs): + def dump(self, filename: str, datatype: str="json.gz", **kwargs): """Dumps the Corr into a file of chosen type Parameters ---------- @@ -1046,10 +1049,10 @@ class Corr: else: raise ValueError("Unknown datatype " + str(datatype)) - def print(self, print_range=None): + def print(self, print_range: Optional[List[int]]=None): print(self.__repr__(print_range)) - def __repr__(self, print_range=None): + def __repr__(self, print_range: Optional[List[int]]=None) -> str: if print_range is None: print_range = [0, None] @@ -1074,7 +1077,7 @@ class Corr: content_string += '\n' return content_string - def __str__(self): + def __str__(self) -> str: return self.__repr__() # We define the basic operations, that can be performed with correlators. @@ -1084,14 +1087,14 @@ class Corr: __array_priority__ = 10000 - def __eq__(self, y): + def __eq__(self, y: Union[Corr, Obs, int]) -> ndarray: if isinstance(y, Corr): comp = np.asarray(y.content, dtype=object) else: comp = np.asarray(y) return np.asarray(self.content, dtype=object) == comp - def __add__(self, y): + def __add__(self, y: Any) -> "Corr": if isinstance(y, Corr): if ((self.N != y.N) or (self.T != y.T)): raise ValueError("Addition of Corrs with different shape") @@ -1119,7 +1122,7 @@ class Corr: else: raise TypeError("Corr + wrong type") - def __mul__(self, y): + def __mul__(self, y: Any) -> "Corr": if isinstance(y, Corr): if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T): raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T") @@ -1147,7 +1150,7 @@ class Corr: else: raise TypeError("Corr * wrong type") - def __matmul__(self, y): + def __matmul__(self, y: Union[Corr, ndarray]) -> "Corr": if isinstance(y, np.ndarray): if y.ndim != 2 or y.shape[0] != y.shape[1]: raise ValueError("Can only multiply correlators by square matrices.") @@ -1174,7 +1177,7 @@ class Corr: else: return NotImplemented - def __rmatmul__(self, y): + def __rmatmul__(self, y: ndarray) -> "Corr": if isinstance(y, np.ndarray): if y.ndim != 2 or y.shape[0] != y.shape[1]: raise ValueError("Can only multiply correlators by square matrices.") @@ -1190,7 +1193,7 @@ class Corr: else: return NotImplemented - def __truediv__(self, y): + def __truediv__(self, y: Union[Corr, float, ndarray, int]) -> "Corr": if isinstance(y, Corr): if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T): raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T") @@ -1244,37 +1247,37 @@ class Corr: else: raise TypeError('Corr / wrong type') - def __neg__(self): + def __neg__(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else -1. * item for item in self.content] return Corr(newcontent, prange=self.prange) - def __sub__(self, y): + def __sub__(self, y: Union[Corr, float, ndarray, int]) -> "Corr": return self + (-y) - def __pow__(self, y): + def __pow__(self, y: Union[float, int]) -> "Corr": if isinstance(y, (Obs, int, float, CObs)): newcontent = [None if _check_for_none(self, item) else item**y for item in self.content] return Corr(newcontent, prange=self.prange) else: raise TypeError('Type of exponent not supported') - def __abs__(self): + def __abs__(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.abs(item) for item in self.content] return Corr(newcontent, prange=self.prange) # The numpy functions: - def sqrt(self): + def sqrt(self) -> "Corr": return self ** 0.5 - def log(self): + def log(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.log(item) for item in self.content] return Corr(newcontent, prange=self.prange) - def exp(self): + def exp(self) -> "Corr": newcontent = [None if _check_for_none(self, item) else np.exp(item) for item in self.content] return Corr(newcontent, prange=self.prange) - def _apply_func_to_corr(self, func): + def _apply_func_to_corr(self, func: Union[Callable, ufunc]) -> "Corr": newcontent = [None if _check_for_none(self, item) else func(item) for item in self.content] for t in range(self.T): if _check_for_none(self, newcontent[t]): @@ -1287,57 +1290,57 @@ class Corr: raise ValueError('Operation returns undefined correlator') return Corr(newcontent) - def sin(self): + def sin(self) -> "Corr": return self._apply_func_to_corr(np.sin) - def cos(self): + def cos(self) -> "Corr": return self._apply_func_to_corr(np.cos) - def tan(self): + def tan(self) -> "Corr": return self._apply_func_to_corr(np.tan) - def sinh(self): + def sinh(self) -> "Corr": return self._apply_func_to_corr(np.sinh) - def cosh(self): + def cosh(self) -> "Corr": return self._apply_func_to_corr(np.cosh) - def tanh(self): + def tanh(self) -> "Corr": return self._apply_func_to_corr(np.tanh) - def arcsin(self): + def arcsin(self) -> "Corr": return self._apply_func_to_corr(np.arcsin) - def arccos(self): + def arccos(self) -> "Corr": return self._apply_func_to_corr(np.arccos) - def arctan(self): + def arctan(self) -> "Corr": return self._apply_func_to_corr(np.arctan) - def arcsinh(self): + def arcsinh(self) -> "Corr": return self._apply_func_to_corr(np.arcsinh) - def arccosh(self): + def arccosh(self) -> "Corr": return self._apply_func_to_corr(np.arccosh) - def arctanh(self): + def arctanh(self) -> "Corr": return self._apply_func_to_corr(np.arctanh) # Right hand side operations (require tweak in main module to work) def __radd__(self, y): return self + y - def __rsub__(self, y): + def __rsub__(self, y: int) -> "Corr": return -self + y - def __rmul__(self, y): + def __rmul__(self, y: Union[float, int]) -> "Corr": return self * y - def __rtruediv__(self, y): + def __rtruediv__(self, y: int) -> "Corr": return (self / y) ** (-1) @property - def real(self): + def real(self) -> "Corr": def return_real(obs_OR_cobs): if isinstance(obs_OR_cobs.flatten()[0], CObs): return np.vectorize(lambda x: x.real)(obs_OR_cobs) @@ -1347,7 +1350,7 @@ class Corr: return self._apply_func_to_corr(return_real) @property - def imag(self): + def imag(self) -> "Corr": def return_imag(obs_OR_cobs): if isinstance(obs_OR_cobs.flatten()[0], CObs): return np.vectorize(lambda x: x.imag)(obs_OR_cobs) @@ -1356,7 +1359,7 @@ class Corr: return self._apply_func_to_corr(return_imag) - def prune(self, Ntrunc, tproj=3, t0proj=2, basematrix=None): + def prune(self, Ntrunc: int, tproj: int=3, t0proj: int=2, basematrix: None=None) -> "Corr": r''' Project large correlation matrix to lowest states This method can be used to reduce the size of an (N x N) correlation matrix @@ -1414,7 +1417,7 @@ class Corr: return Corr(newcontent) -def _sort_vectors(vec_set_in, ts): +def _sort_vectors(vec_set_in: List[Optional[ndarray]], ts: int) -> List[Optional[Union[ndarray, List[ndarray]]]]: """Helper function used to find a set of Eigenvectors consistent over all timeslices""" if isinstance(vec_set_in[ts][0][0], Obs): @@ -1446,12 +1449,12 @@ def _sort_vectors(vec_set_in, ts): return sorted_vec_set -def _check_for_none(corr, entry): +def _check_for_none(corr: Corr, entry: Optional[ndarray]) -> bool: """Checks if entry for correlator corr is None""" return len(list(filter(None, np.asarray(entry).flatten()))) < corr.N ** 2 -def _GEVP_solver(Gt, G0, method='eigh', chol_inv=None): +def _GEVP_solver(Gt: Optional[ndarray], G0: ndarray, method: str='eigh', chol_inv: Optional[ndarray]=None) -> ndarray: r"""Helper function for solving the GEVP and sorting the eigenvectors. Solves $G(t)v_i=\lambda_i G(t_0)v_i$ and returns the eigenvectors v_i diff --git a/pyerrors/covobs.py b/pyerrors/covobs.py index 64d9d6ec..e3056f04 100644 --- a/pyerrors/covobs.py +++ b/pyerrors/covobs.py @@ -1,9 +1,12 @@ +from __future__ import annotations import numpy as np +from numpy import float64, ndarray +from typing import Any, List, Optional, Union class Covobs: - def __init__(self, mean, cov, name, pos=None, grad=None): + def __init__(self, mean: Optional[Union[float, float64, int]], cov: Any, name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, List[float]]]=None): """ Initialize Covobs object. Parameters @@ -39,12 +42,12 @@ class Covobs: self._set_grad(grad) self.value = mean - def errsq(self): + def errsq(self) -> float: """ Return the variance (= square of the error) of the Covobs """ return np.dot(np.transpose(self.grad), np.dot(self.cov, self.grad)).item() - def _set_cov(self, cov): + def _set_cov(self, cov: Any): """ Set the covariance matrix of the covobs Parameters @@ -79,7 +82,7 @@ class Covobs: if ev < 0: raise Exception('Covariance matrix is not positive-semidefinite!') - def _set_grad(self, grad): + def _set_grad(self, grad: Union[List[float], ndarray]): """ Set the gradient of the covobs Parameters @@ -96,9 +99,9 @@ class Covobs: raise Exception('Invalid dimension of grad!') @property - def cov(self): + def cov(self) -> ndarray: return self._cov @property - def grad(self): + def grad(self) -> ndarray: return self._grad diff --git a/pyerrors/dirac.py b/pyerrors/dirac.py index 016e4722..9d4244aa 100644 --- a/pyerrors/dirac.py +++ b/pyerrors/dirac.py @@ -1,4 +1,6 @@ +from __future__ import annotations import numpy as np +from numpy import ndarray gammaX = np.array( @@ -22,7 +24,7 @@ identity = np.array( dtype=complex) -def epsilon_tensor(i, j, k): +def epsilon_tensor(i: int, j: int, k: int) -> float: """Rank-3 epsilon tensor Based on https://codegolf.stackexchange.com/a/160375 @@ -39,7 +41,7 @@ def epsilon_tensor(i, j, k): return (i - j) * (j - k) * (k - i) / 2 -def epsilon_tensor_rank4(i, j, k, o): +def epsilon_tensor_rank4(i: int, j: int, k: int, o: int) -> float: """Rank-4 epsilon tensor Extension of https://codegolf.stackexchange.com/a/160375 @@ -57,7 +59,7 @@ def epsilon_tensor_rank4(i, j, k, o): return (i - j) * (j - k) * (k - i) * (i - o) * (j - o) * (o - k) / 12 -def Grid_gamma(gamma_tag): +def Grid_gamma(gamma_tag: str) -> ndarray: """Returns gamma matrix in Grid labeling.""" if gamma_tag == 'Identity': g = identity diff --git a/pyerrors/fits.py b/pyerrors/fits.py index 8ed540c5..8a27c1b1 100644 --- a/pyerrors/fits.py +++ b/pyerrors/fits.py @@ -1,3 +1,4 @@ +from __future__ import annotations import gc from collections.abc import Sequence import warnings @@ -15,6 +16,8 @@ from autograd import elementwise_grad as egrad from numdifftools import Jacobian as num_jacobian from numdifftools import Hessian as num_hessian from .obs import Obs, derived_observable, covariance, cov_Obs, invert_corr_cov_cholesky +from numpy import ndarray +from typing import Any, Callable, Dict, List, Optional, Tuple, Union class Fit_result(Sequence): @@ -36,10 +39,10 @@ class Fit_result(Sequence): def __init__(self): self.fit_parameters = None - def __getitem__(self, idx): + def __getitem__(self, idx: int) -> Obs: return self.fit_parameters[idx] - def __len__(self): + def __len__(self) -> int: return len(self.fit_parameters) def gamma_method(self, **kwargs): @@ -48,7 +51,7 @@ class Fit_result(Sequence): gm = gamma_method - def __str__(self): + def __str__(self) -> str: my_str = 'Goodness of fit:\n' if hasattr(self, 'chisquare_by_dof'): my_str += '\u03C7\u00b2/d.o.f. = ' + f'{self.chisquare_by_dof:2.6f}' + '\n' @@ -65,12 +68,12 @@ class Fit_result(Sequence): my_str += str(i_par) + '\t' + ' ' * int(par >= 0) + str(par).rjust(int(par < 0.0)) + '\n' return my_str - def __repr__(self): + def __repr__(self) -> str: m = max(map(len, list(self.__dict__.keys()))) + 1 return '\n'.join([key.rjust(m) + ': ' + repr(value) for key, value in sorted(self.__dict__.items())]) -def least_squares(x, y, func, priors=None, silent=False, **kwargs): +def least_squares(x: Any, y: Union[Dict[str, ndarray], List[Obs], ndarray, Dict[str, List[Obs]]], func: Union[Callable, Dict[str, Callable]], priors: Optional[Union[Dict[int, str], List[str], List[Obs], Dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result: r'''Performs a non-linear fit to y = func(x). ``` @@ -503,7 +506,7 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs): return output -def total_least_squares(x, y, func, silent=False, **kwargs): +def total_least_squares(x: List[Obs], y: List[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result: r'''Performs a non-linear fit to y = func(x) and returns a list of Obs corresponding to the fit parameters. Parameters @@ -707,7 +710,7 @@ def total_least_squares(x, y, func, silent=False, **kwargs): return output -def fit_lin(x, y, **kwargs): +def fit_lin(x: List[Union[Obs, int, float]], y: List[Obs], **kwargs) -> List[Obs]: """Performs a linear fit to y = n + m * x and returns two Obs n, m. Parameters @@ -738,7 +741,7 @@ def fit_lin(x, y, **kwargs): raise TypeError('Unsupported types for x') -def qqplot(x, o_y, func, p, title=""): +def qqplot(x: ndarray, o_y: List[Obs], func: Callable, p: List[Obs], title: str=""): """Generates a quantile-quantile plot of the fit result which can be used to check if the residuals of the fit are gaussian distributed. @@ -768,7 +771,7 @@ def qqplot(x, o_y, func, p, title=""): plt.draw() -def residual_plot(x, y, func, fit_res, title=""): +def residual_plot(x: ndarray, y: List[Obs], func: Callable, fit_res: List[Obs], title: str=""): """Generates a plot which compares the fit to the data and displays the corresponding residuals For uncorrelated data the residuals are expected to be distributed ~N(0,1). @@ -805,7 +808,7 @@ def residual_plot(x, y, func, fit_res, title=""): plt.draw() -def error_band(x, func, beta): +def error_band(x: List[int], func: Callable, beta: List[Obs]) -> ndarray: """Calculate the error band for an array of sample values x, for given fit function func with optimized parameters beta. Returns @@ -829,7 +832,7 @@ def error_band(x, func, beta): return err -def ks_test(objects=None): +def ks_test(objects: Optional[List[Fit_result]]=None): """Performs a Kolmogorov–Smirnov test for the p-values of all fit object. Parameters @@ -873,7 +876,7 @@ def ks_test(objects=None): print(scipy.stats.kstest(p_values, 'uniform')) -def _extract_val_and_dval(string): +def _extract_val_and_dval(string: str) -> Tuple[float, float]: split_string = string.split('(') if '.' in split_string[0] and '.' not in split_string[1][:-1]: factor = 10 ** -len(split_string[0].partition('.')[2]) @@ -882,7 +885,7 @@ def _extract_val_and_dval(string): return float(split_string[0]), float(split_string[1][:-1]) * factor -def _construct_prior_obs(i_prior, i_n): +def _construct_prior_obs(i_prior: Union[Obs, str], i_n: int) -> Obs: if isinstance(i_prior, Obs): return i_prior elif isinstance(i_prior, str): diff --git a/pyerrors/integrate.py b/pyerrors/integrate.py index 3b48f3fb..74e0e4a5 100644 --- a/pyerrors/integrate.py +++ b/pyerrors/integrate.py @@ -1,10 +1,13 @@ +from __future__ import annotations import numpy as np from .obs import derived_observable, Obs from autograd import jacobian from scipy.integrate import quad as squad +from numpy import ndarray +from typing import Callable, Dict, List, Tuple, Union -def quad(func, p, a, b, **kwargs): +def quad(func: Callable, p: Union[List[Union[float, Obs]], List[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[Tuple[Obs, float], Tuple[float, float], Tuple[Obs, float, Dict[str, Union[int, ndarray]]]]: '''Performs a (one-dimensional) numeric integration of f(p, x) from a to b. The integration is performed using scipy.integrate.quad(). diff --git a/pyerrors/linalg.py b/pyerrors/linalg.py index 5a489e26..f850a830 100644 --- a/pyerrors/linalg.py +++ b/pyerrors/linalg.py @@ -1,9 +1,12 @@ +from __future__ import annotations import numpy as np import autograd.numpy as anp # Thinly-wrapped numpy from .obs import derived_observable, CObs, Obs, import_jackknife +from numpy import ndarray +from typing import Callable, Tuple, Union -def matmul(*operands): +def matmul(*operands) -> ndarray: """Matrix multiply all operands. Parameters @@ -59,7 +62,7 @@ def matmul(*operands): return derived_observable(multi_dot, operands, array_mode=True) -def jack_matmul(*operands): +def jack_matmul(*operands) -> ndarray: """Matrix multiply both operands making use of the jackknife approximation. Parameters @@ -120,7 +123,7 @@ def jack_matmul(*operands): return _imp_from_jack(r, name, idl) -def einsum(subscripts, *operands): +def einsum(subscripts: str, *operands) -> Union[CObs, Obs, ndarray]: """Wrapper for numpy.einsum Parameters @@ -194,24 +197,24 @@ def einsum(subscripts, *operands): return result -def inv(x): +def inv(x: ndarray) -> ndarray: """Inverse of Obs or CObs valued matrices.""" return _mat_mat_op(anp.linalg.inv, x) -def cholesky(x): +def cholesky(x: ndarray) -> ndarray: """Cholesky decomposition of Obs valued matrices.""" if any(isinstance(o, CObs) for o in x.ravel()): raise Exception("Cholesky decomposition is not implemented for CObs.") return _mat_mat_op(anp.linalg.cholesky, x) -def det(x): +def det(x: Union[ndarray, int]) -> Obs: """Determinant of Obs valued matrices.""" return _scalar_mat_op(anp.linalg.det, x) -def _scalar_mat_op(op, obs, **kwargs): +def _scalar_mat_op(op: Callable, obs: Union[ndarray, int], **kwargs) -> Obs: """Computes the matrix to scalar operation op to a given matrix of Obs.""" def _mat(x, **kwargs): dim = int(np.sqrt(len(x))) @@ -232,7 +235,7 @@ def _scalar_mat_op(op, obs, **kwargs): return derived_observable(_mat, raveled_obs, **kwargs) -def _mat_mat_op(op, obs, **kwargs): +def _mat_mat_op(op: Callable, obs: ndarray, **kwargs) -> ndarray: """Computes the matrix to matrix operation op to a given matrix of Obs.""" # Use real representation to calculate matrix operations for complex matrices if any(isinstance(o, CObs) for o in obs.ravel()): @@ -258,31 +261,31 @@ def _mat_mat_op(op, obs, **kwargs): return derived_observable(lambda x, **kwargs: op(x), [obs], array_mode=True)[0] -def eigh(obs, **kwargs): +def eigh(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray]: """Computes the eigenvalues and eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh.""" w = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[0], obs) v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs) return w, v -def eig(obs, **kwargs): +def eig(obs: ndarray, **kwargs) -> ndarray: """Computes the eigenvalues of a given matrix of Obs according to np.linalg.eig.""" w = derived_observable(lambda x, **kwargs: anp.real(anp.linalg.eig(x)[0]), obs) return w -def eigv(obs, **kwargs): +def eigv(obs: ndarray, **kwargs) -> ndarray: """Computes the eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh.""" v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs) return v -def pinv(obs, **kwargs): +def pinv(obs: ndarray, **kwargs) -> ndarray: """Computes the Moore-Penrose pseudoinverse of a matrix of Obs.""" return derived_observable(lambda x, **kwargs: anp.linalg.pinv(x), obs) -def svd(obs, **kwargs): +def svd(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray, ndarray]: """Computes the singular value decomposition of a matrix of Obs.""" u = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[0], obs) s = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[1], obs) diff --git a/pyerrors/misc.py b/pyerrors/misc.py index 94a7d4c2..8832e0ef 100644 --- a/pyerrors/misc.py +++ b/pyerrors/misc.py @@ -1,3 +1,4 @@ +from __future__ import annotations import platform import numpy as np import scipy @@ -7,6 +8,8 @@ import pandas as pd import pickle from .obs import Obs from .version import __version__ +from numpy import float64, int64, ndarray +from typing import List, Type, Union def print_config(): @@ -54,7 +57,7 @@ def errorbar(x, y, axes=plt, **kwargs): axes.errorbar(val["x"], val["y"], xerr=err["x"], yerr=err["y"], **kwargs) -def dump_object(obj, name, **kwargs): +def dump_object(obj: Corr, name: str, **kwargs): """Dump object into pickle file. Parameters @@ -78,7 +81,7 @@ def dump_object(obj, name, **kwargs): pickle.dump(obj, fb) -def load_object(path): +def load_object(path: str) -> Union[Obs, Corr]: """Load object from pickle file. Parameters @@ -95,7 +98,7 @@ def load_object(path): return pickle.load(file) -def pseudo_Obs(value, dvalue, name, samples=1000): +def pseudo_Obs(value: Union[float, int64, float64, int], dvalue: Union[float, float64, int], name: str, samples: int=1000) -> Obs: """Generate an Obs object with given value, dvalue and name for test purposes Parameters @@ -132,7 +135,7 @@ def pseudo_Obs(value, dvalue, name, samples=1000): return res -def gen_correlated_data(means, cov, name, tau=0.5, samples=1000): +def gen_correlated_data(means: Union[ndarray, List[float]], cov: ndarray, name: str, tau: Union[float, ndarray]=0.5, samples: int=1000) -> List[Obs]: """ Generate observables with given covariance and autocorrelation times. Parameters @@ -174,7 +177,7 @@ def gen_correlated_data(means, cov, name, tau=0.5, samples=1000): return [Obs([dat], [name]) for dat in corr_data.T] -def _assert_equal_properties(ol, otype=Obs): +def _assert_equal_properties(ol: Union[List[Obs], List[CObs], ndarray], otype: Type[Obs]=Obs): otype = type(ol[0]) for o in ol[1:]: if not isinstance(o, otype): diff --git a/pyerrors/mpm.py b/pyerrors/mpm.py index bf782af6..b539797e 100644 --- a/pyerrors/mpm.py +++ b/pyerrors/mpm.py @@ -1,10 +1,13 @@ +from __future__ import annotations import numpy as np import scipy.linalg from .obs import Obs from .linalg import svd, eig +from pyerrors.obs import Obs +from typing import List -def matrix_pencil_method(corrs, k=1, p=None, **kwargs): +def matrix_pencil_method(corrs: List[Obs], k: int=1, p: None=None, **kwargs) -> List[Obs]: """Matrix pencil method to extract k energy levels from data Implementation of the matrix pencil method based on diff --git a/pyerrors/obs.py b/pyerrors/obs.py index 623c37fd..661d3c72 100644 --- a/pyerrors/obs.py +++ b/pyerrors/obs.py @@ -1,3 +1,4 @@ +from __future__ import annotations import warnings import hashlib import pickle @@ -10,6 +11,8 @@ from scipy.stats import skew, skewtest, kurtosis, kurtosistest import numdifftools as nd from itertools import groupby from .covobs import Covobs +from numpy import bool, float64, int64, ndarray +from typing import Any, Callable, Dict, List, Optional, Union # Improve print output of numpy.ndarrays containing Obs objects. np.set_printoptions(formatter={'object': lambda x: str(x)}) @@ -57,7 +60,7 @@ class Obs: N_sigma_global = 1.0 N_sigma_dict = {} - def __init__(self, samples, names, idl=None, **kwargs): + def __init__(self, samples: Union[List[List[int]], List[ndarray], ndarray, List[List[float64]], List[List[float]]], names: List[Union[int, Any, str]], idl: Optional[Any]=None, **kwargs): """ Initialize Obs object. Parameters @@ -141,27 +144,27 @@ class Obs: self.tag = None @property - def value(self): + def value(self) -> Union[float, int64, float64, int]: return self._value @property - def dvalue(self): + def dvalue(self) -> Union[float, float64]: return self._dvalue @property - def e_names(self): + def e_names(self) -> List[str]: return sorted(set([o.split('|')[0] for o in self.names])) @property - def cov_names(self): + def cov_names(self) -> List[Union[Any, str]]: return sorted(set([o for o in self.covobs.keys()])) @property - def mc_names(self): + def mc_names(self) -> List[Union[Any, str]]: return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names])) @property - def e_content(self): + def e_content(self) -> Dict[str, List[str]]: res = {} for e, e_name in enumerate(self.e_names): res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names)) @@ -170,7 +173,7 @@ class Obs: return res @property - def covobs(self): + def covobs(self) -> Dict[str, Covobs]: return self._covobs def gamma_method(self, **kwargs): @@ -341,7 +344,7 @@ class Obs: gm = gamma_method - def _calc_gamma(self, deltas, idx, shape, w_max, fft, gapsize): + def _calc_gamma(self, deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray: """Calculate Gamma_{AA} from the deltas, which are defined on idx. idx is assumed to be a contiguous range (possibly with a stepsize != 1) @@ -377,7 +380,7 @@ class Obs: return gamma - def details(self, ens_content=True): + def details(self, ens_content: bool=True): """Output detailed properties of the Obs. Parameters @@ -446,7 +449,7 @@ class Obs: my_string_list.append(my_string) print('\n'.join(my_string_list)) - def reweight(self, weight): + def reweight(self, weight: "Obs") -> "Obs": """Reweight the obs with given rewighting factors. Parameters @@ -461,7 +464,7 @@ class Obs: """ return reweight(weight, [self])[0] - def is_zero_within_error(self, sigma=1): + def is_zero_within_error(self, sigma: Union[float, int]=1) -> Union[bool, bool]: """Checks whether the observable is zero within 'sigma' standard errors. Parameters @@ -473,7 +476,7 @@ class Obs: """ return self.is_zero() or np.abs(self.value) <= sigma * self._dvalue - def is_zero(self, atol=1e-10): + def is_zero(self, atol: float=1e-10) -> Union[bool, bool]: """Checks whether the observable is zero within a given tolerance. Parameters @@ -483,7 +486,7 @@ class Obs: """ return np.isclose(0.0, self.value, 1e-14, atol) and all(np.allclose(0.0, delta, 1e-14, atol) for delta in self.deltas.values()) and all(np.allclose(0.0, delta.errsq(), 1e-14, atol) for delta in self.covobs.values()) - def plot_tauint(self, save=None): + def plot_tauint(self, save: None=None): """Plot integrated autocorrelation time for each ensemble. Parameters @@ -523,7 +526,7 @@ class Obs: if save: fig.savefig(save + "_" + str(e)) - def plot_rho(self, save=None): + def plot_rho(self, save: None=None): """Plot normalized autocorrelation function time for each ensemble. Parameters @@ -576,7 +579,7 @@ class Obs: plt.title('Replica distribution' + e_name + ' (mean=0, var=1)') plt.draw() - def plot_history(self, expand=True): + def plot_history(self, expand: bool=True): """Plot derived Monte Carlo history for each ensemble Parameters @@ -608,7 +611,7 @@ class Obs: plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})') plt.draw() - def plot_piechart(self, save=None): + def plot_piechart(self, save: None=None) -> Dict[str, float64]: """Plot piechart which shows the fractional contribution of each ensemble to the error and returns a dictionary containing the fractions. @@ -632,7 +635,7 @@ class Obs: return dict(zip(labels, sizes)) - def dump(self, filename, datatype="json.gz", description="", **kwargs): + def dump(self, filename: str, datatype: str="json.gz", description: str="", **kwargs): """Dump the Obs to a file 'name' of chosen format. Parameters @@ -661,7 +664,7 @@ class Obs: else: raise TypeError("Unknown datatype " + str(datatype)) - def export_jackknife(self): + def export_jackknife(self) -> ndarray: """Export jackknife samples from the Obs Returns @@ -687,7 +690,7 @@ class Obs: tmp_jacks[1:] = (n * mean - full_data) / (n - 1) return tmp_jacks - def export_bootstrap(self, samples=500, random_numbers=None, save_rng=None): + def export_bootstrap(self, samples: int=500, random_numbers: Optional[ndarray]=None, save_rng: None=None) -> ndarray: """Export bootstrap samples from the Obs Parameters @@ -730,16 +733,16 @@ class Obs: ret[1:] = proj @ (self.deltas[name] + self.r_values[name]) return ret - def __float__(self): + def __float__(self) -> float: return float(self.value) - def __repr__(self): + def __repr__(self) -> str: return 'Obs[' + str(self) + ']' - def __str__(self): + def __str__(self) -> str: return _format_uncertainty(self.value, self._dvalue) - def __format__(self, format_type): + def __format__(self, format_type: str) -> str: if format_type == "": significance = 2 else: @@ -752,7 +755,7 @@ class Obs: my_str = char + my_str return my_str - def __hash__(self): + def __hash__(self) -> int: hash_tuple = (np.array([self.value]).astype(np.float32).data.tobytes(),) hash_tuple += tuple([o.astype(np.float32).data.tobytes() for o in self.deltas.values()]) hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()]) @@ -762,25 +765,25 @@ class Obs: return int(m.hexdigest(), 16) & 0xFFFFFFFF # Overload comparisons - def __lt__(self, other): + def __lt__(self, other: Union[Obs, float, float64]) -> Union[bool, bool]: return self.value < other - def __le__(self, other): + def __le__(self, other: Union[Obs, float, int]) -> bool: return self.value <= other - def __gt__(self, other): + def __gt__(self, other: Union[Obs, float]) -> Union[bool, bool]: return self.value > other - def __ge__(self, other): + def __ge__(self, other: Union[Obs, float, int]) -> Union[bool, bool]: return self.value >= other - def __eq__(self, other): + def __eq__(self, other: Optional[Union[Obs, float64, int, float]]) -> Union[bool, bool]: if other is None: return False return (self - other).is_zero() # Overload math operations - def __add__(self, y): + def __add__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] + x[1], [self, y], man_grad=[1, 1]) else: @@ -793,10 +796,10 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1]) - def __radd__(self, y): + def __radd__(self, y: Union[float, int]) -> "Obs": return self + y - def __mul__(self, y): + def __mul__(self, y: Any) -> Union[Obs, ndarray, CObs, NotImplementedType]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] * x[1], [self, y], man_grad=[y.value, self.value]) else: @@ -809,10 +812,10 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y]) - def __rmul__(self, y): + def __rmul__(self, y: Union[float, int]) -> "Obs": return self * y - def __sub__(self, y): + def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] - x[1], [self, y], man_grad=[1, -1]) else: @@ -823,16 +826,16 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1]) - def __rsub__(self, y): + def __rsub__(self, y: Union[float, int]) -> "Obs": return -1 * (self - y) - def __pos__(self): + def __pos__(self) -> "Obs": return self - def __neg__(self): + def __neg__(self) -> "Obs": return -1 * self - def __truediv__(self, y): + def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]: if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] / x[1], [self, y], man_grad=[1 / y.value, - self.value / y.value ** 2]) else: @@ -843,7 +846,7 @@ class Obs: else: return derived_observable(lambda x, **kwargs: x[0] / y, [self], man_grad=[1 / y]) - def __rtruediv__(self, y): + def __rtruediv__(self, y: Union[float, int]) -> "Obs": if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] / x[1], [y, self], man_grad=[1 / self.value, - y.value / self.value ** 2]) else: @@ -854,62 +857,62 @@ class Obs: else: return derived_observable(lambda x, **kwargs: y / x[0], [self], man_grad=[-y / self.value ** 2]) - def __pow__(self, y): + def __pow__(self, y: Union[Obs, float, int]) -> "Obs": if isinstance(y, Obs): return derived_observable(lambda x, **kwargs: x[0] ** x[1], [self, y], man_grad=[y.value * self.value ** (y.value - 1), self.value ** y.value * np.log(self.value)]) else: return derived_observable(lambda x, **kwargs: x[0] ** y, [self], man_grad=[y * self.value ** (y - 1)]) - def __rpow__(self, y): + def __rpow__(self, y: Union[float, int]) -> "Obs": return derived_observable(lambda x, **kwargs: y ** x[0], [self], man_grad=[y ** self.value * np.log(y)]) - def __abs__(self): + def __abs__(self) -> "Obs": return derived_observable(lambda x: anp.abs(x[0]), [self]) # Overload numpy functions - def sqrt(self): + def sqrt(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.sqrt(x[0]), [self], man_grad=[1 / 2 / np.sqrt(self.value)]) - def log(self): + def log(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.log(x[0]), [self], man_grad=[1 / self.value]) - def exp(self): + def exp(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.exp(x[0]), [self], man_grad=[np.exp(self.value)]) - def sin(self): + def sin(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.sin(x[0]), [self], man_grad=[np.cos(self.value)]) - def cos(self): + def cos(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.cos(x[0]), [self], man_grad=[-np.sin(self.value)]) - def tan(self): + def tan(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.tan(x[0]), [self], man_grad=[1 / np.cos(self.value) ** 2]) - def arcsin(self): + def arcsin(self) -> "Obs": return derived_observable(lambda x: anp.arcsin(x[0]), [self]) - def arccos(self): + def arccos(self) -> "Obs": return derived_observable(lambda x: anp.arccos(x[0]), [self]) - def arctan(self): + def arctan(self) -> "Obs": return derived_observable(lambda x: anp.arctan(x[0]), [self]) - def sinh(self): + def sinh(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.sinh(x[0]), [self], man_grad=[np.cosh(self.value)]) - def cosh(self): + def cosh(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.cosh(x[0]), [self], man_grad=[np.sinh(self.value)]) - def tanh(self): + def tanh(self) -> "Obs": return derived_observable(lambda x, **kwargs: np.tanh(x[0]), [self], man_grad=[1 / np.cosh(self.value) ** 2]) - def arcsinh(self): + def arcsinh(self) -> "Obs": return derived_observable(lambda x: anp.arcsinh(x[0]), [self]) - def arccosh(self): + def arccosh(self) -> "Obs": return derived_observable(lambda x: anp.arccosh(x[0]), [self]) - def arctanh(self): + def arctanh(self) -> "Obs": return derived_observable(lambda x: anp.arctanh(x[0]), [self]) @@ -917,17 +920,17 @@ class CObs: """Class for a complex valued observable.""" __slots__ = ['_real', '_imag', 'tag'] - def __init__(self, real, imag=0.0): + def __init__(self, real: Obs, imag: Union[Obs, float, int]=0.0): self._real = real self._imag = imag self.tag = None @property - def real(self): + def real(self) -> Obs: return self._real @property - def imag(self): + def imag(self) -> Union[Obs, float, int]: return self._imag def gamma_method(self, **kwargs): @@ -937,14 +940,14 @@ class CObs: if isinstance(self.imag, Obs): self.imag.gamma_method(**kwargs) - def is_zero(self): + def is_zero(self) -> bool: """Checks whether both real and imaginary part are zero within machine precision.""" return self.real == 0.0 and self.imag == 0.0 - def conjugate(self): + def conjugate(self) -> "CObs": return CObs(self.real, -self.imag) - def __add__(self, other): + def __add__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return other + self elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -953,10 +956,10 @@ class CObs: else: return CObs(self.real + other, self.imag) - def __radd__(self, y): + def __radd__(self, y: Union[complex, float, Obs, int]) -> "CObs": return self + y - def __sub__(self, other): + def __sub__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return -1 * (other - self) elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -964,10 +967,10 @@ class CObs: else: return CObs(self.real - other, self.imag) - def __rsub__(self, other): + def __rsub__(self, other: Union[complex, float, Obs, int]) -> "CObs": return -1 * (self - other) - def __mul__(self, other): + def __mul__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return other * self elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -986,10 +989,10 @@ class CObs: else: return CObs(self.real * other, self.imag * other) - def __rmul__(self, other): + def __rmul__(self, other: Union[complex, Obs, float, int]) -> "CObs": return self * other - def __truediv__(self, other): + def __truediv__(self, other: Any) -> Union[CObs, ndarray]: if isinstance(other, np.ndarray): return 1 / (other / self) elif hasattr(other, 'real') and hasattr(other, 'imag'): @@ -998,32 +1001,32 @@ class CObs: else: return CObs(self.real / other, self.imag / other) - def __rtruediv__(self, other): + def __rtruediv__(self, other: Union[complex, float, Obs, int]) -> "CObs": r = self.real ** 2 + self.imag ** 2 if hasattr(other, 'real') and hasattr(other, 'imag'): return CObs((self.real * other.real + self.imag * other.imag) / r, (self.real * other.imag - self.imag * other.real) / r) else: return CObs(self.real * other / r, -self.imag * other / r) - def __abs__(self): + def __abs__(self) -> Obs: return np.sqrt(self.real**2 + self.imag**2) - def __pos__(self): + def __pos__(self) -> "CObs": return self - def __neg__(self): + def __neg__(self) -> "CObs": return -1 * self - def __eq__(self, other): + def __eq__(self, other: Union[CObs, int]) -> bool: return self.real == other.real and self.imag == other.imag - def __str__(self): + def __str__(self) -> str: return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)' - def __repr__(self): + def __repr__(self) -> str: return 'CObs[' + str(self) + ']' - def __format__(self, format_type): + def __format__(self, format_type: str) -> str: if format_type == "": significance = 2 format_type = "2" @@ -1032,7 +1035,7 @@ class CObs: return f"({self.real:{format_type}}{self.imag:+{significance}}j)" -def gamma_method(x, **kwargs): +def gamma_method(x: Union[Corr, Obs, ndarray, List[Obs]], **kwargs) -> ndarray: """Vectorized version of the gamma_method applicable to lists or arrays of Obs. See docstring of pe.Obs.gamma_method for details. @@ -1043,7 +1046,7 @@ def gamma_method(x, **kwargs): gm = gamma_method -def _format_uncertainty(value, dvalue, significance=2): +def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float, float64, int], significance: int=2) -> str: """Creates a string of a value and its error in paranthesis notation, e.g., 13.02(45)""" if dvalue == 0.0 or (not np.isfinite(dvalue)): return str(value) @@ -1060,7 +1063,7 @@ def _format_uncertainty(value, dvalue, significance=2): return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})" -def _expand_deltas(deltas, idx, shape, gapsize): +def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray: """Expand deltas defined on idx to a regular range with spacing gapsize between two configurations and where holes are filled by 0. If idx is of type range, the deltas are not changed if the idx.step == gapsize. @@ -1086,7 +1089,7 @@ def _expand_deltas(deltas, idx, shape, gapsize): return ret -def _merge_idx(idl): +def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> Union[List[Union[int64, int]], range, List[int]]: """Returns the union of all lists in idl as range or sorted list Parameters @@ -1109,7 +1112,7 @@ def _merge_idx(idl): return idunion -def _intersection_idx(idl): +def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[int]]: """Returns the intersection of all lists in idl as range or sorted list Parameters @@ -1135,7 +1138,7 @@ def _intersection_idx(idl): return idinter -def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor): +def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, List[int]], shape: int, new_idx: Union[range, List[int]], scalefactor: Union[float, int]) -> ndarray: """Expand deltas defined on idx to the list of configs that is defined by new_idx. New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest common divisor of the step sizes is used as new step size. @@ -1167,7 +1170,7 @@ def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor): return np.array([ret[new_idx[i] - new_idx[0]] for i in range(len(new_idx))]) * len(new_idx) / len(idx) * scalefactor -def derived_observable(func, data, array_mode=False, **kwargs): +def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwargs) -> Union[Obs, ndarray]: """Construct a derived Obs according to func(data, **kwargs) using automatic differentiation. Parameters @@ -1357,7 +1360,7 @@ def derived_observable(func, data, array_mode=False, **kwargs): return final_result -def _reduce_deltas(deltas, idx_old, idx_new): +def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, List[int]], idx_new: Union[range, List[int], ndarray]) -> Union[List[float], ndarray]: """Extract deltas defined on idx_old on all configs of idx_new. Assumes, that idx_old and idx_new are correctly defined idl, i.e., they @@ -1386,7 +1389,7 @@ def _reduce_deltas(deltas, idx_old, idx_new): return np.array(deltas)[indices] -def reweight(weight, obs, **kwargs): +def reweight(weight: Obs, obs: Union[ndarray, List[Obs]], **kwargs) -> List[Obs]: """Reweight a list of observables. Parameters @@ -1428,7 +1431,7 @@ def reweight(weight, obs, **kwargs): return result -def correlate(obs_a, obs_b): +def correlate(obs_a: Obs, obs_b: Obs) -> Obs: """Correlate two observables. Parameters @@ -1471,7 +1474,7 @@ def correlate(obs_a, obs_b): return o -def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs): +def covariance(obs: Union[ndarray, List[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray: r'''Calculates the error covariance matrix of a set of observables. WARNING: This function should be used with care, especially for observables with support on multiple @@ -1541,7 +1544,7 @@ def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs): return cov -def invert_corr_cov_cholesky(corr, inverrdiag): +def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray: """Constructs a lower triangular matrix `chol` via the Cholesky decomposition of the correlation matrix `corr` and then returns the inverse covariance matrix `chol_inv` as a lower triangular matrix by solving `chol * x = inverrdiag`. @@ -1564,7 +1567,7 @@ def invert_corr_cov_cholesky(corr, inverrdiag): return chol_inv -def sort_corr(corr, kl, yd): +def sort_corr(corr: ndarray, kl: List[str], yd: Dict[str, List[Obs]]) -> ndarray: """ Reorders a correlation matrix to match the alphabetical order of its underlying y data. The ordering of the input correlation matrix `corr` is given by the list of keys `kl`. @@ -1627,7 +1630,7 @@ def sort_corr(corr, kl, yd): return corr_sorted -def _smooth_eigenvalues(corr, E): +def _smooth_eigenvalues(corr: ndarray, E: int) -> ndarray: """Eigenvalue smoothing as described in hep-lat/9412087 corr : np.ndarray @@ -1644,7 +1647,7 @@ def _smooth_eigenvalues(corr, E): return vec @ np.diag(vals) @ vec.T -def _covariance_element(obs1, obs2): +def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]: """Estimates the covariance of two Obs objects, neglecting autocorrelations.""" def calc_gamma(deltas1, deltas2, idx1, idx2, new_idx): @@ -1704,7 +1707,7 @@ def _covariance_element(obs1, obs2): return dvalue -def import_jackknife(jacks, name, idl=None): +def import_jackknife(jacks: ndarray, name: str, idl: Optional[List[range]]=None) -> Obs: """Imports jackknife samples and returns an Obs Parameters @@ -1724,7 +1727,7 @@ def import_jackknife(jacks, name, idl=None): return new_obs -def import_bootstrap(boots, name, random_numbers): +def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs: """Imports bootstrap samples and returns an Obs Parameters @@ -1754,7 +1757,7 @@ def import_bootstrap(boots, name, random_numbers): return ret -def merge_obs(list_of_obs): +def merge_obs(list_of_obs: List[Obs]) -> Obs: """Combine all observables in list_of_obs into one new observable Parameters @@ -1784,7 +1787,7 @@ def merge_obs(list_of_obs): return o -def cov_Obs(means, cov, name, grad=None): +def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, List[Obs]]: """Create an Obs based on mean(s) and a covariance matrix Parameters @@ -1827,7 +1830,7 @@ def cov_Obs(means, cov, name, grad=None): return ol -def _determine_gap(o, e_content, e_name): +def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Union[int64, int]: gaps = [] for r_name in e_content[e_name]: if isinstance(o.idl[r_name], range): @@ -1842,7 +1845,7 @@ def _determine_gap(o, e_content, e_name): return gap -def _check_lists_equal(idl): +def _check_lists_equal(idl: List[Union[List[int], List[Union[int64, int]], range, ndarray]]): ''' Use groupby to efficiently check whether all elements of idl are identical. Returns True if all elements are equal, otherwise False. diff --git a/pyerrors/roots.py b/pyerrors/roots.py index acc5614f..c3a9f7da 100644 --- a/pyerrors/roots.py +++ b/pyerrors/roots.py @@ -1,10 +1,12 @@ +from __future__ import annotations import numpy as np import scipy.optimize from autograd import jacobian from .obs import derived_observable +from typing import Callable, List, Union -def find_root(d, func, guess=1.0, **kwargs): +def find_root(d: Union[Obs, List[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs: r'''Finds the root of the function func(x, d) where d is an `Obs`. Parameters diff --git a/pyerrors/special.py b/pyerrors/special.py index 8b36e056..bb9e82b9 100644 --- a/pyerrors/special.py +++ b/pyerrors/special.py @@ -1,3 +1,4 @@ +from __future__ import annotations import scipy import numpy as np from autograd.extend import primitive, defvjp