from pyerrors.input import json as pj import os import sqlite3 from .input import sfcf,openQCD import json from typing import Union from pyerrors import Obs, Corr, dump_object, load_object from hashlib import sha256 from .tools import get_db_file, cache_enabled from .tracker import get, save, unlock import shutil from typing import Any from pathlib import Path CACHE_DIR = ".cache" def write_measurement(path: Path, ensemble: str, measurement: dict[str, dict[str, dict[str, Any]]], uuid: str, code: str, parameter_file: Union[str, None]) -> None: """ Write a measurement to the backlog. If the file for the measurement already exists, update the measurement. Parameters ---------- path: str The path to the backlogger folder. ensemble: str The ensemble of the measurement. measurement: dict Measurements to be captured in the backlogging system. uuid: str The uuid of the project. code: str Name of the code that was used for the project. parameter_file: str The parameter file used for the measurement. """ db_file = get_db_file(path) db = path / db_file files_to_save = [] get(path, db_file) unlock(path, db_file) files_to_save.append(db_file) conn = sqlite3.connect(db) c = conn.cursor() for corr in measurement.keys(): file_in_archive = Path('.') / 'archive' / ensemble / corr / str(uuid + '.json.gz') file = path / file_in_archive known_meas = {} if not os.path.exists(path / 'archive' / ensemble / corr): os.makedirs(path / 'archive' / ensemble / corr) files_to_save.append(file_in_archive) else: if os.path.exists(file): if file not in files_to_save: unlock(path, file_in_archive) files_to_save.append(file_in_archive) known_meas = pj.load_json_dict(file, verbose=False) if code == "sfcf": if parameter_file is not None: parameters = sfcf.read_param(path, uuid, parameter_file) else: raise Exception("Need parameter file for this code!") pars = {} subkeys = list(measurement[corr].keys()) for subkey in subkeys: pars[subkey] = sfcf.get_specs(corr + "/" + subkey, parameters) elif code == "openQCD": ms_type = list(measurement.keys())[0] if ms_type == 'ms1': if parameter_file is not None: parameters = openQCD.read_ms1_param(path, uuid, parameter_file) else: raise Exception("Need parameter file for this code!") pars = {} subkeys = [] for i in range(len(parameters["rw_fcts"])): par_list = [] for k in parameters["rw_fcts"][i].keys(): par_list.append(str(parameters["rw_fcts"][i][k])) subkey = "/".join(par_list) subkeys.append(subkey) pars[subkey] = json.dumps(parameters["rw_fcts"][i]) elif ms_type in ['t0', 't1']: if parameter_file is not None: parameters = openQCD.read_ms3_param(path, uuid, parameter_file) else: parameters = {} for rwp in ["integrator", "eps", "ntot", "dnms"]: parameters[rwp] = "Unknown" pars = {} subkeys = [] par_list= [] for k in ["integrator", "eps", "ntot", "dnms"]: par_list.append(str(parameters[k])) subkey = "/".join(par_list) subkeys = [subkey] pars[subkey] = json.dumps(parameters) for subkey in subkeys: parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest() meas_path = str(file_in_archive) + "::" + parHash known_meas[parHash] = measurement[corr][subkey] if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None: c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, )) else: c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))", (corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file)) conn.commit() pj.dump_dict_to_json(known_meas, file) conn.close() save(path, message="Add measurements to database", files=files_to_save) return def load_record(path: Path, meas_path: str) -> Union[Corr, Obs]: """ Load a list of records by their paths. Parameters ---------- path: str Path of the correlator library. meas_path: str The path to the correlator in the backlog system. Returns ------- co : Corr or Obs The correlator in question. """ return load_records(path, [meas_path])[0] def load_records(path: Path, meas_paths: list[str], preloaded: dict[str, Any] = {}) -> list[Union[Corr, Obs]]: """ Load a list of records by their paths. Parameters ---------- path: str Path of the correlator library. meas_paths: list[str] A list of the paths to the correlator in the backlog system. perloaded: dict[str, Any] The data that is already prelaoded. Of interest if data has alread been loaded in the same script. Returns ------- retruned_data: list The loaded records. """ needed_data: dict[str, list[str]] = {} for mpath in meas_paths: file = mpath.split("::")[0] if file not in needed_data.keys(): needed_data[file] = [] key = mpath.split("::")[1] needed_data[file].append(key) returned_data: list[Any] = [] for file in needed_data.keys(): for key in list(needed_data[file]): if os.path.exists(str(cache_path(path, file, key)) + ".p"): returned_data.append(load_object(str(cache_path(path, file, key)) + ".p")) else: if file not in preloaded: preloaded[file] = preload(path, Path(file)) returned_data.append(preloaded[file][key]) if cache_enabled(path): if not os.path.exists(cache_dir(path, file)): os.makedirs(cache_dir(path, file)) dump_object(preloaded[file][key], cache_path(path, file, key)) return returned_data def cache_dir(path: Path, file: str) -> Path: """ Returns the directory corresponding to the cache for the given file. Parameters ---------- path: str The path of the library. file: str The file in the library that we want to access the cached data of. Returns ------- cache_path: str The path holding the cached data for the given file. """ cache_path_list = file.split("/")[1:] cache_path = path / CACHE_DIR for directory in cache_path_list: cache_path /= directory return cache_path def cache_path(path: Path, file: str, key: str) -> Path: """ Parameters ---------- path: str The path of the library. file: str The file in the library that we want to access the cached data of. key: str The key within the archive file. Returns ------- cache_path: str The path at which the measurement of the given file and key is cached. """ cache_path = cache_dir(path, file) / key return cache_path def preload(path: Path, file: Path) -> dict[str, Any]: """ Read the contents of a file into a json dictionary with the pyerrors.json.load_json_dict method. Parameters ---------- path: str The path of the library. file: str The file within the library to be laoded. Returns ------- filedict: dict[str, Any] The data read from the file. """ get(path, file) filedict: dict[str, Any] = pj.load_json_dict(path / file) print("> read file") return filedict def drop_record(path: Path, meas_path: str) -> None: """ Drop a record by it's path. Parameters ---------- path: str The path of the library. meas_path: str The measurement path as noted in the database. """ file_in_archive = meas_path.split("::")[0] file = path / file_in_archive db_file = get_db_file(path) db = path / db_file get(path, db_file) sub_key = meas_path.split("::")[1] unlock(path, db_file) conn = sqlite3.connect(db) c = conn.cursor() if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path, )).fetchone() is not None: c.execute("DELETE FROM backlogs WHERE path = ?", (meas_path, )) else: raise ValueError("This measurement does not exist as an entry!") conn.commit() known_meas = pj.load_json_dict(file) if sub_key in known_meas: del known_meas[sub_key] unlock(path, Path(file_in_archive)) pj.dump_dict_to_json(known_meas, file) save(path, message="Drop measurements to database", files=[db, file]) return else: raise ValueError("This measurement does not exist as a file!") def drop_cache(path: Path) -> None: """ Drop the cache directory of the library. Parameters ---------- path: str The path of the library. """ cache_dir = path / ".cache" for f in os.listdir(cache_dir): shutil.rmtree(cache_dir / f) return