This commit is contained in:
Fabian Joswig 2025-05-07 09:32:36 +02:00 committed by GitHub
commit f75637c692
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 652 additions and 556 deletions

View file

@ -21,6 +21,6 @@ jobs:
- name: flake8 Lint
uses: py-actions/flake8@v2
with:
ignore: "E501,W503"
ignore: "E501,W503,E252"
exclude: "__init__.py, input/__init__.py"
path: "pyerrors"

View file

@ -39,7 +39,7 @@ jobs:
run: |
uv pip install wheel --system
uv pip install . --system
uv pip install pytest pytest-cov pytest-benchmark hypothesis --system
uv pip install pytest pytest-cov pytest-benchmark hypothesis typing_extensions --system
uv pip freeze --system
- name: Run tests

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import warnings
from itertools import permutations
import numpy as np
@ -6,9 +7,11 @@ import matplotlib.pyplot as plt
import scipy.linalg
from .obs import Obs, reweight, correlate, CObs
from .misc import dump_object, _assert_equal_properties
from .fits import least_squares
from .fits import least_squares, Fit_result
from .roots import find_root
from . import linalg
from numpy import ndarray, ufunc
from typing import Any, Callable, Optional, Union
class Corr:
@ -42,7 +45,7 @@ class Corr:
__slots__ = ["content", "N", "T", "tag", "prange"]
def __init__(self, data_input, padding=[0, 0], prange=None):
def __init__(self, data_input: list[Obs, CObs], padding: list[int]=[0, 0], prange: Optional[list[int]]=None):
""" Initialize a Corr object.
Parameters
@ -73,7 +76,7 @@ class Corr:
T = data_input[0, 0].T
N = data_input.shape[0]
input_as_list = []
input_as_list: list[Union[None, ndarray]] = []
for t in range(T):
if any([(item.content[t] is None) for item in data_input.flatten()]):
if not all([(item.content[t] is None) for item in data_input.flatten()]):
@ -97,7 +100,7 @@ class Corr:
if all([isinstance(item, (Obs, CObs)) or item is None for item in data_input]):
_assert_equal_properties([o for o in data_input if o is not None])
self.content = [np.asarray([item]) if item is not None else None for item in data_input]
self.content: list[Union[None, ndarray]] = [np.asarray([item]) if item is not None else None for item in data_input]
self.N = 1
elif all([isinstance(item, np.ndarray) or item is None for item in data_input]) and any([isinstance(item, np.ndarray) for item in data_input]):
self.content = data_input
@ -119,14 +122,15 @@ class Corr:
self.T = len(self.content)
self.prange = prange
def __getitem__(self, idx):
def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, list[ndarray]]:
"""Return the content of timeslice idx"""
if self.content[idx] is None:
idx_content = self.content[idx]
if idx_content is None:
return None
elif len(self.content[idx]) == 1:
return self.content[idx][0]
elif len(idx_content) == 1:
return idx_content[0]
else:
return self.content[idx]
return idx_content
@property
def reweighted(self):
@ -151,7 +155,7 @@ class Corr:
gm = gamma_method
def projected(self, vector_l=None, vector_r=None, normalize=False):
def projected(self, vector_l: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, vector_r: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, normalize: bool=False) -> "Corr":
"""We need to project the Correlator with a Vector to get a single value at each timeslice.
The method can use one or two vectors.
@ -163,7 +167,7 @@ class Corr:
if vector_l is None:
vector_l, vector_r = np.asarray([1.] + (self.N - 1) * [0.]), np.asarray([1.] + (self.N - 1) * [0.])
elif (vector_r is None):
elif vector_r is None:
vector_r = vector_l
if isinstance(vector_l, list) and not isinstance(vector_r, list):
if len(vector_l) != self.T:
@ -174,7 +178,7 @@ class Corr:
raise ValueError("Length of vector list must be equal to T")
vector_l = [vector_l] * self.T
if not isinstance(vector_l, list):
if isinstance(vector_l, ndarray) and isinstance(vector_r, ndarray):
if not vector_l.shape == vector_r.shape == (self.N,):
raise ValueError("Vectors are of wrong shape!")
if normalize:
@ -190,7 +194,7 @@ class Corr:
newcontent = [None if (_check_for_none(self, self.content[t]) or vector_l[t] is None or vector_r[t] is None) else np.asarray([vector_l[t].T @ self.content[t] @ vector_r[t]]) for t in range(self.T)]
return Corr(newcontent)
def item(self, i, j):
def item(self, i: int, j: int) -> "Corr":
"""Picks the element [i,j] from every matrix and returns a correlator containing one Obs per timeslice.
Parameters
@ -205,7 +209,7 @@ class Corr:
newcontent = [None if (item is None) else item[i, j] for item in self.content]
return Corr(newcontent)
def plottable(self):
def plottable(self) -> tuple[list, list, list]:
"""Outputs the correlator in a plotable format.
Outputs three lists containing the timeslice index, the value on each
@ -219,7 +223,7 @@ class Corr:
return x_list, y_list, y_err_list
def symmetric(self):
def symmetric(self) -> "Corr":
""" Symmetrize the correlator around x0=0."""
if self.N != 1:
raise ValueError('symmetric cannot be safely applied to multi-dimensional correlators.')
@ -236,11 +240,11 @@ class Corr:
newcontent.append(None)
else:
newcontent.append(0.5 * (self.content[t] + self.content[self.T - t]))
if (all([x is None for x in newcontent])):
if all([x is None for x in newcontent]):
raise ValueError("Corr could not be symmetrized: No redundant values")
return Corr(newcontent, prange=self.prange)
def anti_symmetric(self):
def anti_symmetric(self) -> "Corr":
"""Anti-symmetrize the correlator around x0=0."""
if self.N != 1:
raise TypeError('anti_symmetric cannot be safely applied to multi-dimensional correlators.')
@ -277,11 +281,11 @@ class Corr:
return False
return True
def trace(self):
def trace(self) -> "Corr":
"""Calculates the per-timeslice trace of a correlator matrix."""
if self.N == 1:
raise ValueError("Only works for correlator matrices.")
newcontent = []
newcontent: list[Union[None, Obs, CObs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]):
newcontent.append(None)
@ -289,7 +293,7 @@ class Corr:
newcontent.append(np.trace(self.content[t]))
return Corr(newcontent)
def matrix_symmetric(self):
def matrix_symmetric(self) -> "Corr":
"""Symmetrizes the correlator matrices on every timeslice."""
if self.N == 1:
raise ValueError("Trying to symmetrize a correlator matrix, that already has N=1.")
@ -299,7 +303,7 @@ class Corr:
transposed = [None if _check_for_none(self, G) else G.T for G in self.content]
return 0.5 * (Corr(transposed) + self)
def GEVP(self, t0, ts=None, sort="Eigenvalue", vector_obs=False, **kwargs):
def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[list[list[Optional[ndarray]]], ndarray, list[Optional[ndarray]]]:
r'''Solve the generalized eigenvalue problem on the correlator matrix and returns the corresponding eigenvectors.
The eigenvectors are sorted according to the descending eigenvalues, the zeroth eigenvector(s) correspond to the
@ -405,7 +409,7 @@ class Corr:
else:
return reordered_vecs
def Eigenvalue(self, t0, ts=None, state=0, sort="Eigenvalue", **kwargs):
def Eigenvalue(self, t0: int, ts: None=None, state: int=0, sort: str="Eigenvalue", **kwargs) -> "Corr":
"""Determines the eigenvalue of the GEVP by solving and projecting the correlator
Parameters
@ -418,7 +422,7 @@ class Corr:
vec = self.GEVP(t0, ts=ts, sort=sort, **kwargs)[state]
return self.projected(vec)
def Hankel(self, N, periodic=False):
def Hankel(self, N: int, periodic: bool=False) -> "Corr":
"""Constructs an NxN Hankel matrix
C(t) c(t+1) ... c(t+n-1)
@ -459,7 +463,7 @@ class Corr:
return Corr(new_content)
def roll(self, dt):
def roll(self, dt: int) -> "Corr":
"""Periodically shift the correlator by dt timeslices
Parameters
@ -469,11 +473,11 @@ class Corr:
"""
return Corr(list(np.roll(np.array(self.content, dtype=object), dt, axis=0)))
def reverse(self):
def reverse(self) -> "Corr":
"""Reverse the time ordering of the Corr"""
return Corr(self.content[:: -1])
def thin(self, spacing=2, offset=0):
def thin(self, spacing: int=2, offset: int=0) -> "Corr":
"""Thin out a correlator to suppress correlations
Parameters
@ -483,7 +487,7 @@ class Corr:
offset : int
Offset the equal spacing
"""
new_content = []
new_content: list[Union[None, list, ndarray]] = []
for t in range(self.T):
if (offset + t) % spacing != 0:
new_content.append(None)
@ -491,7 +495,7 @@ class Corr:
new_content.append(self.content[t])
return Corr(new_content)
def correlate(self, partner):
def correlate(self, partner: Union[Corr, float, Obs]) -> "Corr":
"""Correlate the correlator with another correlator or Obs
Parameters
@ -503,7 +507,7 @@ class Corr:
"""
if self.N != 1:
raise ValueError("Only one-dimensional correlators can be safely correlated.")
new_content = []
new_content: list[Union[None, ndarray]] = []
for x0, t_slice in enumerate(self.content):
if _check_for_none(self, t_slice):
new_content.append(None)
@ -520,7 +524,7 @@ class Corr:
return Corr(new_content)
def reweight(self, weight, **kwargs):
def reweight(self, weight: Obs, **kwargs) -> "Corr":
"""Reweight the correlator.
Parameters
@ -535,7 +539,7 @@ class Corr:
"""
if self.N != 1:
raise Exception("Reweighting only implemented for one-dimensional correlators.")
new_content = []
new_content: list[Union[None, ndarray]] = []
for t_slice in self.content:
if _check_for_none(self, t_slice):
new_content.append(None)
@ -543,7 +547,7 @@ class Corr:
new_content.append(np.array(reweight(weight, t_slice, **kwargs)))
return Corr(new_content)
def T_symmetry(self, partner, parity=+1):
def T_symmetry(self, partner: "Corr", parity: int=+1) -> "Corr":
"""Return the time symmetry average of the correlator and its partner
Parameters
@ -573,7 +577,7 @@ class Corr:
return (self + T_partner) / 2
def deriv(self, variant="symmetric"):
def deriv(self, variant: Optional[str]="symmetric") -> "Corr":
"""Return the first derivative of the correlator with respect to x0.
Parameters
@ -584,8 +588,8 @@ class Corr:
"""
if self.N != 1:
raise ValueError("deriv only implemented for one-dimensional correlators.")
newcontent: list[Union[None, ndarray, Obs]] = []
if variant == "symmetric":
newcontent = []
for t in range(1, self.T - 1):
if (self.content[t - 1] is None) or (self.content[t + 1] is None):
newcontent.append(None)
@ -595,7 +599,6 @@ class Corr:
raise ValueError('Derivative is undefined at all timeslices')
return Corr(newcontent, padding=[1, 1])
elif variant == "forward":
newcontent = []
for t in range(self.T - 1):
if (self.content[t] is None) or (self.content[t + 1] is None):
newcontent.append(None)
@ -605,7 +608,6 @@ class Corr:
raise ValueError("Derivative is undefined at all timeslices")
return Corr(newcontent, padding=[0, 1])
elif variant == "backward":
newcontent = []
for t in range(1, self.T):
if (self.content[t - 1] is None) or (self.content[t] is None):
newcontent.append(None)
@ -615,7 +617,6 @@ class Corr:
raise ValueError("Derivative is undefined at all timeslices")
return Corr(newcontent, padding=[1, 0])
elif variant == "improved":
newcontent = []
for t in range(2, self.T - 2):
if (self.content[t - 2] is None) or (self.content[t - 1] is None) or (self.content[t + 1] is None) or (self.content[t + 2] is None):
newcontent.append(None)
@ -625,7 +626,6 @@ class Corr:
raise ValueError('Derivative is undefined at all timeslices')
return Corr(newcontent, padding=[2, 2])
elif variant == 'log':
newcontent = []
for t in range(self.T):
if (self.content[t] is None) or (self.content[t] <= 0):
newcontent.append(None)
@ -638,7 +638,7 @@ class Corr:
else:
raise ValueError("Unknown variant.")
def second_deriv(self, variant="symmetric"):
def second_deriv(self, variant: Optional[str]="symmetric") -> "Corr":
r"""Return the second derivative of the correlator with respect to x0.
Parameters
@ -657,8 +657,8 @@ class Corr:
"""
if self.N != 1:
raise ValueError("second_deriv only implemented for one-dimensional correlators.")
newcontent: list[Union[None, ndarray, Obs]] = []
if variant == "symmetric":
newcontent = []
for t in range(1, self.T - 1):
if (self.content[t - 1] is None) or (self.content[t + 1] is None):
newcontent.append(None)
@ -668,7 +668,6 @@ class Corr:
raise ValueError("Derivative is undefined at all timeslices")
return Corr(newcontent, padding=[1, 1])
elif variant == "big_symmetric":
newcontent = []
for t in range(2, self.T - 2):
if (self.content[t - 2] is None) or (self.content[t + 2] is None):
newcontent.append(None)
@ -678,7 +677,6 @@ class Corr:
raise ValueError("Derivative is undefined at all timeslices")
return Corr(newcontent, padding=[2, 2])
elif variant == "improved":
newcontent = []
for t in range(2, self.T - 2):
if (self.content[t - 2] is None) or (self.content[t - 1] is None) or (self.content[t] is None) or (self.content[t + 1] is None) or (self.content[t + 2] is None):
newcontent.append(None)
@ -688,7 +686,6 @@ class Corr:
raise ValueError("Derivative is undefined at all timeslices")
return Corr(newcontent, padding=[2, 2])
elif variant == 'log':
newcontent = []
for t in range(self.T):
if (self.content[t] is None) or (self.content[t] <= 0):
newcontent.append(None)
@ -701,7 +698,7 @@ class Corr:
else:
raise ValueError("Unknown variant.")
def m_eff(self, variant='log', guess=1.0):
def m_eff(self, variant: str='log', guess: float=1.0) -> "Corr":
"""Returns the effective mass of the correlator as correlator object
Parameters
@ -718,8 +715,8 @@ class Corr:
"""
if self.N != 1:
raise Exception('Correlator must be projected before getting m_eff')
newcontent: list[Union[None, Obs]] = []
if variant == 'log':
newcontent = []
for t in range(self.T - 1):
if ((self.content[t] is None) or (self.content[t + 1] is None)) or (self.content[t + 1][0].value == 0):
newcontent.append(None)
@ -733,7 +730,6 @@ class Corr:
return np.log(Corr(newcontent, padding=[0, 1]))
elif variant == 'logsym':
newcontent = []
for t in range(1, self.T - 1):
if ((self.content[t - 1] is None) or (self.content[t + 1] is None)) or (self.content[t + 1][0].value == 0):
newcontent.append(None)
@ -755,7 +751,6 @@ class Corr:
def root_function(x, d):
return func(x * (t - self.T / 2)) / func(x * (t + 1 - self.T / 2)) - d
newcontent = []
for t in range(self.T - 1):
if (self.content[t] is None) or (self.content[t + 1] is None) or (self.content[t + 1][0].value == 0):
newcontent.append(None)
@ -772,7 +767,6 @@ class Corr:
return Corr(newcontent, padding=[0, 1])
elif variant == 'arccosh':
newcontent = []
for t in range(1, self.T - 1):
if (self.content[t] is None) or (self.content[t + 1] is None) or (self.content[t - 1] is None) or (self.content[t][0].value == 0):
newcontent.append(None)
@ -785,7 +779,7 @@ class Corr:
else:
raise ValueError('Unknown variant.')
def fit(self, function, fitrange=None, silent=False, **kwargs):
def fit(self, function: Callable, fitrange: Optional[list[int]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Fits function to the data
Parameters
@ -819,7 +813,7 @@ class Corr:
result = least_squares(xs, ys, function, silent=silent, **kwargs)
return result
def plateau(self, plateau_range=None, method="fit", auto_gamma=False):
def plateau(self, plateau_range: Optional[list[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs:
""" Extract a plateau value from a Corr object
Parameters
@ -856,7 +850,7 @@ class Corr:
else:
raise ValueError("Unsupported plateau method: " + method)
def set_prange(self, prange):
def set_prange(self, prange: list[int]):
"""Sets the attribute prange of the Corr object."""
if not len(prange) == 2:
raise ValueError("prange must be a list or array with two values")
@ -868,7 +862,7 @@ class Corr:
self.prange = prange
return
def show(self, x_range=None, comp=None, y_range=None, logscale=False, plateau=None, fit_res=None, fit_key=None, ylabel=None, save=None, auto_gamma=False, hide_sigma=None, references=None, title=None):
def show(self, x_range: Optional[list[int]]=None, comp: Optional[Corr]=None, y_range: Optional[list[int, float]]=None, logscale: bool=False, plateau: Optional[Obs, float, int]=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: Optional[str]=None, save: Optional[str]=None, auto_gamma: bool=False, hide_sigma: Optional[int, float]=None, references: Optional[list[float]]=None, title: Optional[str]=None):
"""Plots the correlator using the tag of the correlator as label if available.
Parameters
@ -993,7 +987,7 @@ class Corr:
else:
raise TypeError("'save' has to be a string.")
def spaghetti_plot(self, logscale=True):
def spaghetti_plot(self, logscale: bool=True):
"""Produces a spaghetti plot of the correlator suited to monitor exceptional configurations.
Parameters
@ -1022,7 +1016,7 @@ class Corr:
plt.title(name)
plt.draw()
def dump(self, filename, datatype="json.gz", **kwargs):
def dump(self, filename: str, datatype: str="json.gz", **kwargs):
"""Dumps the Corr into a file of chosen type
Parameters
----------
@ -1046,10 +1040,10 @@ class Corr:
else:
raise ValueError("Unknown datatype " + str(datatype))
def print(self, print_range=None):
def print(self, print_range: Optional[list[int]]=None):
print(self.__repr__(print_range))
def __repr__(self, print_range=None):
def __repr__(self, print_range: Optional[list[int]]=None) -> str:
if print_range is None:
print_range = [0, None]
@ -1074,7 +1068,7 @@ class Corr:
content_string += '\n'
return content_string
def __str__(self):
def __str__(self) -> str:
return self.__repr__()
# We define the basic operations, that can be performed with correlators.
@ -1084,18 +1078,18 @@ class Corr:
__array_priority__ = 10000
def __eq__(self, y):
def __eq__(self, y: Any) -> ndarray:
if isinstance(y, Corr):
comp = np.asarray(y.content, dtype=object)
else:
comp = np.asarray(y)
return np.asarray(self.content, dtype=object) == comp
def __add__(self, y):
def __add__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
if isinstance(y, Corr):
if ((self.N != y.N) or (self.T != y.T)):
raise ValueError("Addition of Corrs with different shape")
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]):
newcontent.append(None)
@ -1104,7 +1098,7 @@ class Corr:
return Corr(newcontent)
elif isinstance(y, (Obs, int, float, CObs, complex)):
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]):
newcontent.append(None)
@ -1119,11 +1113,11 @@ class Corr:
else:
raise TypeError("Corr + wrong type")
def __mul__(self, y):
def __mul__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
if isinstance(y, Corr):
if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T):
raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T")
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]):
newcontent.append(None)
@ -1147,13 +1141,13 @@ class Corr:
else:
raise TypeError("Corr * wrong type")
def __matmul__(self, y):
def __matmul__(self, y: Union[Corr, ndarray]) -> "Corr":
if isinstance(y, np.ndarray):
if y.ndim != 2 or y.shape[0] != y.shape[1]:
raise ValueError("Can only multiply correlators by square matrices.")
if not self.N == y.shape[0]:
raise ValueError("matmul: mismatch of matrix dimensions")
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]):
newcontent.append(None)
@ -1174,13 +1168,13 @@ class Corr:
else:
return NotImplemented
def __rmatmul__(self, y):
def __rmatmul__(self, y: ndarray) -> "Corr":
if isinstance(y, np.ndarray):
if y.ndim != 2 or y.shape[0] != y.shape[1]:
raise ValueError("Can only multiply correlators by square matrices.")
if not self.N == y.shape[0]:
raise ValueError("matmul: mismatch of matrix dimensions")
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]):
newcontent.append(None)
@ -1190,11 +1184,11 @@ class Corr:
else:
return NotImplemented
def __truediv__(self, y):
def __truediv__(self, y: Union[Corr, Obs, CObs, int, float, ndarray]) -> "Corr":
if isinstance(y, Corr):
if not ((self.N == 1 or y.N == 1 or self.N == y.N) and self.T == y.T):
raise ValueError("Multiplication of Corr object requires N=N or N=1 and T=T")
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]) or _check_for_none(y, y.content[t]):
newcontent.append(None)
@ -1229,7 +1223,7 @@ class Corr:
elif isinstance(y, (int, float)):
if y == 0:
raise ValueError('Division by zero will return undefined correlator')
newcontent = []
newcontent: list[Union[None, ndarray, Obs]] = []
for t in range(self.T):
if _check_for_none(self, self.content[t]):
newcontent.append(None)
@ -1244,37 +1238,37 @@ class Corr:
else:
raise TypeError('Corr / wrong type')
def __neg__(self):
def __neg__(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else -1. * item for item in self.content]
return Corr(newcontent, prange=self.prange)
def __sub__(self, y):
def __sub__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
return self + (-y)
def __pow__(self, y):
def __pow__(self, y: Union[Obs, CObs, float, int]) -> "Corr":
if isinstance(y, (Obs, int, float, CObs)):
newcontent = [None if _check_for_none(self, item) else item**y for item in self.content]
return Corr(newcontent, prange=self.prange)
else:
raise TypeError('Type of exponent not supported')
def __abs__(self):
def __abs__(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.abs(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
# The numpy functions:
def sqrt(self):
def sqrt(self) -> "Corr":
return self ** 0.5
def log(self):
def log(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.log(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
def exp(self):
def exp(self) -> "Corr":
newcontent = [None if _check_for_none(self, item) else np.exp(item) for item in self.content]
return Corr(newcontent, prange=self.prange)
def _apply_func_to_corr(self, func):
def _apply_func_to_corr(self, func: Union[Callable, ufunc]) -> "Corr":
newcontent = [None if _check_for_none(self, item) else func(item) for item in self.content]
for t in range(self.T):
if _check_for_none(self, newcontent[t]):
@ -1287,57 +1281,57 @@ class Corr:
raise ValueError('Operation returns undefined correlator')
return Corr(newcontent)
def sin(self):
def sin(self) -> "Corr":
return self._apply_func_to_corr(np.sin)
def cos(self):
def cos(self) -> "Corr":
return self._apply_func_to_corr(np.cos)
def tan(self):
def tan(self) -> "Corr":
return self._apply_func_to_corr(np.tan)
def sinh(self):
def sinh(self) -> "Corr":
return self._apply_func_to_corr(np.sinh)
def cosh(self):
def cosh(self) -> "Corr":
return self._apply_func_to_corr(np.cosh)
def tanh(self):
def tanh(self) -> "Corr":
return self._apply_func_to_corr(np.tanh)
def arcsin(self):
def arcsin(self) -> "Corr":
return self._apply_func_to_corr(np.arcsin)
def arccos(self):
def arccos(self) -> "Corr":
return self._apply_func_to_corr(np.arccos)
def arctan(self):
def arctan(self) -> "Corr":
return self._apply_func_to_corr(np.arctan)
def arcsinh(self):
def arcsinh(self) -> "Corr":
return self._apply_func_to_corr(np.arcsinh)
def arccosh(self):
def arccosh(self) -> "Corr":
return self._apply_func_to_corr(np.arccosh)
def arctanh(self):
def arctanh(self) -> "Corr":
return self._apply_func_to_corr(np.arctanh)
# Right hand side operations (require tweak in main module to work)
def __radd__(self, y):
def __radd__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
return self + y
def __rsub__(self, y):
def __rsub__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
return -self + y
def __rmul__(self, y):
def __rmul__(self, y: Union[Corr, Obs, CObs, int, float, complex, ndarray]) -> "Corr":
return self * y
def __rtruediv__(self, y):
def __rtruediv__(self, y: Union[Corr, Obs, CObs, int, float, ndarray]) -> "Corr":
return (self / y) ** (-1)
@property
def real(self):
def real(self) -> "Corr":
def return_real(obs_OR_cobs):
if isinstance(obs_OR_cobs.flatten()[0], CObs):
return np.vectorize(lambda x: x.real)(obs_OR_cobs)
@ -1347,7 +1341,7 @@ class Corr:
return self._apply_func_to_corr(return_real)
@property
def imag(self):
def imag(self) -> "Corr":
def return_imag(obs_OR_cobs):
if isinstance(obs_OR_cobs.flatten()[0], CObs):
return np.vectorize(lambda x: x.imag)(obs_OR_cobs)
@ -1356,7 +1350,7 @@ class Corr:
return self._apply_func_to_corr(return_imag)
def prune(self, Ntrunc, tproj=3, t0proj=2, basematrix=None):
def prune(self, Ntrunc: int, tproj: int=3, t0proj: int=2, basematrix: Optional[ndarray]=None) -> "Corr":
r''' Project large correlation matrix to lowest states
This method can be used to reduce the size of an (N x N) correlation matrix
@ -1414,7 +1408,7 @@ class Corr:
return Corr(newcontent)
def _sort_vectors(vec_set_in, ts):
def _sort_vectors(vec_set_in: list[Optional[ndarray]], ts: int) -> list[Union[None, ndarray, list[ndarray]]]:
"""Helper function used to find a set of Eigenvectors consistent over all timeslices"""
if isinstance(vec_set_in[ts][0][0], Obs):
@ -1446,12 +1440,12 @@ def _sort_vectors(vec_set_in, ts):
return sorted_vec_set
def _check_for_none(corr, entry):
def _check_for_none(corr: Corr, entry: Optional[ndarray]) -> bool:
"""Checks if entry for correlator corr is None"""
return len(list(filter(None, np.asarray(entry).flatten()))) < corr.N ** 2
def _GEVP_solver(Gt, G0, method='eigh', chol_inv=None):
def _GEVP_solver(Gt: ndarray, G0: ndarray, method: str='eigh', chol_inv: Optional[ndarray]=None) -> ndarray:
r"""Helper function for solving the GEVP and sorting the eigenvectors.
Solves $G(t)v_i=\lambda_i G(t_0)v_i$ and returns the eigenvectors v_i

View file

@ -1,9 +1,12 @@
from __future__ import annotations
import numpy as np
from numpy import ndarray
from typing import Optional, Union
class Covobs:
def __init__(self, mean, cov, name, pos=None, grad=None):
def __init__(self, mean: Union[float, int], cov: Union[list, ndarray], name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, list[float]]]=None):
""" Initialize Covobs object.
Parameters
@ -39,12 +42,12 @@ class Covobs:
self._set_grad(grad)
self.value = mean
def errsq(self):
def errsq(self) -> float:
""" Return the variance (= square of the error) of the Covobs
"""
return np.dot(np.transpose(self.grad), np.dot(self.cov, self.grad)).item()
def _set_cov(self, cov):
def _set_cov(self, cov: Union[list, ndarray]):
""" Set the covariance matrix of the covobs
Parameters
@ -79,7 +82,7 @@ class Covobs:
if ev < 0:
raise Exception('Covariance matrix is not positive-semidefinite!')
def _set_grad(self, grad):
def _set_grad(self, grad: Union[list[float], ndarray]):
""" Set the gradient of the covobs
Parameters
@ -96,9 +99,9 @@ class Covobs:
raise Exception('Invalid dimension of grad!')
@property
def cov(self):
def cov(self) -> ndarray:
return self._cov
@property
def grad(self):
def grad(self) -> ndarray:
return self._grad

View file

@ -1,4 +1,6 @@
from __future__ import annotations
import numpy as np
from numpy import ndarray
gammaX = np.array(
@ -22,7 +24,7 @@ identity = np.array(
dtype=complex)
def epsilon_tensor(i, j, k):
def epsilon_tensor(i: int, j: int, k: int) -> float:
"""Rank-3 epsilon tensor
Based on https://codegolf.stackexchange.com/a/160375
@ -39,7 +41,7 @@ def epsilon_tensor(i, j, k):
return (i - j) * (j - k) * (k - i) / 2
def epsilon_tensor_rank4(i, j, k, o):
def epsilon_tensor_rank4(i: int, j: int, k: int, o: int) -> float:
"""Rank-4 epsilon tensor
Extension of https://codegolf.stackexchange.com/a/160375
@ -57,7 +59,7 @@ def epsilon_tensor_rank4(i, j, k, o):
return (i - j) * (j - k) * (k - i) * (i - o) * (j - o) * (o - k) / 12
def Grid_gamma(gamma_tag):
def Grid_gamma(gamma_tag: str) -> ndarray:
"""Returns gamma matrix in Grid labeling."""
if gamma_tag == 'Identity':
g = identity

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import gc
from collections.abc import Sequence
import warnings
@ -15,6 +16,8 @@ from autograd import elementwise_grad as egrad
from numdifftools import Jacobian as num_jacobian
from numdifftools import Hessian as num_hessian
from .obs import Obs, derived_observable, covariance, cov_Obs, invert_corr_cov_cholesky
from numpy import ndarray
from typing import Any, Callable, Optional, Union
class Fit_result(Sequence):
@ -33,13 +36,31 @@ class Fit_result(Sequence):
Hotelling t-squared p-value for correlated fits.
"""
def __init__(self):
self.fit_parameters = None
def __init__(self) -> None:
self.fit_parameters: Optional[list] = None
self.fit_function: Optional[Union[Callable, dict[str, Callable]]] = None
self.priors: Optional[Union[list[Obs], dict[int, Obs]]] = None
self.method: Optional[str] = None
self.iterations: Optional[int] = None
self.chisquare: Optional[float] = None
self.odr_chisquare: Optional[float] = None
self.dof: Optional[int] = None
self.p_value: Optional[float] = None
self.message: Optional[str] = None
self.t2_p_value: Optional[float] = None
self.chisquare_by_dof: Optional[float] = None
self.chisquare_by_expected_chisquare: Optional[float] = None
self.residual_variance: Optional[float] = None
self.xplus: Optional[float] = None
def __getitem__(self, idx):
def __getitem__(self, idx: int) -> Obs:
if self.fit_parameters is None:
raise TypeError('No fit parameters available.')
return self.fit_parameters[idx]
def __len__(self):
def __len__(self) -> int:
if self.fit_parameters is None:
raise TypeError('No fit parameters available.')
return len(self.fit_parameters)
def gamma_method(self, **kwargs):
@ -48,29 +69,31 @@ class Fit_result(Sequence):
gm = gamma_method
def __str__(self):
def __str__(self) -> str:
my_str = 'Goodness of fit:\n'
if hasattr(self, 'chisquare_by_dof'):
if self.chisquare_by_dof is not None:
my_str += '\u03C7\u00b2/d.o.f. = ' + f'{self.chisquare_by_dof:2.6f}' + '\n'
elif hasattr(self, 'residual_variance'):
elif self.residual_variance is not None:
my_str += 'residual variance = ' + f'{self.residual_variance:2.6f}' + '\n'
if hasattr(self, 'chisquare_by_expected_chisquare'):
if self.chisquare_by_expected_chisquare is not None:
my_str += '\u03C7\u00b2/\u03C7\u00b2exp = ' + f'{self.chisquare_by_expected_chisquare:2.6f}' + '\n'
if hasattr(self, 'p_value'):
if self.p_value is not None:
my_str += 'p-value = ' + f'{self.p_value:2.4f}' + '\n'
if hasattr(self, 't2_p_value'):
if self.t2_p_value is not None:
my_str += 't\u00B2p-value = ' + f'{self.t2_p_value:2.4f}' + '\n'
my_str += 'Fit parameters:\n'
if self.fit_parameters is None:
raise TypeError('No fit parameters available.')
for i_par, par in enumerate(self.fit_parameters):
my_str += str(i_par) + '\t' + ' ' * int(par >= 0) + str(par).rjust(int(par < 0.0)) + '\n'
return my_str
def __repr__(self):
def __repr__(self) -> str:
m = max(map(len, list(self.__dict__.keys()))) + 1
return '\n'.join([key.rjust(m) + ': ' + repr(value) for key, value in sorted(self.__dict__.items())])
def least_squares(x, y, func, priors=None, silent=False, **kwargs):
def least_squares(x: Any, y: Union[dict[str, ndarray], list[Obs], ndarray, dict[str, list[Obs]]], func: Union[Callable, dict[str, Callable]], priors: Optional[Union[dict[int, str], list[str], list[Obs], dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x).
```
@ -335,9 +358,8 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
p_f = dp_f = np.array([])
prior_mask = []
loc_priors = []
if 'initial_guess' in kwargs:
x0 = kwargs.get('initial_guess')
x0 = kwargs.get('initial_guess')
if x0 is not None:
if len(x0) != n_parms:
raise ValueError('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms))
else:
@ -356,8 +378,8 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
return anp.sum(general_chisqfunc_uncorr(p, y_f, p_f) ** 2)
if kwargs.get('correlated_fit') is True:
if 'inv_chol_cov_matrix' in kwargs:
chol_inv = kwargs.get('inv_chol_cov_matrix')
chol_inv = kwargs.get('inv_chol_cov_matrix')
if chol_inv is not None:
if (chol_inv[0].shape[0] != len(dy_f)):
raise TypeError('The number of columns of the inverse covariance matrix handed over needs to be equal to the number of y errors.')
if (chol_inv[0].shape[0] != chol_inv[0].shape[1]):
@ -388,17 +410,17 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
if output.method != 'Levenberg-Marquardt':
if output.method == 'migrad':
tolerance = 1e-4 # default value of 1e-1 set by iminuit can be problematic
if 'tol' in kwargs:
tolerance = kwargs.get('tol')
tolerance = kwargs.get('tol')
if tolerance is None:
tolerance = 1e-4 # default value of 1e-1 set by iminuit can be problematic
fit_result = iminuit.minimize(chisqfunc_uncorr, x0, tol=tolerance) # Stopping criterion 0.002 * tol * errordef
if kwargs.get('correlated_fit') is True:
fit_result = iminuit.minimize(chisqfunc, fit_result.x, tol=tolerance)
output.iterations = fit_result.nfev
else:
tolerance = 1e-12
if 'tol' in kwargs:
tolerance = kwargs.get('tol')
tolerance = kwargs.get('tol')
if tolerance is None:
tolerance = 1e-12
fit_result = scipy.optimize.minimize(chisqfunc_uncorr, x0, method=kwargs.get('method'), tol=tolerance)
if kwargs.get('correlated_fit') is True:
fit_result = scipy.optimize.minimize(chisqfunc, fit_result.x, method=kwargs.get('method'), tol=tolerance)
@ -428,8 +450,8 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
if not fit_result.success:
raise Exception('The minimization procedure did not converge.')
output.chisquare = chisquare
output.dof = y_all.shape[-1] - n_parms + len(loc_priors)
output.chisquare = float(chisquare)
output.dof = int(y_all.shape[-1] - n_parms + len(loc_priors))
output.p_value = 1 - scipy.stats.chi2.cdf(output.chisquare, output.dof)
if output.dof > 0:
output.chisquare_by_dof = output.chisquare / output.dof
@ -505,7 +527,7 @@ def least_squares(x, y, func, priors=None, silent=False, **kwargs):
return output
def total_least_squares(x, y, func, silent=False, **kwargs):
def total_least_squares(x: list[Obs], y: list[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x) and returns a list of Obs corresponding to the fit parameters.
Parameters
@ -602,8 +624,8 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
if np.any(np.asarray(dy_f) <= 0.0):
raise Exception('No y errors available, run the gamma method first.')
if 'initial_guess' in kwargs:
x0 = kwargs.get('initial_guess')
x0 = kwargs.get('initial_guess')
if x0 is not None:
if len(x0) != n_parms:
raise Exception('Initial guess does not have the correct length: %d vs. %d' % (len(x0), n_parms))
else:
@ -709,7 +731,7 @@ def total_least_squares(x, y, func, silent=False, **kwargs):
return output
def fit_lin(x, y, **kwargs):
def fit_lin(x: Sequence[Union[Obs, int, float]], y: Sequence[Obs], **kwargs) -> list[Obs]:
"""Performs a linear fit to y = n + m * x and returns two Obs n, m.
Parameters
@ -740,7 +762,7 @@ def fit_lin(x, y, **kwargs):
raise TypeError('Unsupported types for x')
def qqplot(x, o_y, func, p, title=""):
def qqplot(x: ndarray, o_y: list[Obs], func: Callable, p: list[Obs], title: str=""):
"""Generates a quantile-quantile plot of the fit result which can be used to
check if the residuals of the fit are gaussian distributed.
@ -770,7 +792,7 @@ def qqplot(x, o_y, func, p, title=""):
plt.draw()
def residual_plot(x, y, func, fit_res, title=""):
def residual_plot(x: ndarray, y: list[Obs], func: Callable, fit_res: list[Obs], title: str=""):
"""Generates a plot which compares the fit to the data and displays the corresponding residuals
For uncorrelated data the residuals are expected to be distributed ~N(0,1).
@ -807,9 +829,20 @@ def residual_plot(x, y, func, fit_res, title=""):
plt.draw()
def error_band(x, func, beta):
def error_band(x: list[int], func: Callable, beta: Union[Fit_result, list[Obs]]) -> ndarray:
"""Calculate the error band for an array of sample values x, for given fit function func with optimized parameters beta.
Parameters
----------
x : list[int]
A list of sample points where the error band is evaluated.
func : Callable
The function representing the fit model.
beta : Union[Fit_result, list[Obs]]
Optimized fit parameters.
Returns
-------
err : np.array(Obs)
@ -831,7 +864,7 @@ def error_band(x, func, beta):
return err
def ks_test(objects=None):
def ks_test(objects: Optional[list[Fit_result]]=None):
"""Performs a KolmogorovSmirnov test for the p-values of all fit object.
Parameters
@ -875,7 +908,7 @@ def ks_test(objects=None):
print(scipy.stats.kstest(p_values, 'uniform'))
def _extract_val_and_dval(string):
def _extract_val_and_dval(string: str) -> tuple[float, float]:
split_string = string.split('(')
if '.' in split_string[0] and '.' not in split_string[1][:-1]:
factor = 10 ** -len(split_string[0].partition('.')[2])
@ -884,11 +917,13 @@ def _extract_val_and_dval(string):
return float(split_string[0]), float(split_string[1][:-1]) * factor
def _construct_prior_obs(i_prior, i_n):
def _construct_prior_obs(i_prior: Union[Obs, str], i_n: int) -> Obs:
if isinstance(i_prior, Obs):
return i_prior
elif isinstance(i_prior, str):
loc_val, loc_dval = _extract_val_and_dval(i_prior)
return cov_Obs(loc_val, loc_dval ** 2, '#prior' + str(i_n) + f"_{np.random.randint(2147483647):010d}")
prior_obs = cov_Obs(loc_val, loc_dval ** 2, '#prior' + str(i_n) + f"_{np.random.randint(2147483647):010d}")
assert isinstance(prior_obs, Obs)
return prior_obs
else:
raise TypeError("Prior entries need to be 'Obs' or 'str'.")

View file

@ -1,3 +1,4 @@
from __future__ import annotations
from collections import defaultdict
import gzip
import lxml.etree as et
@ -11,12 +12,15 @@ from ..obs import Obs
from ..obs import _merge_idx
from ..covobs import Covobs
from .. import version as pyerrorsversion
from lxml.etree import _Element
from numpy import ndarray
from typing import Any, Optional, Union
# Based on https://stackoverflow.com/a/10076823
def _etree_to_dict(t):
def _etree_to_dict(t: _Element) -> dict:
""" Convert the content of an XML file to a python dict"""
d = {t.tag: {} if t.attrib else None}
d: dict = {t.tag: {} if t.attrib else None}
children = list(t)
if children:
dd = defaultdict(list)
@ -38,7 +42,7 @@ def _etree_to_dict(t):
return d
def _dict_to_xmlstring(d):
def _dict_to_xmlstring(d: dict[str, Any]) -> str:
if isinstance(d, dict):
iters = ''
for k in d:
@ -66,7 +70,7 @@ def _dict_to_xmlstring(d):
return iters
def _dict_to_xmlstring_spaces(d, space=' '):
def _dict_to_xmlstring_spaces(d: dict, space: str=' ') -> str:
s = _dict_to_xmlstring(d)
o = ''
c = 0
@ -85,7 +89,7 @@ def _dict_to_xmlstring_spaces(d, space=' '):
return o
def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None):
def create_pobs_string(obsl: list[Obs], name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: Optional[str]=None) -> str:
"""Export a list of Obs or structures containing Obs to an xml string
according to the Zeuthen pobs format.
@ -113,7 +117,9 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None):
XML formatted string of the input data
"""
od = {}
if symbol is None:
symbol = []
od: dict[str, Any] = {}
ename = obsl[0].e_names[0]
names = list(obsl[0].deltas.keys())
nr = len(names)
@ -176,7 +182,7 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None):
return rs
def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz=True):
def write_pobs(obsl: list[Obs], fname: str, name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: Optional[str]=None, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .xml.gz file
according to the Zeuthen pobs format.
@ -206,6 +212,8 @@ def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz
-------
None
"""
if symbol is None:
symbol = []
pobsstring = create_pobs_string(obsl, name, spec, origin, symbol, enstag)
if not fname.endswith('.xml') and not fname.endswith('.gz'):
@ -215,38 +223,39 @@ def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz
if not fname.endswith('.gz'):
fname += '.gz'
fp = gzip.open(fname, 'wb')
fp.write(pobsstring.encode('utf-8'))
gp = gzip.open(fname, 'wb')
gp.write(pobsstring.encode('utf-8'))
gp.close()
else:
fp = open(fname, 'w', encoding='utf-8')
fp.write(pobsstring)
fp.close()
fp.close()
def _import_data(string):
def _import_data(string: str) -> list[Union[int, float]]:
return json.loads("[" + ",".join(string.replace(' +', ' ').split()) + "]")
def _check(condition):
def _check(condition: bool):
if not condition:
raise Exception("XML file format not supported")
class _NoTagInDataError(Exception):
"""Raised when tag is not in data"""
def __init__(self, tag):
def __init__(self, tag: str):
self.tag = tag
super().__init__('Tag %s not in data!' % (self.tag))
def _find_tag(dat, tag):
def _find_tag(dat: _Element, tag: str) -> int:
for i in range(len(dat)):
if dat[i].tag == tag:
return i
raise _NoTagInDataError(tag)
def _import_array(arr):
def _import_array(arr: _Element) -> Union[list[Union[str, list[int], list[ndarray]]], ndarray]:
name = arr[_find_tag(arr, 'id')].text.strip()
index = _find_tag(arr, 'layout')
try:
@ -284,12 +293,12 @@ def _import_array(arr):
_check(False)
def _import_rdata(rd):
def _import_rdata(rd: _Element) -> tuple[list[ndarray], str, list[int]]:
name, idx, mask, deltas = _import_array(rd)
return deltas, name, idx
def _import_cdata(cd):
def _import_cdata(cd: _Element) -> tuple[str, ndarray, ndarray]:
_check(cd[0].tag == "id")
_check(cd[1][0].text.strip() == "cov")
cov = _import_array(cd[1])
@ -297,7 +306,7 @@ def _import_cdata(cd):
return cd[0].text.strip(), cov, grad
def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
def read_pobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: None=None) -> Union[dict, list[Obs]]:
"""Import a list of Obs from an xml.gz file in the Zeuthen pobs format.
Tags are not written or recovered automatically.
@ -309,7 +318,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
full_output : bool
If True, a dict containing auxiliary information and the data is returned.
If False, only the data is returned as list.
separatior_insertion: str or int
separator_insertion: str or int
str: replace all occurences of "separator_insertion" within the replica names
by "|%s" % (separator_insertion) when constructing the names of the replica.
int: Insert the separator "|" at the position given by separator_insertion.
@ -329,8 +338,8 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
if gz:
if not fname.endswith('.gz'):
fname += '.gz'
with gzip.open(fname, 'r') as fin:
content = fin.read()
with gzip.open(fname, 'r') as gin:
content = gin.read()
else:
if fname.endswith('.gz'):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
@ -350,7 +359,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
deltas = []
names = []
idl = []
idl: list[list[int]] = []
for i in range(5, len(pobs)):
delta, name, idx = _import_rdata(pobs[i])
deltas.append(delta)
@ -397,7 +406,7 @@ def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
# this is based on Mattia Bruno's implementation at https://github.com/mbruno46/pyobs/blob/master/pyobs/IO/xml.py
def import_dobs_string(content, full_output=False, separator_insertion=True):
def import_dobs_string(content: bytes, full_output: bool=False, separator_insertion: bool=True) -> Union[dict, list[Obs]]:
"""Import a list of Obs from a string in the Zeuthen dobs format.
Tags are not written or recovered automatically.
@ -409,7 +418,7 @@ def import_dobs_string(content, full_output=False, separator_insertion=True):
full_output : bool
If True, a dict containing auxiliary information and the data is returned.
If False, only the data is returned as list.
separatior_insertion: str, int or bool
separator_insertion: str, int or bool
str: replace all occurences of "separator_insertion" within the replica names
by "|%s" % (separator_insertion) when constructing the names of the replica.
int: Insert the separator "|" at the position given by separator_insertion.
@ -572,7 +581,7 @@ def import_dobs_string(content, full_output=False, separator_insertion=True):
return res
def read_dobs(fname, full_output=False, gz=True, separator_insertion=True):
def read_dobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: bool=True) -> Union[dict, list[Obs]]:
"""Import a list of Obs from an xml.gz file in the Zeuthen dobs format.
Tags are not written or recovered automatically.
@ -619,7 +628,7 @@ def read_dobs(fname, full_output=False, gz=True, separator_insertion=True):
return import_dobs_string(content, full_output, separator_insertion=separator_insertion)
def _dobsdict_to_xmlstring(d):
def _dobsdict_to_xmlstring(d: dict[str, Any]) -> str:
if isinstance(d, dict):
iters = ''
for k in d:
@ -659,7 +668,7 @@ def _dobsdict_to_xmlstring(d):
return iters
def _dobsdict_to_xmlstring_spaces(d, space=' '):
def _dobsdict_to_xmlstring_spaces(d: dict, space: str=' ') -> str:
s = _dobsdict_to_xmlstring(d)
o = ''
c = 0
@ -678,7 +687,7 @@ def _dobsdict_to_xmlstring_spaces(d, space=' '):
return o
def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags=None):
def create_dobs_string(obsl: list[Obs], name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: Optional[str]=None, enstags: Optional[dict]=None) -> str:
"""Generate the string for the export of a list of Obs or structures containing Obs
to a .xml.gz file according to the Zeuthen dobs format.
@ -709,9 +718,11 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
xml_str : str
XML string generated from the data
"""
if symbol is None:
symbol = []
if enstags is None:
enstags = {}
od = {}
od: dict[str, Any] = {}
r_names = []
for o in obsl:
r_names += [name for name in o.names if name.split('|')[0] in o.mc_names]
@ -811,7 +822,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
ed[''].append(ad)
pd['edata'].append(ed)
allcov = {}
allcov: dict[str, ndarray] = {}
for o in obsl:
for cname in o.cov_names:
if cname in allcov:
@ -867,7 +878,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
return rs
def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags=None, gz=True):
def write_dobs(obsl: list[Obs], fname: str, name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: Optional[str]=None, enstags: Optional[dict]=None, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .xml.gz file
according to the Zeuthen dobs format.
@ -901,6 +912,8 @@ def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=No
-------
None
"""
if symbol is None:
symbol = []
if enstags is None:
enstags = {}
@ -913,9 +926,10 @@ def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=No
if not fname.endswith('.gz'):
fname += '.gz'
fp = gzip.open(fname, 'wb')
fp.write(dobsstring.encode('utf-8'))
gp = gzip.open(fname, 'wb')
gp.write(dobsstring.encode('utf-8'))
gp.close()
else:
fp = open(fname, 'w', encoding='utf-8')
fp.write(dobsstring)
fp.close()
fp.close()

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import rapidjson as json
import gzip
import getpass
@ -12,9 +13,10 @@ from ..covobs import Covobs
from ..correlators import Corr
from ..misc import _assert_equal_properties
from .. import version as pyerrorsversion
from typing import Any, Union
def create_json_string(ol, description='', indent=1):
def create_json_string(ol: list, description: Union[str, dict]='', indent: int=1) -> str:
"""Generate the string for the export of a list of Obs or structures containing Obs
to a .json(.gz) file
@ -166,7 +168,7 @@ def create_json_string(ol, description='', indent=1):
if not isinstance(ol, list):
ol = [ol]
d = {}
d: dict[str, Any] = {}
d['program'] = 'pyerrors %s' % (pyerrorsversion.__version__)
d['version'] = '1.1'
d['who'] = getpass.getuser()
@ -217,7 +219,7 @@ def create_json_string(ol, description='', indent=1):
return json.dumps(d, indent=indent, ensure_ascii=False, default=_jsonifier, write_mode=json.WM_COMPACT)
def dump_to_json(ol, fname, description='', indent=1, gz=True):
def dump_to_json(ol: list, fname: str, description: Union[str, dict]='', indent: int=1, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .json(.gz) file.
Dict keys that are not JSON-serializable such as floats are converted to strings.
@ -251,15 +253,16 @@ def dump_to_json(ol, fname, description='', indent=1, gz=True):
if not fname.endswith('.gz'):
fname += '.gz'
fp = gzip.open(fname, 'wb')
fp.write(jsonstring.encode('utf-8'))
gp = gzip.open(fname, 'wb')
gp.write(jsonstring.encode('utf-8'))
gp.close()
else:
fp = open(fname, 'w', encoding='utf-8')
fp.write(jsonstring)
fp.close()
fp.close()
def _parse_json_dict(json_dict, verbose=True, full_output=False):
def _parse_json_dict(json_dict: dict[str, Any], verbose: bool=True, full_output: bool=False) -> Any:
"""Reconstruct a list of Obs or structures containing Obs from a dict that
was built out of a json string.
@ -474,7 +477,7 @@ def _parse_json_dict(json_dict, verbose=True, full_output=False):
return ol
def import_json_string(json_string, verbose=True, full_output=False):
def import_json_string(json_string: str, verbose: bool=True, full_output: bool=False) -> Union[Obs, list[Obs], Corr]:
"""Reconstruct a list of Obs or structures containing Obs from a json string.
The following structures are supported: Obs, list, numpy.ndarray, Corr
@ -504,7 +507,7 @@ def import_json_string(json_string, verbose=True, full_output=False):
return _parse_json_dict(json.loads(json_string), verbose, full_output)
def load_json(fname, verbose=True, gz=True, full_output=False):
def load_json(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False) -> Any:
"""Import a list of Obs or structures containing Obs from a .json(.gz) file.
The following structures are supported: Obs, list, numpy.ndarray, Corr
@ -549,7 +552,7 @@ def load_json(fname, verbose=True, gz=True, full_output=False):
return _parse_json_dict(d, verbose, full_output)
def _ol_from_dict(ind, reps='DICTOBS'):
def _ol_from_dict(ind: dict, reps: str='DICTOBS') -> tuple[list, dict]:
"""Convert a dictionary of Obs objects to a list and a dictionary that contains
placeholders instead of the Obs objects.
@ -626,7 +629,7 @@ def _ol_from_dict(ind, reps='DICTOBS'):
return ol, nd
def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=True):
def dump_dict_to_json(od: dict, fname: str, description: Union[str, dict]='', indent: int=1, reps: str='DICTOBS', gz: bool=True):
"""Export a dict of Obs or structures containing Obs to a .json(.gz) file
Parameters
@ -666,7 +669,7 @@ def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=Tr
dump_to_json(ol, fname, description=desc_dict, indent=indent, gz=gz)
def _od_from_list_and_dict(ol, ind, reps='DICTOBS'):
def _od_from_list_and_dict(ol: list, ind: dict, reps: str='DICTOBS') -> dict[str, dict[str, Any]]:
"""Parse a list of Obs or structures containing Obs and an accompanying
dict, where the structures have been replaced by placeholders to a
dict that contains the structures.
@ -727,7 +730,7 @@ def _od_from_list_and_dict(ol, ind, reps='DICTOBS'):
return nd
def load_json_dict(fname, verbose=True, gz=True, full_output=False, reps='DICTOBS'):
def load_json_dict(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False, reps: str='DICTOBS') -> dict:
"""Import a dict of Obs or structures containing Obs from a .json(.gz) file.
The following structures are supported: Obs, list, numpy.ndarray, Corr

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import os
import fnmatch
import re
@ -8,9 +9,10 @@ import matplotlib.pyplot as plt
from matplotlib import gridspec
from ..obs import Obs
from ..fits import fit_lin
from typing import Optional
def fit_t0(t2E_dict, fit_range, plot_fit=False, observable='t0'):
def fit_t0(t2E_dict: dict[float, Obs], fit_range: int, plot_fit: Optional[bool]=False, observable: str='t0') -> Obs:
"""Compute the root of (flow-based) data based on a dictionary that contains
the necessary information in key-value pairs a la (flow time: observable at flow time).
@ -97,11 +99,15 @@ def fit_t0(t2E_dict, fit_range, plot_fit=False, observable='t0'):
return -fit_result[0] / fit_result[1]
def read_pbp(path, prefix, **kwargs):
def read_pbp(path: str, prefix: str, **kwargs):
"""Read pbp format from given folder structure.
Parameters
----------
path : str
Directory to read pbp from
prefix : str
Prefix of the files to be read
r_start : list
list which contains the first config to be read for each replicum
r_stop : list

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import os
import fnmatch
import struct
@ -8,9 +9,19 @@ from ..obs import CObs
from ..correlators import Corr
from .misc import fit_t0
from .utils import sort_names
from io import BufferedReader
from typing import Optional, Union, TypedDict
def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
class rwms_kwargs(TypedDict):
files: list[str]
postfix: str
r_start: list[int]
r_stop: list[int]
r_step: int
def read_rwms(path: str, prefix: str, version: str='2.0', names: Optional[list[str]]=None, **kwargs: rwms_kwargs) -> list[Obs]:
"""Read rwms format from given folder structure. Returns a list of length nrw
Parameters
@ -24,7 +35,7 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
version : str
version of openQCD, default 2.0
names : list
list of names that is assigned to the data according according
list of names that is assigned to the data according
to the order in the file list. Use careful, if you do not provide file names!
r_start : list
list which contains the first config to be read for each replicum
@ -50,39 +61,23 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
if version not in known_oqcd_versions:
raise Exception('Unknown openQCD version defined!')
print("Working with openQCD version " + version)
if 'postfix' in kwargs:
postfix = kwargs.get('postfix')
else:
postfix = ''
postfix: str = kwargs.get('postfix', '')
if 'files' in kwargs:
known_files = kwargs.get('files')
else:
known_files = []
known_files: list[str] = kwargs.get('files', [])
ls = _find_files(path, prefix, postfix, 'dat', known_files=known_files)
replica = len(ls)
if 'r_start' in kwargs:
r_start = kwargs.get('r_start')
if len(r_start) != replica:
raise Exception('r_start does not match number of replicas')
r_start = [o if o else None for o in r_start]
else:
r_start = [None] * replica
r_start: list[Union[int, None]] = kwargs.get('r_start', [None] * replica)
if len(r_start) != replica:
raise Exception('r_start does not match number of replicas')
if 'r_stop' in kwargs:
r_stop = kwargs.get('r_stop')
if len(r_stop) != replica:
raise Exception('r_stop does not match number of replicas')
else:
r_stop = [None] * replica
r_stop: list[Union[int, None]] = kwargs.get('r_stop', [None] * replica)
if len(r_stop) != replica:
raise Exception('r_stop does not match number of replicas')
if 'r_step' in kwargs:
r_step = kwargs.get('r_step')
else:
r_step = 1
r_step: int = kwargs.get('r_step', 1)
print('Read reweighting factors from', prefix[:-1], ',',
replica, 'replica', end='')
@ -107,14 +102,14 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
print_err = 1
print()
deltas = []
deltas: list[list[float]] = []
configlist = []
configlist: list[list[int]] = []
r_start_index = []
r_stop_index = []
for rep in range(replica):
tmp_array = []
tmp_array: list[list] = []
with open(path + '/' + ls[rep], 'rb') as fp:
t = fp.read(4) # number of reweighting factors
@ -141,7 +136,7 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
for i in range(nrw):
nfct.append(1)
nsrc = []
nsrc: list[int] = []
for i in range(nrw):
t = fp.read(4)
nsrc.append(struct.unpack('i', t)[0])
@ -158,11 +153,12 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
configlist[-1].append(config_no)
for i in range(nrw):
if (version == '2.0'):
tmpd: dict = _read_array_openQCD2(fp)
tmpd = _read_array_openQCD2(fp)
tmpd = _read_array_openQCD2(fp)
tmp_rw = tmpd['arr']
tmp_rw: list[float] = tmpd['arr']
tmp_n: list[int] = tmpd['n']
tmp_nfct = 1.0
for j in range(tmpd['n'][0]):
for j in range(tmp_n[0]):
tmp_nfct *= np.mean(np.exp(-np.asarray(tmp_rw[j])))
if print_err:
print(config_no, i, j,
@ -176,7 +172,7 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
for j in range(nfct[i]):
t = fp.read(8 * nsrc[i])
t = fp.read(8 * nsrc[i])
tmp_rw = struct.unpack('d' * nsrc[i], t)
tmp_rw: list[float] = struct.unpack('d' * nsrc[i], t)
tmp_nfct *= np.mean(np.exp(-np.asarray(tmp_rw)))
if print_err:
print(config_no, i, j,
@ -229,7 +225,7 @@ def read_rwms(path, prefix, version='2.0', names=None, **kwargs):
return result
def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent, postfix='ms', **kwargs):
def _extract_flowed_energy_density(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, postfix: str='ms', **kwargs: rwms_kwargs) -> dict[float, Obs]:
"""Extract a dictionary with the flowed Yang-Mills action density from given .ms.dat files.
Returns a dictionary with Obs as values and flow times as keys.
@ -285,49 +281,35 @@ def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent,
Dictionary with the flowed action density at flow times t
"""
if 'files' in kwargs:
known_files = kwargs.get('files')
else:
known_files = []
known_files = kwargs.get('files', [])
ls = _find_files(path, prefix, postfix, 'dat', known_files=known_files)
replica = len(ls)
if 'r_start' in kwargs:
r_start = kwargs.get('r_start')
if len(r_start) != replica:
raise Exception('r_start does not match number of replicas')
r_start = [o if o else None for o in r_start]
else:
r_start = [None] * replica
r_start: list[Union[int, None]] = kwargs.get('r_start', [None] * replica)
if len(r_start) != replica:
raise Exception('r_start does not match number of replicas')
if 'r_stop' in kwargs:
r_stop = kwargs.get('r_stop')
if len(r_stop) != replica:
raise Exception('r_stop does not match number of replicas')
else:
r_stop = [None] * replica
r_stop: list[Union[int, None]] = kwargs.get('r_stop', [None] * replica)
if len(r_stop) != replica:
raise Exception('r_stop does not match number of replicas')
if 'r_step' in kwargs:
r_step = kwargs.get('r_step')
else:
r_step = 1
r_step = kwargs.get('r_step', 1)
print('Extract flowed Yang-Mills action density from', prefix, ',', replica, 'replica')
if 'names' in kwargs:
rep_names = kwargs.get('names')
else:
rep_names: list[str] = kwargs.get('names', [])
if len(rep_names) == 0:
rep_names = []
for entry in ls:
truncated_entry = entry.split('.')[0]
idx = truncated_entry.index('r')
rep_names.append(truncated_entry[:idx] + '|' + truncated_entry[idx:])
Ysum = []
Ysum: list = []
configlist = []
configlist: list[list[int]] = []
r_start_index = []
r_stop_index = []
@ -410,7 +392,7 @@ def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent,
idl = [range(configlist[rep][r_start_index[rep]], configlist[rep][r_stop_index[rep]] + 1, r_step) for rep in range(replica)]
E_dict = {}
for n in range(nn + 1):
samples = []
samples: list[list[float]] = []
for nrep, rep in enumerate(Ysum):
samples.append([])
for cnfg in rep:
@ -422,7 +404,7 @@ def _extract_flowed_energy_density(path, prefix, dtr_read, xmin, spatial_extent,
return E_dict
def extract_t0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfix='ms', c=0.3, **kwargs):
def extract_t0(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, fit_range: int=5, postfix: str='ms', c: Union[float, int]=0.3, **kwargs) -> Obs:
"""Extract t0/a^2 from given .ms.dat files. Returns t0 as Obs.
It is assumed that all boundary effects have
@ -495,7 +477,7 @@ def extract_t0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfi
return fit_t0(t2E_dict, fit_range, plot_fit=kwargs.get('plot_fit'))
def extract_w0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfix='ms', c=0.3, **kwargs):
def extract_w0(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, fit_range: int=5, postfix: str='ms', c: Union[float, int]=0.3, **kwargs) -> Obs:
"""Extract w0/a from given .ms.dat files. Returns w0 as Obs.
It is assumed that all boundary effects have
@ -577,7 +559,7 @@ def extract_w0(path, prefix, dtr_read, xmin, spatial_extent, fit_range=5, postfi
return np.sqrt(fit_t0(tdtt2E_dict, fit_range, plot_fit=kwargs.get('plot_fit'), observable='w0'))
def _parse_array_openQCD2(d, n, size, wa, quadrupel=False):
def _parse_array_openQCD2(d: int, n: tuple[int, int], size: int, wa: Union[tuple[float, float, float, float, float, float, float, float], tuple[float, float]], quadrupel: bool=False) -> list[list[float]]:
arr = []
if d == 2:
for i in range(n[0]):
@ -596,7 +578,7 @@ def _parse_array_openQCD2(d, n, size, wa, quadrupel=False):
return arr
def _find_files(path, prefix, postfix, ext, known_files=[]):
def _find_files(path: str, prefix: str, postfix: str, ext: str, known_files: list[str]=[]) -> list[str]:
found = []
files = []
@ -636,7 +618,7 @@ def _find_files(path, prefix, postfix, ext, known_files=[]):
return files
def _read_array_openQCD2(fp):
def _read_array_openQCD2(fp: BufferedReader) -> dict[str, Union[int, tuple[int, int], list[list[float]]]]:
t = fp.read(4)
d = struct.unpack('i', t)[0]
t = fp.read(4 * d)
@ -662,7 +644,7 @@ def _read_array_openQCD2(fp):
return {'d': d, 'n': n, 'size': size, 'arr': arr}
def read_qtop(path, prefix, c, dtr_cnfg=1, version="openQCD", **kwargs):
def read_qtop(path: str, prefix: str, c: float, dtr_cnfg: int=1, version: str="openQCD", **kwargs) -> Obs:
"""Read the topologial charge based on openQCD gradient flow measurements.
Parameters
@ -715,7 +697,7 @@ def read_qtop(path, prefix, c, dtr_cnfg=1, version="openQCD", **kwargs):
return _read_flow_obs(path, prefix, c, dtr_cnfg=dtr_cnfg, version=version, obspos=0, **kwargs)
def read_gf_coupling(path, prefix, c, dtr_cnfg=1, Zeuthen_flow=True, **kwargs):
def read_gf_coupling(path: str, prefix: str, c: float, dtr_cnfg: int=1, Zeuthen_flow: bool=True, **kwargs) -> Obs:
"""Read the gradient flow coupling based on sfqcd gradient flow measurements. See 1607.06423 for details.
Note: The current implementation only works for c=0.3 and T=L. The definition of the coupling in 1607.06423 requires projection to topological charge zero which is not done within this function but has to be performed in a separate step.
@ -787,7 +769,7 @@ def read_gf_coupling(path, prefix, c, dtr_cnfg=1, Zeuthen_flow=True, **kwargs):
return t * t * (5 / 3 * plaq - 1 / 12 * C2x1) / normdict[L]
def _read_flow_obs(path, prefix, c, dtr_cnfg=1, version="openQCD", obspos=0, sum_t=True, **kwargs):
def _read_flow_obs(path: str, prefix: str, c: float, dtr_cnfg: int=1, version: str="openQCD", obspos: int=0, sum_t: bool=True, **kwargs) -> Obs:
"""Read a flow observable based on openQCD gradient flow measurements.
Parameters
@ -1059,7 +1041,7 @@ def _read_flow_obs(path, prefix, c, dtr_cnfg=1, version="openQCD", obspos=0, sum
return result
def qtop_projection(qtop, target=0):
def qtop_projection(qtop: Obs, target: int=0) -> Obs:
"""Returns the projection to the topological charge sector defined by target.
Parameters
@ -1085,7 +1067,7 @@ def qtop_projection(qtop, target=0):
return reto
def read_qtop_sector(path, prefix, c, target=0, **kwargs):
def read_qtop_sector(path: str, prefix: str, c: float, target: int=0, **kwargs) -> Obs:
"""Constructs reweighting factors to a specified topological sector.
Parameters
@ -1143,7 +1125,7 @@ def read_qtop_sector(path, prefix, c, target=0, **kwargs):
return qtop_projection(qtop, target=target)
def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
def read_ms5_xsf(path: str, prefix: str, qc: str, corr: str, sep: str="r", **kwargs) -> Union[Corr, CObs]:
"""
Read data from files in the specified directory with the specified prefix and quark combination extension, and return a `Corr` object containing the data.
@ -1185,9 +1167,7 @@ def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
If there is an error unpacking binary data.
"""
# found = []
files = []
names = []
# test if the input is correct
if qc not in ['dd', 'ud', 'du', 'uu']:
@ -1196,15 +1176,13 @@ def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
if corr not in ["gS", "gP", "gA", "gV", "gVt", "lA", "lV", "lVt", "lT", "lTt", "g1", "l1"]:
raise Exception("Unknown correlator!")
if "files" in kwargs:
known_files = kwargs.get("files")
else:
known_files = []
known_files: list[str] = kwargs.get("files", [])
expected_idl = kwargs.get('idl', [])
files = _find_files(path, prefix, "ms5_xsf_" + qc, "dat", known_files=known_files)
if "names" in kwargs:
names = kwargs.get("names")
else:
names: list[str] = kwargs.get("names", [])
if len(names) == 0:
for f in files:
if not sep == "":
se = f.split(".")[0]
@ -1213,31 +1191,30 @@ def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
names.append(se.split(sep)[0] + "|r" + se.split(sep)[1])
else:
names.append(prefix)
if 'idl' in kwargs:
expected_idl = kwargs.get('idl')
names = sorted(names)
files = sorted(files)
cnfgs = []
realsamples = []
imagsamples = []
cnfgs: list[list[int]] = []
realsamples: list[list[list[float]]] = []
imagsamples: list[list[list[float]]] = []
repnum = 0
for file in files:
with open(path + "/" + file, "rb") as fp:
t = fp.read(8)
kappa = struct.unpack('d', t)[0]
t = fp.read(8)
csw = struct.unpack('d', t)[0]
t = fp.read(8)
dF = struct.unpack('d', t)[0]
t = fp.read(8)
zF = struct.unpack('d', t)[0]
tmp_bytes = fp.read(8)
kappa: float = struct.unpack('d', tmp_bytes)[0]
tmp_bytes = fp.read(8)
csw: float = struct.unpack('d', tmp_bytes)[0]
tmp_bytes = fp.read(8)
dF: float = struct.unpack('d', tmp_bytes)[0]
tmp_bytes = fp.read(8)
zF: float = struct.unpack('d', tmp_bytes)[0]
t = fp.read(4)
tmax = struct.unpack('i', t)[0]
t = fp.read(4)
bnd = struct.unpack('i', t)[0]
tmp_bytes = fp.read(4)
tmax: int = struct.unpack('i', tmp_bytes)[0]
tmp_bytes = fp.read(4)
bnd: int = struct.unpack('i', tmp_bytes)[0]
placesBI = ["gS", "gP",
"gA", "gV",
@ -1249,22 +1226,22 @@ def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
# the chunks have the following structure:
# confignumber, 10x timedependent complex correlators as doubles, 2x timeindependent complex correlators as doubles
chunksize = 4 + (8 * 2 * tmax * 10) + (8 * 2 * 2)
packstr = '=i' + ('d' * 2 * tmax * 10) + ('d' * 2 * 2)
chunksize = struct.calcsize(packstr)
cnfgs.append([])
realsamples.append([])
imagsamples.append([])
for t in range(tmax):
for time in range(tmax):
realsamples[repnum].append([])
imagsamples[repnum].append([])
if 'idl' in kwargs:
left_idl = set(expected_idl[repnum])
while True:
cnfgt = fp.read(chunksize)
if not cnfgt:
cnfg_bytes = fp.read(chunksize)
if not cnfg_bytes:
break
asascii = struct.unpack(packstr, cnfgt)
cnfg = asascii[0]
asascii = struct.unpack(packstr, cnfg_bytes)
cnfg: int = asascii[0]
idl_wanted = True
if 'idl' in kwargs:
idl_wanted = (cnfg in expected_idl[repnum])
@ -1277,24 +1254,21 @@ def read_ms5_xsf(path, prefix, qc, corr, sep="r", **kwargs):
else:
tmpcorr = asascii[1 + 2 * tmax * len(placesBI) + 2 * placesBB.index(corr):1 + 2 * tmax * len(placesBI) + 2 * placesBB.index(corr) + 2]
corrres = [[], []]
corrres: list[list[float]] = [[], []]
for i in range(len(tmpcorr)):
corrres[i % 2].append(tmpcorr[i])
for t in range(int(len(tmpcorr) / 2)):
realsamples[repnum][t].append(corrres[0][t])
for t in range(int(len(tmpcorr) / 2)):
imagsamples[repnum][t].append(corrres[1][t])
if 'idl' in kwargs:
left_idl = list(left_idl)
if expected_idl[repnum] == left_idl:
raise ValueError("None of the idls searched for were found in replikum of file " + file)
elif len(left_idl) > 0:
warnings.warn('Could not find idls ' + str(left_idl) + ' in replikum of file ' + file, UserWarning)
for time in range(int(len(tmpcorr) / 2)):
realsamples[repnum][time].append(corrres[0][time])
for time in range(int(len(tmpcorr) / 2)):
imagsamples[repnum][time].append(corrres[1][time])
if len(expected_idl) > 0:
left_idl_list = list(left_idl)
if expected_idl[repnum] == left_idl_list:
raise ValueError("None of the idls searched for were found in replicum of file " + file)
elif len(left_idl_list) > 0:
warnings.warn('Could not find idls ' + str(left_idl) + ' in replicum of file ' + file, UserWarning)
repnum += 1
s = "Read correlator " + corr + " from " + str(repnum) + " replika with idls" + str(realsamples[0][t])
for rep in range(1, repnum):
s += ", " + str(realsamples[rep][t])
print(s)
print("Read correlator " + corr + " from " + str(repnum) + " replica with idls")
print("Asserted run parameters:\n T:", tmax, "kappa:", kappa, "csw:", csw, "dF:", dF, "zF:", zF, "bnd:", bnd)
# we have the data now... but we need to re format the whole thing and put it into Corr objects.

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import warnings
import gzip
import sqlite3
@ -7,9 +8,11 @@ from ..obs import Obs
from ..correlators import Corr
from .json import create_json_string, import_json_string
import numpy as np
from pandas.core.frame import DataFrame
from pandas.core.series import Series
def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
def to_sql(df: DataFrame, table_name: str, db: str, if_exists: str='fail', gz: bool=True, **kwargs):
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
Parameters
@ -34,7 +37,7 @@ def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
se_df.to_sql(table_name, con=con, if_exists=if_exists, index=False, **kwargs)
def read_sql(sql, db, auto_gamma=False, **kwargs):
def read_sql(sql: str, db: str, auto_gamma: bool=False, **kwargs) -> DataFrame:
"""Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns.
Parameters
@ -57,7 +60,7 @@ def read_sql(sql, db, auto_gamma=False, **kwargs):
return _deserialize_df(extract_df, auto_gamma=auto_gamma)
def dump_df(df, fname, gz=True):
def dump_df(df: DataFrame, fname: str, gz: bool=True):
"""Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file.
Before making use of pandas to_csv functionality Obs objects are serialized via the standardized
@ -96,7 +99,7 @@ def dump_df(df, fname, gz=True):
out.to_csv(fname, index=False)
def load_df(fname, auto_gamma=False, gz=True):
def load_df(fname: str, auto_gamma: bool=False, gz: bool=True) -> DataFrame:
"""Imports a pandas DataFrame from a csv.(gz) file in which Obs objects are serialized as json strings.
Parameters
@ -130,7 +133,7 @@ def load_df(fname, auto_gamma=False, gz=True):
return _deserialize_df(re_import, auto_gamma=auto_gamma)
def _serialize_df(df, gz=False):
def _serialize_df(df: DataFrame, gz: bool=False) -> DataFrame:
"""Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification.
Parameters
@ -151,7 +154,7 @@ def _serialize_df(df, gz=False):
return out
def _deserialize_df(df, auto_gamma=False):
def _deserialize_df(df: DataFrame, auto_gamma: bool=False) -> DataFrame:
"""Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification.
Parameters
@ -187,7 +190,7 @@ def _deserialize_df(df, auto_gamma=False):
return df
def _need_to_serialize(col):
def _need_to_serialize(col: Series) -> bool:
serialize = False
i = 0
while i < len(col) and col[i] is None:

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import os
import fnmatch
import re
@ -5,12 +6,14 @@ import numpy as np # Thinly-wrapped numpy
from ..obs import Obs
from .utils import sort_names, check_idl
import itertools
from numpy import ndarray
from typing import Any, Union, Optional, Literal
sep = "/"
def read_sfcf(path, prefix, name, quarks='.*', corr_type="bi", noffset=0, wf=0, wf2=0, version="1.0c", cfg_separator="n", silent=False, **kwargs):
def read_sfcf(path: str, prefix: str, name: str, quarks: str='.*', corr_type: str="bi", noffset: int=0, wf: int=0, wf2: int=0, version: str="1.0c", cfg_separator: str="n", silent: bool=False, **kwargs) -> list[Obs]:
"""Read sfcf files from given folder structure.
Parameters
@ -75,7 +78,7 @@ def read_sfcf(path, prefix, name, quarks='.*', corr_type="bi", noffset=0, wf=0,
return ret[name][quarks][str(noffset)][str(wf)][str(wf2)]
def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=['bi'], noffset_list=[0], wf_list=[0], wf2_list=[0], version="1.0c", cfg_separator="n", silent=False, keyed_out=False, **kwargs):
def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: list[str]=['.*'], corr_type_list: list[str]=['bi'], noffset_list: list[int]=[0], wf_list: list[int]=[0], wf2_list: list[int]=[0], version: str="1.0c", cfg_separator: str="n", silent: bool=False, keyed_out: bool=False, **kwargs) -> dict:
"""Read sfcf files from given folder structure.
Parameters
@ -140,14 +143,14 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
dict[name][quarks][offset][wf][wf2] = list[Obs]
"""
if kwargs.get('im'):
im: Literal[1, 0] = 0
part: str = 'real'
if kwargs.get('im', False):
im = 1
part = 'imaginary'
else:
im = 0
part = 'real'
known_versions = ["0.0", "1.0", "2.0", "1.0c", "2.0c", "1.0a", "2.0a"]
known_versions: list = ["0.0", "1.0", "2.0", "1.0c", "2.0c", "1.0a", "2.0a"]
if version not in known_versions:
raise Exception("This version is not known!")
@ -162,10 +165,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
else:
compact = False
appended = False
ls = []
if "replica" in kwargs:
ls = kwargs.get("replica")
else:
ls: list = kwargs.get("replica", [])
if ls == []:
for (dirpath, dirnames, filenames) in os.walk(path):
if not appended:
ls.extend(dirnames)
@ -178,7 +180,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
for exc in ls:
if not fnmatch.fnmatch(exc, prefix + '*'):
ls = list(set(ls) - set([exc]))
replica: int = 0
if not appended:
ls = sort_names(ls)
replica = len(ls)
@ -190,13 +192,12 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
if not silent:
print('Read', part, 'part of', name_list, 'from', prefix[:-1], ',', replica, 'replica')
if 'names' in kwargs:
new_names = kwargs.get('names')
new_names = kwargs.get('names')
if new_names is not None:
if len(new_names) != len(set(new_names)):
raise Exception("names are not unique!")
if len(new_names) != replica:
raise Exception('names should have the length', replica)
else:
ens_name = kwargs.get("ens_name")
if not appended:
@ -205,14 +206,14 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
new_names = _get_appended_rep_names(ls, prefix, name_list[0], ens_name, rep_sep=(kwargs.get('rep_string', 'r')))
new_names = sort_names(new_names)
idl = []
idl: list[list[int]] = []
noffset_list = [str(x) for x in noffset_list]
wf_list = [str(x) for x in wf_list]
wf2_list = [str(x) for x in wf2_list]
noffset_strings: list[str] = [str(x) for x in noffset_list]
wf_strings: list[str] = [str(x) for x in wf_list]
wf2_strings: list[str] = [str(x) for x in wf2_list]
# setup dict structures
intern = {}
intern: dict[str, Any] = {}
for name, corr_type in zip(name_list, corr_type_list):
intern[name] = {}
b2b, single = _extract_corr_type(corr_type)
@ -221,30 +222,31 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
intern[name]["spec"] = {}
for quarks in quarks_list:
intern[name]["spec"][quarks] = {}
for off in noffset_list:
for off in noffset_strings:
intern[name]["spec"][quarks][off] = {}
for w in wf_list:
for w in wf_strings:
intern[name]["spec"][quarks][off][w] = {}
if b2b:
for w2 in wf2_list:
for w2 in wf2_strings:
intern[name]["spec"][quarks][off][w][w2] = {}
intern[name]["spec"][quarks][off][w][w2]["pattern"] = _make_pattern(version, name, off, w, w2, intern[name]['b2b'], quarks)
else:
intern[name]["spec"][quarks][off][w]["0"] = {}
intern[name]["spec"][quarks][off][w]["0"]["pattern"] = _make_pattern(version, name, off, w, 0, intern[name]['b2b'], quarks)
internal_ret_dict = {}
internal_ret_dict: dict[str, list] = {}
needed_keys = []
for name, corr_type in zip(name_list, corr_type_list):
b2b, single = _extract_corr_type(corr_type)
if b2b:
needed_keys.extend(_lists2key([name], quarks_list, noffset_list, wf_list, wf2_list))
needed_keys.extend(_lists2key([name], quarks_list, noffset_strings, wf_strings, wf2_strings))
else:
needed_keys.extend(_lists2key([name], quarks_list, noffset_list, wf_list, ["0"]))
needed_keys.extend(_lists2key([name], quarks_list, noffset_strings, wf_strings, ["0"]))
for key in needed_keys:
internal_ret_dict[key] = []
rep_idl: list = []
if not appended:
for i, item in enumerate(ls):
rep_path = path + '/' + item
@ -286,9 +288,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
if version == "0.0" or not compact:
file = path + '/' + item + '/' + sub_ls[0] + '/' + name
if corr_type_list[name_index] == 'bi':
name_keys = _lists2key(quarks_list, noffset_list, wf_list, ["0"])
name_keys = _lists2key(quarks_list, noffset_strings, wf_strings, ["0"])
else:
name_keys = _lists2key(quarks_list, noffset_list, wf_list, wf2_list)
name_keys = _lists2key(quarks_list, noffset_strings, wf_strings, wf2_strings)
for key in name_keys:
specs = _key2specs(key)
quarks = specs[0]
@ -304,7 +306,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
intern[name]["T"] = T
# preparing the datastructure
# the correlators get parsed into...
deltas = []
deltas: list[list] = []
for j in range(intern[name]["T"]):
deltas.append([])
internal_ret_dict[sep.join([name, key])] = deltas
@ -317,7 +319,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
internal_ret_dict[key][t].append(rep_deltas[key][t])
else:
for key in needed_keys:
rep_data = []
rep_data: list = []
name = _key2specs(key)[0]
for subitem in sub_ls:
cfg_path = path + '/' + item + '/' + subitem
@ -325,8 +327,8 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
rep_data.append(file_data)
for t in range(intern[name]["T"]):
internal_ret_dict[key][t].append([])
for cfg in range(no_cfg):
internal_ret_dict[key][t][i].append(rep_data[cfg][key][t])
for cfg_number in range(no_cfg):
internal_ret_dict[key][t][i].append(rep_data[cfg_number][key][t])
else:
for key in needed_keys:
specs = _key2specs(key)
@ -335,10 +337,9 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
off = specs[2]
w = specs[3]
w2 = specs[4]
if "files" in kwargs:
if isinstance(kwargs.get("files"), list) and all(isinstance(f, str) for f in kwargs.get("files")):
name_ls = kwargs.get("files")
else:
name_ls = kwargs.get("files")
if name_ls is not None:
if not (isinstance(name_ls, list) and all(isinstance(f, str) for f in name_ls)):
raise TypeError("In append mode, files has to be of type list[str]!")
else:
name_ls = ls
@ -350,6 +351,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
deltas = []
for rep, file in enumerate(name_ls):
rep_idl = []
rep_data = []
filename = path + '/' + file
T, rep_idl, rep_data = _read_append_rep(filename, pattern, intern[name]['b2b'], cfg_separator, im, intern[name]['single'])
if rep == 0:
@ -362,12 +364,12 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
if name == name_list[0]:
idl.append(rep_idl)
if kwargs.get("check_configs") is True:
che: Union[list[list[int]], None] = kwargs.get("check_configs", None)
if che is not None:
if not silent:
print("Checking for missing configs...")
che = kwargs.get("check_configs")
if not (len(che) == len(idl)):
raise Exception("check_configs has to be the same length as replica!")
raise Exception("check_configs has to have an entry for each replicum!")
for r in range(len(idl)):
if not silent:
print("checking " + new_names[r])
@ -375,7 +377,7 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
if not silent:
print("Done")
result_dict = {}
result_dict: dict[str, Any] = {}
if keyed_out:
for key in needed_keys:
name = _key2specs(key)[0]
@ -388,12 +390,12 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
result_dict[name] = {}
for quarks in quarks_list:
result_dict[name][quarks] = {}
for off in noffset_list:
for off in noffset_strings:
result_dict[name][quarks][off] = {}
for w in wf_list:
for w in wf_strings:
result_dict[name][quarks][off][w] = {}
if corr_type != 'bi':
for w2 in wf2_list:
for w2 in wf2_strings:
key = _specs2key(name, quarks, off, w, w2)
result = []
for t in range(intern[name]["T"]):
@ -408,22 +410,22 @@ def read_sfcf_multi(path, prefix, name_list, quarks_list=['.*'], corr_type_list=
return result_dict
def _lists2key(*lists):
def _lists2key(*lists) -> list[str]:
keys = []
for tup in itertools.product(*lists):
keys.append(sep.join(tup))
return keys
def _key2specs(key):
def _key2specs(key: str) -> list[str]:
return key.split(sep)
def _specs2key(*specs):
def _specs2key(*specs) -> str:
return sep.join(specs)
def _read_o_file(cfg_path, name, needed_keys, intern, version, im):
def _read_o_file(cfg_path: str, name: str, needed_keys: list[str], intern: dict[str, dict], version: str, im: int) -> dict[str, list[float]]:
return_vals = {}
for key in needed_keys:
file = cfg_path + '/' + name
@ -448,7 +450,7 @@ def _read_o_file(cfg_path, name, needed_keys, intern, version, im):
return return_vals
def _extract_corr_type(corr_type):
def _extract_corr_type(corr_type: str) -> tuple[bool, bool]:
if corr_type == 'bb':
b2b = True
single = True
@ -461,7 +463,9 @@ def _extract_corr_type(corr_type):
return b2b, single
def _find_files(rep_path, prefix, compact, files=[]):
def _find_files(rep_path: str, prefix: str, compact: bool, files: Optional[list]=None) -> list[str]:
if files is None:
files = []
sub_ls = []
if not files == []:
files.sort(key=lambda x: int(re.findall(r'\d+', x)[-1]))
@ -488,7 +492,7 @@ def _find_files(rep_path, prefix, compact, files=[]):
return files
def _make_pattern(version, name, noffset, wf, wf2, b2b, quarks):
def _make_pattern(version: str, name: str, noffset: str, wf: str, wf2: Union[str, int], b2b: bool, quarks: str) -> str:
if version == "0.0":
pattern = "# " + name + " : offset " + str(noffset) + ", wf " + str(wf)
if b2b:
@ -502,7 +506,7 @@ def _make_pattern(version, name, noffset, wf, wf2, b2b, quarks):
return pattern
def _find_correlator(file_name, version, pattern, b2b, silent=False):
def _find_correlator(file_name: str, version: str, pattern: str, b2b: bool, silent: bool=False) -> tuple[int, int]:
T = 0
with open(file_name, "r") as my_file:
@ -516,6 +520,7 @@ def _find_correlator(file_name, version, pattern, b2b, silent=False):
else:
start_read = content.count('\n', 0, match.start()) + 5 + b2b
end_match = re.search(r'\n\s*\n', content[match.start():])
assert end_match is not None
T = content[match.start():].count('\n', 0, end_match.start()) - 4 - b2b
if not T > 0:
raise ValueError("Correlator with pattern\n" + pattern + "\nis empty!")
@ -528,7 +533,7 @@ def _find_correlator(file_name, version, pattern, b2b, silent=False):
return start_read, T
def _read_compact_file(rep_path, cfg_file, intern, needed_keys, im):
def _read_compact_file(rep_path: str, cfg_file: str, intern: dict[str, dict], needed_keys: list[str], im: int) -> dict[str, list[float]]:
return_vals = {}
with open(rep_path + cfg_file) as fp:
lines = fp.readlines()
@ -559,7 +564,7 @@ def _read_compact_file(rep_path, cfg_file, intern, needed_keys, im):
return return_vals
def _read_compact_rep(path, rep, sub_ls, intern, needed_keys, im):
def _read_compact_rep(path: str, rep: str, sub_ls: list[str], intern: dict[str, dict], needed_keys: list[str], im: int) -> dict[str, list[ndarray]]:
rep_path = path + '/' + rep + '/'
no_cfg = len(sub_ls)
@ -581,7 +586,7 @@ def _read_compact_rep(path, rep, sub_ls, intern, needed_keys, im):
return return_vals
def _read_chunk(chunk, gauge_line, cfg_sep, start_read, T, corr_line, b2b, pattern, im, single):
def _read_chunk(chunk: list[str], gauge_line: int, cfg_sep: str, start_read: int, T: int, corr_line: int, b2b: bool, pattern: str, im: int, single: bool) -> tuple[int, list[float]]:
try:
idl = int(chunk[gauge_line].split(cfg_sep)[-1])
except Exception:
@ -598,7 +603,7 @@ def _read_chunk(chunk, gauge_line, cfg_sep, start_read, T, corr_line, b2b, patte
return idl, data
def _read_append_rep(filename, pattern, b2b, cfg_separator, im, single):
def _read_append_rep(filename: str, pattern: str, b2b: bool, cfg_separator: str, im: int, single: bool) -> tuple[int, list[int], list[list[float]]]:
with open(filename, 'r') as fp:
content = fp.readlines()
data_starts = []
@ -638,16 +643,16 @@ def _read_append_rep(filename, pattern, b2b, cfg_separator, im, single):
rep_idl.append(idl)
rep_data.append(data)
data = []
final_data: list[list[float]] = []
for t in range(T):
data.append([])
final_data.append([])
for c in range(len(rep_data)):
data[t].append(rep_data[c][t])
return T, rep_idl, data
final_data[t].append(rep_data[c][t])
return T, rep_idl, final_data
def _get_rep_names(ls, ens_name=None, rep_sep='r'):
def _get_rep_names(ls: list[str], ens_name: Optional[str]=None, rep_sep: str ='r') -> list[str]:
new_names = []
for entry in ls:
try:
@ -662,7 +667,7 @@ def _get_rep_names(ls, ens_name=None, rep_sep='r'):
return new_names
def _get_appended_rep_names(ls, prefix, name, ens_name=None, rep_sep='r'):
def _get_appended_rep_names(ls: list[str], prefix: str, name: str, ens_name: Optional[str]=None, rep_sep: str ='r') -> list[str]:
new_names = []
for exc in ls:
if not fnmatch.fnmatch(exc, prefix + '*.' + name):

View file

@ -1,11 +1,12 @@
"""Utilities for the input"""
from __future__ import annotations
import re
import fnmatch
import os
def sort_names(ll):
def sort_names(ll: list[str]) -> list[str]:
"""Sorts a list of names of replika with searches for `r` and `id` in the replikum string.
If this search fails, a fallback method is used,
where the strings are simply compared and the first diffeing numeral is used for differentiation.
@ -52,7 +53,7 @@ def sort_names(ll):
return ll
def check_idl(idl, che):
def check_idl(idl: list, che: list) -> str:
"""Checks if list of configurations is contained in an idl
Parameters
@ -82,7 +83,7 @@ def check_idl(idl, che):
return miss_str
def check_params(path, param_hash, prefix, param_prefix="parameters_"):
def check_params(path: str, param_hash: str, prefix: str, param_prefix: str ="parameters_") -> dict[str, str]:
"""
Check if, for sfcf, the parameter hashes at the end of the parameter files are in fact the expected one.

View file

@ -1,10 +1,13 @@
from __future__ import annotations
import numpy as np
from .obs import derived_observable, Obs
from autograd import jacobian
from scipy.integrate import quad as squad
from numpy import ndarray
from typing import Callable, Union
def quad(func, p, a, b, **kwargs):
def quad(func: Callable, p: Union[list[Union[float, Obs]], list[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[tuple[Obs, float], tuple[float, float], tuple[Obs, float, dict[str, Union[int, ndarray]]]]:
'''Performs a (one-dimensional) numeric integration of f(p, x) from a to b.
The integration is performed using scipy.integrate.quad().

View file

@ -1,9 +1,12 @@
from __future__ import annotations
import numpy as np
import autograd.numpy as anp # Thinly-wrapped numpy
from .obs import derived_observable, CObs, Obs, import_jackknife
from numpy import ndarray
from typing import Callable, Union
def matmul(*operands):
def matmul(*operands) -> ndarray:
"""Matrix multiply all operands.
Parameters
@ -45,6 +48,7 @@ def matmul(*operands):
Nr = derived_observable(multi_dot_r, extended_operands, array_mode=True)
Ni = derived_observable(multi_dot_i, extended_operands, array_mode=True)
assert isinstance(Nr, ndarray) and isinstance(Ni, ndarray)
res = np.empty_like(Nr)
for (n, m), entry in np.ndenumerate(Nr):
res[n, m] = CObs(Nr[n, m], Ni[n, m])
@ -59,7 +63,7 @@ def matmul(*operands):
return derived_observable(multi_dot, operands, array_mode=True)
def jack_matmul(*operands):
def jack_matmul(*operands) -> ndarray:
"""Matrix multiply both operands making use of the jackknife approximation.
Parameters
@ -120,7 +124,7 @@ def jack_matmul(*operands):
return _imp_from_jack(r, name, idl)
def einsum(subscripts, *operands):
def einsum(subscripts: str, *operands) -> Union[CObs, Obs, ndarray]:
"""Wrapper for numpy.einsum
Parameters
@ -194,24 +198,24 @@ def einsum(subscripts, *operands):
return result
def inv(x):
def inv(x: ndarray) -> ndarray:
"""Inverse of Obs or CObs valued matrices."""
return _mat_mat_op(anp.linalg.inv, x)
def cholesky(x):
def cholesky(x: ndarray) -> ndarray:
"""Cholesky decomposition of Obs valued matrices."""
if any(isinstance(o, CObs) for o in x.ravel()):
raise Exception("Cholesky decomposition is not implemented for CObs.")
return _mat_mat_op(anp.linalg.cholesky, x)
def det(x):
def det(x: Union[ndarray, int]) -> Obs:
"""Determinant of Obs valued matrices."""
return _scalar_mat_op(anp.linalg.det, x)
def _scalar_mat_op(op, obs, **kwargs):
def _scalar_mat_op(op: Callable, obs: Union[ndarray, int], **kwargs) -> Obs:
"""Computes the matrix to scalar operation op to a given matrix of Obs."""
def _mat(x, **kwargs):
dim = int(np.sqrt(len(x)))
@ -232,7 +236,7 @@ def _scalar_mat_op(op, obs, **kwargs):
return derived_observable(_mat, raveled_obs, **kwargs)
def _mat_mat_op(op, obs, **kwargs):
def _mat_mat_op(op: Callable, obs: ndarray, **kwargs) -> ndarray:
"""Computes the matrix to matrix operation op to a given matrix of Obs."""
# Use real representation to calculate matrix operations for complex matrices
if any(isinstance(o, CObs) for o in obs.ravel()):
@ -258,31 +262,31 @@ def _mat_mat_op(op, obs, **kwargs):
return derived_observable(lambda x, **kwargs: op(x), [obs], array_mode=True)[0]
def eigh(obs, **kwargs):
def eigh(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray]:
"""Computes the eigenvalues and eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh."""
w = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[0], obs)
v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs)
return w, v
def eig(obs, **kwargs):
def eig(obs: ndarray, **kwargs) -> ndarray:
"""Computes the eigenvalues of a given matrix of Obs according to np.linalg.eig."""
w = derived_observable(lambda x, **kwargs: anp.real(anp.linalg.eig(x)[0]), obs)
return w
def eigv(obs, **kwargs):
def eigv(obs: ndarray, **kwargs) -> ndarray:
"""Computes the eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh."""
v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs)
return v
def pinv(obs, **kwargs):
def pinv(obs: ndarray, **kwargs) -> ndarray:
"""Computes the Moore-Penrose pseudoinverse of a matrix of Obs."""
return derived_observable(lambda x, **kwargs: anp.linalg.pinv(x), obs)
def svd(obs, **kwargs):
def svd(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray, ndarray]:
"""Computes the singular value decomposition of a matrix of Obs."""
u = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[0], obs)
s = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[1], obs)

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import platform
import numpy as np
import scipy
@ -5,8 +6,13 @@ import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
import pickle
from .obs import Obs
from .obs import Obs, CObs
from .version import __version__
from numpy import ndarray
from typing import Union, TYPE_CHECKING
if TYPE_CHECKING:
from .correlators import Corr
def print_config():
@ -23,7 +29,7 @@ def print_config():
print(f"{key: <10}\t {value}")
def errorbar(x, y, axes=plt, **kwargs):
def errorbar(x: Union[ndarray[int, float, Obs], list[int, float, Obs]], y: Union[ndarray[int, float, Obs], list[int, float, Obs]], axes=plt, **kwargs):
"""pyerrors wrapper for the errorbars method of matplotlib
Parameters
@ -54,7 +60,7 @@ def errorbar(x, y, axes=plt, **kwargs):
axes.errorbar(val["x"], val["y"], xerr=err["x"], yerr=err["y"], **kwargs)
def dump_object(obj, name, **kwargs):
def dump_object(obj: Corr, name: str, **kwargs):
"""Dump object into pickle file.
Parameters
@ -70,15 +76,18 @@ def dump_object(obj, name, **kwargs):
-------
None
"""
if 'path' in kwargs:
file_name = kwargs.get('path') + '/' + name + '.p'
path = kwargs.get('path')
if path is not None:
if not isinstance(path, str):
raise Exception("Path has to be a string.")
file_name = path + '/' + name + '.p'
else:
file_name = name + '.p'
with open(file_name, 'wb') as fb:
pickle.dump(obj, fb)
def load_object(path):
def load_object(path: str) -> Union[Obs, Corr]:
"""Load object from pickle file.
Parameters
@ -95,7 +104,7 @@ def load_object(path):
return pickle.load(file)
def pseudo_Obs(value, dvalue, name, samples=1000):
def pseudo_Obs(value: Union[float, int], dvalue: Union[float, int], name: str, samples: int=1000) -> Obs:
"""Generate an Obs object with given value, dvalue and name for test purposes
Parameters
@ -118,11 +127,11 @@ def pseudo_Obs(value, dvalue, name, samples=1000):
return Obs([np.zeros(samples) + value], [name])
else:
for _ in range(100):
deltas = [np.random.normal(0.0, dvalue * np.sqrt(samples), samples)]
deltas = np.array([np.random.normal(0.0, dvalue * np.sqrt(samples), samples)])
deltas -= np.mean(deltas)
deltas *= dvalue / np.sqrt((np.var(deltas) / samples)) / np.sqrt(1 + 3 / samples)
deltas += value
res = Obs(deltas, [name])
res = Obs(list(deltas), [name])
res.gamma_method(S=2, tau_exp=0)
if abs(res.dvalue - dvalue) < 1e-10 * dvalue:
break
@ -132,7 +141,7 @@ def pseudo_Obs(value, dvalue, name, samples=1000):
return res
def gen_correlated_data(means, cov, name, tau=0.5, samples=1000):
def gen_correlated_data(means: Union[ndarray, list[float]], cov: ndarray, name: str, tau: Union[float, ndarray]=0.5, samples: int=1000) -> list[Obs]:
""" Generate observables with given covariance and autocorrelation times.
Parameters
@ -174,7 +183,7 @@ def gen_correlated_data(means, cov, name, tau=0.5, samples=1000):
return [Obs([dat], [name]) for dat in corr_data.T]
def _assert_equal_properties(ol, otype=Obs):
def _assert_equal_properties(ol: Union[list[Obs], list[CObs], ndarray]):
otype = type(ol[0])
for o in ol[1:]:
if not isinstance(o, otype):

View file

@ -1,10 +1,12 @@
from __future__ import annotations
import numpy as np
import scipy.linalg
from .obs import Obs
from .linalg import svd, eig
from typing import Optional
def matrix_pencil_method(corrs, k=1, p=None, **kwargs):
def matrix_pencil_method(corrs: list[Obs], k: int=1, p: Optional[int]=None, **kwargs) -> list[Obs]:
"""Matrix pencil method to extract k energy levels from data
Implementation of the matrix pencil method based on

View file

@ -1,3 +1,5 @@
from __future__ import annotations
import sys
import warnings
import hashlib
import pickle
@ -10,6 +12,16 @@ from scipy.stats import skew, skewtest, kurtosis, kurtosistest
import numdifftools as nd
from itertools import groupby
from .covobs import Covobs
from numpy import float64, int64, ndarray
from typing import Any, Callable, Optional, Union, Sequence, TYPE_CHECKING
if sys.version_info >= (3, 10):
from types import NotImplementedType
else:
NotImplementedType = type(NotImplemented)
if TYPE_CHECKING:
from .correlators import Corr
# Improve print output of numpy.ndarrays containing Obs objects.
np.set_printoptions(formatter={'object': lambda x: str(x)})
@ -51,13 +63,13 @@ class Obs:
'idl', 'tag', '_covobs', '__dict__']
S_global = 2.0
S_dict = {}
S_dict: dict[str, float] = {}
tau_exp_global = 0.0
tau_exp_dict = {}
tau_exp_dict: dict[str, float] = {}
N_sigma_global = 1.0
N_sigma_dict = {}
N_sigma_dict: dict[str, int] = {}
def __init__(self, samples, names, idl=None, **kwargs):
def __init__(self, samples: list[Union[ndarray, list[Any]]], names: list[str], idl: Optional[Union[list[list[int]], list[Union[list[int], range]], list[range]]]=None, **kwargs):
""" Initialize Obs object.
Parameters
@ -70,7 +82,8 @@ class Obs:
list of ranges or lists on which the samples are defined
"""
if kwargs.get("means") is None and len(samples):
means: Optional[list[float]] = kwargs.get("means")
if means is None and len(samples):
if len(samples) != len(names):
raise ValueError('Length of samples and names incompatible.')
if idl is not None:
@ -87,18 +100,21 @@ class Obs:
else:
if not isinstance(names[0], str):
raise TypeError('All names have to be strings.')
# This check does not work because of nan hacks in the json.gz export
# if not all((isinstance(o, np.ndarray) and o.ndim == 1) for o in samples):
# raise TypeError('All samples have to be 1d numpy arrays.')
if min(len(x) for x in samples) <= 4:
raise ValueError('Samples have to have at least 5 entries.')
self.names = sorted(names)
self.names: list[str] = sorted(names)
self.shape = {}
self.r_values = {}
self.deltas = {}
self._covobs = {}
self.r_values: dict[str, float] = {}
self.deltas: dict[str, ndarray] = {}
self._covobs: dict[str, Covobs] = {}
self._value = 0
self.N = 0
self.idl = {}
self._value: float = 0.0
self.N: int = 0
self.idl: dict[str, Union[list[int], range]] = {}
if idl is not None:
for name, idx in sorted(zip(names, idl)):
if isinstance(idx, range):
@ -114,13 +130,13 @@ class Obs:
else:
self.idl[name] = list(idx)
else:
raise TypeError('incompatible type for idl[%s].' % (name))
raise TypeError('incompatible type for idl[%s].' % name)
else:
for name, sample in sorted(zip(names, samples)):
self.idl[name] = range(1, len(sample) + 1)
if kwargs.get("means") is not None:
for name, sample, mean in sorted(zip(names, samples, kwargs.get("means"))):
if means is not None:
for name, sample, mean in sorted(zip(names, samples, means)):
self.shape[name] = len(self.idl[name])
self.N += self.shape[name]
self.r_values[name] = mean
@ -132,38 +148,38 @@ class Obs:
if len(sample) != self.shape[name]:
raise ValueError('Incompatible samples and idx for %s: %d vs. %d' % (name, len(sample), self.shape[name]))
self.r_values[name] = np.mean(sample)
self.deltas[name] = sample - self.r_values[name]
self.deltas[name] = np.asarray(sample) - self.r_values[name]
self._value += self.shape[name] * self.r_values[name]
self._value /= self.N
self._dvalue = 0.0
self.ddvalue = 0.0
self._dvalue: float = 0.0
self.ddvalue: float = 0.0
self.reweighted = False
self.tag = None
@property
def value(self):
def value(self) -> Union[float, int64, float64, int]:
return self._value
@property
def dvalue(self):
def dvalue(self) -> Union[float, float64]:
return self._dvalue
@property
def e_names(self):
def e_names(self) -> list[str]:
return sorted(set([o.split('|')[0] for o in self.names]))
@property
def cov_names(self):
def cov_names(self) -> list[Union[Any, str]]:
return sorted(set([o for o in self.covobs.keys()]))
@property
def mc_names(self):
def mc_names(self) -> list[Union[Any, str]]:
return sorted(set([o.split('|')[0] for o in self.names if o not in self.cov_names]))
@property
def e_content(self):
def e_content(self) -> dict[str, list[str]]:
res = {}
for e, e_name in enumerate(self.e_names):
res[e_name] = sorted(filter(lambda x: x.startswith(e_name + '|'), self.names))
@ -172,7 +188,7 @@ class Obs:
return res
@property
def covobs(self):
def covobs(self) -> dict[str, Covobs]:
return self._covobs
def gamma_method(self, **kwargs):
@ -219,7 +235,7 @@ class Obs:
else:
fft = True
def _parse_kwarg(kwarg_name):
def _parse_kwarg(kwarg_name: str):
if kwarg_name in kwargs:
tmp = kwargs.get(kwarg_name)
if isinstance(tmp, (int, float)):
@ -343,7 +359,7 @@ class Obs:
gm = gamma_method
def _calc_gamma(self, deltas, idx, shape, w_max, fft, gapsize):
def _calc_gamma(self, deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, w_max: Union[int64, int], fft: bool, gapsize: Union[int64, int]) -> ndarray:
"""Calculate Gamma_{AA} from the deltas, which are defined on idx.
idx is assumed to be a contiguous range (possibly with a stepsize != 1)
@ -379,7 +395,7 @@ class Obs:
return gamma
def details(self, ens_content=True):
def details(self, ens_content: bool=True):
"""Output detailed properties of the Obs.
Parameters
@ -427,19 +443,21 @@ class Obs:
my_string = ' ' + "\u00B7 Ensemble '" + key + "' "
if len(value) == 1:
my_string += f': {self.shape[value[0]]} configurations'
if isinstance(self.idl[value[0]], range):
my_string += f' (from {self.idl[value[0]].start} to {self.idl[value[0]][-1]}' + int(self.idl[value[0]].step != 1) * f' in steps of {self.idl[value[0]].step}' + ')'
my_idl = self.idl[value[0]]
if isinstance(my_idl, range):
my_string += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')'
else:
my_string += f' (irregular range from {self.idl[value[0]][0]} to {self.idl[value[0]][-1]})'
my_string += f' (irregular range from {my_idl[0]} to {my_idl[-1]})'
else:
sublist = []
for v in value:
my_substring = ' ' + "\u00B7 Replicum '" + v[len(key) + 1:] + "' "
my_substring += f': {self.shape[v]} configurations'
if isinstance(self.idl[v], range):
my_substring += f' (from {self.idl[v].start} to {self.idl[v][-1]}' + int(self.idl[v].step != 1) * f' in steps of {self.idl[v].step}' + ')'
my_idl = self.idl[v]
if isinstance(my_idl, range):
my_substring += f' (from {my_idl.start} to {my_idl[-1]}' + int(my_idl.step != 1) * f' in steps of {my_idl.step}' + ')'
else:
my_substring += f' (irregular range from {self.idl[v][0]} to {self.idl[v][-1]})'
my_substring += f' (irregular range from {my_idl[0]} to {my_idl[-1]})'
sublist.append(my_substring)
my_string += '\n' + '\n'.join(sublist)
@ -448,7 +466,7 @@ class Obs:
my_string_list.append(my_string)
print('\n'.join(my_string_list))
def reweight(self, weight):
def reweight(self, weight: "Obs") -> "Obs":
"""Reweight the obs with given rewighting factors.
Parameters
@ -463,7 +481,7 @@ class Obs:
"""
return reweight(weight, [self])[0]
def is_zero_within_error(self, sigma=1):
def is_zero_within_error(self, sigma: Union[float, int]=1) -> Union[bool, bool]:
"""Checks whether the observable is zero within 'sigma' standard errors.
Parameters
@ -475,7 +493,7 @@ class Obs:
"""
return self.is_zero() or np.abs(self.value) <= sigma * self._dvalue
def is_zero(self, atol=1e-10):
def is_zero(self, atol: float=1e-10) -> Union[bool, bool]:
"""Checks whether the observable is zero within a given tolerance.
Parameters
@ -485,7 +503,7 @@ class Obs:
"""
return np.isclose(0.0, self.value, 1e-14, atol) and all(np.allclose(0.0, delta, 1e-14, atol) for delta in self.deltas.values()) and all(np.allclose(0.0, delta.errsq(), 1e-14, atol) for delta in self.covobs.values())
def plot_tauint(self, save=None):
def plot_tauint(self, save: Optional[str]=None):
"""Plot integrated autocorrelation time for each ensemble.
Parameters
@ -525,7 +543,7 @@ class Obs:
if save:
fig.savefig(save + "_" + str(e))
def plot_rho(self, save=None):
def plot_rho(self, save: Optional[str]=None):
"""Plot normalized autocorrelation function time for each ensemble.
Parameters
@ -578,7 +596,7 @@ class Obs:
plt.title('Replica distribution' + e_name + ' (mean=0, var=1)')
plt.draw()
def plot_history(self, expand=True):
def plot_history(self, expand: bool=True):
"""Plot derived Monte Carlo history for each ensemble
Parameters
@ -610,7 +628,7 @@ class Obs:
plt.title(e_name + f'\nskew: {skew(y_test):.3f} (p={skewtest(y_test).pvalue:.3f}), kurtosis: {kurtosis(y_test):.3f} (p={kurtosistest(y_test).pvalue:.3f})')
plt.draw()
def plot_piechart(self, save=None):
def plot_piechart(self, save: Optional[str]=None) -> dict[str, float64]:
"""Plot piechart which shows the fractional contribution of each
ensemble to the error and returns a dictionary containing the fractions.
@ -624,7 +642,7 @@ class Obs:
if np.isclose(0.0, self._dvalue, atol=1e-15):
raise ValueError('Error is 0.0')
labels = self.e_names
sizes = [self.e_dvalue[name] ** 2 for name in labels] / self._dvalue ** 2
sizes = np.array([self.e_dvalue[name] ** 2 for name in labels]) / self._dvalue ** 2
fig1, ax1 = plt.subplots()
ax1.pie(sizes, labels=labels, startangle=90, normalize=True)
ax1.axis('equal')
@ -634,7 +652,7 @@ class Obs:
return dict(zip(labels, sizes))
def dump(self, filename, datatype="json.gz", description="", **kwargs):
def dump(self, filename: str, datatype: str="json.gz", description: str="", **kwargs):
"""Dump the Obs to a file 'name' of chosen format.
Parameters
@ -649,8 +667,11 @@ class Obs:
path : str
specifies a custom path for the file (default '.')
"""
if 'path' in kwargs:
file_name = kwargs.get('path') + '/' + filename
path = kwargs.get('path')
if path is not None:
if not isinstance(path, str):
raise TypeError('path has to be a string.')
file_name = path + '/' + filename
else:
file_name = filename
@ -663,7 +684,7 @@ class Obs:
else:
raise TypeError("Unknown datatype " + str(datatype))
def export_jackknife(self):
def export_jackknife(self) -> ndarray:
"""Export jackknife samples from the Obs
Returns
@ -689,7 +710,7 @@ class Obs:
tmp_jacks[1:] = (n * mean - full_data) / (n - 1)
return tmp_jacks
def export_bootstrap(self, samples=500, random_numbers=None, save_rng=None):
def export_bootstrap(self, samples: int=500, random_numbers: Optional[ndarray]=None, save_rng: Optional[str]=None) -> ndarray:
"""Export bootstrap samples from the Obs
Parameters
@ -732,16 +753,16 @@ class Obs:
ret[1:] = proj @ (self.deltas[name] + self.r_values[name])
return ret
def __float__(self):
def __float__(self) -> float:
return float(self.value)
def __repr__(self):
def __repr__(self) -> str:
return 'Obs[' + str(self) + ']'
def __str__(self):
def __str__(self) -> str:
return _format_uncertainty(self.value, self._dvalue)
def __format__(self, format_type):
def __format__(self, format_type: str) -> str:
if format_type == "":
significance = 2
else:
@ -754,35 +775,36 @@ class Obs:
my_str = char + my_str
return my_str
def __hash__(self):
def __hash__(self) -> int:
hash_tuple = (np.array([self.value]).astype(np.float32).data.tobytes(),)
hash_tuple += tuple([o.astype(np.float32).data.tobytes() for o in self.deltas.values()])
hash_tuple += tuple([np.array([o.errsq()]).astype(np.float32).data.tobytes() for o in self.covobs.values()])
hash_tuple += tuple([o.encode() for o in self.names])
m = hashlib.md5()
[m.update(o) for o in hash_tuple]
for o in hash_tuple:
m.update(o)
return int(m.hexdigest(), 16) & 0xFFFFFFFF
# Overload comparisons
def __lt__(self, other):
def __lt__(self, other: Union[Obs, float, float64, int]) -> bool:
return self.value < other
def __le__(self, other):
def __le__(self, other: Union[Obs, float, float64, int]) -> bool:
return self.value <= other
def __gt__(self, other):
def __gt__(self, other: Union[Obs, float, float64, int]) -> bool:
return self.value > other
def __ge__(self, other):
def __ge__(self, other: Union[Obs, float, float64, int]) -> bool:
return self.value >= other
def __eq__(self, other):
def __eq__(self, other: Optional[Union[Obs, float, float64, int]]) -> bool:
if other is None:
return False
return (self - other).is_zero()
# Overload math operations
def __add__(self, y):
def __add__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] + x[1], [self, y], man_grad=[1, 1])
else:
@ -795,10 +817,10 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] + y, [self], man_grad=[1])
def __radd__(self, y):
def __radd__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return self + y
def __mul__(self, y):
def __mul__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] * x[1], [self, y], man_grad=[y.value, self.value])
else:
@ -811,10 +833,10 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] * y, [self], man_grad=[y])
def __rmul__(self, y):
def __rmul__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return self * y
def __sub__(self, y):
def __sub__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] - x[1], [self, y], man_grad=[1, -1])
else:
@ -825,16 +847,16 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] - y, [self], man_grad=[1])
def __rsub__(self, y):
def __rsub__(self, y: Any) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return -1 * (self - y)
def __pos__(self):
def __pos__(self) -> Obs:
return self
def __neg__(self):
def __neg__(self) -> Union[Obs, NotImplementedType, CObs, ndarray]:
return -1 * self
def __truediv__(self, y):
def __truediv__(self, y: Any) -> Union[Obs, NotImplementedType, ndarray]:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] / x[1], [self, y], man_grad=[1 / y.value, - self.value / y.value ** 2])
else:
@ -845,7 +867,7 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: x[0] / y, [self], man_grad=[1 / y])
def __rtruediv__(self, y):
def __rtruediv__(self, y: Union[float, int]) -> Obs:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] / x[1], [y, self], man_grad=[1 / self.value, - y.value / self.value ** 2])
else:
@ -856,62 +878,62 @@ class Obs:
else:
return derived_observable(lambda x, **kwargs: y / x[0], [self], man_grad=[-y / self.value ** 2])
def __pow__(self, y):
def __pow__(self, y: Union[Obs, float, int]) -> Obs:
if isinstance(y, Obs):
return derived_observable(lambda x, **kwargs: x[0] ** x[1], [self, y], man_grad=[y.value * self.value ** (y.value - 1), self.value ** y.value * np.log(self.value)])
else:
return derived_observable(lambda x, **kwargs: x[0] ** y, [self], man_grad=[y * self.value ** (y - 1)])
def __rpow__(self, y):
def __rpow__(self, y: Union[float, int]) -> Obs:
return derived_observable(lambda x, **kwargs: y ** x[0], [self], man_grad=[y ** self.value * np.log(y)])
def __abs__(self):
def __abs__(self) -> Obs:
return derived_observable(lambda x: anp.abs(x[0]), [self])
# Overload numpy functions
def sqrt(self):
def sqrt(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.sqrt(x[0]), [self], man_grad=[1 / 2 / np.sqrt(self.value)])
def log(self):
def log(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.log(x[0]), [self], man_grad=[1 / self.value])
def exp(self):
def exp(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.exp(x[0]), [self], man_grad=[np.exp(self.value)])
def sin(self):
def sin(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.sin(x[0]), [self], man_grad=[np.cos(self.value)])
def cos(self):
def cos(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.cos(x[0]), [self], man_grad=[-np.sin(self.value)])
def tan(self):
def tan(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.tan(x[0]), [self], man_grad=[1 / np.cos(self.value) ** 2])
def arcsin(self):
def arcsin(self) -> Obs:
return derived_observable(lambda x: anp.arcsin(x[0]), [self])
def arccos(self):
def arccos(self) -> Obs:
return derived_observable(lambda x: anp.arccos(x[0]), [self])
def arctan(self):
def arctan(self) -> Obs:
return derived_observable(lambda x: anp.arctan(x[0]), [self])
def sinh(self):
def sinh(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.sinh(x[0]), [self], man_grad=[np.cosh(self.value)])
def cosh(self):
def cosh(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.cosh(x[0]), [self], man_grad=[np.sinh(self.value)])
def tanh(self):
def tanh(self) -> Obs:
return derived_observable(lambda x, **kwargs: np.tanh(x[0]), [self], man_grad=[1 / np.cosh(self.value) ** 2])
def arcsinh(self):
def arcsinh(self) -> Obs:
return derived_observable(lambda x: anp.arcsinh(x[0]), [self])
def arccosh(self):
def arccosh(self) -> Obs:
return derived_observable(lambda x: anp.arccosh(x[0]), [self])
def arctanh(self):
def arctanh(self) -> Obs:
return derived_observable(lambda x: anp.arctanh(x[0]), [self])
@ -919,17 +941,17 @@ class CObs:
"""Class for a complex valued observable."""
__slots__ = ['_real', '_imag', 'tag']
def __init__(self, real, imag=0.0):
def __init__(self, real: Obs, imag: Union[Obs, float, int]=0.0):
self._real = real
self._imag = imag
self.tag = None
@property
def real(self):
def real(self) -> Obs:
return self._real
@property
def imag(self):
def imag(self) -> Union[Obs, float, int]:
return self._imag
def gamma_method(self, **kwargs):
@ -939,14 +961,16 @@ class CObs:
if isinstance(self.imag, Obs):
self.imag.gamma_method(**kwargs)
def is_zero(self):
gm = gamma_method
def is_zero(self) -> bool:
"""Checks whether both real and imaginary part are zero within machine precision."""
return self.real == 0.0 and self.imag == 0.0
def conjugate(self):
def conjugate(self) -> CObs:
return CObs(self.real, -self.imag)
def __add__(self, other):
def __add__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return other + self
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -955,10 +979,10 @@ class CObs:
else:
return CObs(self.real + other, self.imag)
def __radd__(self, y):
def __radd__(self, y: Union[complex, float, Obs, int]) -> "CObs":
return self + y
def __sub__(self, other):
def __sub__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return -1 * (other - self)
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -966,14 +990,14 @@ class CObs:
else:
return CObs(self.real - other, self.imag)
def __rsub__(self, other):
def __rsub__(self, other: Union[complex, float, Obs, int]) -> "CObs":
return -1 * (self - other)
def __mul__(self, other):
def __mul__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return other * self
elif hasattr(other, 'real') and hasattr(other, 'imag'):
if all(isinstance(i, Obs) for i in [self.real, self.imag, other.real, other.imag]):
if isinstance(self.real, Obs) and isinstance(self.imag, Obs) and isinstance(other.real, Obs) and isinstance(other.imag, Obs):
return CObs(derived_observable(lambda x, **kwargs: x[0] * x[1] - x[2] * x[3],
[self.real, other.real, self.imag, other.imag],
man_grad=[other.real.value, self.real.value, -other.imag.value, -self.imag.value]),
@ -988,10 +1012,10 @@ class CObs:
else:
return CObs(self.real * other, self.imag * other)
def __rmul__(self, other):
def __rmul__(self, other: Union[complex, Obs, CObs, float, int]) -> "CObs":
return self * other
def __truediv__(self, other):
def __truediv__(self, other: Any) -> Union[CObs, ndarray]:
if isinstance(other, np.ndarray):
return 1 / (other / self)
elif hasattr(other, 'real') and hasattr(other, 'imag'):
@ -1000,32 +1024,35 @@ class CObs:
else:
return CObs(self.real / other, self.imag / other)
def __rtruediv__(self, other):
def __rtruediv__(self, other: Union[complex, float, Obs, CObs, int]) -> CObs:
r = self.real ** 2 + self.imag ** 2
if hasattr(other, 'real') and hasattr(other, 'imag'):
return CObs((self.real * other.real + self.imag * other.imag) / r, (self.real * other.imag - self.imag * other.real) / r)
else:
return CObs(self.real * other / r, -self.imag * other / r)
def __abs__(self):
def __abs__(self) -> Obs:
return np.sqrt(self.real**2 + self.imag**2)
def __pos__(self):
def __pos__(self) -> "CObs":
return self
def __neg__(self):
def __neg__(self) -> "CObs":
return -1 * self
def __eq__(self, other):
return self.real == other.real and self.imag == other.imag
def __eq__(self, other: object) -> bool:
if hasattr(other, 'real') and hasattr(other, 'imag'):
return self.real == other.real and self.imag == other.imag
else:
return False
def __str__(self):
def __str__(self) -> str:
return '(' + str(self.real) + int(self.imag >= 0.0) * '+' + str(self.imag) + 'j)'
def __repr__(self):
def __repr__(self) -> str:
return 'CObs[' + str(self) + ']'
def __format__(self, format_type):
def __format__(self, format_type: str) -> str:
if format_type == "":
significance = 2
format_type = "2"
@ -1034,7 +1061,7 @@ class CObs:
return f"({self.real:{format_type}}{self.imag:+{significance}}j)"
def gamma_method(x, **kwargs):
def gamma_method(x: Union[Corr, Obs, CObs, ndarray, list[Obs, CObs]], **kwargs) -> ndarray:
"""Vectorized version of the gamma_method applicable to lists or arrays of Obs.
See docstring of pe.Obs.gamma_method for details.
@ -1045,7 +1072,7 @@ def gamma_method(x, **kwargs):
gm = gamma_method
def _format_uncertainty(value, dvalue, significance=2):
def _format_uncertainty(value: Union[float, float64, int], dvalue: Union[float, float64, int], significance: int=2) -> str:
"""Creates a string of a value and its error in paranthesis notation, e.g., 13.02(45)"""
if dvalue == 0.0 or (not np.isfinite(dvalue)):
return str(value)
@ -1062,7 +1089,7 @@ def _format_uncertainty(value, dvalue, significance=2):
return f"{value:.{max(0, int(significance - fexp - 1))}f}({dvalue:2.{max(0, int(significance - fexp - 1))}f})"
def _expand_deltas(deltas, idx, shape, gapsize):
def _expand_deltas(deltas: ndarray, idx: Union[range, list[int], list[int64]], shape: int, gapsize: Union[int64, int]) -> ndarray:
"""Expand deltas defined on idx to a regular range with spacing gapsize between two
configurations and where holes are filled by 0.
If idx is of type range, the deltas are not changed if the idx.step == gapsize.
@ -1088,7 +1115,7 @@ def _expand_deltas(deltas, idx, shape, gapsize):
return ret
def _merge_idx(idl):
def _merge_idx(idl: list[Union[list[Union[int, int]], range, list[int]]]) -> Union[list[Union[int, int]], range, list[int]]:
"""Returns the union of all lists in idl as range or sorted list
Parameters
@ -1111,7 +1138,7 @@ def _merge_idx(idl):
return idunion
def _intersection_idx(idl):
def _intersection_idx(idl: list[Union[range, list[int]]]) -> Union[range, list[int]]:
"""Returns the intersection of all lists in idl as range or sorted list
Parameters
@ -1137,7 +1164,7 @@ def _intersection_idx(idl):
return idinter
def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor):
def _expand_deltas_for_merge(deltas: ndarray, idx: Union[range, list[int]], shape: int, new_idx: Union[range, list[int]], scalefactor: Union[float, int]) -> ndarray:
"""Expand deltas defined on idx to the list of configs that is defined by new_idx.
New, empty entries are filled by 0. If idx and new_idx are of type range, the smallest
common divisor of the step sizes is used as new step size.
@ -1169,7 +1196,7 @@ def _expand_deltas_for_merge(deltas, idx, shape, new_idx, scalefactor):
return np.array([ret[new_idx[i] - new_idx[0]] for i in range(len(new_idx))]) * len(new_idx) / len(idx) * scalefactor
def derived_observable(func, data, array_mode=False, **kwargs):
def derived_observable(func: Callable, data: Union[list[Obs], ndarray], array_mode: bool=False, **kwargs) -> Union[Obs, ndarray]:
"""Construct a derived Obs according to func(data, **kwargs) using automatic differentiation.
Parameters
@ -1207,7 +1234,7 @@ def derived_observable(func, data, array_mode=False, **kwargs):
if isinstance(raveled_data[i], (int, float)):
raveled_data[i] = cov_Obs(raveled_data[i], 0.0, "###dummy_covobs###")
allcov = {}
allcov: dict[str, ndarray] = {}
for o in raveled_data:
for name in o.cov_names:
if name in allcov:
@ -1297,8 +1324,8 @@ def derived_observable(func, data, array_mode=False, **kwargs):
self.grad = np.zeros((N, 1))
new_covobs_lengths = dict(set([y for x in [[(n, o.covobs[n].N) for n in o.cov_names] for o in raveled_data] for y in x]))
d_extracted = {}
g_extracted = {}
d_extracted: dict[str, list] = {}
g_extracted: dict[str, list] = {}
for name in new_sample_names:
d_extracted[name] = []
ens_length = len(new_idl_d[name])
@ -1359,7 +1386,7 @@ def derived_observable(func, data, array_mode=False, **kwargs):
return final_result
def _reduce_deltas(deltas, idx_old, idx_new):
def _reduce_deltas(deltas: Union[list[float], ndarray], idx_old: Union[range, list[int]], idx_new: Union[range, list[int], ndarray]) -> Union[list[float], ndarray]:
"""Extract deltas defined on idx_old on all configs of idx_new.
Assumes, that idx_old and idx_new are correctly defined idl, i.e., they
@ -1388,7 +1415,7 @@ def _reduce_deltas(deltas, idx_old, idx_new):
return np.array(deltas)[indices]
def reweight(weight, obs, **kwargs):
def reweight(weight: Obs, obs: Union[ndarray, list[Obs]], **kwargs) -> list[Obs]:
"""Reweight a list of observables.
Parameters
@ -1414,8 +1441,8 @@ def reweight(weight, obs, **kwargs):
for name in obs[i].names:
if not set(obs[i].idl[name]).issubset(weight.idl[name]):
raise ValueError('obs[%d] has to be defined on a subset of the configs in weight.idl[%s]!' % (i, name))
new_samples = []
w_deltas = {}
new_samples: list = []
w_deltas: dict[str, ndarray] = {}
for name in sorted(obs[i].names):
w_deltas[name] = _reduce_deltas(weight.deltas[name], weight.idl[name], obs[i].idl[name])
new_samples.append((w_deltas[name] + weight.r_values[name]) * (obs[i].deltas[name] + obs[i].r_values[name]))
@ -1432,7 +1459,7 @@ def reweight(weight, obs, **kwargs):
return result
def correlate(obs_a, obs_b):
def correlate(obs_a: Obs, obs_b: Obs) -> Obs:
"""Correlate two observables.
Parameters
@ -1478,7 +1505,7 @@ def correlate(obs_a, obs_b):
return o
def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs):
def covariance(obs: Union[ndarray, list[Obs]], visualize: bool=False, correlation: bool=False, smooth: Optional[int]=None, **kwargs) -> ndarray:
r'''Calculates the error covariance matrix of a set of observables.
WARNING: This function should be used with care, especially for observables with support on multiple
@ -1548,7 +1575,7 @@ def covariance(obs, visualize=False, correlation=False, smooth=None, **kwargs):
return cov
def invert_corr_cov_cholesky(corr, inverrdiag):
def invert_corr_cov_cholesky(corr: ndarray, inverrdiag: ndarray) -> ndarray:
"""Constructs a lower triangular matrix `chol` via the Cholesky decomposition of the correlation matrix `corr`
and then returns the inverse covariance matrix `chol_inv` as a lower triangular matrix by solving `chol * x = inverrdiag`.
@ -1571,7 +1598,7 @@ def invert_corr_cov_cholesky(corr, inverrdiag):
return chol_inv
def sort_corr(corr, kl, yd):
def sort_corr(corr: ndarray, kl: list[str], yd: dict[str, list[Obs]]) -> ndarray:
""" Reorders a correlation matrix to match the alphabetical order of its underlying y data.
The ordering of the input correlation matrix `corr` is given by the list of keys `kl`.
@ -1634,7 +1661,7 @@ def sort_corr(corr, kl, yd):
return corr_sorted
def _smooth_eigenvalues(corr, E):
def _smooth_eigenvalues(corr: ndarray, E: int) -> ndarray:
"""Eigenvalue smoothing as described in hep-lat/9412087
corr : np.ndarray
@ -1651,7 +1678,7 @@ def _smooth_eigenvalues(corr, E):
return vec @ np.diag(vals) @ vec.T
def _covariance_element(obs1, obs2):
def _covariance_element(obs1: Obs, obs2: Obs) -> Union[float, float64]:
"""Estimates the covariance of two Obs objects, neglecting autocorrelations."""
def calc_gamma(deltas1, deltas2, idx1, idx2, new_idx):
@ -1711,7 +1738,7 @@ def _covariance_element(obs1, obs2):
return dvalue
def import_jackknife(jacks, name, idl=None):
def import_jackknife(jacks: ndarray, name: str, idl: Optional[list[Union[list[int], range]]]=None) -> Obs:
"""Imports jackknife samples and returns an Obs
Parameters
@ -1731,7 +1758,7 @@ def import_jackknife(jacks, name, idl=None):
return new_obs
def import_bootstrap(boots, name, random_numbers):
def import_bootstrap(boots: ndarray, name: str, random_numbers: ndarray) -> Obs:
"""Imports bootstrap samples and returns an Obs
Parameters
@ -1761,7 +1788,7 @@ def import_bootstrap(boots, name, random_numbers):
return ret
def merge_obs(list_of_obs):
def merge_obs(list_of_obs: list[Obs]) -> Obs:
"""Combine all observables in list_of_obs into one new observable.
This allows to merge Obs that have been computed on multiple replica
of the same ensemble.
@ -1783,11 +1810,11 @@ def merge_obs(list_of_obs):
if any([len(o.cov_names) for o in list_of_obs]):
raise ValueError('Not possible to merge data that contains covobs!')
new_dict = {}
idl_dict = {}
idl_dict: dict[str, Union[range, list[int]]] = {}
for o in list_of_obs:
new_dict.update({key: o.deltas.get(key, 0) + o.r_values.get(key, 0)
for key in set(o.deltas) | set(o.r_values)})
idl_dict.update({key: o.idl.get(key, 0) for key in set(o.deltas)})
idl_dict.update({key: o.idl.get(key) for key in set(o.deltas)})
names = sorted(new_dict.keys())
o = Obs([new_dict[name] for name in names], names, idl=[idl_dict[name] for name in names])
@ -1795,7 +1822,7 @@ def merge_obs(list_of_obs):
return o
def cov_Obs(means, cov, name, grad=None):
def cov_Obs(means: Union[int, list[float], float, list[int]], cov: Any, name: str, grad: None=None) -> Union[Obs, list[Obs]]:
"""Create an Obs based on mean(s) and a covariance matrix
Parameters
@ -1838,13 +1865,14 @@ def cov_Obs(means, cov, name, grad=None):
return ol
def _determine_gap(o, e_content, e_name):
def _determine_gap(o: Obs, e_content: dict[str, list[str]], e_name: str) -> Union[int64, int]:
gaps = []
for r_name in e_content[e_name]:
if isinstance(o.idl[r_name], range):
gaps.append(o.idl[r_name].step)
my_idl = o.idl[r_name]
if isinstance(my_idl, range):
gaps.append(my_idl.step)
else:
gaps.append(np.min(np.diff(o.idl[r_name])))
gaps.append(np.min(np.diff(my_idl)))
gap = min(gaps)
if not np.all([gi % gap == 0 for gi in gaps]):
@ -1853,7 +1881,7 @@ def _determine_gap(o, e_content, e_name):
return gap
def _check_lists_equal(idl):
def _check_lists_equal(idl: Sequence[Union[list[int], list[Union[int64, int]], range, ndarray]]):
'''
Use groupby to efficiently check whether all elements of idl are identical.
Returns True if all elements are equal, otherwise False.

View file

@ -1,10 +1,12 @@
from __future__ import annotations
import numpy as np
import scipy.optimize
from autograd import jacobian
from .obs import derived_observable
from .obs import Obs, derived_observable
from typing import Callable, Union
def find_root(d, func, guess=1.0, **kwargs):
def find_root(d: Union[Obs, list[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs:
r'''Finds the root of the function func(x, d) where d is an `Obs`.
Parameters

View file

@ -1,3 +1,4 @@
from __future__ import annotations
import scipy
import numpy as np
from autograd.extend import primitive, defvjp

View file

@ -4,3 +4,7 @@ build-backend = "setuptools.build_meta"
[tool.ruff.lint]
ignore = ["F403"]
[tool.mypy]
warn_unused_configs = true
ignore_missing_imports = true

View file

@ -181,6 +181,8 @@ def test_fit_correlator():
with pytest.raises(ValueError):
my_corr.fit(f, [0, 2, 3])
fit_res = my_corr.fit(f, fitrange=[0, 1])
def test_plateau():
my_corr = pe.correlators.Corr([pe.pseudo_Obs(1.01324, 0.05, 't'), pe.pseudo_Obs(1.042345, 0.008, 't')])
@ -226,7 +228,7 @@ def test_utility():
corr.print()
corr.print([2, 4])
corr.show()
corr.show(comp=corr)
corr.show(comp=corr, x_range=[2, 5.], y_range=[2, 3.], hide_sigma=0.5, references=[.1, .2, .6], title='TEST')
corr.dump('test_dump', datatype="pickle", path='.')
corr.dump('test_dump', datatype="pickle")

View file

@ -410,6 +410,7 @@ def test_cobs():
obs2 = pe.pseudo_Obs(-0.2, 0.03, 't')
my_cobs = pe.CObs(obs1, obs2)
my_cobs.gm()
assert +my_cobs == my_cobs
assert -my_cobs == 0 - my_cobs
my_cobs == my_cobs