295 lines
9.6 KiB
Python
295 lines
9.6 KiB
Python
from pyerrors.input import json as pj
|
|
import os
|
|
import sqlite3
|
|
from .input import sfcf,openQCD
|
|
import json
|
|
from typing import Union
|
|
from pyerrors import Obs, Corr, dump_object, load_object
|
|
from hashlib import sha256
|
|
from .tools import get_db_file, cache_enabled
|
|
from .tracker import get, save, unlock
|
|
import shutil
|
|
from typing import Any
|
|
from pathlib import Path
|
|
|
|
|
|
CACHE_DIR = ".cache"
|
|
|
|
|
|
def write_measurement(path: Path, ensemble: str, measurement: dict[str, dict[str, dict[str, Any]]], uuid: str, code: str, parameter_file: Union[str, None]) -> None:
|
|
"""
|
|
Write a measurement to the backlog.
|
|
If the file for the measurement already exists, update the measurement.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path to the backlogger folder.
|
|
ensemble: str
|
|
The ensemble of the measurement.
|
|
measurement: dict
|
|
Measurements to be captured in the backlogging system.
|
|
uuid: str
|
|
The uuid of the project.
|
|
code: str
|
|
Name of the code that was used for the project.
|
|
parameter_file: str
|
|
The parameter file used for the measurement.
|
|
"""
|
|
db_file = get_db_file(path)
|
|
db = path / db_file
|
|
|
|
files_to_save = []
|
|
|
|
get(path, db_file)
|
|
unlock(path, db_file)
|
|
files_to_save.append(db_file)
|
|
|
|
conn = sqlite3.connect(db)
|
|
c = conn.cursor()
|
|
for corr in measurement.keys():
|
|
file_in_archive = Path('.') / 'archive' / ensemble / corr / str(uuid + '.json.gz')
|
|
file = path / file_in_archive
|
|
known_meas = {}
|
|
if not os.path.exists(path / 'archive' / ensemble / corr):
|
|
os.makedirs(path / 'archive' / ensemble / corr)
|
|
files_to_save.append(file_in_archive)
|
|
else:
|
|
if os.path.exists(file):
|
|
if file not in files_to_save:
|
|
unlock(path, file_in_archive)
|
|
files_to_save.append(file_in_archive)
|
|
known_meas = pj.load_json_dict(file, verbose=False)
|
|
if code == "sfcf":
|
|
if parameter_file is not None:
|
|
parameters = sfcf.read_param(path, uuid, parameter_file)
|
|
else:
|
|
raise Exception("Need parameter file for this code!")
|
|
pars = {}
|
|
subkeys = list(measurement[corr].keys())
|
|
for subkey in subkeys:
|
|
pars[subkey] = sfcf.get_specs(corr + "/" + subkey, parameters)
|
|
|
|
elif code == "openQCD":
|
|
ms_type = list(measurement.keys())[0]
|
|
if ms_type == 'ms1':
|
|
if parameter_file is not None:
|
|
parameters = openQCD.read_ms1_param(path, uuid, parameter_file)
|
|
else:
|
|
raise Exception("Need parameter file for this code!")
|
|
pars = {}
|
|
subkeys = []
|
|
for i in range(len(parameters["rw_fcts"])):
|
|
par_list = []
|
|
for k in parameters["rw_fcts"][i].keys():
|
|
par_list.append(str(parameters["rw_fcts"][i][k]))
|
|
subkey = "/".join(par_list)
|
|
subkeys.append(subkey)
|
|
pars[subkey] = json.dumps(parameters["rw_fcts"][i])
|
|
elif ms_type in ['t0', 't1']:
|
|
if parameter_file is not None:
|
|
parameters = openQCD.read_ms3_param(path, uuid, parameter_file)
|
|
else:
|
|
parameters = {}
|
|
for rwp in ["integrator", "eps", "ntot", "dnms"]:
|
|
parameters[rwp] = "Unknown"
|
|
pars = {}
|
|
subkeys = []
|
|
par_list= []
|
|
for k in ["integrator", "eps", "ntot", "dnms"]:
|
|
par_list.append(str(parameters[k]))
|
|
subkey = "/".join(par_list)
|
|
subkeys = [subkey]
|
|
pars[subkey] = json.dumps(parameters)
|
|
for subkey in subkeys:
|
|
parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
|
|
meas_path = str(file_in_archive) + "::" + parHash
|
|
|
|
known_meas[parHash] = measurement[corr][subkey]
|
|
|
|
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None:
|
|
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, ))
|
|
else:
|
|
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
|
|
(corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file))
|
|
conn.commit()
|
|
pj.dump_dict_to_json(known_meas, file)
|
|
conn.close()
|
|
save(path, message="Add measurements to database", files=files_to_save)
|
|
return
|
|
|
|
|
|
def load_record(path: Path, meas_path: str) -> Union[Corr, Obs]:
|
|
"""
|
|
Load a list of records by their paths.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
Path of the correlator library.
|
|
meas_path: str
|
|
The path to the correlator in the backlog system.
|
|
|
|
Returns
|
|
-------
|
|
co : Corr or Obs
|
|
The correlator in question.
|
|
"""
|
|
return load_records(path, [meas_path])[0]
|
|
|
|
|
|
def load_records(path: Path, meas_paths: list[str], preloaded: dict[str, Any] = {}) -> list[Union[Corr, Obs]]:
|
|
"""
|
|
Load a list of records by their paths.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
Path of the correlator library.
|
|
meas_paths: list[str]
|
|
A list of the paths to the correlator in the backlog system.
|
|
perloaded: dict[str, Any]
|
|
The data that is already prelaoded. Of interest if data has alread been loaded in the same script.
|
|
|
|
Returns
|
|
-------
|
|
retruned_data: list
|
|
The loaded records.
|
|
"""
|
|
needed_data: dict[str, list[str]] = {}
|
|
for mpath in meas_paths:
|
|
file = mpath.split("::")[0]
|
|
if file not in needed_data.keys():
|
|
needed_data[file] = []
|
|
key = mpath.split("::")[1]
|
|
needed_data[file].append(key)
|
|
returned_data: list[Any] = []
|
|
for file in needed_data.keys():
|
|
for key in list(needed_data[file]):
|
|
if os.path.exists(str(cache_path(path, file, key)) + ".p"):
|
|
returned_data.append(load_object(str(cache_path(path, file, key)) + ".p"))
|
|
else:
|
|
if file not in preloaded:
|
|
preloaded[file] = preload(path, Path(file))
|
|
returned_data.append(preloaded[file][key])
|
|
if cache_enabled(path):
|
|
if not os.path.exists(cache_dir(path, file)):
|
|
os.makedirs(cache_dir(path, file))
|
|
dump_object(preloaded[file][key], cache_path(path, file, key))
|
|
return returned_data
|
|
|
|
|
|
def cache_dir(path: Path, file: str) -> Path:
|
|
"""
|
|
Returns the directory corresponding to the cache for the given file.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path of the library.
|
|
file: str
|
|
The file in the library that we want to access the cached data of.
|
|
Returns
|
|
-------
|
|
cache_path: str
|
|
The path holding the cached data for the given file.
|
|
"""
|
|
cache_path_list = file.split("/")[1:]
|
|
cache_path = path / CACHE_DIR
|
|
for directory in cache_path_list:
|
|
cache_path /= directory
|
|
return cache_path
|
|
|
|
|
|
def cache_path(path: Path, file: str, key: str) -> Path:
|
|
"""
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path of the library.
|
|
file: str
|
|
The file in the library that we want to access the cached data of.
|
|
key: str
|
|
The key within the archive file.
|
|
|
|
Returns
|
|
-------
|
|
cache_path: str
|
|
The path at which the measurement of the given file and key is cached.
|
|
"""
|
|
cache_path = cache_dir(path, file) / key
|
|
return cache_path
|
|
|
|
|
|
def preload(path: Path, file: Path) -> dict[str, Any]:
|
|
"""
|
|
Read the contents of a file into a json dictionary with the pyerrors.json.load_json_dict method.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path of the library.
|
|
file: str
|
|
The file within the library to be laoded.
|
|
|
|
Returns
|
|
-------
|
|
filedict: dict[str, Any]
|
|
The data read from the file.
|
|
"""
|
|
get(path, file)
|
|
filedict: dict[str, Any] = pj.load_json_dict(path / file)
|
|
print("> read file")
|
|
return filedict
|
|
|
|
|
|
def drop_record(path: Path, meas_path: str) -> None:
|
|
"""
|
|
Drop a record by it's path.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path of the library.
|
|
meas_path: str
|
|
The measurement path as noted in the database.
|
|
"""
|
|
file_in_archive = meas_path.split("::")[0]
|
|
file = path / file_in_archive
|
|
db_file = get_db_file(path)
|
|
db = path / db_file
|
|
get(path, db_file)
|
|
sub_key = meas_path.split("::")[1]
|
|
unlock(path, db_file)
|
|
conn = sqlite3.connect(db)
|
|
c = conn.cursor()
|
|
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path, )).fetchone() is not None:
|
|
c.execute("DELETE FROM backlogs WHERE path = ?", (meas_path, ))
|
|
else:
|
|
raise ValueError("This measurement does not exist as an entry!")
|
|
|
|
conn.commit()
|
|
known_meas = pj.load_json_dict(file)
|
|
if sub_key in known_meas:
|
|
del known_meas[sub_key]
|
|
unlock(path, Path(file_in_archive))
|
|
pj.dump_dict_to_json(known_meas, file)
|
|
save(path, message="Drop measurements to database", files=[db, file])
|
|
return
|
|
else:
|
|
raise ValueError("This measurement does not exist as a file!")
|
|
|
|
|
|
def drop_cache(path: Path) -> None:
|
|
"""
|
|
Drop the cache directory of the library.
|
|
|
|
Parameters
|
|
----------
|
|
path: str
|
|
The path of the library.
|
|
"""
|
|
cache_dir = path / ".cache"
|
|
for f in os.listdir(cache_dir):
|
|
shutil.rmtree(cache_dir / f)
|
|
return
|