corrlib/corrlib/meas_io.py
Justus Kuhlmann 51ae53aa02
Some checks failed
Mypy / mypy (push) Successful in 1m11s
Pytest / pytest (3.12) (push) Successful in 1m17s
Pytest / pytest (3.13) (push) Successful in 1m12s
Pytest / pytest (3.14) (push) Successful in 1m14s
Ruff / ruff (push) Successful in 1m0s
Mypy / mypy (pull_request) Successful in 1m9s
Pytest / pytest (3.12) (pull_request) Successful in 1m19s
Pytest / pytest (3.13) (pull_request) Failing after 37s
Pytest / pytest (3.14) (pull_request) Failing after 39s
Ruff / ruff (pull_request) Failing after 38s
add empty return
2026-04-17 17:53:13 +02:00

316 lines
11 KiB
Python

from pyerrors.input import json as pj
import os
import sqlite3
from .input import sfcf,openQCD
import json
from typing import Union
from pyerrors import Obs, Corr, dump_object, load_object
from hashlib import sha256
from .tools import get_db_file, cache_enabled
from .tracker import get, save, unlock
import shutil
from typing import Any
from pathlib import Path
from .integrity import _check_db2paths
CACHE_DIR = ".cache"
def write_measurement(path: Path, ensemble: str, measurement: dict[str, dict[str, dict[str, Any]]], uuid: str, code: str, parameter_file: Union[str, None]) -> None:
"""
Write a measurement to the backlog.
If the file for the measurement already exists, update the measurement.
Parameters
----------
path: str
The path to the backlogger folder.
ensemble: str
The ensemble of the measurement.
measurement: dict
Measurements to be captured in the backlogging system.
uuid: str
The uuid of the project.
code: str
Name of the code that was used for the project.
parameter_file: str
The parameter file used for the measurement.
"""
db_file = get_db_file(path)
db = path / db_file
files_to_save = []
get(path, db_file)
unlock(path, db_file)
files_to_save.append(db_file)
conn = sqlite3.connect(db)
c = conn.cursor()
for corr in measurement.keys():
file_in_archive = Path('.') / 'archive' / ensemble / corr / str(uuid + '.json.gz')
file = path / file_in_archive
known_meas = {}
if not os.path.exists(path / 'archive' / ensemble / corr):
os.makedirs(path / 'archive' / ensemble / corr)
files_to_save.append(file_in_archive)
else:
if os.path.exists(file):
if file not in files_to_save:
unlock(path, file_in_archive)
files_to_save.append(file_in_archive)
known_meas = pj.load_json_dict(str(file), verbose=False)
if code == "sfcf":
if parameter_file is not None:
parameters = sfcf.read_param(path, uuid, parameter_file)
else:
raise Exception("Need parameter file for this code!")
pars = {}
subkeys = list(measurement[corr].keys())
for subkey in subkeys:
pars[subkey] = sfcf.get_specs(corr + "/" + subkey, parameters)
elif code == "openQCD":
ms_type = list(measurement.keys())[0]
if ms_type == 'ms1':
if parameter_file is not None:
if parameter_file.endswith(".ms1.in"):
parameters = openQCD.load_ms1_infile(path, uuid, parameter_file)
elif parameter_file.endswith(".ms1.par"):
parameters = openQCD.load_ms1_parfile(path, uuid, parameter_file)
else:
# Temporary solution
parameters = {}
parameters["rand"] = {}
parameters["rw_fcts"] = [{}]
for nrw in range(1):
if "nsrc" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["nsrc"] = 1
if "mu" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["mu"] = "None"
if "np" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["np"] = "None"
if "irp" not in parameters["rw_fcts"][nrw]:
parameters["rw_fcts"][nrw]["irp"] = "None"
pars = {}
subkeys = []
for i in range(len(parameters["rw_fcts"])):
par_list = []
for k in parameters["rw_fcts"][i].keys():
par_list.append(str(parameters["rw_fcts"][i][k]))
subkey = "/".join(par_list)
subkeys.append(subkey)
pars[subkey] = json.dumps(parameters["rw_fcts"][i])
elif ms_type in ['t0', 't1']:
if parameter_file is not None:
parameters = openQCD.load_ms3_infile(path, uuid, parameter_file)
else:
parameters = {}
for rwp in ["integrator", "eps", "ntot", "dnms"]:
parameters[rwp] = "Unknown"
pars = {}
subkeys = []
par_list= []
for k in ["integrator", "eps", "ntot", "dnms"]:
par_list.append(str(parameters[k]))
subkey = "/".join(par_list)
subkeys = [subkey]
pars[subkey] = json.dumps(parameters)
for subkey in subkeys:
parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
meas_path = str(file_in_archive) + "::" + parHash
known_meas[parHash] = measurement[corr][subkey]
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None:
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, ))
else:
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file))
conn.commit()
pj.dump_dict_to_json(known_meas, str(file))
conn.close()
save(path, message="Add measurements to database", files=files_to_save)
return
def load_record(path: Path, meas_path: str) -> Union[Corr, Obs]:
"""
Load a list of records by their paths.
Parameters
----------
path: str
Path of the correlator library.
meas_path: str
The path to the correlator in the backlog system.
Returns
-------
co : Corr or Obs
The correlator in question.
"""
return load_records(path, [meas_path])[0]
def load_records(path: Path, meas_paths: list[str], preloaded: dict[str, Any] = {}, dry_run: bool = False) -> list[Union[Corr, Obs]]:
"""
Load a list of records by their paths.
Parameters
----------
path: str
Path of the correlator library.
meas_paths: list[str]
A list of the paths to the correlator in the backlog system.
preloaded: dict[str, Any]
The data that is already preloaded. Of interest if data has alread been loaded in the same script.
dry_run: bool
Do not load datda, just check whether we can reach the data we are interested in.
Returns
-------
returned_data: list
The loaded records.
"""
if dry_run:
_check_db2paths(path, meas_paths)
return []
needed_data: dict[str, list[str]] = {}
for mpath in meas_paths:
file = mpath.split("::")[0]
if file not in needed_data.keys():
needed_data[file] = []
key = mpath.split("::")[1]
needed_data[file].append(key)
returned_data: list[Any] = []
for file in needed_data.keys():
for key in list(needed_data[file]):
if os.path.exists(str(cache_path(path, file, key)) + ".p"):
returned_data.append(load_object(str(cache_path(path, file, key)) + ".p"))
else:
if file not in preloaded:
preloaded[file] = preload(path, Path(file))
returned_data.append(preloaded[file][key])
if cache_enabled(path):
if not os.path.exists(cache_dir(path, file)):
os.makedirs(cache_dir(path, file))
dump_object(preloaded[file][key], str(cache_path(path, file, key)))
return returned_data
def cache_dir(path: Path, file: str) -> Path:
"""
Returns the directory corresponding to the cache for the given file.
Parameters
----------
path: str
The path of the library.
file: str
The file in the library that we want to access the cached data of.
Returns
-------
cache_path: str
The path holding the cached data for the given file.
"""
cache_path_list = file.split("/")[1:]
cache_path = Path(path) / CACHE_DIR
for directory in cache_path_list:
cache_path /= directory
return cache_path
def cache_path(path: Path, file: str, key: str) -> Path:
"""
Parameters
----------
path: str
The path of the library.
file: str
The file in the library that we want to access the cached data of.
key: str
The key within the archive file.
Returns
-------
cache_path: str
The path at which the measurement of the given file and key is cached.
"""
cache_path = cache_dir(path, file) / key
return cache_path
def preload(path: Path, file: Path) -> dict[str, Any]:
"""
Read the contents of a file into a json dictionary with the pyerrors.json.load_json_dict method.
Parameters
----------
path: str
The path of the library.
file: str
The file within the library to be laoded.
Returns
-------
filedict: dict[str, Any]
The data read from the file.
"""
get(path, file)
filedict: dict[str, Any] = pj.load_json_dict(str(path / file))
print("> read file")
return filedict
def drop_record(path: Path, meas_path: str) -> None:
"""
Drop a record by it's path.
Parameters
----------
path: str
The path of the library.
meas_path: str
The measurement path as noted in the database.
"""
file_in_archive = meas_path.split("::")[0]
file = path / file_in_archive
db_file = get_db_file(path)
db = path / db_file
get(path, db_file)
sub_key = meas_path.split("::")[1]
unlock(path, db_file)
conn = sqlite3.connect(db)
c = conn.cursor()
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path, )).fetchone() is not None:
c.execute("DELETE FROM backlogs WHERE path = ?", (meas_path, ))
else:
raise ValueError("This measurement does not exist as an entry!")
conn.commit()
known_meas = pj.load_json_dict(str(file))
if sub_key in known_meas:
del known_meas[sub_key]
unlock(path, Path(file_in_archive))
pj.dump_dict_to_json(known_meas, str(file))
save(path, message="Drop measurements to database", files=[db, file])
return
else:
raise ValueError("This measurement does not exist as a file!")
def drop_cache(path: Path) -> None:
"""
Drop the cache directory of the library.
Parameters
----------
path: str
The path of the library.
"""
cache_dir = path / ".cache"
for f in os.listdir(cache_dir):
shutil.rmtree(cache_dir / f)
return