[Fix] Simplify type hints

This commit is contained in:
Fabian Joswig 2025-01-03 22:43:19 +01:00
parent d45b43e6de
commit 1c6053ef61
12 changed files with 84 additions and 85 deletions

View file

@ -10,8 +10,8 @@ from .misc import dump_object, _assert_equal_properties
from .fits import least_squares, Fit_result
from .roots import find_root
from . import linalg
from numpy import float64, int64, ndarray, ufunc
from typing import Any, Callable, List, Optional, Tuple, Union
from numpy import ndarray, ufunc
from typing import Any, Callable, Optional, Union
class Corr:
@ -45,7 +45,7 @@ class Corr:
__slots__ = ["content", "N", "T", "tag", "prange"]
def __init__(self, data_input: Any, padding: List[int]=[0, 0], prange: Optional[List[int]]=None):
def __init__(self, data_input: Any, padding: list[int]=[0, 0], prange: Optional[list[int]]=None):
""" Initialize a Corr object.
Parameters
@ -122,7 +122,7 @@ class Corr:
self.T = len(self.content)
self.prange = prange
def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, List[ndarray]]:
def __getitem__(self, idx: Union[slice, int]) -> Union[CObs, Obs, ndarray, list[ndarray]]:
"""Return the content of timeslice idx"""
idx_content = self.content[idx]
if idx_content is None:
@ -155,7 +155,7 @@ class Corr:
gm = gamma_method
def projected(self, vector_l: Optional[Union[ndarray, List[Optional[ndarray]]]]=None, vector_r: Optional[Union[ndarray, List[Optional[ndarray]]]]=None, normalize: bool=False) -> "Corr":
def projected(self, vector_l: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, vector_r: Optional[Union[ndarray, list[Optional[ndarray]]]]=None, normalize: bool=False) -> "Corr":
"""We need to project the Correlator with a Vector to get a single value at each timeslice.
The method can use one or two vectors.
@ -209,7 +209,7 @@ class Corr:
newcontent = [None if (item is None) else item[i, j] for item in self.content]
return Corr(newcontent)
def plottable(self) -> Union[Tuple[List[int], List[float64], List[float64]], Tuple[List[int], List[float], List[float64]]]:
def plottable(self) -> tuple[list[int], list[float]]:
"""Outputs the correlator in a plotable format.
Outputs three lists containing the timeslice index, the value on each
@ -303,7 +303,7 @@ class Corr:
transposed = [None if _check_for_none(self, G) else G.T for G in self.content]
return 0.5 * (Corr(transposed) + self)
def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[List[List[Optional[ndarray]]], ndarray, List[Optional[ndarray]]]:
def GEVP(self, t0: int, ts: Optional[int]=None, sort: Optional[str]="Eigenvalue", vector_obs: bool=False, **kwargs) -> Union[list[list[Optional[ndarray]]], ndarray, list[Optional[ndarray]]]:
r'''Solve the generalized eigenvalue problem on the correlator matrix and returns the corresponding eigenvectors.
The eigenvectors are sorted according to the descending eigenvalues, the zeroth eigenvector(s) correspond to the
@ -786,7 +786,7 @@ class Corr:
else:
raise ValueError('Unknown variant.')
def fit(self, function: Callable, fitrange: Optional[Union[str, List[int]]]=None, silent: bool=False, **kwargs) -> Fit_result:
def fit(self, function: Callable, fitrange: Optional[Union[str, list[int]]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Fits function to the data
Parameters
@ -820,7 +820,7 @@ class Corr:
result = least_squares(xs, ys, function, silent=silent, **kwargs)
return result
def plateau(self, plateau_range: Optional[List[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs:
def plateau(self, plateau_range: Optional[list[int]]=None, method: str="fit", auto_gamma: bool=False) -> Obs:
""" Extract a plateau value from a Corr object
Parameters
@ -857,7 +857,7 @@ class Corr:
else:
raise ValueError("Unsupported plateau method: " + method)
def set_prange(self, prange: List[int]):
def set_prange(self, prange: list[int]):
"""Sets the attribute prange of the Corr object."""
if not len(prange) == 2:
raise ValueError("prange must be a list or array with two values")
@ -869,7 +869,7 @@ class Corr:
self.prange = prange
return
def show(self, x_range: Optional[List[int64]]=None, comp: Optional[Corr]=None, y_range: None=None, logscale: bool=False, plateau: None=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: None=None, save: None=None, auto_gamma: bool=False, hide_sigma: None=None, references: None=None, title: None=None):
def show(self, x_range: Optional[list[int]]=None, comp: Optional[Corr]=None, y_range: None=None, logscale: bool=False, plateau: None=None, fit_res: Optional[Fit_result]=None, fit_key: Optional[str]=None, ylabel: None=None, save: None=None, auto_gamma: bool=False, hide_sigma: None=None, references: None=None, title: None=None):
"""Plots the correlator using the tag of the correlator as label if available.
Parameters
@ -1047,10 +1047,10 @@ class Corr:
else:
raise ValueError("Unknown datatype " + str(datatype))
def print(self, print_range: Optional[List[int]]=None):
def print(self, print_range: Optional[list[int]]=None):
print(self.__repr__(print_range))
def __repr__(self, print_range: Optional[List[int]]=None) -> str:
def __repr__(self, print_range: Optional[list[int]]=None) -> str:
if print_range is None:
print_range = [0, None]
@ -1415,7 +1415,7 @@ class Corr:
return Corr(newcontent)
def _sort_vectors(vec_set_in: List[Optional[ndarray]], ts: int) -> List[Optional[Union[ndarray, List[ndarray]]]]:
def _sort_vectors(vec_set_in: list[Optional[ndarray]], ts: int) -> list[Optional[Union[ndarray, list[ndarray]]]]:
"""Helper function used to find a set of Eigenvectors consistent over all timeslices"""
if isinstance(vec_set_in[ts][0][0], Obs):

View file

@ -1,12 +1,12 @@
from __future__ import annotations
import numpy as np
from numpy import float64, ndarray
from typing import Any, List, Optional, Union
from numpy import ndarray
from typing import Any, Optional, Union
class Covobs:
def __init__(self, mean: Optional[Union[float, float64, int]], cov: Any, name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, List[float]]]=None):
def __init__(self, mean: Optional[Union[float, int]], cov: Any, name: str, pos: Optional[int]=None, grad: Optional[Union[ndarray, list[float]]]=None):
""" Initialize Covobs object.
Parameters
@ -82,7 +82,7 @@ class Covobs:
if ev < 0:
raise Exception('Covariance matrix is not positive-semidefinite!')
def _set_grad(self, grad: Union[List[float], ndarray]):
def _set_grad(self, grad: Union[list[float], ndarray]):
""" Set the gradient of the covobs
Parameters

View file

@ -17,7 +17,7 @@ from numdifftools import Jacobian as num_jacobian
from numdifftools import Hessian as num_hessian
from .obs import Obs, derived_observable, covariance, cov_Obs, invert_corr_cov_cholesky
from numpy import ndarray
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Optional, Union
class Fit_result(Sequence):
@ -73,7 +73,7 @@ class Fit_result(Sequence):
return '\n'.join([key.rjust(m) + ': ' + repr(value) for key, value in sorted(self.__dict__.items())])
def least_squares(x: Any, y: Union[Dict[str, ndarray], List[Obs], ndarray, Dict[str, List[Obs]]], func: Union[Callable, Dict[str, Callable]], priors: Optional[Union[Dict[int, str], List[str], List[Obs], Dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result:
def least_squares(x: Any, y: Union[dict[str, ndarray], list[Obs], ndarray, dict[str, list[Obs]]], func: Union[Callable, dict[str, Callable]], priors: Optional[Union[dict[int, str], list[str], list[Obs], dict[int, Obs]]]=None, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x).
```
@ -506,7 +506,7 @@ def least_squares(x: Any, y: Union[Dict[str, ndarray], List[Obs], ndarray, Dict[
return output
def total_least_squares(x: List[Obs], y: List[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result:
def total_least_squares(x: list[Obs], y: list[Obs], func: Callable, silent: bool=False, **kwargs) -> Fit_result:
r'''Performs a non-linear fit to y = func(x) and returns a list of Obs corresponding to the fit parameters.
Parameters
@ -710,7 +710,7 @@ def total_least_squares(x: List[Obs], y: List[Obs], func: Callable, silent: bool
return output
def fit_lin(x: List[Union[Obs, int, float]], y: List[Obs], **kwargs) -> List[Obs]:
def fit_lin(x: list[Union[Obs, int, float]], y: list[Obs], **kwargs) -> list[Obs]:
"""Performs a linear fit to y = n + m * x and returns two Obs n, m.
Parameters
@ -741,7 +741,7 @@ def fit_lin(x: List[Union[Obs, int, float]], y: List[Obs], **kwargs) -> List[Obs
raise TypeError('Unsupported types for x')
def qqplot(x: ndarray, o_y: List[Obs], func: Callable, p: List[Obs], title: str=""):
def qqplot(x: ndarray, o_y: list[Obs], func: Callable, p: list[Obs], title: str=""):
"""Generates a quantile-quantile plot of the fit result which can be used to
check if the residuals of the fit are gaussian distributed.
@ -771,7 +771,7 @@ def qqplot(x: ndarray, o_y: List[Obs], func: Callable, p: List[Obs], title: str=
plt.draw()
def residual_plot(x: ndarray, y: List[Obs], func: Callable, fit_res: List[Obs], title: str=""):
def residual_plot(x: ndarray, y: list[Obs], func: Callable, fit_res: list[Obs], title: str=""):
"""Generates a plot which compares the fit to the data and displays the corresponding residuals
For uncorrelated data the residuals are expected to be distributed ~N(0,1).
@ -808,7 +808,7 @@ def residual_plot(x: ndarray, y: List[Obs], func: Callable, fit_res: List[Obs],
plt.draw()
def error_band(x: List[int], func: Callable, beta: List[Obs]) -> ndarray:
def error_band(x: list[int], func: Callable, beta: list[Obs]) -> ndarray:
"""Calculate the error band for an array of sample values x, for given fit function func with optimized parameters beta.
Returns
@ -832,7 +832,7 @@ def error_band(x: List[int], func: Callable, beta: List[Obs]) -> ndarray:
return err
def ks_test(objects: Optional[List[Fit_result]]=None):
def ks_test(objects: Optional[list[Fit_result]]=None):
"""Performs a KolmogorovSmirnov test for the p-values of all fit object.
Parameters
@ -876,7 +876,7 @@ def ks_test(objects: Optional[List[Fit_result]]=None):
print(scipy.stats.kstest(p_values, 'uniform'))
def _extract_val_and_dval(string: str) -> Tuple[float, float]:
def _extract_val_and_dval(string: str) -> tuple[float, float]:
split_string = string.split('(')
if '.' in split_string[0] and '.' not in split_string[1][:-1]:
factor = 10 ** -len(split_string[0].partition('.')[2])

View file

@ -14,11 +14,11 @@ from ..covobs import Covobs
from .. import version as pyerrorsversion
from lxml.etree import _Element
from numpy import ndarray
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Optional, Union
# Based on https://stackoverflow.com/a/10076823
def _etree_to_dict(t: _Element) -> Dict[str, Union[str, Dict[str, str], Dict[str, Union[str, Dict[str, str]]]]]:
def _etree_to_dict(t: _Element) -> dict[str, Union[str, dict[str, str], dict[str, Union[str, dict[str, str]]]]]:
""" Convert the content of an XML file to a python dict"""
d = {t.tag: {} if t.attrib else None}
children = list(t)
@ -42,7 +42,7 @@ def _etree_to_dict(t: _Element) -> Dict[str, Union[str, Dict[str, str], Dict[str
return d
def _dict_to_xmlstring(d: Dict[str, Any]) -> str:
def _dict_to_xmlstring(d: dict[str, Any]) -> str:
if isinstance(d, dict):
iters = ''
for k in d:
@ -70,7 +70,7 @@ def _dict_to_xmlstring(d: Dict[str, Any]) -> str:
return iters
def _dict_to_xmlstring_spaces(d: Dict[str, Dict[str, Dict[str, Union[str, Dict[str, str], List[Dict[str, str]]]]]], space: str=' ') -> str:
def _dict_to_xmlstring_spaces(d: dict[str, dict[str, dict[str, Union[str, dict[str, str], list[dict[str, str]]]]]], space: str=' ') -> str:
s = _dict_to_xmlstring(d)
o = ''
c = 0
@ -89,7 +89,7 @@ def _dict_to_xmlstring_spaces(d: Dict[str, Dict[str, Dict[str, Union[str, Dict[s
return o
def create_pobs_string(obsl: List[Obs], name: str, spec: str='', origin: str='', symbol: Optional[List[Union[str, Any]]]=None, enstag: None=None) -> str:
def create_pobs_string(obsl: list[Obs], name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: None=None) -> str:
"""Export a list of Obs or structures containing Obs to an xml string
according to the Zeuthen pobs format.
@ -182,7 +182,7 @@ def create_pobs_string(obsl: List[Obs], name: str, spec: str='', origin: str='',
return rs
def write_pobs(obsl: List[Obs], fname: str, name: str, spec: str='', origin: str='', symbol: Optional[List[Union[str, Any]]]=None, enstag: None=None, gz: bool=True):
def write_pobs(obsl: list[Obs], fname: str, name: str, spec: str='', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, enstag: None=None, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .xml.gz file
according to the Zeuthen pobs format.
@ -231,7 +231,7 @@ def write_pobs(obsl: List[Obs], fname: str, name: str, spec: str='', origin: str
fp.close()
def _import_data(string: str) -> List[Union[int, float]]:
def _import_data(string: str) -> list[Union[int, float]]:
return json.loads("[" + ",".join(string.replace(' +', ' ').split()) + "]")
@ -254,7 +254,7 @@ def _find_tag(dat: _Element, tag: str) -> int:
raise _NoTagInDataError(tag)
def _import_array(arr: _Element) -> Union[List[Union[str, List[int], List[ndarray]]], ndarray]:
def _import_array(arr: _Element) -> Union[list[Union[str, list[int], list[ndarray]]], ndarray]:
name = arr[_find_tag(arr, 'id')].text.strip()
index = _find_tag(arr, 'layout')
try:
@ -292,12 +292,12 @@ def _import_array(arr: _Element) -> Union[List[Union[str, List[int], List[ndarra
_check(False)
def _import_rdata(rd: _Element) -> Tuple[List[ndarray], str, List[int]]:
def _import_rdata(rd: _Element) -> tuple[list[ndarray], str, list[int]]:
name, idx, mask, deltas = _import_array(rd)
return deltas, name, idx
def _import_cdata(cd: _Element) -> Tuple[str, ndarray, ndarray]:
def _import_cdata(cd: _Element) -> tuple[str, ndarray, ndarray]:
_check(cd[0].tag == "id")
_check(cd[1][0].text.strip() == "cov")
cov = _import_array(cd[1])
@ -305,7 +305,7 @@ def _import_cdata(cd: _Element) -> Tuple[str, ndarray, ndarray]:
return cd[0].text.strip(), cov, grad
def read_pobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: None=None) -> Union[Dict[str, Union[str, Dict[str, str], List[Obs]]], List[Obs]]:
def read_pobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: None=None) -> Union[dict[str, Union[str, dict[str, str], list[Obs]]], list[Obs]]:
"""Import a list of Obs from an xml.gz file in the Zeuthen pobs format.
Tags are not written or recovered automatically.
@ -405,7 +405,7 @@ def read_pobs(fname: str, full_output: bool=False, gz: bool=True, separator_inse
# this is based on Mattia Bruno's implementation at https://github.com/mbruno46/pyobs/blob/master/pyobs/IO/xml.py
def import_dobs_string(content: bytes, full_output: bool=False, separator_insertion: bool=True) -> Union[Dict[str, Union[str, Dict[str, str], List[Obs]]], List[Obs]]:
def import_dobs_string(content: bytes, full_output: bool=False, separator_insertion: bool=True) -> Union[dict[str, Union[str, dict[str, str], list[Obs]]], list[Obs]]:
"""Import a list of Obs from a string in the Zeuthen dobs format.
Tags are not written or recovered automatically.
@ -579,7 +579,7 @@ def import_dobs_string(content: bytes, full_output: bool=False, separator_insert
return res
def read_dobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: bool=True) -> Union[Dict[str, Union[str, Dict[str, str], List[Obs]]], List[Obs]]:
def read_dobs(fname: str, full_output: bool=False, gz: bool=True, separator_insertion: bool=True) -> Union[dict[str, Union[str, dict[str, str], list[Obs]]], list[Obs]]:
"""Import a list of Obs from an xml.gz file in the Zeuthen dobs format.
Tags are not written or recovered automatically.
@ -626,7 +626,7 @@ def read_dobs(fname: str, full_output: bool=False, gz: bool=True, separator_inse
return import_dobs_string(content, full_output, separator_insertion=separator_insertion)
def _dobsdict_to_xmlstring(d: Dict[str, Any]) -> str:
def _dobsdict_to_xmlstring(d: dict[str, Any]) -> str:
if isinstance(d, dict):
iters = ''
for k in d:
@ -666,7 +666,7 @@ def _dobsdict_to_xmlstring(d: Dict[str, Any]) -> str:
return iters
def _dobsdict_to_xmlstring_spaces(d: Dict[str, Union[Dict[str, Union[Dict[str, str], Dict[str, Union[str, Dict[str, str]]], Dict[str, Union[str, Dict[str, Union[str, List[str]]], List[Dict[str, Union[str, int, List[Dict[str, str]]]]]]]]], Dict[str, Union[Dict[str, str], Dict[str, Union[str, Dict[str, str]]], Dict[str, Union[str, Dict[str, Union[str, List[str]]], List[Dict[str, Union[str, int, List[Dict[str, str]]]]], List[Dict[str, Union[str, List[Dict[str, str]]]]]]]]]]], space: str=' ') -> str:
def _dobsdict_to_xmlstring_spaces(d: dict[str, Union[dict[str, Union[dict[str, str], dict[str, Union[str, dict[str, str]]], dict[str, Union[str, dict[str, Union[str, list[str]]], list[dict[str, Union[str, int, list[dict[str, str]]]]]]]]], dict[str, Union[dict[str, str], dict[str, Union[str, dict[str, str]]], dict[str, Union[str, dict[str, Union[str, list[str]]], list[dict[str, Union[str, int, list[dict[str, str]]]]], list[dict[str, Union[str, list[dict[str, str]]]]]]]]]]], space: str=' ') -> str:
s = _dobsdict_to_xmlstring(d)
o = ''
c = 0
@ -685,7 +685,7 @@ def _dobsdict_to_xmlstring_spaces(d: Dict[str, Union[Dict[str, Union[Dict[str, s
return o
def create_dobs_string(obsl: List[Obs], name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[List[Union[str, Any]]]=None, who: None=None, enstags: Optional[Dict[Any, Any]]=None) -> str:
def create_dobs_string(obsl: list[Obs], name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: None=None, enstags: Optional[dict[Any, Any]]=None) -> str:
"""Generate the string for the export of a list of Obs or structures containing Obs
to a .xml.gz file according to the Zeuthen dobs format.
@ -876,7 +876,7 @@ def create_dobs_string(obsl: List[Obs], name: str, spec: str='dobs v1.0', origin
return rs
def write_dobs(obsl: List[Obs], fname: str, name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[List[Union[str, Any]]]=None, who: None=None, enstags: None=None, gz: bool=True):
def write_dobs(obsl: list[Obs], fname: str, name: str, spec: str='dobs v1.0', origin: str='', symbol: Optional[list[Union[str, Any]]]=None, who: None=None, enstags: None=None, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .xml.gz file
according to the Zeuthen dobs format.

View file

@ -13,11 +13,11 @@ from ..covobs import Covobs
from ..correlators import Corr
from ..misc import _assert_equal_properties
from .. import version as pyerrorsversion
from numpy import float32, float64, int64, ndarray
from typing import Any, Dict, List, Optional, Tuple, Union
from numpy import ndarray
from typing import Any, Optional, Union
def create_json_string(ol: Any, description: Union[str, Dict[str, Union[str, Dict[str, Union[Dict[str, Union[str, Dict[str, Union[int, str]]]], Dict[str, Optional[Union[str, List[str], float]]]]]]], Dict[str, Union[str, Dict[Optional[Union[int, bool]], str], float32]], Dict[str, Dict[str, Dict[str, str]]], Dict[str, Union[str, Dict[Optional[Union[int, bool]], str], Dict[int64, float64]]]]='', indent: int=1) -> str:
def create_json_string(ol: Any, description: Union[str, dict[str, Union[str, dict[str, Union[dict[str, Union[str, dict[str, Union[int, str]]]], dict[str, Optional[Union[str, list[str], float]]]]]]], dict[str, Union[str, dict[Optional[Union[int, bool]], str], float]], dict[str, dict[str, dict[str, str]]], dict[str, Union[str, dict[Optional[Union[int, bool]], str], dict[int, float]]]]='', indent: int=1) -> str:
"""Generate the string for the export of a list of Obs or structures containing Obs
to a .json(.gz) file
@ -219,7 +219,7 @@ def create_json_string(ol: Any, description: Union[str, Dict[str, Union[str, Dic
return json.dumps(d, indent=indent, ensure_ascii=False, default=_jsonifier, write_mode=json.WM_COMPACT)
def dump_to_json(ol: Union[Corr, List[Union[Obs, List[Obs], Corr, ndarray]], ndarray, List[Union[Obs, List[Obs], ndarray]], List[Obs]], fname: str, description: Union[str, Dict[str, Union[str, Dict[str, Union[Dict[str, Union[str, Dict[str, Union[int, str]]]], Dict[str, Optional[Union[str, List[str], float]]]]]]], Dict[str, Union[str, Dict[Optional[Union[int, bool]], str], float32]], Dict[str, Dict[str, Dict[str, str]]], Dict[str, Union[str, Dict[Optional[Union[int, bool]], str], Dict[int64, float64]]]]='', indent: int=1, gz: bool=True):
def dump_to_json(ol: Union[Corr, list[Union[Obs, list[Obs], Corr, ndarray]], ndarray, list[Union[Obs, list[Obs], ndarray]], list[Obs]], fname: str, description: Union[str, dict[str, Union[str, dict[str, Union[dict[str, Union[str, dict[str, Union[int, str]]]], dict[str, Optional[Union[str, list[str], float]]]]]]], dict[str, Union[str, dict[Optional[Union[int, bool]], str], float]], dict[str, dict[str, dict[str, str]]], dict[str, Union[str, dict[Optional[Union[int, bool]], str], dict[int, float]]]]='', indent: int=1, gz: bool=True):
"""Export a list of Obs or structures containing Obs to a .json(.gz) file.
Dict keys that are not JSON-serializable such as floats are converted to strings.
@ -261,7 +261,7 @@ def dump_to_json(ol: Union[Corr, List[Union[Obs, List[Obs], Corr, ndarray]], nda
fp.close()
def _parse_json_dict(json_dict: Dict[str, Any], verbose: bool=True, full_output: bool=False) -> Any:
def _parse_json_dict(json_dict: dict[str, Any], verbose: bool=True, full_output: bool=False) -> Any:
"""Reconstruct a list of Obs or structures containing Obs from a dict that
was built out of a json string.
@ -548,7 +548,7 @@ def load_json(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=F
return _parse_json_dict(d, verbose, full_output)
def _ol_from_dict(ind: Union[Dict[Optional[Union[int, bool]], str], Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]], str]], Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]], List[str]]], Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]]]]], reps: str='DICTOBS') -> Union[Tuple[List[Any], Dict[Optional[Union[int, bool]], str]], Tuple[List[Union[Obs, List[Obs], Corr, ndarray]], Dict[str, Union[Dict[str, Union[str, Dict[str, Union[int, str]]]], Dict[str, Optional[Union[str, List[str], float]]]]]]]:
def _ol_from_dict(ind: Union[dict[Optional[Union[int, bool]], str], dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]], str]], dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]], list[str]]], dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]]]]], reps: str='DICTOBS') -> Union[tuple[list[Any], dict[Optional[Union[int, bool]], str]], tuple[list[Union[Obs, list[Obs], Corr, ndarray]], dict[str, Union[dict[str, Union[str, dict[str, Union[int, str]]]], dict[str, Optional[Union[str, list[str], float]]]]]]]:
"""Convert a dictionary of Obs objects to a list and a dictionary that contains
placeholders instead of the Obs objects.
@ -628,7 +628,7 @@ def _ol_from_dict(ind: Union[Dict[Optional[Union[int, bool]], str], Dict[str, Un
return ol, nd
def dump_dict_to_json(od: Union[Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]], str]], Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]], List[str]]], List[Union[Obs, List[Obs], Corr, ndarray]], Dict[Optional[Union[int, bool]], str], Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]]]]], fname: str, description: Union[str, float32, Dict[int64, float64]]='', indent: int=1, reps: str='DICTOBS', gz: bool=True):
def dump_dict_to_json(od: Union[dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]], str]], dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]], list[str]]], list[Union[Obs, list[Obs], Corr, ndarray]], dict[Optional[Union[int, bool]], str], dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]]]]], fname: str, description: Union[str, float, dict[int, float]]='', indent: int=1, reps: str='DICTOBS', gz: bool=True):
"""Export a dict of Obs or structures containing Obs to a .json(.gz) file
Parameters
@ -668,7 +668,7 @@ def dump_dict_to_json(od: Union[Dict[str, Union[Dict[str, Union[Obs, List[Obs],
dump_to_json(ol, fname, description=desc_dict, indent=indent, gz=gz)
def _od_from_list_and_dict(ol: List[Union[Obs, List[Obs], Corr, ndarray]], ind: Dict[str, Dict[str, Optional[Union[str, Dict[str, Union[int, str]], List[str], float]]]], reps: str='DICTOBS') -> Dict[str, Dict[str, Any]]:
def _od_from_list_and_dict(ol: list[Union[Obs, list[Obs], Corr, ndarray]], ind: dict[str, dict[str, Optional[Union[str, dict[str, Union[int, str]], list[str], float]]]], reps: str='DICTOBS') -> dict[str, dict[str, Any]]:
"""Parse a list of Obs or structures containing Obs and an accompanying
dict, where the structures have been replaced by placeholders to a
dict that contains the structures.
@ -731,7 +731,7 @@ def _od_from_list_and_dict(ol: List[Union[Obs, List[Obs], Corr, ndarray]], ind:
return nd
def load_json_dict(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False, reps: str='DICTOBS') -> Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]], str, Dict[str, Union[Dict[str, Union[Obs, List[Obs], Dict[str, Union[int, Obs, Corr, ndarray]]]], Dict[str, Optional[Union[str, List[str], Obs, float]]]]]]]:
def load_json_dict(fname: str, verbose: bool=True, gz: bool=True, full_output: bool=False, reps: str='DICTOBS') -> dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]], str, dict[str, Union[dict[str, Union[Obs, list[Obs], dict[str, Union[int, Obs, Corr, ndarray]]]], dict[str, Optional[Union[str, list[str], Obs, float]]]]]]]:
"""Import a dict of Obs or structures containing Obs from a .json(.gz) file.
The following structures are supported: Obs, list, numpy.ndarray, Corr

View file

@ -9,10 +9,10 @@ import matplotlib.pyplot as plt
from matplotlib import gridspec
from ..obs import Obs
from ..fits import fit_lin
from typing import Dict, Optional
from typing import Optional
def fit_t0(t2E_dict: Dict[float, Obs], fit_range: int, plot_fit: Optional[bool]=False, observable: str='t0') -> Obs:
def fit_t0(t2E_dict: dict[float, Obs], fit_range: int, plot_fit: Optional[bool]=False, observable: str='t0') -> Obs:
"""Compute the root of (flow-based) data based on a dictionary that contains
the necessary information in key-value pairs a la (flow time: observable at flow time).

View file

@ -10,10 +10,10 @@ from ..correlators import Corr
from .misc import fit_t0
from .utils import sort_names
from io import BufferedReader
from typing import Dict, List, Optional, Tuple, Union
from typing import Optional, Union
def read_rwms(path: str, prefix: str, version: str='2.0', names: Optional[List[str]]=None, **kwargs) -> List[Obs]:
def read_rwms(path: str, prefix: str, version: str='2.0', names: Optional[list[str]]=None, **kwargs) -> list[Obs]:
"""Read rwms format from given folder structure. Returns a list of length nrw
Parameters
@ -232,7 +232,7 @@ def read_rwms(path: str, prefix: str, version: str='2.0', names: Optional[List[s
return result
def _extract_flowed_energy_density(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, postfix: str='ms', **kwargs) -> Dict[float, Obs]:
def _extract_flowed_energy_density(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent: int, postfix: str='ms', **kwargs) -> dict[float, Obs]:
"""Extract a dictionary with the flowed Yang-Mills action density from given .ms.dat files.
Returns a dictionary with Obs as values and flow times as keys.
@ -580,7 +580,7 @@ def extract_w0(path: str, prefix: str, dtr_read: int, xmin: int, spatial_extent:
return np.sqrt(fit_t0(tdtt2E_dict, fit_range, plot_fit=kwargs.get('plot_fit'), observable='w0'))
def _parse_array_openQCD2(d: int, n: Tuple[int, int], size: int, wa: Union[Tuple[float, float, float, float, float, float, float, float], Tuple[float, float]], quadrupel: bool=False) -> List[List[float]]:
def _parse_array_openQCD2(d: int, n: tuple[int, int], size: int, wa: Union[tuple[float, float, float, float, float, float, float, float], tuple[float, float]], quadrupel: bool=False) -> list[list[float]]:
arr = []
if d == 2:
for i in range(n[0]):
@ -599,7 +599,7 @@ def _parse_array_openQCD2(d: int, n: Tuple[int, int], size: int, wa: Union[Tuple
return arr
def _find_files(path: str, prefix: str, postfix: str, ext: str, known_files: Union[str, List[str]]=[]) -> List[str]:
def _find_files(path: str, prefix: str, postfix: str, ext: str, known_files: Union[str, list[str]]=[]) -> list[str]:
found = []
files = []
@ -639,7 +639,7 @@ def _find_files(path: str, prefix: str, postfix: str, ext: str, known_files: Uni
return files
def _read_array_openQCD2(fp: BufferedReader) -> Dict[str, Union[int, Tuple[int, int], List[List[float]]]]:
def _read_array_openQCD2(fp: BufferedReader) -> dict[str, Union[int, tuple[int, int], list[list[float]]]]:
t = fp.read(4)
d = struct.unpack('i', t)[0]
t = fp.read(4 * d)

View file

@ -7,13 +7,13 @@ from ..obs import Obs
from .utils import sort_names, check_idl
import itertools
from numpy import ndarray
from typing import Any, Dict, List, Tuple, Union
from typing import Any, Union
sep = "/"
def read_sfcf(path: str, prefix: str, name: str, quarks: str='.*', corr_type: str="bi", noffset: int=0, wf: int=0, wf2: int=0, version: str="1.0c", cfg_separator: str="n", silent: bool=False, **kwargs) -> List[Obs]:
def read_sfcf(path: str, prefix: str, name: str, quarks: str='.*', corr_type: str="bi", noffset: int=0, wf: int=0, wf2: int=0, version: str="1.0c", cfg_separator: str="n", silent: bool=False, **kwargs) -> list[Obs]:
"""Read sfcf files from given folder structure.
Parameters
@ -78,7 +78,7 @@ def read_sfcf(path: str, prefix: str, name: str, quarks: str='.*', corr_type: st
return ret[name][quarks][str(noffset)][str(wf)][str(wf2)]
def read_sfcf_multi(path: str, prefix: str, name_list: List[str], quarks_list: List[str]=['.*'], corr_type_list: List[str]=['bi'], noffset_list: List[int]=[0], wf_list: List[int]=[0], wf2_list: List[int]=[0], version: str="1.0c", cfg_separator: str="n", silent: bool=False, keyed_out: bool=False, **kwargs) -> Dict[str, Dict[str, Dict[str, Dict[str, Dict[str, List[Obs]]]]]]:
def read_sfcf_multi(path: str, prefix: str, name_list: list[str], quarks_list: list[str]=['.*'], corr_type_list: list[str]=['bi'], noffset_list: list[int]=[0], wf_list: list[int]=[0], wf2_list: list[int]=[0], version: str="1.0c", cfg_separator: str="n", silent: bool=False, keyed_out: bool=False, **kwargs) -> dict[str, dict[str, dict[str, dict[str, dict[str, list[Obs]]]]]]:
"""Read sfcf files from given folder structure.
Parameters
@ -410,14 +410,14 @@ def read_sfcf_multi(path: str, prefix: str, name_list: List[str], quarks_list: L
return result_dict
def _lists2key(*lists) -> List[str]:
def _lists2key(*lists) -> list[str]:
keys = []
for tup in itertools.product(*lists):
keys.append(sep.join(tup))
return keys
def _key2specs(key: str) -> List[str]:
def _key2specs(key: str) -> list[str]:
return key.split(sep)
@ -425,7 +425,7 @@ def _specs2key(*specs) -> str:
return sep.join(specs)
def _read_o_file(cfg_path: str, name: str, needed_keys: List[str], intern: Dict[str, Dict[str, Union[bool, Dict[str, Dict[str, Dict[str, Dict[str, Dict[str, Union[int, str]]]]]], int]]], version: str, im: int) -> Dict[str, List[float]]:
def _read_o_file(cfg_path: str, name: str, needed_keys: list[str], intern: dict[str, dict[str, Union[bool, dict[str, dict[str, dict[str, dict[str, dict[str, Union[int, str]]]]]], int]]], version: str, im: int) -> dict[str, list[float]]:
return_vals = {}
for key in needed_keys:
file = cfg_path + '/' + name
@ -450,7 +450,7 @@ def _read_o_file(cfg_path: str, name: str, needed_keys: List[str], intern: Dict[
return return_vals
def _extract_corr_type(corr_type: str) -> Tuple[bool, bool]:
def _extract_corr_type(corr_type: str) -> tuple[bool, bool]:
if corr_type == 'bb':
b2b = True
single = True
@ -463,7 +463,7 @@ def _extract_corr_type(corr_type: str) -> Tuple[bool, bool]:
return b2b, single
def _find_files(rep_path: str, prefix: str, compact: bool, files: List[Union[range, str, Any]]=[]) -> List[str]:
def _find_files(rep_path: str, prefix: str, compact: bool, files: list[Union[range, str, Any]]=[]) -> list[str]:
sub_ls = []
if not files == []:
files.sort(key=lambda x: int(re.findall(r'\d+', x)[-1]))
@ -504,7 +504,7 @@ def _make_pattern(version: str, name: str, noffset: str, wf: str, wf2: Union[str
return pattern
def _find_correlator(file_name: str, version: str, pattern: str, b2b: bool, silent: bool=False) -> Tuple[int, int]:
def _find_correlator(file_name: str, version: str, pattern: str, b2b: bool, silent: bool=False) -> tuple[int, int]:
T = 0
with open(file_name, "r") as my_file:
@ -530,7 +530,7 @@ def _find_correlator(file_name: str, version: str, pattern: str, b2b: bool, sile
return start_read, T
def _read_compact_file(rep_path: str, cfg_file: str, intern: Dict[str, Dict[str, Union[bool, Dict[str, Dict[str, Dict[str, Dict[str, Dict[str, Union[int, str]]]]]], int]]], needed_keys: List[str], im: int) -> Dict[str, List[float]]:
def _read_compact_file(rep_path: str, cfg_file: str, intern: dict[str, dict[str, Union[bool, dict[str, dict[str, dict[str, dict[str, dict[str, Union[int, str]]]]]], int]]], needed_keys: list[str], im: int) -> dict[str, list[float]]:
return_vals = {}
with open(rep_path + cfg_file) as fp:
lines = fp.readlines()
@ -561,7 +561,7 @@ def _read_compact_file(rep_path: str, cfg_file: str, intern: Dict[str, Dict[str,
return return_vals
def _read_compact_rep(path: str, rep: str, sub_ls: List[str], intern: Dict[str, Dict[str, Union[bool, Dict[str, Dict[str, Dict[str, Dict[str, Dict[str, Union[int, str]]]]]], int]]], needed_keys: List[str], im: int) -> Dict[str, List[ndarray]]:
def _read_compact_rep(path: str, rep: str, sub_ls: list[str], intern: dict[str, dict[str, Union[bool, dict[str, dict[str, dict[str, dict[str, dict[str, Union[int, str]]]]]], int]]], needed_keys: list[str], im: int) -> dict[str, list[ndarray]]:
rep_path = path + '/' + rep + '/'
no_cfg = len(sub_ls)
@ -583,7 +583,7 @@ def _read_compact_rep(path: str, rep: str, sub_ls: List[str], intern: Dict[str,
return return_vals
def _read_chunk(chunk: List[str], gauge_line: int, cfg_sep: str, start_read: int, T: int, corr_line: int, b2b: bool, pattern: str, im: int, single: bool) -> Tuple[int, List[float]]:
def _read_chunk(chunk: list[str], gauge_line: int, cfg_sep: str, start_read: int, T: int, corr_line: int, b2b: bool, pattern: str, im: int, single: bool) -> tuple[int, list[float]]:
try:
idl = int(chunk[gauge_line].split(cfg_sep)[-1])
except Exception:
@ -600,7 +600,7 @@ def _read_chunk(chunk: List[str], gauge_line: int, cfg_sep: str, start_read: int
return idl, data
def _read_append_rep(filename: str, pattern: str, b2b: bool, cfg_separator: str, im: int, single: bool) -> Tuple[int, List[int], List[List[float]]]:
def _read_append_rep(filename: str, pattern: str, b2b: bool, cfg_separator: str, im: int, single: bool) -> tuple[int, list[int], list[list[float]]]:
with open(filename, 'r') as fp:
content = fp.readlines()
data_starts = []
@ -649,7 +649,7 @@ def _read_append_rep(filename: str, pattern: str, b2b: bool, cfg_separator: str,
return T, rep_idl, data
def _get_rep_names(ls: List[str], ens_name: None=None) -> List[str]:
def _get_rep_names(ls: list[str], ens_name: None=None) -> list[str]:
new_names = []
for entry in ls:
try:
@ -664,7 +664,7 @@ def _get_rep_names(ls: List[str], ens_name: None=None) -> List[str]:
return new_names
def _get_appended_rep_names(ls: List[str], prefix: str, name: str, ens_name: None=None) -> List[str]:
def _get_appended_rep_names(ls: list[str], prefix: str, name: str, ens_name: None=None) -> list[str]:
new_names = []
for exc in ls:
if not fnmatch.fnmatch(exc, prefix + '*.' + name):

View file

@ -4,10 +4,9 @@ from __future__ import annotations
import re
import fnmatch
import os
from typing import List
def sort_names(ll: List[str]) -> List[str]:
def sort_names(ll: list[str]) -> list[str]:
"""Sorts a list of names of replika with searches for `r` and `id` in the replikum string.
If this search fails, a fallback method is used,
where the strings are simply compared and the first diffeing numeral is used for differentiation.

View file

@ -4,10 +4,10 @@ from .obs import derived_observable, Obs
from autograd import jacobian
from scipy.integrate import quad as squad
from numpy import ndarray
from typing import Callable, Dict, List, Tuple, Union
from typing import Callable, Union
def quad(func: Callable, p: Union[List[Union[float, Obs]], List[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[Tuple[Obs, float], Tuple[float, float], Tuple[Obs, float, Dict[str, Union[int, ndarray]]]]:
def quad(func: Callable, p: Union[list[Union[float, Obs]], list[float], ndarray], a: Union[Obs, float, int], b: Union[Obs, float, int], **kwargs) -> Union[tuple[Obs, float], tuple[float, float], tuple[Obs, float, dict[str, Union[int, ndarray]]]]:
'''Performs a (one-dimensional) numeric integration of f(p, x) from a to b.
The integration is performed using scipy.integrate.quad().

View file

@ -3,7 +3,7 @@ import numpy as np
import autograd.numpy as anp # Thinly-wrapped numpy
from .obs import derived_observable, CObs, Obs, import_jackknife
from numpy import ndarray
from typing import Callable, Tuple, Union
from typing import Callable, Union
def matmul(*operands) -> ndarray:
@ -262,7 +262,7 @@ def _mat_mat_op(op: Callable, obs: ndarray, **kwargs) -> ndarray:
return derived_observable(lambda x, **kwargs: op(x), [obs], array_mode=True)[0]
def eigh(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray]:
def eigh(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray]:
"""Computes the eigenvalues and eigenvectors of a given hermitian matrix of Obs according to np.linalg.eigh."""
w = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[0], obs)
v = derived_observable(lambda x, **kwargs: anp.linalg.eigh(x)[1], obs)
@ -286,7 +286,7 @@ def pinv(obs: ndarray, **kwargs) -> ndarray:
return derived_observable(lambda x, **kwargs: anp.linalg.pinv(x), obs)
def svd(obs: ndarray, **kwargs) -> Tuple[ndarray, ndarray, ndarray]:
def svd(obs: ndarray, **kwargs) -> tuple[ndarray, ndarray, ndarray]:
"""Computes the singular value decomposition of a matrix of Obs."""
u = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[0], obs)
s = derived_observable(lambda x, **kwargs: anp.linalg.svd(x, full_matrices=False)[1], obs)

View file

@ -3,10 +3,10 @@ import numpy as np
import scipy.optimize
from autograd import jacobian
from .obs import Obs, derived_observable
from typing import Callable, List, Union
from typing import Callable, Union
def find_root(d: Union[Obs, List[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs:
def find_root(d: Union[Obs, list[Obs]], func: Callable, guess: float=1.0, **kwargs) -> Obs:
r'''Finds the root of the function func(x, d) where d is an `Obs`.
Parameters