diff --git a/.gitignore b/.gitignore index e7385f6..dd21dea 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ pyerrors_corrlib.egg-info __pycache__ *.egg-info test.ipynb +test_ds diff --git a/README.md b/README.md index 976ae57..0f6c9a3 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,12 @@ This is done in a reproducible way using `datalad`. In principle, a dataset is created, that is automatically administered by the backlogger, in which data from differnt projects are held together. Everything is catalogued by a searchable SQL database, which holds the paths to the respective measurements. The original projects can be linked to the dataset and the data may be imported using wrapper functions around the read methonds of pyerrors. + +We work with the following nomenclature in this project: +- Measurement + A setis of Observables, including the appropriate metadata. +- Project + A series of measurements that was done by one person as part of their research. +- Record + An entry of a single Correlator in the database of the backlogger. +- \ No newline at end of file diff --git a/TODO.md b/TODO.md index 4153fc3..ba32ec9 100644 --- a/TODO.md +++ b/TODO.md @@ -1,14 +1,21 @@ # TODO ## Features -- implement import of non-datalad projects -- implement a way to use another backlog repo as a project - -- find a way to convey the mathematical structure of what EXACTLY is the form of the correlator in a specific project - - this could e.g. be done along the lines of mandatory documentation -- keep better track of the versions of the code, that was used for a specific measurement. - - maybe let this be an input in the project file? - - git repo and commit hash/version tag - +- [ ] implement import of non-datalad projects +- [ ] implement a way to use another backlog repo as a project +- [ ] make cache deadlock resistent (no read while writing) +- [ ] find a way to convey the mathematical structure of what EXACTLY is the form of the correlator in a specific project + - [ ] this could e.g. be done along the lines of mandatory documentation +- [ ] keep better track of the versions of the code, that was used for a specific measurement. + - [ ] maybe let this be an input in the project file? + - [ ] git repo and commit hash/version tag + - [ ] implement a code table? +- [ ] parallel processing of measurements +- [ ] extra SQL table for ensembles with UUID and aliases ## Bugfixes - [ ] revisit the reimport function for single files +- [ ] drop record needs to look if no records are left in a json file. + +## Rough Ideas +- [ ] multitable could provide a high speed implementation of an HDF5 based format +- [ ] implement also a way to include compiled binaries in the archives. diff --git a/corrlib/__init__.py b/corrlib/__init__.py index 91b07f4..41d8691 100644 --- a/corrlib/__init__.py +++ b/corrlib/__init__.py @@ -19,5 +19,6 @@ from .main import * from .import input as input from .initialization import * from .meas_io import * +from .cache_io import * from .find import * from .version import __version__ diff --git a/corrlib/cache_io.py b/corrlib/cache_io.py new file mode 100644 index 0000000..c890164 --- /dev/null +++ b/corrlib/cache_io.py @@ -0,0 +1,58 @@ +from typing import Union, Optional +import os +import shutil +from .tools import record2name_key +from pyerrors import dump_object +import datalad.api as dl +import sqlite3 + + +def get_version_hash(path, record): + db = os.path.join(path, "backlogger.db") + dl.get(db, dataset=path) + conn = sqlite3.connect(db) + c = conn.cursor() + c.execute(f"SELECT current_version FROM 'backlogs' WHERE path = '{record}'") + return c.fetchall()[0][0] + + +def drop_cache_files(path: str, fs: Optional[list[str]]=None): + cache_dir = os.path.join(path, ".cache") + if fs is None: + fs = os.listdir(cache_dir) + for f in fs: + shutil.rmtree(os.path.join(cache_dir, f)) + + +def cache_dir(path, file): + cache_path_list = [path] + cache_path_list.append(".cache") + cache_path_list.extend(file.split("/")[1:]) + cache_path = "/".join(cache_path_list) + return cache_path + + +def cache_path(path, file, sha_hash, key): + cache_path = os.path.join(cache_dir(path, file), key + "_" + sha_hash) + return cache_path + + +def is_old_version(path, record): + version_hash = get_version_hash(path, record) + file, key = record2name_key(record) + meas_cache_path = os.path.join(cache_dir(path, file)) + ls = [] + for p, ds, fs in os.walk(meas_cache_path): + ls.extend(fs) + for filename in ls: + if key == filename.split("_")[0]: + if not version_hash == filename.split("_")[1][:-2]: + return True + else: + return False + + +def is_in_cache(path, record): + version_hash = get_version_hash(path, record) + file, key = record2name_key(record) + return os.path.exists(cache_path(path, file, version_hash, key) + ".p") diff --git a/corrlib/cli.py b/corrlib/cli.py index b808c13..44ede1b 100644 --- a/corrlib/cli.py +++ b/corrlib/cli.py @@ -6,7 +6,7 @@ from .toml import import_tomls, update_project, reimport_project from .find import find_record, list_projects from .tools import str2list from .main import update_aliases -from .meas_io import drop_cache as mio_drop_cache +from .cache_io import drop_cache_files as cio_drop_cache_files import os @@ -171,7 +171,7 @@ def drop_cache( """ Drop the currect cache directory of the dataset. """ - mio_drop_cache(path) + cio_drop_cache_files(path) return diff --git a/corrlib/initialization.py b/corrlib/initialization.py index f6ef5aa..e5c0ede 100644 --- a/corrlib/initialization.py +++ b/corrlib/initialization.py @@ -21,7 +21,8 @@ def _create_db(db): parameters TEXT, parameter_file TEXT, created_at TEXT, - updated_at TEXT)''') + updated_at TEXT, + current_version TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS projects (id TEXT PRIMARY KEY, aliases TEXT, diff --git a/corrlib/meas_io.py b/corrlib/meas_io.py index 7122ca0..80925e1 100644 --- a/corrlib/meas_io.py +++ b/corrlib/meas_io.py @@ -4,14 +4,14 @@ import datalad.api as dl import sqlite3 from .input import sfcf,openQCD import json -from typing import Union -from pyerrors import Obs, Corr, dump_object, load_object -from hashlib import sha256 -from .tools import cached, get_file -import shutil +from typing import Union, Optional +from pyerrors import Obs, Corr, load_object, dump_object +from hashlib import sha256, sha1 +from .tools import cached, get_file, record2name_key, name_key2record, make_version_hash +from .cache_io import is_in_cache, cache_path, cache_dir, get_version_hash -def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=None): +def write_measurement(path, ensemble, measurement, uuid, code, parameter_file: Optional[str]=None): """ Write a measurement to the backlog. If the file for the measurement already exists, update the measurement. @@ -79,20 +79,21 @@ def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=No subkey = "/".join(par_list) subkeys = [subkey] pars[subkey] = json.dumps(parameters) - for subkey in subkeys: - parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest() - meas_path = file_in_archive + "::" + parHash - known_meas[parHash] = measurement[corr][subkey] - - if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None: - c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, )) - else: - c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))", + meas_paths = [] + for subkey in subkeys: + par_hash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest() + meas_path = name_key2record(file_in_archive, par_hash) + meas_paths.append(meas_path) + known_meas[par_hash] = measurement[corr][subkey] + data_hash = make_version_hash(path, meas_path) + if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is None: + c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))", (corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file)) - conn.commit() + c.execute("UPDATE backlogs SET current_version = ?, updated_at = datetime('now') WHERE path = ?", (data_hash, meas_path)) pj.dump_dict_to_json(known_meas, file) - files.append(path + '/backlogger.db') + conn.commit() + files.append(db) conn.close() dl.save(files, message="Add measurements to database", dataset=path) @@ -116,7 +117,7 @@ def load_record(path: str, meas_path: str): return load_records(path, [meas_path])[0] -def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union[Corr, Obs]]: +def load_records(path: str, record_paths: list[str], preloaded = {}) -> list[Union[Corr, Obs]]: """ Load a list of records by their paths. @@ -132,41 +133,32 @@ def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union List """ needed_data: dict[str, list[str]] = {} - for mpath in meas_paths: - file = mpath.split("::")[0] + for rpath in record_paths: + file, key = record2name_key(rpath) if file not in needed_data.keys(): needed_data[file] = [] - key = mpath.split("::")[1] needed_data[file].append(key) returned_data: list = [] for file in needed_data.keys(): for key in list(needed_data[file]): - if os.path.exists(cache_path(path, file, key) + ".p"): - returned_data.append(load_object(cache_path(path, file, key) + ".p")) + record = name_key2record(file, key) + current_version = get_version_hash(path, record) + if is_in_cache(path, record): + returned_data.append(load_object(cache_path(path, file, current_version, key) + ".p")) else: if file not in preloaded: preloaded[file] = preload(path, file) returned_data.append(preloaded[file][key]) if cached: - if not os.path.exists(cache_dir(path, file)): - os.makedirs(cache_dir(path, file)) - dump_object(preloaded[file][key], cache_path(path, file, key)) + if not is_in_cache(path, record): + file, key = record2name_key(record) + if not os.path.exists(cache_dir(path, file)): + os.makedirs(cache_dir(path, file)) + current_version = get_version_hash(path, record) + dump_object(preloaded[file][key], cache_path(path, file, current_version, key)) return returned_data -def cache_dir(path, file): - cache_path_list = [path] - cache_path_list.append(".cache") - cache_path_list.extend(file.split("/")[1:]) - cache_path = "/".join(cache_path_list) - return cache_path - - -def cache_path(path, file, key): - cache_path = os.path.join(cache_dir(path, file), key) - return cache_path - - def preload(path: str, file: str): get_file(path, file) filedict = pj.load_json_dict(os.path.join(path, file)) @@ -175,11 +167,10 @@ def preload(path: str, file: str): def drop_record(path: str, meas_path: str): - file_in_archive = meas_path.split("::")[0] + file_in_archive, sub_key = record2name_key(meas_path) file = os.path.join(path, file_in_archive) db = os.path.join(path, 'backlogger.db') get_file(path, 'backlogger.db') - sub_key = meas_path.split("::")[1] dl.unlock(db, dataset=path) conn = sqlite3.connect(db) c = conn.cursor() @@ -199,7 +190,3 @@ def drop_record(path: str, meas_path: str): else: raise ValueError("This measurement does not exist as a file!") -def drop_cache(path: str): - cache_dir = os.path.join(path, ".cache") - for f in os.listdir(cache_dir): - shutil.rmtree(os.path.join(cache_dir, f)) diff --git a/corrlib/tools.py b/corrlib/tools.py index 14bfc05..e8a9c18 100644 --- a/corrlib/tools.py +++ b/corrlib/tools.py @@ -1,6 +1,6 @@ import os import datalad.api as dl - +import hashlib def str2list(string): return string.split(",") @@ -19,11 +19,18 @@ def k2m(k): return (1/(2*k))-4 -def get_file(path, file): - if file == "backlogger.db": - print("Downloading database...") - else: - print("Downloading data...") - dl.get(os.path.join(path, file), dataset=path) - print("> downloaded file") - \ No newline at end of file +def record2name_key(record_path: str): + file = record_path.split("::")[0] + key = record_path.split("::")[1] + return file, key + + +def name_key2record(name: str, key: str): + return name + "::" + key + + +def make_version_hash(path, record): + file, key = record2name_key(record) + with open(os.path.join(path, file), 'rb') as fp: + file_hash = hashlib.file_digest(fp, 'sha1').hexdigest() + return file_hash diff --git a/setup.py b/setup.py index add6910..6b8794e 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setup(name='pycorrlib', version=version['__version__'], author='Justus Kuhlmann', author_email='j_kuhl19@uni-muenster.de', - install_requires=['pyerrors>=2.11.1', 'datalad>=1.1.0', 'typer>=0.12.5'], + install_requires=['pyerrors>=2.11.1', 'datalad>=1.1.0', 'typer>=0.12.5', 'gitpython>=3.1.45'], entry_points = { 'console_scripts': ['pcl=corrlib.cli:app'], }, diff --git a/tests/test_import_project.py b/tests/test_import_project.py deleted file mode 100644 index e69de29..0000000