pyerrors/pyerrors/input/hadrons.py

615 lines
21 KiB
Python

import os
from collections import Counter
import h5py
from pathlib import Path
import numpy as np
from ..obs import Obs, CObs
from ..correlators import Corr
from ..dirac import epsilon_tensor_rank4
from .misc import fit_t0
def _get_files(path, filestem, idl):
ls = os.listdir(path)
# Clean up file list
files = list(filter(lambda x: x.startswith(filestem + "."), ls))
if not files:
raise Exception('No files starting with', filestem, 'in folder', path)
def get_cnfg_number(n):
return int(n.replace(".h5", "")[len(filestem) + 1:]) # From python 3.9 onward the safer 'removesuffix' method can be used.
# Sort according to configuration number
files.sort(key=get_cnfg_number)
cnfg_numbers = []
filtered_files = []
for line in files:
no = get_cnfg_number(line)
if idl:
if no in list(idl):
filtered_files.append(line)
cnfg_numbers.append(no)
else:
filtered_files.append(line)
cnfg_numbers.append(no)
if idl:
if Counter(list(idl)) != Counter(cnfg_numbers):
raise Exception("Not all configurations specified in idl found, configurations " + str(list(Counter(list(idl)) - Counter(cnfg_numbers))) + " are missing.")
# Check that configurations are evenly spaced
dc = np.unique(np.diff(cnfg_numbers))
if np.any(dc < 0):
raise Exception("Unsorted files")
if len(dc) == 1:
idx = range(cnfg_numbers[0], cnfg_numbers[-1] + dc[0], dc[0])
elif idl:
idx = idl
else:
raise Exception("Configurations are not evenly spaced. Provide an idl if you want to proceed with this set of configurations.")
return filtered_files, idx
def read_hd5(filestem, ens_id, group, attrs=None, idl=None, part="real"):
r'''Read hadrons hdf5 file and extract entry based on attributes.
Parameters
-----------------
filestem : str
Full namestem of the files to read, including the full path.
ens_id : str
name of the ensemble, required for internal bookkeeping
group : str
label of the group to be extracted.
attrs : dict or int
Dictionary containing the attributes. For example
```python
attrs = {"gamma_snk": "Gamma5",
"gamma_src": "Gamma5"}
```
Alternatively an integer can be specified to identify the sub group.
This is discouraged as the order in the file is not guaranteed.
idl : range
If specified only configurations in the given range are read in.
part: str
string specifying whether to extract the real part ('real'),
the imaginary part ('imag') or a complex correlator ('complex').
Default 'real'.
Returns
-------
corr : Corr
Correlator of the source sink combination in question.
'''
path_obj = Path(filestem)
path = path_obj.parent.as_posix()
filestem = path_obj.name
files, idx = _get_files(path, filestem, idl)
if isinstance(attrs, dict):
h5file = h5py.File(path + '/' + files[0], "r")
entry = None
for key in h5file[group].keys():
if attrs.items() <= {k: v[0].decode() for k, v in h5file[group][key].attrs.items()}.items():
if entry is None:
entry = key
else:
raise ValueError("More than one fitting entry found. More constraint on attributes needed.")
h5file.close()
if entry is None:
raise ValueError(f"Entry with attributes {attrs} not found.")
elif isinstance(attrs, int):
entry = group + f"_{attrs}"
else:
raise TypeError("Invalid type for 'attrs'. Needs to be dict or int.")
corr_data = []
infos = []
for hd5_file in files:
h5file = h5py.File(path + '/' + hd5_file, "r")
if not group + '/' + entry in h5file:
raise Exception("Entry '" + entry + "' not contained in the files.")
raw_data = h5file[group + '/' + entry + '/corr']
real_data = raw_data[:].view("complex")
corr_data.append(real_data)
if not infos:
for k, i in h5file[group + '/' + entry].attrs.items():
infos.append(k + ': ' + i[0].decode())
h5file.close()
corr_data = np.array(corr_data)
if part == "complex":
l_obs = []
for c in corr_data.T:
l_obs.append(CObs(Obs([c.real], [ens_id], idl=[idx]),
Obs([c.imag], [ens_id], idl=[idx])))
else:
corr_data = getattr(corr_data, part)
l_obs = []
for c in corr_data.T:
l_obs.append(Obs([c], [ens_id], idl=[idx]))
corr = Corr(l_obs)
corr.tag = r", ".join(infos)
return corr
def read_meson_hd5(path, filestem, ens_id, meson='meson_0', idl=None, gammas=None):
r'''Read hadrons meson hdf5 file and extract the meson labeled 'meson'
Parameters
-----------------
path : str
path to the files to read
filestem : str
namestem of the files to read
ens_id : str
name of the ensemble, required for internal bookkeeping
meson : str
label of the meson to be extracted, standard value meson_0 which
corresponds to the pseudoscalar pseudoscalar two-point function.
gammas : tuple of strings
Instrad of a meson label one can also provide a tuple of two strings
indicating the gamma matrices at sink and source (gamma_snk, gamma_src).
("Gamma5", "Gamma5") corresponds to the pseudoscalar pseudoscalar
two-point function. The gammas argument dominateds over meson.
idl : range
If specified only configurations in the given range are read in.
Returns
-------
corr : Corr
Correlator of the source sink combination in question.
'''
if gammas is None:
attrs = int(meson.rsplit('_', 1)[-1])
else:
if len(gammas) != 2:
raise ValueError("'gammas' needs to have exactly two entries")
attrs = {"gamma_snk": gammas[0],
"gamma_src": gammas[1]}
return read_hd5(filestem=path + "/" + filestem, ens_id=ens_id,
group=meson.rsplit('_', 1)[0], attrs=attrs, idl=idl,
part="real")
def _extract_real_arrays(path, files, tree, keys):
corr_data = {}
for key in keys:
corr_data[key] = []
for hd5_file in files:
h5file = h5py.File(path + '/' + hd5_file, "r")
for key in keys:
if not tree + '/' + key in h5file:
raise Exception("Entry '" + key + "' not contained in the files.")
raw_data = h5file[tree + '/' + key + '/data']
real_data = raw_data[:].astype(np.double)
corr_data[key].append(real_data)
h5file.close()
for key in keys:
corr_data[key] = np.array(corr_data[key])
return corr_data
def extract_t0_hd5(path, filestem, ens_id, obs='Clover energy density', fit_range=5, idl=None, **kwargs):
r'''Read hadrons FlowObservables hdf5 file and extract t0
Parameters
-----------------
path : str
path to the files to read
filestem : str
namestem of the files to read
ens_id : str
name of the ensemble, required for internal bookkeeping
obs : str
label of the observable from which t0 should be extracted.
Options: 'Clover energy density' and 'Plaquette energy density'
fit_range : int
Number of data points left and right of the zero
crossing to be included in the linear fit. (Default: 5)
idl : range
If specified only configurations in the given range are read in.
plot_fit : bool
If true, the fit for the extraction of t0 is shown together with the data.
'''
files, idx = _get_files(path, filestem, idl)
tree = "FlowObservables"
h5file = h5py.File(path + '/' + files[0], "r")
obs_key = None
for key in h5file[tree].keys():
if obs == h5file[tree][key].attrs["description"][0].decode():
obs_key = key
break
h5file.close()
if obs_key is None:
raise Exception(f"Observable {obs} not found.")
corr_data = _extract_real_arrays(path, files, tree, ["FlowObservables_0", obs_key])
if not np.allclose(corr_data["FlowObservables_0"][0], corr_data["FlowObservables_0"][:]):
raise Exception("Not all flow times were equal.")
t2E_dict = {}
for t2, dat in zip(corr_data["FlowObservables_0"][0], corr_data[obs_key].T):
t2E_dict[t2] = Obs([dat], [ens_id], idl=[idx]) - 0.3
return fit_t0(t2E_dict, fit_range, plot_fit=kwargs.get('plot_fit'))
def read_DistillationContraction_hd5(path, ens_id, diagrams=["direct"], idl=None):
"""Read hadrons DistillationContraction hdf5 files in given directory structure
Parameters
-----------------
path : str
path to the directories to read
ens_id : str
name of the ensemble, required for internal bookkeeping
diagrams : list
List of strings of the diagrams to extract, e.g. ["direct", "box", "cross"].
idl : range
If specified only configurations in the given range are read in.
Returns
-------
result : dict
extracted DistillationContration data
"""
res_dict = {}
directories, idx = _get_files(path, "data", idl)
explore_path = Path(path + "/" + directories[0])
for explore_file in explore_path.iterdir():
if explore_file.is_file():
stem = explore_file.with_suffix("").with_suffix("").as_posix().split("/")[-1]
else:
continue
file_list = []
for dir in directories:
tmp_path = Path(path + "/" + dir)
file_list.append((tmp_path / stem).as_posix() + tmp_path.suffix + ".h5")
corr_data = {}
for diagram in diagrams:
corr_data[diagram] = []
try:
for n_file, (hd5_file, n_traj) in enumerate(zip(file_list, list(idx))):
h5file = h5py.File(hd5_file)
if n_file == 0:
if h5file["DistillationContraction/Metadata"].attrs.get("TimeSources")[0].decode() != "0...":
raise Exception("Routine is only implemented for files containing inversions on all timeslices.")
Nt = h5file["DistillationContraction/Metadata"].attrs.get("Nt")[0]
identifier = []
for in_file in range(len(h5file["DistillationContraction/Metadata/DmfInputFiles"].attrs.keys()) - 1):
encoded_info = h5file["DistillationContraction/Metadata/DmfInputFiles"].attrs.get("DmfInputFiles_" + str(in_file))
full_info = encoded_info[0].decode().split("/")[-1].replace(".h5", "").split("_")
my_tuple = (full_info[0], full_info[1][1:], full_info[2], full_info[3])
identifier.append(my_tuple)
identifier = tuple(identifier)
# "DistillationContraction/Metadata/DmfSuffix" contains info about different quarks, irrelevant in the SU(3) case.
for diagram in diagrams:
if diagram == "triangle" and "Identity" not in str(identifier):
part = "im"
else:
part = "re"
real_data = np.zeros(Nt)
for x0 in range(Nt):
raw_data = h5file["DistillationContraction/Correlators/" + diagram + "/" + str(x0)][:][part].astype(np.double)
real_data += np.roll(raw_data, -x0)
real_data /= Nt
corr_data[diagram].append(real_data)
h5file.close()
res_dict[str(identifier)] = {}
for diagram in diagrams:
tmp_data = np.array(corr_data[diagram])
l_obs = []
for c in tmp_data.T:
l_obs.append(Obs([c], [ens_id], idl=[idx]))
corr = Corr(l_obs)
corr.tag = str(identifier)
res_dict[str(identifier)][diagram] = corr
except FileNotFoundError:
print("Skip", stem)
return res_dict
class Npr_matrix(np.ndarray):
def __new__(cls, input_array, mom_in=None, mom_out=None):
obj = np.asarray(input_array).view(cls)
obj.mom_in = mom_in
obj.mom_out = mom_out
return obj
@property
def g5H(self):
"""Gamma_5 hermitean conjugate
Uses the fact that the propagator is gamma5 hermitean, so just the
in and out momenta of the propagator are exchanged.
"""
return Npr_matrix(self,
mom_in=self.mom_out,
mom_out=self.mom_in)
def _propagate_mom(self, other, name):
s_mom = getattr(self, name, None)
o_mom = getattr(other, name, None)
if s_mom is not None and o_mom is not None:
if not np.allclose(s_mom, o_mom):
raise Exception(name + ' does not match.')
return o_mom if o_mom is not None else s_mom
def __matmul__(self, other):
return self.__new__(Npr_matrix,
super().__matmul__(other),
self._propagate_mom(other, 'mom_in'),
self._propagate_mom(other, 'mom_out'))
def __array_finalize__(self, obj):
if obj is None:
return
self.mom_in = getattr(obj, 'mom_in', None)
self.mom_out = getattr(obj, 'mom_out', None)
def read_ExternalLeg_hd5(path, filestem, ens_id, idl=None):
"""Read hadrons ExternalLeg hdf5 file and output an array of CObs
Parameters
----------
path : str
path to the files to read
filestem : str
namestem of the files to read
ens_id : str
name of the ensemble, required for internal bookkeeping
idl : range
If specified only configurations in the given range are read in.
Returns
-------
result : Npr_matrix
read Cobs-matrix
"""
files, idx = _get_files(path, filestem, idl)
mom = None
corr_data = []
for hd5_file in files:
file = h5py.File(path + '/' + hd5_file, "r")
raw_data = file['ExternalLeg/corr'][0][0].view('complex')
corr_data.append(raw_data)
if mom is None:
mom = np.array(str(file['ExternalLeg/info'].attrs['pIn'])[3:-2].strip().split(), dtype=float)
file.close()
corr_data = np.array(corr_data)
rolled_array = np.rollaxis(corr_data, 0, 5)
matrix = np.empty((rolled_array.shape[:-1]), dtype=object)
for si, sj, ci, cj in np.ndindex(rolled_array.shape[:-1]):
real = Obs([rolled_array[si, sj, ci, cj].real], [ens_id], idl=[idx])
imag = Obs([rolled_array[si, sj, ci, cj].imag], [ens_id], idl=[idx])
matrix[si, sj, ci, cj] = CObs(real, imag)
return Npr_matrix(matrix, mom_in=mom)
def read_Bilinear_hd5(path, filestem, ens_id, idl=None):
"""Read hadrons Bilinear hdf5 file and output an array of CObs
Parameters
----------
path : str
path to the files to read
filestem : str
namestem of the files to read
ens_id : str
name of the ensemble, required for internal bookkeeping
idl : range
If specified only configurations in the given range are read in.
Returns
-------
result_dict: dict[Npr_matrix]
extracted Bilinears
"""
files, idx = _get_files(path, filestem, idl)
mom_in = None
mom_out = None
corr_data = {}
for hd5_file in files:
file = h5py.File(path + '/' + hd5_file, "r")
for i in range(16):
name = file['Bilinear/Bilinear_' + str(i) + '/info'].attrs['gamma'][0].decode('UTF-8')
if name not in corr_data:
corr_data[name] = []
raw_data = file['Bilinear/Bilinear_' + str(i) + '/corr'][0][0].view('complex')
corr_data[name].append(raw_data)
if mom_in is None:
mom_in = np.array(str(file['Bilinear/Bilinear_' + str(i) + '/info'].attrs['pIn'])[3:-2].strip().split(), dtype=float)
if mom_out is None:
mom_out = np.array(str(file['Bilinear/Bilinear_' + str(i) + '/info'].attrs['pOut'])[3:-2].strip().split(), dtype=float)
file.close()
result_dict = {}
for key, data in corr_data.items():
local_data = np.array(data)
rolled_array = np.rollaxis(local_data, 0, 5)
matrix = np.empty((rolled_array.shape[:-1]), dtype=object)
for si, sj, ci, cj in np.ndindex(rolled_array.shape[:-1]):
real = Obs([rolled_array[si, sj, ci, cj].real], [ens_id], idl=[idx])
imag = Obs([rolled_array[si, sj, ci, cj].imag], [ens_id], idl=[idx])
matrix[si, sj, ci, cj] = CObs(real, imag)
result_dict[key] = Npr_matrix(matrix, mom_in=mom_in, mom_out=mom_out)
return result_dict
def read_Fourquark_hd5(path, filestem, ens_id, idl=None, vertices=["VA", "AV"]):
"""Read hadrons FourquarkFullyConnected hdf5 file and output an array of CObs
Parameters
----------
path : str
path to the files to read
filestem : str
namestem of the files to read
ens_id : str
name of the ensemble, required for internal bookkeeping
idl : range
If specified only configurations in the given range are read in.
vertices : list
Vertex functions to be extracted.
Returns
-------
result_dict : dict
extracted fourquark matrizes
"""
files, idx = _get_files(path, filestem, idl)
mom_in = None
mom_out = None
vertex_names = []
for vertex in vertices:
vertex_names += _get_lorentz_names(vertex)
corr_data = {}
tree = 'FourQuarkFullyConnected/FourQuarkFullyConnected_'
for hd5_file in files:
file = h5py.File(path + '/' + hd5_file, "r")
for i in range(32):
name = (file[tree + str(i) + '/info'].attrs['gammaA'][0].decode('UTF-8'), file[tree + str(i) + '/info'].attrs['gammaB'][0].decode('UTF-8'))
if name in vertex_names:
if name not in corr_data:
corr_data[name] = []
raw_data = file[tree + str(i) + '/corr'][0][0].view('complex')
corr_data[name].append(raw_data)
if mom_in is None:
mom_in = np.array(str(file[tree + str(i) + '/info'].attrs['pIn'])[3:-2].strip().split(), dtype=float)
if mom_out is None:
mom_out = np.array(str(file[tree + str(i) + '/info'].attrs['pOut'])[3:-2].strip().split(), dtype=float)
file.close()
intermediate_dict = {}
for vertex in vertices:
lorentz_names = _get_lorentz_names(vertex)
for v_name in lorentz_names:
if v_name in [('SigmaXY', 'SigmaZT'),
('SigmaXT', 'SigmaYZ'),
('SigmaYZ', 'SigmaXT'),
('SigmaZT', 'SigmaXY')]:
sign = -1
else:
sign = 1
if vertex not in intermediate_dict:
intermediate_dict[vertex] = sign * np.array(corr_data[v_name])
else:
intermediate_dict[vertex] += sign * np.array(corr_data[v_name])
result_dict = {}
for key, data in intermediate_dict.items():
rolled_array = np.moveaxis(data, 0, 8)
matrix = np.empty((rolled_array.shape[:-1]), dtype=object)
for index in np.ndindex(rolled_array.shape[:-1]):
real = Obs([rolled_array[index].real], [ens_id], idl=[idx])
imag = Obs([rolled_array[index].imag], [ens_id], idl=[idx])
matrix[index] = CObs(real, imag)
result_dict[key] = Npr_matrix(matrix, mom_in=mom_in, mom_out=mom_out)
return result_dict
def _get_lorentz_names(name):
lorentz_index = ['X', 'Y', 'Z', 'T']
res = []
if name == "TT":
for i in range(4):
for j in range(i + 1, 4):
res.append(("Sigma" + lorentz_index[i] + lorentz_index[j], "Sigma" + lorentz_index[i] + lorentz_index[j]))
return res
if name == "TTtilde":
for i in range(4):
for j in range(i + 1, 4):
for k in range(4):
for o in range(k + 1, 4):
fac = epsilon_tensor_rank4(i, j, k, o)
if not np.isclose(fac, 0.0):
res.append(("Sigma" + lorentz_index[i] + lorentz_index[j], "Sigma" + lorentz_index[k] + lorentz_index[o]))
return res
assert len(name) == 2
if 'S' in name or 'P' in name:
if not set(name) <= set(['S', 'P']):
raise Exception("'" + name + "' is not a Lorentz scalar")
g_names = {'S': 'Identity',
'P': 'Gamma5'}
res.append((g_names[name[0]], g_names[name[1]]))
else:
if not set(name) <= set(['V', 'A']):
raise Exception("'" + name + "' is not a Lorentz scalar")
for ind in lorentz_index:
res.append(('Gamma' + ind + (name[0] == 'A') * 'Gamma5',
'Gamma' + ind + (name[1] == 'A') * 'Gamma5'))
return res