[Feat] Add type hints to pyerrors modules

This commit is contained in:
Fabian Joswig 2024-12-25 11:09:58 +01:00
parent 997d360db3
commit 3db8eb2989
11 changed files with 236 additions and 207 deletions

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import warnings
from itertools import permutations
import numpy as np
@ -9,6 +10,8 @@ from .misc import dump_object, _assert_equal_properties
from .fits import least_squares
from .roots import find_root
from . import linalg
from numpy import float64, int64, ndarray, ufunc
from typing import Any, Callable, List, Optional, Tuple, Union
class Corr:
@ -42,7 +45,7 @@ class Corr:
__slots__ = ["content", "N", "T", "tag", "prange"]
def __init__(self, data_input, padding=[0, 0], prange=None):
def __init__(self, data_input: Any, padding: List[int]=[0, 0], prange: Optional[List[int]]=None):
""" Initialize a Corr object.
Parameters
@ -119,7 +122,7 @@ class Corr:
self.T = len(self.content)
self.prange = prange
def __getitem__(self, idx):
def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, List[ndarray]]:
"""Return the content of timeslice idx"""
if self.content[idx] is None:
return None
@ -151,7 +154,7 @@ class Corr:
gm = gamma_method
def projected(self, vector_l=None, vector_r=None, normalize=False):
def projected(self, vector_l: Optional[Union[ndarray, List[Optional[ndarray]]]]=None, vector_r: None=None, normalize: bool=False) -> "Corr":
"""We need to project the Correlator with a Vector to get a single value at each timeslice.
The method can use one or two vectors.
@ -190,7 +193,7 @@ class Corr:
newcontent = [None if (_check_for_none(self, self.content[t]) or vector_l[t] is None or vector_r[t] is None) else np.asarray([vector_l[t].T @ self.content[t] @ vector_r[t]]) for t in range(self.T)]
return Corr(newcontent)
def item(self, i, j):
def item(self, i: int, j: int) -> "Corr":
"""Picks the element [i,j] from every matrix and returns a correlator containing one Obs per timeslice.
Parameters
@ -205,7 +208,7 @@ class Corr:
newcontent = [None if (item is None) else item[i, j] for item in self.content]
return Corr(newcontent)
def plottable(self):
def plottable(self) -> Union[Tuple[List[int], List[float64], List[float64]], Tuple[List[int], List[float], List[float64]]]:
"""Outputs the correlator in a plotable format.
Outputs three lists containing the timeslice index, the value on each
@ -219,7 +222,7 @@ class Corr:
return x_list, y_list, y_err_list
def symmetric(self):
def symmetric(self) -> "Corr":
""" Symmetrize the correlator around x0=0."""
if self.N != 1:
raise ValueError('symmetric cannot be safely applied to multi-dimensional correlators.')
@ -240,7 +243,7 @@ class Corr:
raise ValueError("Corr could not be symmetrized: No redundant values")
return Corr(newcontent, prange=self.prange)
def anti_symmetric(self):
def anti_symmetric(self) -> "Corr":
"""Anti-symmetrize the correlator around x0=0."""
if self.N != 1:
raise TypeError('anti_symmetric cannot be safely applied to multi-dimensional correlators.')
@ -277,7 +280,7 @@ class Corr:
return False
return True
def trace(self):
def trace(self) -> "Corr":
"""Calculates the per-timeslice trace of a correlator matrix."""
if self.N == 1:
raise ValueError("Only works for correlator matrices.")
@ -289,7 +292,7 @@ class Corr:
newcontent.append(np.trace(self.content[t]))
return Corr(newcontent)
def matrix_symmetric(self):
def matrix_symmetric(self) -> "Corr":
"""Symmetrizes the correlator matrices on every timeslice."""
if self.N == 1:
raise ValueError("Trying to symmetrize a correlator matrix, that already has N=1.")
@ -299,7 +302,7 @@ class Corr:
transposed = [None if _check_for_none(self, G) else G.T for G in self.content]
return 0.5 * (Corr(transposed) + self)
def GEVP(self, t0, ts=None, sort="Eigenvalue", vector_obs=False, **kwargs):
def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[List[List[Optional[ndarray]]], ndarray, List[Optional[ndarray]]]:
r'''Solve the generalized eigenvalue problem on the correlator matrix and returns the corresponding eigenvectors.
The eigenvectors are sorted according to the descending eigenvalues, the zeroth eigenvector(s) correspond to the
@ -405,7 +408,7 @@ class Corr:
else:
return reordered_vecs
def Eigenvalue(self, t0, ts=None, state=0, sort="Eigenvalue", **kwargs):
def Eigenvalue(self, t0: int, ts: None=None, state: int=0, sort: str="Eigenvalue", **kwargs) -> "Corr":
"""Determines the eigenvalue of the GEVP by solving and projecting the correlator
Parameters
@ -418,7 +421,7 @@ class Corr:
vec = self.GEVP(t0, ts=ts, sort=sort, **kwargs)[state]
return self.projected(vec)
def Hankel(self, N, periodic=False):
def Hankel(self, N: int, periodic: bool=False) -> "Corr":
"""Constructs an NxN Hankel matrix
C(t) c(t+1) ... c(t+n-1)
@ -459,7 +462,7 @@ class Corr:
return Corr(new_content)
def roll(self, dt):
def roll(self, dt: int) -> "Corr":
"""Periodically shift the correlator by dt timeslices
Parameters
@ -469,11 +472,11 @@ class Corr:
"""
return Corr(list(np.roll(np.array(self.content, dtype=object), dt, axis=0)))
def reverse(self):
def reverse(self) -> "Corr":
"""Reverse the time ordering of the Corr"""
return Corr(self.content[:: -1])
def thin(self, spacing=2, offset=0):
def thin(self, spacing: int=2, offset: int=0) -> "Corr":
"""Thin out a correlator to suppress correlations
Parameters
@ -491,7 +494,7 @@ class Corr:
new_content.append(self.content[t])
return Corr(new_content)
def correlate(self, partner):
def correlate(self, partner: Union[Corr, float, Obs]) -> "Corr":
"""Correlate the correlator with another correlator or Obs
Parameters
@ -520,7 +523,7 @@ class Corr:
return Corr(new_content)
def reweight(self, weight, **kwargs):
def reweight(self, weight: Obs, **kwargs) -> "Corr":
"""Reweight the correlator.
Parameters
@ -543,7 +546,7 @@ class Corr:
new_content.append(np.array(reweight(weight, t_slice, **kwargs)))
return Corr(new_content)
def T_symmetry(self, partner, parity=+1):
def T_symmetry(self, partner: "Corr", parity: int=+1) -> "Corr":
"""Return the time symmetry average of the correlator and its partner
Parameters
@ -573,7 +576,7 @@ class Corr:
return (self + T_partner) / 2
def deriv(self, variant="symmetric"):
def deriv(self, variant: Optional[str]="symmetric") -> "Corr":
"""Return the first derivative of the correlator with respect to x0.
Parameters
@ -638,7 +641,7 @@ class Corr:
else:
raise ValueError("Unknown variant.")
def second_deriv(self, variant="symmetric"):
def second_deriv(self, variant: Optional[str]="symmetric") -> "Corr":
r"""Return the second derivative of the correlator with respect to x0.
Parameters
@ -701,7 +704,7 @@ class Corr:
else:
raise ValueError("Unknown variant.")
def m_eff(self, variant='log', guess=1.0):
def m_eff(self, variant: str='log', guess: float=1.0) -> "Corr":
"""Returns the effective mass of the correlator as correlator object
Parameters
@ -785,7 +788,7 @@ class Corr:
else:
raise ValueError('Unknown variant.')
def fit(self, function, fitrange=None, silent=False, **kwargs):
def fit(self, function: Callable, fitrange: Optional[Union[str, List[int]]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Fits function to the data
Parameters
@ -819,7 +822,7 @@ class Corr:
result = least_squares(xs, ys, function, silent=silent, **kwargs)
return result
def plateau(self, plateau_range=None, method="fit", auto_gamma=False):
def plateau(self, plateau_range: Optional[List[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs:
""" Extract a plateau value from a Corr object
Parameters
@ -856,7 +859,7 @@ class Corr:
else:
raise ValueError("Unsupported plateau method: " + method)
def set_prange(self, prange):
def set_prange(self, prange: List[Union[int, float]]):
"""Sets the attribute prange of the Corr object."""
if not len(prange) == 2:
raise ValueError("prange must be a list or array with two values")
@ -868,7 +871,7 @@ class Corr:
self.prange = prange
return
def show(self, x_range=None, comp=None, y_range=None, logscale=False, plateau=None, fit_res=None, fit_key=None, ylabel=None, save=None, auto_gamma=False, hide_sigma=None, references=None, title=None):
def show(self, x_range: Optional[List[int64]]=None, comp: Optional[Corr]=None, y_range: None=None, logscale: bool=False, plateau: None=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: None=None, save: None=None, auto_gamma: bool=False, hide_sigma: None=None, references: None=None, title: None=None):
"""Plots the correlator using the tag of the correlator as label if available.
Parameters
@ -993,7 +996,7 @@ class Corr:
else:
raise TypeError("'save' has to be a string.")
def spaghetti_plot(self, logscale=True):
def spaghetti_plot(self, logscale: bool=True):
"""Produces a spaghetti plot of the correlator suited to monitor exceptional configurations.
Parameters
@ -1022,7 +1025,7 @@ class Corr:
plt.title(name)
plt.draw()
def dump(self, filename, datatype="json.gz", **kwargs):
def dump(self, filename: str, datatype: str="json.gz", **kwargs):
"""Dumps the Corr into a file of chosen type
Parameters
----------
@ -1046,10 +1049,10 @@ class Corr:
else:
raise ValueError("Unknown datatype " + str(datatype))
def print(self, print_range=None):
def print(self, print_range: Optional[List[int]]=None):
print(self.__repr__(print_range))
def __repr__(self, print_range=None):
def __repr__(self, print_range: Optional[List[int]]=None) -> str:
if print_range is None:
print_range = [0, None]
@ -1074,7 +1077,7 @@ class Corr:
content_string += '\n'
return content_string
def __str__(self):
def __str__(self) -> str:
return self.__repr__()
# We define the basic operations, that can be performed with correlators.
@ -1084,14 +1087,14 @@ class Corr:
__array_priority__ = 10000
def __eq__(self, y):
def __eq__(self, y: Union[Corr, Obs, int]) -> ndarray:
if isinstance(y, Corr):
comp = np.asarray(y.content, dtype=object)
else:
comp = np.asarray(y)
return np.asarray(self.content, dtype=object) == comp
def __add__(self, y):
def __add__(self, y: Any) -> "Corr":
if isinstance(y, Corr):
if ((self.N != y.N) or (self.T != y.T)):
raise ValueError("Addition of Corrs with different shape")
@ -1119,7 +1122,7 @@ class Corr:
else:
raise TypeError("Corr + wrong type")
def __mul__(self, y):
def __mul__(self, y: Any) -> "Corr":
if isinstance(y, Corr):
if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T):
raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T")
@ -1147,7 +1150,7 @@ class Corr:
else:
raise TypeError("Corr * wrong type")
def __matmul__(self, y):
def __matmul__(self, y: Union[Corr, ndarray]) -> "Corr":
if isinstance(y, np.ndarray):
if y.ndim != 2 or y.shape[0] != y.shape[1]:
raise ValueError("Can only multiply correlators by square matrices.")
@ -1174,7 +1177,7 @@ class Corr:
else:
return NotImplemented
def __rmatmul__(self, y):
def __rmatmul__(self, y: ndarray) -> "Corr":
if isinstance(y, np.ndarray):
if y.ndim != 2 or y.shape[0] != y.shape[1]:
raise ValueError("Can only multiply correlators by square matrices.")
@ -1190,7 +1193,7 @@ class Corr:
else:
return NotImplemented
def __truediv__(self, y):
def __truediv__(self, y: Union[Corr, float, ndarray, int]) -> "Corr":
if isinstance(y, Corr):
if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T):
raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T")
@ -1244,37 +1247,37 @@ class Corr:
else:
raise TypeError('Corr / wrong type')
def __neg__(self):
def __neg__(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else -1. * item for item in self.content]
return Corr(newcontent, prange=self.prange)
def __sub__(self, y):
def __sub__(self, y: Union[Corr, float, ndarray, int]) -> "Corr":
return self + (-y)
def __pow__(self, y):
def __pow__(self, y: Union[float, int]) -> "Corr":
if isinstance(y, (Obs, int, float, CObs)):
newcontent = [None if _check_for_none(self, item) else item**y for item in self.content]
return Corr(newcontent, prange=self.prange)
else:
raise TypeError('Type of exponent not supported')
def __abs__(self):
def __abs__(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.abs(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
# The numpy functions:
def sqrt(self):
def sqrt(self) -> "Corr":
return self ** 0.5
def log(self):
def log(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.log(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
def exp(self):
def exp(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.exp(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
def _apply_func_to_corr(self, func):
def _apply_func_to_corr(self, func: Union[Callable, ufunc]) -> "Corr":
newcontent = [None if _check_for_none(self, item) else func(item) for item in self.content]
for t in range(self.T):
if _check_for_none(self, newcontent[t]):
@ -1287,57 +1290,57 @@ class Corr:
raise ValueError('Operation returns undefined correlator')
return Corr(newcontent)
def sin(self):
def sin(self) -> "Corr":
return self._apply_func_to_corr(np.sin)
def cos(self):
def cos(self) -> "Corr":
return self._apply_func_to_corr(np.cos)
def tan(self):
def tan(self) -> "Corr":
return self._apply_func_to_corr(np.tan)
def sinh(self):
def sinh(self) -> "Corr":
return self._apply_func_to_corr(np.sinh)
def cosh(self):
def cosh(self) -> "Corr":
return self._apply_func_to_corr(np.cosh)
def tanh(self):
def tanh(self) -> "Corr":
return self._apply_func_to_corr(np.tanh)
def arcsin(self):
def arcsin(self) -> "Corr":
return self._apply_func_to_corr(np.arcsin)
def arccos(self):
def arccos(self) -> "Corr":
return self._apply_func_to_corr(np.arccos)
def arctan(self):
def arctan(self) -> "Corr":
return self._apply_func_to_corr(np.arctan)
def arcsinh(self):
def arcsinh(self) -> "Corr":
return self._apply_func_to_corr(np.arcsinh)
def arccosh(self):
def arccosh(self) -> "Corr":
return self._apply_func_to_corr(np.arccosh)
def arctanh(self):
def arctanh(self) -> "Corr":
return self._apply_func_to_corr(np.arctanh)
# Right hand side operations (require tweak in main module to work)
def __radd__(self, y):
return self + y
def __rsub__(self, y):
def __rsub__(self, y: int) -> "Corr":
return -self + y
def __rmul__(self, y):
def __rmul__(self, y: Union[float, int]) -> "Corr":
return self * y
def __rtruediv__(self, y):
def __rtruediv__(self, y: int) -> "Corr":
return (self / y) ** (-1)
@property
def real(self):
def real(self) -> "Corr":
def return_real(obs_OR_cobs):
if isinstance(obs_OR_cobs.flatten()[0], CObs):
return np.vectorize(lambda x: x.real)(obs_OR_cobs)
@ -1347,7 +1350,7 @@ class Corr:
return self._apply_func_to_corr(return_real)
@property
def imag(self):
def imag(self) -> "Corr":
def return_imag(obs_OR_cobs):
if isinstance(obs_OR_cobs.flatten()[0], CObs):
return np.vectorize(lambda x: x.imag)(obs_OR_cobs)
@ -1356,7 +1359,7 @@ class Corr:
return self._apply_func_to_corr(return_imag)
def prune(self, Ntrunc, tproj=3, t0proj=2, basematrix=None):
def prune(self, Ntrunc: int, tproj: int=3, t0proj: int=2, basematrix: None=None) -> "Corr":
r''' Project large correlation matrix to lowest states
This method can be used to reduce the size of an (N x N) correlation matrix
@ -1414,7 +1417,7 @@ class Corr:
return Corr(newcontent)
def _sort_vectors(vec_set_in, ts):
def _sort_vectors(vec_set_in: List[Optional[ndarray]], ts: int) -> List[Optional[Union[ndarray, List[ndarray]]]]:
"""Helper function used to find a set of Eigenvectors consistent over all timeslices"""
if isinstance(vec_set_in[ts][0][0], Obs):
@ -1446,12 +1449,12 @@ def _sort_vectors(vec_set_in, ts):
return sorted_vec_set
def _check_for_none(corr, entry):
def _check_for_none(corr: Corr, entry: Optional[ndarray]) -> bool:
"""Checks if entry for correlator corr is None"""
return len(list(filter(None, np.asarray(entry).flatten()))) < corr.N ** 2
def _GEVP_solver(Gt, G0, method='eigh', chol_inv=None):
def _GEVP_solver(Gt: Optional[ndarray], G0: ndarray, method: str='eigh', chol_inv: Optional[ndarray]=None) -> ndarray:
r"""Helper function for solving the GEVP and sorting the eigenvectors.
Solves $G(t)v_i=\lambda_i G(t_0)v_i$ and returns the eigenvectors v_i

View file

@ -1,9 +1,12 @@
from __future__ import annotations
import numpy as np
from numpy import float64, ndarray
from typing import Any, List, Optional, Union
class Covobs:
def __init__(self, mean, cov, name, pos=None, grad=None):
def __init__(self, mean: Optional[Union[float, float64, int]], cov: Any, name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, List[float]]]=None):
""" Initialize Covobs object.
Parameters
@ -39,12 +42,12 @@ class Covobs:
self._set_grad(grad)
self.value = mean
def errsq(self):
def errsq(self) -> float:
""" Return the variance (= square of the error) of the Covobs
"""
return np.dot(np.transpose(self.grad), np.dot(self.cov, self.grad)).item()
def _set_cov(self, cov):
def _set_cov(self, cov: Any):
""" Set the covariance matrix of the covobs
Parameters
@ -79,7 +82,7 @@ class Covobs:
if ev < 0:
raise Exception('Covariance matrix is not positive-semidefinite!')
def _set_grad(self, grad):
def _set_grad(self, grad: Union[List[float], ndarray]):
""" Set the gradient of the covobs
Parameters
@ -96,9 +99,9 @@ class Covobs:
raise Exception('Invalid dimension of grad!')
@property
def cov(self):
def cov(self) -> ndarray:
return self._cov
@property
def grad(self):
def grad(self) -> ndarray:
return self._grad

View file

@ -1,4 +1,6 @@
from __future__ import annotations
import numpy as np
from numpy import ndarray
gammaX = np.array(
@ -22,7 +24,7 @@ identity = np.array(
dtype=complex)
def epsilon_tensor(i, j, k):
def epsilon_tensor(i: int, j: int, k: int) -> float:
"""Rank-3 epsilon tensor
Based on https://codegolf.stackexchange.com/a/160375
@ -39,7 +41,7 @@ def epsilon_tensor(i, j, k):
return (i - j) * (j - k) * (k - i) / 2
def epsilon_tensor_rank4(i, j, k, o):
def epsilon_tensor_rank4(i: int, j: int, k: int, o: int) -> float:
"""Rank-4 epsilon tensor
Extension of https://codegolf.stackexchange.com/a/160375
@ -57,7 +59,7 @@ def epsilon_tensor_rank4(i, j, k, o):
return (i - j) * (j - k) * (k - i) * (i - o) * (j - o) * (o - k) / 12
def Grid_gamma(gamma_tag):
def Grid_gamma(gamma_tag: str) -> ndarray:
"""Returns gamma matrix in Grid labeling."""
if gamma_tag == 'Identity':
g = identity

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import gc
from collections.abc import Sequence
import warnings
@ -15,6 +16,8 @@ from autograd import elementwise_grad as egrad
from numdifftools import Jacobian as num_jacobian
from numdifftools import Hessian as num_hessian
from .obs import Obs, derived_observable, covariance, cov_Obs, invert_corr_cov_cholesky
from numpy import ndarray
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
class Fit_result(Sequence):
@ -36,10 +39,10 @@ class Fit_result(Sequence):
def __init__(self):
self.fit_parameters = None
def __getitem__(self, idx):
def __getitem__(self, idx: int) -> Obs:
return self.fit_parameters[idx]
def __len__(self):
def __len__(self) -> int:
return len(self.fit_parameters)
def gamma_method(self, **kwargs):
@ -48,7 +51,7 @@ class Fit_result(Sequence):
gm = gamma_method
def __str__(self):
def __str__(self) -> str:
my_str = 'Goodness of fit:\n'
if hasattr(self, 'chisquare_by_dof'):
my_str += '\u03C7\u00b2/d.o.f. = ' + f'{self.chisquare_by_dof:2.6f}' + '\n'
@ -65,12 +68,12 @@ class Fit_result(Sequence):
my_str += str(i_par) + '\t' + ' ' * int(par >= 0) + str(par).rjust(int(par < 0.0)) + '\n'
return my_str
def __repr__(self):
def __repr__(self) -> str:
m = max(map(len, list(self.__dict__.keys()))) + 1
return '\n'.join([key.rjust(m) + ': ' + repr(value) for key, value in sorted(self.__dict__.items())])
def least_squares(x, y, func, priors=None, silent=False, **kwargs):
def least_squares(x: Any, y: Union[Dict[str, ndarray], List[Obs], ndarray, Dict[str, List[Obs]]], func: Union[Callable, Dict[str, Callable]], priors: Optional[Union[Dict[int, str], List[str], List[Obs], Dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x).
```
@ -503,7 +506,7 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
return output
def total_least_squares(x, y, func, silent=False, **kwargs):
def total_least_squares(x: List[Obs], y: List[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x) and returns a list of Obs corresponding to the fit parameters.
Parameters
@ -707,7 +710,7 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
return output
def fit_lin(x, y, **kwargs):
def fit_lin(x: List[Union[Obs, int, float]], y: List[Obs], **kwargs) -> List[Obs]:
"""Performs a linear fit to y = n + m * x and returns two Obs n, m.
Parameters
@ -738,7 +741,7 @@ def fit_lin(x, y, **kwargs):
raise TypeError('Unsupported types for x')
def qqplot(x, o_y, func, p, title=""):
def qqplot(x: ndarray, o_y: List[Obs], func: Callable, p: List[Obs], title: str=""):
"""Generates a quantile-quantile plot of the fit result which can be used to
check if the residuals of the fit are gaussian distributed.
@ -768,7 +771,7 @@ def qqplot(x, o_y, func, p, title=""):
plt.draw()
def residual_plot(x, y, func, fit_res, title=""):
def residual_plot(x: ndarray, y: List[Obs], func: Callable, fit_res: List[Obs], title: str=""):
"""Generates a plot which compares the fit to the data and displays the corresponding residuals
For uncorrelated data the residuals are expected to be distributed ~N(0,1).
@ -805,7 +808,7 @@ def residual_plot(x, y, func, fit_res, title=""):
plt.draw()
def error_band(x, func, beta):
def error_band(x: List[int], func: Callable, beta: List[Obs]) -> ndarray:
"""Calculate the error band for an array of sample values x, for given fit function func with optimized parameters beta.
Returns
@ -829,7 +832,7 @@ def error_band(x, func, beta):
return err
def ks_test(objects=None):
def ks_test(objects: Optional[List[Fit_result]]=None):
"""Performs a KolmogorovSmirnov test for the p-values of all fit object.
Parameters
@ -873,7 +876,7 @@ def ks_test(objects=None):
print(scipy.stats.kstest(p_values, 'uniform'))
def _extract_val_and_dval(string):
def _extract_val_and_dval(string: str) -> Tuple[float, float]:
split_string = string.split('(')
if '.' in split_string[0] and '.' not in split_string[1][:-1]:
factor = 10 ** -len(split_string[0].partition('.')[2])
@ -882,7 +885,7 @@ def _extract_val_and_dval(string):
return float(split_string[0]), float(split_string[1][:-1]) * factor
def _construct_prior_obs(i_prior, i_n):
def _construct_prior_obs(i_prior: Union[Obs, str], i_n: int) -> Obs:
if isinstance(i_prior, Obs):
return i_prior
elif isinstance(i_prior, str):

View file

@ -1,10 +1,13 @@
from __future__ import annotations
import numpy as np
from .obs import derived_observable, Obs
from autograd import jacobian
from scipy.integrate import quad as squad
from numpy import ndarray
from typing import Callable, Dict, List, Tuple, Union
def quad(func, p, a, b, **kwargs):
def quad(func: Callable, p: Union[List[Union[float, Obs]], List[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[Tuple[Obs, float], Tuple[float, float], Tuple[Obs, float, Dict[str, Union[int, ndarray]]]]:
'''Performs a (one-dimensional) numeric integration of f(p, x) from a to b.
The integration is performed using scipy.integrate.quad().

View file

@ -1,9 +1,12 @@
from __future__ import annotations
import numpy as np
import autograd.numpy as anp # Thinly-wrapped numpy
from .obs import derived_observable, CObs, Obs, import_jackknife
from numpy import ndarray
from typing import Callable, Tuple, Union
def matmul(*operands):
def matmul(*operands) -> ndarray:
"""Matrix multiply all operands.
Parameters
@ -59,7 +62,7 @@ def matmul(*operands):
return derived_observable(multi_dot, operands, array_mode=True)
def jack_matmul(*operands):
def jack_matmul(*operands) -> ndarray:
"""Matrix multiply both operands making use of the jackknife approximation.
Parameters
@ -120,7 +123,7 @@ def jack_matmul(*operands):
return _imp_from_jack(r, name, idl)
def einsum(subscripts, *operands):
def einsum(subscripts: str, *operands) -> Union[CObs, Obs, ndarray]:
"""Wrapper for numpy.einsum
Parameters
@ -194,24 +197,24 @@ def einsum(subscripts, *operands):
return result
def inv(x):
def inv(x: ndarray) -> ndarray:
"""Inverse of Obs or CObs valued matrices."""
return _mat_mat_op(anp.linalg.inv, x)
def cholesky(x):
def cholesky(x: ndarray) -> ndarray:
"""Cholesky decomposition of Obs valued matrices."""
if any(isinstance(o, CObs) for o in x.ravel()):
raise Exception("Cholesky decomposition is not implemented for CObs.")
return _mat_mat_op(anp.linalg.cholesky, x)
def det(x):
def det(x: Union[ndarray, int]) -> Obs:
"""Determinant of Obs valued matrices."""
return _scalar_mat_op(anp.linalg.det, x)
def _scalar_mat_op(op, obs, **kwargs):
def _scalar_mat_op(op: Callable, obs: Union[ndarray, int], **kwargs) -> Obs:
"""Computes the matrix to scalar operation op to a given matrix of Obs."""
def _mat(x, **kwargs):
dim = int(np.sqrt(len(x)))
@ -232,7 +235,7 @@ def _scalar_mat_op(op, obs, **kwargs):
return derived_observable(_mat, raveled_obs, **kwargs)
def _mat_mat_op(op, obs, **kwargs):
def _mat_mat_op(op: Callable, obs: ndarray, **kwargs) -> ndarray:
"""Computes the matrix to matrix operation op to a given matrix of Obs."""
# Use real representation to calculate matrix operations for complex matrices
if any(isinstance(o, CObs) for o in obs.ravel()):
@ -258,31 +261,31 @@ def _mat_mat_op(op, obs, **kwargs):
return derived_observable(lambda x, **kwargs: op(x), [obs], array_mode=True)[0]
def eigh(obs, **kwargs):
def eigh(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray]:
"""Computes the eigenvalues and eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh."""
w = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[0], obs)
v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs)
return w, v
def eig(obs, **kwargs):
def eig(obs: ndarray, **kwargs) -> ndarray:
"""Computes the eigenvalues of a given matrix of Obs according to np.linalg.eig."""
w = derived_observable(lambda x, **kwargs: anp.real(anp.linalg.eig(x)[0]), obs)
return w
def eigv(obs, **kwargs):
def eigv(obs: ndarray, **kwargs) -> ndarray:
"""Computes the eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh."""
v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs)
return v
def pinv(obs, **kwargs):
def pinv(obs: ndarray, **kwargs) -> ndarray:
"""Computes the Moore-Penrose pseudoinverse of a matrix of Obs."""
return derived_observable(lambda x, **kwargs: anp.linalg.pinv(x), obs)
def svd(obs, **kwargs):
def svd(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray, ndarray]:
"""Computes the singular value decomposition of a matrix of Obs."""
u = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[0], obs)
s = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[1], obs)

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import platform
import numpy as np
import scipy
@ -7,6 +8,8 @@ import pandas as pd
import pickle
from .obs import Obs
from .version import __version__
from numpy import float64, int64, ndarray
from typing import List, Type, Union
def print_config():
@ -54,7 +57,7 @@ def errorbar(x, y, axes=plt, **kwargs):
axes.errorbar(val["x"], val["y"], xerr=err["x"], yerr=err["y"], **kwargs)
def dump_object(obj, name, **kwargs):
def dump_object(obj: Corr, name: str, **kwargs):
"""Dump object into pickle file.
Parameters
@ -78,7 +81,7 @@ def dump_object(obj, name, **kwargs):
pickle.dump(obj, fb)
def load_object(path):
def load_object(path: str) -> Union[Obs, Corr]:
"""Load object from pickle file.
Parameters
@ -95,7 +98,7 @@ def load_object(path):
return pickle.load(file)
def pseudo_Obs(value, dvalue, name, samples=1000):
def pseudo_Obs(value: Union[float, int64, float64, int], dvalue: Union[float, float64, int], name: str, samples: int=1000) -> Obs:
"""Generate an Obs object with given value, dvalue and name for test purposes
Parameters
@ -132,7 +135,7 @@ def pseudo_Obs(value, dvalue, name, samples=1000):
return res
def gen_correlated_data(means, cov, name, tau=0.5, samples=1000):
def gen_correlated_data(means: Union[ndarray, List[float]], cov: ndarray, name: str, tau: Union[float, ndarray]=0.5, samples: int=1000) -> List[Obs]:
""" Generate observables with given covariance and autocorrelation times.
Parameters
@ -174,7 +177,7 @@ def gen_correlated_data(means, cov, name, tau=0.5, samples=1000):
return [Obs([dat], [name]) for dat in corr_data.T]
def _assert_equal_properties(ol, otype=Obs):
def _assert_equal_properties(ol: Union[List[Obs], List[CObs], ndarray], otype: Type[Obs]=Obs):
otype = type(ol[0])
for o in ol[1:]:
if not isinstance(o, otype):

View file

@ -1,10 +1,13 @@
from __future__ import annotations
import numpy as np
import scipy.linalg
from .obs import Obs
from .linalg import svd, eig
from pyerrors.obs import Obs
from typing import List
def matrix_pencil_method(corrs, k=1, p=None, **kwargs):
def matrix_pencil_method(corrs: List[Obs], k: int=1, p: None=None, **kwargs) -> List[Obs]:
"""Matrix pencil method to extract k energy levels from data
Implementation of the matrix pencil method based on

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import warnings
import hashlib
import pickle
@ -10,6 +11,8 @@ from scipy.stats import skew, skewtest, kurtosis, kurtosistest
import numdifftools as nd
from itertools import groupby
from .covobs import Covobs
from numpy import bool, float64, int64, ndarray
from typing import Any, Callable, Dict, List, Optional, Union
# Improve print output of numpy.ndarrays containing Obs objects.
np.set_printoptions(formatter={'object': lambda x: str(x)})
@ -57,7 +60,7 @@ class Obs:
N_sigma_global = 1.0
N_sigma_dict = {}
def __init__(self, samples, names, idl=None, **kwargs):
def __init__(self, samples: Union[List[List[int]], List[ndarray], ndarray, List[List[float64]], List[List[float]]], names: List[Union[int, Any, str]], idl: Optional[Any]=None, **kwargs):
""" Initialize Obs object.
Parameters
@ -141,27 +144,27 @@ class Obs:
self.tag = None
@property
def value(self):
def value(self) -> Union[float, int64, float64, int]:
return self._value
@property
def dvalue(self):
def dvalue(self) -> Union[float, float64]:
return self._dvalue
@property
def e_names(self):
def e_names(self) -> List[str]:
return sorted(set([o.split('|')[0] for o in self.names]))
@property
def cov_names(self):
def cov_names(self) -> List[Union[Any, str]]:
return sorted(set([o for o in self.covobs.keys()]))
@property
def mc_names(self):
def mc_names(self) -> List[Union[Any, str]]:
return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names]))
@property
def e_content(self):
def e_content(self) -> Dict[str, List[str]]:
res = {}
for e, e_name in enumerate(self.e_names):
res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names))
@ -170,7 +173,7 @@ class Obs:
return res
@property
def covobs(self):
def covobs(self) -> Dict[str, Covobs]:
return self._covobs
def gamma_method(self, **kwargs):
@ -341,7 +344,7 @@ class Obs:
gm = gamma_method
def _calc_gamma(self, deltas, idx, shape, w_max, fft, gapsize):
def _calc_gamma(self, deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray:
"""Calculate Gamma_{AA} from the deltas, which are defined on idx.
idx is assumed to be a contiguous range (possibly with a stepsize != 1)
@ -377,7 +380,7 @@ class Obs:
return gamma
def details(self, ens_content=True):
def details(self, ens_content: bool=True):
"""Output detailed properties of the Obs.
Parameters
@ -446,7 +449,7 @@ class Obs:
my_string_list.append(my_string)
print('\n'.join(my_string_list))
def reweight(self, weight):
def reweight(self, weight: "Obs") -> "Obs":
"""Reweight the obs with given rewighting factors.
Parameters
@ -461,7 +464,7 @@ class Obs:
"""
return reweight(weight, [self])[0]
def is_zero_within_error(self, sigma=1):
def is_zero_within_error(self, sigma: Union[float, int]=1) -> Union[bool, bool]:
"""Checks whether the observable is zero within 'sigma' standard errors.
Parameters
@ -473,7 +476,7 @@ class Obs:
"""
return self.is_zero() or np.abs(self.value) <= sigma * self._dvalue
def is_zero(self, atol=1e-10):
def is_zero(self, atol: float=1e-10) -> Union[bool, bool]:
"""Checks whether the observable is zero within a given tolerance.
Parameters
@ -483,7 +486,7 @@ class Obs:
"""
return np.isclose(0.0, self.value, 1e-14, atol) and all(np.allclose(0.0, delta, 1e-14, atol) for delta in self.deltas.values()) and all(np.allclose(0.0, delta.errsq(), 1e-14, atol) for delta in self.covobs.values())
def plot_tauint(self, save=None):
def plot_tauint(self, save: None=None):
"""Plot integrated autocorrelation time for each ensemble.
Parameters
@ -523,7 +526,7 @@ class Obs:
if save:
fig.savefig(save + "_" + str(e))
def plot_rho(self, save=None):
def plot_rho(self, save: None=None):
"""Plot normalized autocorrelation function time for each ensemble.
Parameters
@ -576,7 +579,7 @@ class Obs:
plt.title('Replica distribution' + e_name + ' (mean=0, var=1)')
plt.draw()
def plot_history(self, expand=True):
def plot_history(self, expand: bool=True):
"""Plot derived Monte Carlo history for each ensemble
Parameters
@ -608,7 +611,7 @@ class Obs:
plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})')
plt.draw()
def plot_piechart(self, save=None):
def plot_piechart(self, save: None=None) -> Dict[str, float64]:
"""Plot piechart which shows the fractional contribution of each
ensemble to the error and returns a dictionary containing the fractions.
@ -632,7 +635,7 @@ class Obs:
return dict(zip(labels, sizes))
def dump(self, filename, datatype="json.gz", description="", **kwargs):
def dump(self, filename: str, datatype: str="json.gz", description: str="", **kwargs):
"""Dump the Obs to a file 'name' of chosen format.
Parameters
@ -661,7 +664,7 @@ class Obs:
else:
raise TypeError("Unknown datatype " + str(datatype))
def export_jackknife(self):
def export_jackknife(self) -> ndarray:
"""Export jackknife samples from the Obs
Returns
@ -687,7 +690,7 @@ class Obs:
tmp_jacks[1:] = (n * mean - full_data) / (n - 1)
return tmp_jacks
def export_bootstrap(self, samples=500, random_numbers=None, save_rng=None):
def export_bootstrap(self, samples: int=500, random_numbers: Optional[ndarray]=None, save_rng: None=None) -> ndarray:
"""Export bootstrap samples from the Obs
Parameters
@ -730,16 +733,16 @@ class Obs:
ret[1:] = proj @ (self.deltas[name] + self.r_values[name])
return ret
def __float__(self):
def __float__(self) -> float:
return float(self.value)
def __repr__(self):
def __repr__(self) -> str:
return 'Obs[' + str(self) + ']'
def __str__(self):
def __str__(self) -> str:
return _format_uncertainty(self.value, self._dvalue)
def __format__(self, format_type):
def __format__(self, format_type: str) -> str:
if format_type == "":
significance = 2
else:
@ -752,7 +755,7 @@ class Obs:
my_str = char + my_str
return my_str
def __hash__(self):
def __hash__(self) -> int:
hash_tuple = (np.array([self.value]).astype(np.float32).data.tobytes(),)
hash_tuple += tuple([o.astype(np.float32).data.tobytes() for o in self.deltas.values()])
hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()])
@ -762,25 +765,25 @@ class Obs:
return int(m.hexdigest(), 16) & 0xFFFFFFFF
# Overload comparisons
def __lt__(self, other):
def __lt__(self, other: Union[Obs, float, float64]) -> Union[bool, bool]:
return self.value < other
def __le__(self, other):
def __le__(self, other: Union[Obs, float, int]) -> bool:
return self.value <= other
def __gt__(self, other):
def __gt__(self, other: Union[Obs, float]) -> Union[bool, bool]:
return self.value > other
def __ge__(self, other):
def __ge__(self, other: Union[Obs, float, int]) -> Union[bool, bool]:
return self.value >= other
def __eq__(self, other):
def __eq__(self, other: Optional[Union[Obs, float64, int, float]]) -> Union[bool, bool]:
if other is None:
return False
return (self - other).is_zero()
# Overload math operations
def __add__(self, y):
def __add__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] + x[1], [self, y], man_grad=[1, 1])
else:
@ -793,10 +796,10 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1])
def __radd__(self, y):
def __radd__(self, y: Union[float, int]) -> "Obs":
return self + y
def __mul__(self, y):
def __mul__(self, y: Any) -> Union[Obs, ndarray, CObs, NotImplementedType]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] * x[1], [self, y], man_grad=[y.value, self.value])
else:
@ -809,10 +812,10 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y])
def __rmul__(self, y):
def __rmul__(self, y: Union[float, int]) -> "Obs":
return self * y
def __sub__(self, y):
def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] - x[1], [self, y], man_grad=[1, -1])
else:
@ -823,16 +826,16 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1])
def __rsub__(self, y):
def __rsub__(self, y: Union[float, int]) -> "Obs":
return -1 * (self - y)
def __pos__(self):
def __pos__(self) -> "Obs":
return self
def __neg__(self):
def __neg__(self) -> "Obs":
return -1 * self
def __truediv__(self, y):
def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] / x[1], [self, y], man_grad=[1 / y.value, - self.value / y.value ** 2])
else:
@ -843,7 +846,7 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] / y, [self], man_grad=[1 / y])
def __rtruediv__(self, y):
def __rtruediv__(self, y: Union[float, int]) -> "Obs":
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] / x[1], [y, self], man_grad=[1 / self.value, - y.value / self.value ** 2])
else:
@ -854,62 +857,62 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: y / x[0], [self], man_grad=[-y / self.value ** 2])
def __pow__(self, y):
def __pow__(self, y: Union[Obs, float, int]) -> "Obs":
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] ** x[1], [self, y], man_grad=[y.value * self.value ** (y.value - 1), self.value ** y.value * np.log(self.value)])
else:
return derived_observable(lambda x, **kwargs: x[0] ** y, [self], man_grad=[y * self.value ** (y - 1)])
def __rpow__(self, y):
def __rpow__(self, y: Union[float, int]) -> "Obs":
return derived_observable(lambda x, **kwargs: y ** x[0], [self], man_grad=[y ** self.value * np.log(y)])
def __abs__(self):
def __abs__(self) -> "Obs":
return derived_observable(lambda x: anp.abs(x[0]), [self])
# Overload numpy functions
def sqrt(self):
def sqrt(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.sqrt(x[0]), [self], man_grad=[1 / 2 / np.sqrt(self.value)])
def log(self):
def log(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.log(x[0]), [self], man_grad=[1 / self.value])
def exp(self):
def exp(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.exp(x[0]), [self], man_grad=[np.exp(self.value)])
def sin(self):
def sin(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.sin(x[0]), [self], man_grad=[np.cos(self.value)])
def cos(self):
def cos(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.cos(x[0]), [self], man_grad=[-np.sin(self.value)])
def tan(self):
def tan(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.tan(x[0]), [self], man_grad=[1 / np.cos(self.value) ** 2])
def arcsin(self):
def arcsin(self) -> "Obs":
return derived_observable(lambda x: anp.arcsin(x[0]), [self])
def arccos(self):
def arccos(self) -> "Obs":
return derived_observable(lambda x: anp.arccos(x[0]), [self])
def arctan(self):
def arctan(self) -> "Obs":
return derived_observable(lambda x: anp.arctan(x[0]), [self])
def sinh(self):
def sinh(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.sinh(x[0]), [self], man_grad=[np.cosh(self.value)])
def cosh(self):
def cosh(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.cosh(x[0]), [self], man_grad=[np.sinh(self.value)])
def tanh(self):
def tanh(self) -> "Obs":
return derived_observable(lambda x, **kwargs: np.tanh(x[0]), [self], man_grad=[1 / np.cosh(self.value) ** 2])
def arcsinh(self):
def arcsinh(self) -> "Obs":
return derived_observable(lambda x: anp.arcsinh(x[0]), [self])
def arccosh(self):
def arccosh(self) -> "Obs":
return derived_observable(lambda x: anp.arccosh(x[0]), [self])
def arctanh(self):
def arctanh(self) -> "Obs":
return derived_observable(lambda x: anp.arctanh(x[0]), [self])
@ -917,17 +920,17 @@ class CObs:
"""Class for a complex valued observable."""
__slots__ = ['_real', '_imag', 'tag']
def __init__(self, real, imag=0.0):
def __init__(self, real: Obs, imag: Union[Obs, float, int]=0.0):
self._real = real
self._imag = imag
self.tag = None
@property
def real(self):
def real(self) -> Obs:
return self._real
@property
def imag(self):
def imag(self) -> Union[Obs, float, int]:
return self._imag
def gamma_method(self, **kwargs):
@ -937,14 +940,14 @@ class CObs:
if isinstance(self.imag, Obs):
self.imag.gamma_method(**kwargs)
def is_zero(self):
def is_zero(self) -> bool:
"""Checks whether both real and imaginary part are zero within machine precision."""
return self.real == 0.0 and self.imag == 0.0
def conjugate(self):
def conjugate(self) -> "CObs":
return CObs(self.real, -self.imag)
def __add__(self, other):
def __add__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return other + self
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -953,10 +956,10 @@ class CObs:
else:
return CObs(self.real + other, self.imag)
def __radd__(self, y):
def __radd__(self, y: Union[complex, float, Obs, int]) -> "CObs":
return self + y
def __sub__(self, other):
def __sub__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return -1 * (other - self)
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -964,10 +967,10 @@ class CObs:
else:
return CObs(self.real - other, self.imag)
def __rsub__(self, other):
def __rsub__(self, other: Union[complex, float, Obs, int]) -> "CObs":
return -1 * (self - other)
def __mul__(self, other):
def __mul__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return other * self
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -986,10 +989,10 @@ class CObs:
else:
return CObs(self.real * other, self.imag * other)
def __rmul__(self, other):
def __rmul__(self, other: Union[complex, Obs, float, int]) -> "CObs":
return self * other
def __truediv__(self, other):
def __truediv__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return 1 / (other / self)
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -998,32 +1001,32 @@ class CObs:
else:
return CObs(self.real / other, self.imag / other)
def __rtruediv__(self, other):
def __rtruediv__(self, other: Union[complex, float, Obs, int]) -> "CObs":
r = self.real ** 2 + self.imag ** 2
if hasattr(other, 'real') and hasattr(other, 'imag'):
return CObs((self.real * other.real + self.imag * other.imag) / r, (self.real * other.imag - self.imag * other.real) / r)
else:
return CObs(self.real * other / r, -self.imag * other / r)
def __abs__(self):
def __abs__(self) -> Obs:
return np.sqrt(self.real**2 + self.imag**2)
def __pos__(self):
def __pos__(self) -> "CObs":
return self
def __neg__(self):
def __neg__(self) -> "CObs":
return -1 * self
def __eq__(self, other):
def __eq__(self, other: Union[CObs, int]) -> bool:
return self.real == other.real and self.imag == other.imag
def __str__(self):
def __str__(self) -> str:
return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)'
def __repr__(self):
def __repr__(self) -> str:
return 'CObs[' + str(self) + ']'
def __format__(self, format_type):
def __format__(self, format_type: str) -> str:
if format_type == "":
significance = 2
format_type = "2"
@ -1032,7 +1035,7 @@ class CObs:
return f"({self.real:{format_type}}{self.imag:+{significance}}j)"
def gamma_method(x, **kwargs):
def gamma_method(x: Union[Corr, Obs, ndarray, List[Obs]], **kwargs) -> ndarray:
"""Vectorized version of the gamma_method applicable to lists or arrays of Obs.
See docstring of pe.Obs.gamma_method for details.
@ -1043,7 +1046,7 @@ def gamma_method(x, **kwargs):
gm = gamma_method
def _format_uncertainty(value, dvalue, significance=2):
def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float, float64, int], significance: int=2) -> str:
"""Creates a string of a value and its error in paranthesis notation, e.g., 13.02(45)"""
if dvalue == 0.0 or (not np.isfinite(dvalue)):
return str(value)
@ -1060,7 +1063,7 @@ def _format_uncertainty(value, dvalue, significance=2):
return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})"
def _expand_deltas(deltas, idx, shape, gapsize):
def _expand_deltas(deltas: ndarray, idx: Union[range, List[int], List[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray:
"""Expand deltas defined on idx to a regular range with spacing gapsize between two
configurations and where holes are filled by 0.
If idx is of type range, the deltas are not changed if the idx.step == gapsize.
@ -1086,7 +1089,7 @@ def _expand_deltas(deltas, idx, shape, gapsize):
return ret
def _merge_idx(idl):
def _merge_idx(idl: List[Union[List[Union[int64, int]], range, List[int]]]) -> Union[List[Union[int64, int]], range, List[int]]:
"""Returns the union of all lists in idl as range or sorted list
Parameters
@ -1109,7 +1112,7 @@ def _merge_idx(idl):
return idunion
def _intersection_idx(idl):
def _intersection_idx(idl: List[Union[range, List[int]]]) -> Union[range, List[int]]:
"""Returns the intersection of all lists in idl as range or sorted list
Parameters
@ -1135,7 +1138,7 @@ def _intersection_idx(idl):
return idinter
def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor):
def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, List[int]], shape: int, new_idx: Union[range, List[int]], scalefactor: Union[float, int]) -> ndarray:
"""Expand deltas defined on idx to the list of configs that is defined by new_idx.
New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest
common divisor of the step sizes is used as new step size.
@ -1167,7 +1170,7 @@ def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor):
return np.array([ret[new_idx[i] - new_idx[0]] for i in range(len(new_idx))]) * len(new_idx) / len(idx) * scalefactor
def derived_observable(func, data, array_mode=False, **kwargs):
def derived_observable(func: Callable, data: Any, array_mode: bool=False, **kwargs) -> Union[Obs, ndarray]:
"""Construct a derived Obs according to func(data, **kwargs) using automatic differentiation.
Parameters
@ -1357,7 +1360,7 @@ def derived_observable(func, data, array_mode=False, **kwargs):
return final_result
def _reduce_deltas(deltas, idx_old, idx_new):
def _reduce_deltas(deltas: Union[List[float], ndarray], idx_old: Union[range, List[int]], idx_new: Union[range, List[int], ndarray]) -> Union[List[float], ndarray]:
"""Extract deltas defined on idx_old on all configs of idx_new.
Assumes, that idx_old and idx_new are correctly defined idl, i.e., they
@ -1386,7 +1389,7 @@ def _reduce_deltas(deltas, idx_old, idx_new):
return np.array(deltas)[indices]
def reweight(weight, obs, **kwargs):
def reweight(weight: Obs, obs: Union[ndarray, List[Obs]], **kwargs) -> List[Obs]:
"""Reweight a list of observables.
Parameters
@ -1428,7 +1431,7 @@ def reweight(weight, obs, **kwargs):
return result
def correlate(obs_a, obs_b):
def correlate(obs_a: Obs, obs_b: Obs) -> Obs:
"""Correlate two observables.
Parameters
@ -1471,7 +1474,7 @@ def correlate(obs_a, obs_b):
return o
def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs):
def covariance(obs: Union[ndarray, List[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray:
r'''Calculates the error covariance matrix of a set of observables.
WARNING: This function should be used with care, especially for observables with support on multiple
@ -1541,7 +1544,7 @@ def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs):
return cov
def invert_corr_cov_cholesky(corr, inverrdiag):
def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray:
"""Constructs a lower triangular matrix `chol` via the Cholesky decomposition of the correlation matrix `corr`
and then returns the inverse covariance matrix `chol_inv` as a lower triangular matrix by solving `chol * x = inverrdiag`.
@ -1564,7 +1567,7 @@ def invert_corr_cov_cholesky(corr, inverrdiag):
return chol_inv
def sort_corr(corr, kl, yd):
def sort_corr(corr: ndarray, kl: List[str], yd: Dict[str, List[Obs]]) -> ndarray:
""" Reorders a correlation matrix to match the alphabetical order of its underlying y data.
The ordering of the input correlation matrix `corr` is given by the list of keys `kl`.
@ -1627,7 +1630,7 @@ def sort_corr(corr, kl, yd):
return corr_sorted
def _smooth_eigenvalues(corr, E):
def _smooth_eigenvalues(corr: ndarray, E: int) -> ndarray:
"""Eigenvalue smoothing as described in hep-lat/9412087
corr : np.ndarray
@ -1644,7 +1647,7 @@ def _smooth_eigenvalues(corr, E):
return vec @ np.diag(vals) @ vec.T
def _covariance_element(obs1, obs2):
def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]:
"""Estimates the covariance of two Obs objects, neglecting autocorrelations."""
def calc_gamma(deltas1, deltas2, idx1, idx2, new_idx):
@ -1704,7 +1707,7 @@ def _covariance_element(obs1, obs2):
return dvalue
def import_jackknife(jacks, name, idl=None):
def import_jackknife(jacks: ndarray, name: str, idl: Optional[List[range]]=None) -> Obs:
"""Imports jackknife samples and returns an Obs
Parameters
@ -1724,7 +1727,7 @@ def import_jackknife(jacks, name, idl=None):
return new_obs
def import_bootstrap(boots, name, random_numbers):
def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs:
"""Imports bootstrap samples and returns an Obs
Parameters
@ -1754,7 +1757,7 @@ def import_bootstrap(boots, name, random_numbers):
return ret
def merge_obs(list_of_obs):
def merge_obs(list_of_obs: List[Obs]) -> Obs:
"""Combine all observables in list_of_obs into one new observable
Parameters
@ -1784,7 +1787,7 @@ def merge_obs(list_of_obs):
return o
def cov_Obs(means, cov, name, grad=None):
def cov_Obs(means: Union[float64, int, List[float], float, List[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, List[Obs]]:
"""Create an Obs based on mean(s) and a covariance matrix
Parameters
@ -1827,7 +1830,7 @@ def cov_Obs(means, cov, name, grad=None):
return ol
def _determine_gap(o, e_content, e_name):
def _determine_gap(o: Obs, e_content: Dict[str, List[str]], e_name: str) -> Union[int64, int]:
gaps = []
for r_name in e_content[e_name]:
if isinstance(o.idl[r_name], range):
@ -1842,7 +1845,7 @@ def _determine_gap(o, e_content, e_name):
return gap
def _check_lists_equal(idl):
def _check_lists_equal(idl: List[Union[List[int], List[Union[int64, int]], range, ndarray]]):
'''
Use groupby to efficiently check whether all elements of idl are identical.
Returns True if all elements are equal, otherwise False.

View file

@ -1,10 +1,12 @@
from __future__ import annotations
import numpy as np
import scipy.optimize
from autograd import jacobian
from .obs import derived_observable
from typing import Callable, List, Union
def find_root(d, func, guess=1.0, **kwargs):
def find_root(d: Union[Obs, List[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs:
r'''Finds the root of the function func(x, d) where d is an `Obs`.
Parameters

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import scipy
import numpy as np
from autograd.extend import primitive, defvjp