Merge branch 'develop' into refactor/data_backend
Some checks failed
Mypy / mypy (push) Failing after 34s
Pytest / pytest (3.12) (push) Failing after 47s
Pytest / pytest (3.13) (push) Failing after 44s
Pytest / pytest (3.14) (push) Failing after 46s
Ruff / ruff (push) Failing after 33s

This commit is contained in:
Justus Kuhlmann 2025-12-04 11:16:23 +01:00
commit 641c612a59
Signed by: jkuhl
GPG key ID: 00ED992DD79B85A6
26 changed files with 3012 additions and 110 deletions

View file

@ -10,9 +10,10 @@ from hashlib import sha256
from .tools import cached
from .tracker import get
import shutil
from typing import Any
def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=None):
def write_measurement(path: str, ensemble: str, measurement: dict[str, dict[str, dict[str, Any]]], uuid: str, code: str, parameter_file: str) -> None:
"""
Write a measurement to the backlog.
If the file for the measurement already exists, update the measurement.
@ -59,7 +60,7 @@ def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=No
pars = {}
subkeys = []
for i in range(len(parameters["rw_fcts"])):
par_list = []
par_list = []
for k in parameters["rw_fcts"][i].keys():
par_list.append(str(parameters["rw_fcts"][i][k]))
subkey = "/".join(par_list)
@ -80,12 +81,12 @@ def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=No
subkey = "/".join(par_list)
subkeys = [subkey]
pars[subkey] = json.dumps(parameters)
for subkey in subkeys:
for subkey in subkeys:
parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
meas_path = file_in_archive + "::" + parHash
known_meas[parHash] = measurement[corr][subkey]
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None:
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, ))
else:
@ -98,7 +99,7 @@ def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=No
dl.save(files, message="Add measurements to database", dataset=path)
def load_record(path: str, meas_path: str):
def load_record(path: str, meas_path: str) -> Union[Corr, Obs]:
"""
Load a list of records by their paths.
@ -108,7 +109,7 @@ def load_record(path: str, meas_path: str):
Path of the correlator library.
meas_path: str
The path to the correlator in the backlog system.
Returns
-------
co : Corr or Obs
@ -117,7 +118,7 @@ def load_record(path: str, meas_path: str):
return load_records(path, [meas_path])[0]
def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union[Corr, Obs]]:
def load_records(path: str, meas_paths: list[str], preloaded: dict[str, Any] = {}) -> list[Union[Corr, Obs]]:
"""
Load a list of records by their paths.
@ -127,7 +128,7 @@ def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union
Path of the correlator library.
meas_paths: list[str]
A list of the paths to the correlator in the backlog system.
Returns
-------
List
@ -139,7 +140,7 @@ def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union
needed_data[file] = []
key = mpath.split("::")[1]
needed_data[file].append(key)
returned_data: list = []
returned_data: list[Any] = []
for file in needed_data.keys():
for key in list(needed_data[file]):
if os.path.exists(cache_path(path, file, key) + ".p"):
@ -155,7 +156,7 @@ def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union
return returned_data
def cache_dir(path, file):
def cache_dir(path: str, file: str) -> str:
cache_path_list = [path]
cache_path_list.append(".cache")
cache_path_list.extend(file.split("/")[1:])
@ -163,19 +164,19 @@ def cache_dir(path, file):
return cache_path
def cache_path(path, file, key):
def cache_path(path: str, file: str, key: str) -> str:
cache_path = os.path.join(cache_dir(path, file), key)
return cache_path
def preload(path: str, file: str):
def preload(path: str, file: str) -> dict[str, Any]:
get(path, file)
filedict = pj.load_json_dict(os.path.join(path, file))
filedict: dict[str, Any] = pj.load_json_dict(os.path.join(path, file))
print("> read file")
return filedict
def drop_record(path: str, meas_path: str):
def drop_record(path: str, meas_path: str) -> None:
file_in_archive = meas_path.split("::")[0]
file = os.path.join(path, file_in_archive)
db = os.path.join(path, 'backlogger.db')
@ -200,7 +201,9 @@ def drop_record(path: str, meas_path: str):
else:
raise ValueError("This measurement does not exist as a file!")
def drop_cache(path: str):
def drop_cache(path: str) -> None:
cache_dir = os.path.join(path, ".cache")
for f in os.listdir(cache_dir):
shutil.rmtree(os.path.join(cache_dir, f))
return