corrlib/corrlib/meas_io.py
Justus Kuhlmann e3be65beec
Some checks failed
Mypy / mypy (push) Failing after 1m12s
Pytest / pytest (3.12) (push) Successful in 1m17s
Pytest / pytest (3.13) (push) Failing after 39s
Pytest / pytest (3.14) (push) Failing after 39s
Ruff / ruff (push) Failing after 40s
TEMPFIX: allow ms1 to not have an in or par file part 4
2026-04-09 12:18:59 +02:00

310 lines
10 KiB
Python

from pyerrors.input import json as pj
import os
import sqlite3
from .input import sfcf,openQCD
import json
from typing import Union
from pyerrors import Obs, Corr, dump_object, load_object
from hashlib import sha256
from .tools import get_db_file, cache_enabled
from .tracker import get, save, unlock
import shutil
from typing import Any
from pathlib import Path
CACHE_DIR = ".cache"
def write_measurement(path: Path, ensemble: str, measurement: dict[str, dict[str, dict[str, Any]]], uuid: str, code: str, parameter_file: Union[str, None]) -> None:
"""
Write a measurement to the backlog.
If the file for the measurement already exists, update the measurement.
Parameters
----------
path: str
The path to the backlogger folder.
ensemble: str
The ensemble of the measurement.
measurement: dict
Measurements to be captured in the backlogging system.
uuid: str
The uuid of the project.
code: str
Name of the code that was used for the project.
parameter_file: str
The parameter file used for the measurement.
"""
db_file = get_db_file(path)
db = path / db_file
files_to_save = []
get(path, db_file)
unlock(path, db_file)
files_to_save.append(db_file)
conn = sqlite3.connect(db)
c = conn.cursor()
for corr in measurement.keys():
file_in_archive = Path('.') / 'archive' / ensemble / corr / str(uuid + '.json.gz')
file = path / file_in_archive
known_meas = {}
if not os.path.exists(path / 'archive' / ensemble / corr):
os.makedirs(path / 'archive' / ensemble / corr)
files_to_save.append(file_in_archive)
else:
if os.path.exists(file):
if file not in files_to_save:
unlock(path, file_in_archive)
files_to_save.append(file_in_archive)
known_meas = pj.load_json_dict(str(file), verbose=False)
if code == "sfcf":
if parameter_file is not None:
parameters = sfcf.read_param(path, uuid, parameter_file)
else:
raise Exception("Need parameter file for this code!")
pars = {}
subkeys = list(measurement[corr].keys())
for subkey in subkeys:
pars[subkey] = sfcf.get_specs(corr + "/" + subkey, parameters)
elif code == "openQCD":
ms_type = list(measurement.keys())[0]
if ms_type == 'ms1':
if parameter_file is not None:
if parameter_file.endswith(".ms1.in"):
parameters = openQCD.load_ms1_infile(path, uuid, parameter_file)
elif parameter_file.endswith(".ms1.par"):
parameters = openQCD.load_ms1_parfile(path, uuid, parameter_file)
else:
# Temporary solution
parameters = {}
parameters["rand"] = {}
parameters["rw_fcts"] = [{}]
for nrw in range(1):
if "nsrc" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["nsrc"] = 1
if "mu" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["mu"] = "None"
if "np" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["np"] = "None"
if "irp" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["irp"] = "None"
pars = {}
subkeys = []
for i in range(len(parameters["rw_fcts"])):
par_list = []
for k in parameters["rw_fcts"][i].keys():
par_list.append(str(parameters["rw_fcts"][i][k]))
subkey = "/".join(par_list)
subkeys.append(subkey)
pars[subkey] = json.dumps(parameters["rw_fcts"][i])
elif ms_type in ['t0', 't1']:
if parameter_file is not None:
parameters = openQCD.load_ms3_infile(path, uuid, parameter_file)
else:
parameters = {}
for rwp in ["integrator", "eps", "ntot", "dnms"]:
parameters[rwp] = "Unknown"
pars = {}
subkeys = []
par_list= []
for k in ["integrator", "eps", "ntot", "dnms"]:
par_list.append(str(parameters[k]))
subkey = "/".join(par_list)
subkeys = [subkey]
pars[subkey] = json.dumps(parameters)
for subkey in subkeys:
parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
meas_path = str(file_in_archive) + "::" + parHash
known_meas[parHash] = measurement[corr][subkey]
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None:
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, ))
else:
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file))
conn.commit()
pj.dump_dict_to_json(known_meas, str(file))
conn.close()
save(path, message="Add measurements to database", files=files_to_save)
return
def load_record(path: Path, meas_path: str) -> Union[Corr, Obs]:
"""
Load a list of records by their paths.
Parameters
----------
path: str
Path of the correlator library.
meas_path: str
The path to the correlator in the backlog system.
Returns
-------
co : Corr or Obs
The correlator in question.
"""
return load_records(path, [meas_path])[0]
def load_records(path: Path, meas_paths: list[str], preloaded: dict[str, Any] = {}) -> list[Union[Corr, Obs]]:
"""
Load a list of records by their paths.
Parameters
----------
path: str
Path of the correlator library.
meas_paths: list[str]
A list of the paths to the correlator in the backlog system.
perloaded: dict[str, Any]
The data that is already prelaoded. Of interest if data has alread been loaded in the same script.
Returns
-------
retruned_data: list
The loaded records.
"""
needed_data: dict[str, list[str]] = {}
for mpath in meas_paths:
file = mpath.split("::")[0]
if file not in needed_data.keys():
needed_data[file] = []
key = mpath.split("::")[1]
needed_data[file].append(key)
returned_data: list[Any] = []
for file in needed_data.keys():
for key in list(needed_data[file]):
if os.path.exists(str(cache_path(path, file, key)) + ".p"):
returned_data.append(load_object(str(cache_path(path, file, key)) + ".p"))
else:
if file not in preloaded:
preloaded[file] = preload(path, Path(file))
returned_data.append(preloaded[file][key])
if cache_enabled(path):
if not os.path.exists(cache_dir(path, file)):
os.makedirs(cache_dir(path, file))
dump_object(preloaded[file][key], str(cache_path(path, file, key)))
return returned_data
def cache_dir(path: Path, file: str) -> Path:
"""
Returns the directory corresponding to the cache for the given file.
Parameters
----------
path: str
The path of the library.
file: str
The file in the library that we want to access the cached data of.
Returns
-------
cache_path: str
The path holding the cached data for the given file.
"""
cache_path_list = file.split("/")[1:]
cache_path = Path(path) / CACHE_DIR
for directory in cache_path_list:
cache_path /= directory
return cache_path
def cache_path(path: Path, file: str, key: str) -> Path:
"""
Parameters
----------
path: str
The path of the library.
file: str
The file in the library that we want to access the cached data of.
key: str
The key within the archive file.
Returns
-------
cache_path: str
The path at which the measurement of the given file and key is cached.
"""
cache_path = cache_dir(path, file) / key
return cache_path
def preload(path: Path, file: Path) -> dict[str, Any]:
"""
Read the contents of a file into a json dictionary with the pyerrors.json.load_json_dict method.
Parameters
----------
path: str
The path of the library.
file: str
The file within the library to be laoded.
Returns
-------
filedict: dict[str, Any]
The data read from the file.
"""
get(path, file)
filedict: dict[str, Any] = pj.load_json_dict(str(path / file))
print("> read file")
return filedict
def drop_record(path: Path, meas_path: str) -> None:
"""
Drop a record by it's path.
Parameters
----------
path: str
The path of the library.
meas_path: str
The measurement path as noted in the database.
"""
file_in_archive = meas_path.split("::")[0]
file = path / file_in_archive
db_file = get_db_file(path)
db = path / db_file
get(path, db_file)
sub_key = meas_path.split("::")[1]
unlock(path, db_file)
conn = sqlite3.connect(db)
c = conn.cursor()
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path, )).fetchone() is not None:
c.execute("DELETE FROM backlogs WHERE path = ?", (meas_path, ))
else:
raise ValueError("This measurement does not exist as an entry!")
conn.commit()
known_meas = pj.load_json_dict(str(file))
if sub_key in known_meas:
del known_meas[sub_key]
unlock(path, Path(file_in_archive))
pj.dump_dict_to_json(known_meas, str(file))
save(path, message="Drop measurements to database", files=[db, file])
return
else:
raise ValueError("This measurement does not exist as a file!")
def drop_cache(path: Path) -> None:
"""
Drop the cache directory of the library.
Parameters
----------
path: str
The path of the library.
"""
cache_dir = path / ".cache"
for f in os.listdir(cache_dir):
shutil.rmtree(cache_dir / f)
return