Compare commits

...
Sign in to create a new pull request.

14 commits

11 changed files with 138 additions and 67 deletions

1
.gitignore vendored
View file

@ -2,3 +2,4 @@ pyerrors_corrlib.egg-info
__pycache__ __pycache__
*.egg-info *.egg-info
test.ipynb test.ipynb
test_ds

View file

@ -5,3 +5,12 @@ This is done in a reproducible way using `datalad`.
In principle, a dataset is created, that is automatically administered by the backlogger, in which data from differnt projects are held together. In principle, a dataset is created, that is automatically administered by the backlogger, in which data from differnt projects are held together.
Everything is catalogued by a searchable SQL database, which holds the paths to the respective measurements. Everything is catalogued by a searchable SQL database, which holds the paths to the respective measurements.
The original projects can be linked to the dataset and the data may be imported using wrapper functions around the read methonds of pyerrors. The original projects can be linked to the dataset and the data may be imported using wrapper functions around the read methonds of pyerrors.
We work with the following nomenclature in this project:
- Measurement
A setis of Observables, including the appropriate metadata.
- Project
A series of measurements that was done by one person as part of their research.
- Record
An entry of a single Correlator in the database of the backlogger.
-

25
TODO.md
View file

@ -1,14 +1,21 @@
# TODO # TODO
## Features ## Features
- implement import of non-datalad projects - [ ] implement import of non-datalad projects
- implement a way to use another backlog repo as a project - [ ] implement a way to use another backlog repo as a project
- [ ] make cache deadlock resistent (no read while writing)
- find a way to convey the mathematical structure of what EXACTLY is the form of the correlator in a specific project - [ ] find a way to convey the mathematical structure of what EXACTLY is the form of the correlator in a specific project
- this could e.g. be done along the lines of mandatory documentation - [ ] this could e.g. be done along the lines of mandatory documentation
- keep better track of the versions of the code, that was used for a specific measurement. - [ ] keep better track of the versions of the code, that was used for a specific measurement.
- maybe let this be an input in the project file? - [ ] maybe let this be an input in the project file?
- git repo and commit hash/version tag - [ ] git repo and commit hash/version tag
- [ ] implement a code table?
- [ ] parallel processing of measurements
- [ ] extra SQL table for ensembles with UUID and aliases
## Bugfixes ## Bugfixes
- [ ] revisit the reimport function for single files - [ ] revisit the reimport function for single files
- [ ] drop record needs to look if no records are left in a json file.
## Rough Ideas
- [ ] multitable could provide a high speed implementation of an HDF5 based format
- [ ] implement also a way to include compiled binaries in the archives.

View file

@ -19,5 +19,6 @@ from .main import *
from .import input as input from .import input as input
from .initialization import * from .initialization import *
from .meas_io import * from .meas_io import *
from .cache_io import *
from .find import * from .find import *
from .version import __version__ from .version import __version__

58
corrlib/cache_io.py Normal file
View file

@ -0,0 +1,58 @@
from typing import Union, Optional
import os
import shutil
from .tools import record2name_key
from pyerrors import dump_object
import datalad.api as dl
import sqlite3
def get_version_hash(path, record):
db = os.path.join(path, "backlogger.db")
dl.get(db, dataset=path)
conn = sqlite3.connect(db)
c = conn.cursor()
c.execute(f"SELECT current_version FROM 'backlogs' WHERE path = '{record}'")
return c.fetchall()[0][0]
def drop_cache_files(path: str, fs: Optional[list[str]]=None):
cache_dir = os.path.join(path, ".cache")
if fs is None:
fs = os.listdir(cache_dir)
for f in fs:
shutil.rmtree(os.path.join(cache_dir, f))
def cache_dir(path, file):
cache_path_list = [path]
cache_path_list.append(".cache")
cache_path_list.extend(file.split("/")[1:])
cache_path = "/".join(cache_path_list)
return cache_path
def cache_path(path, file, sha_hash, key):
cache_path = os.path.join(cache_dir(path, file), key + "_" + sha_hash)
return cache_path
def is_old_version(path, record):
version_hash = get_version_hash(path, record)
file, key = record2name_key(record)
meas_cache_path = os.path.join(cache_dir(path, file))
ls = []
for p, ds, fs in os.walk(meas_cache_path):
ls.extend(fs)
for filename in ls:
if key == filename.split("_")[0]:
if not version_hash == filename.split("_")[1][:-2]:
return True
else:
return False
def is_in_cache(path, record):
version_hash = get_version_hash(path, record)
file, key = record2name_key(record)
return os.path.exists(cache_path(path, file, version_hash, key) + ".p")

View file

@ -6,7 +6,7 @@ from .toml import import_tomls, update_project, reimport_project
from .find import find_record, list_projects from .find import find_record, list_projects
from .tools import str2list from .tools import str2list
from .main import update_aliases from .main import update_aliases
from .meas_io import drop_cache as mio_drop_cache from .cache_io import drop_cache_files as cio_drop_cache_files
import os import os
@ -171,7 +171,7 @@ def drop_cache(
""" """
Drop the currect cache directory of the dataset. Drop the currect cache directory of the dataset.
""" """
mio_drop_cache(path) cio_drop_cache_files(path)
return return

View file

@ -21,7 +21,8 @@ def _create_db(db):
parameters TEXT, parameters TEXT,
parameter_file TEXT, parameter_file TEXT,
created_at TEXT, created_at TEXT,
updated_at TEXT)''') updated_at TEXT,
current_version TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS projects c.execute('''CREATE TABLE IF NOT EXISTS projects
(id TEXT PRIMARY KEY, (id TEXT PRIMARY KEY,
aliases TEXT, aliases TEXT,

View file

@ -4,14 +4,14 @@ import datalad.api as dl
import sqlite3 import sqlite3
from .input import sfcf,openQCD from .input import sfcf,openQCD
import json import json
from typing import Union from typing import Union, Optional
from pyerrors import Obs, Corr, dump_object, load_object from pyerrors import Obs, Corr, load_object, dump_object
from hashlib import sha256 from hashlib import sha256, sha1
from .tools import cached, get_file from .tools import cached, get_file, record2name_key, name_key2record, make_version_hash
import shutil from .cache_io import is_in_cache, cache_path, cache_dir, get_version_hash
def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=None): def write_measurement(path, ensemble, measurement, uuid, code, parameter_file: Optional[str]=None):
""" """
Write a measurement to the backlog. Write a measurement to the backlog.
If the file for the measurement already exists, update the measurement. If the file for the measurement already exists, update the measurement.
@ -79,20 +79,21 @@ def write_measurement(path, ensemble, measurement, uuid, code, parameter_file=No
subkey = "/".join(par_list) subkey = "/".join(par_list)
subkeys = [subkey] subkeys = [subkey]
pars[subkey] = json.dumps(parameters) pars[subkey] = json.dumps(parameters)
for subkey in subkeys:
parHash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
meas_path = file_in_archive + "::" + parHash
known_meas[parHash] = measurement[corr][subkey] meas_paths = []
for subkey in subkeys:
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None: par_hash = sha256(str(pars[subkey]).encode('UTF-8')).hexdigest()
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, )) meas_path = name_key2record(file_in_archive, par_hash)
else: meas_paths.append(meas_path)
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))", known_meas[par_hash] = measurement[corr][subkey]
data_hash = make_version_hash(path, meas_path)
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is None:
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'))",
(corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file)) (corr, ensemble, code, meas_path, uuid, pars[subkey], parameter_file))
conn.commit() c.execute("UPDATE backlogs SET current_version = ?, updated_at = datetime('now') WHERE path = ?", (data_hash, meas_path))
pj.dump_dict_to_json(known_meas, file) pj.dump_dict_to_json(known_meas, file)
files.append(path + '/backlogger.db') conn.commit()
files.append(db)
conn.close() conn.close()
dl.save(files, message="Add measurements to database", dataset=path) dl.save(files, message="Add measurements to database", dataset=path)
@ -116,7 +117,7 @@ def load_record(path: str, meas_path: str):
return load_records(path, [meas_path])[0] return load_records(path, [meas_path])[0]
def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union[Corr, Obs]]: def load_records(path: str, record_paths: list[str], preloaded = {}) -> list[Union[Corr, Obs]]:
""" """
Load a list of records by their paths. Load a list of records by their paths.
@ -132,41 +133,32 @@ def load_records(path: str, meas_paths: list[str], preloaded = {}) -> list[Union
List List
""" """
needed_data: dict[str, list[str]] = {} needed_data: dict[str, list[str]] = {}
for mpath in meas_paths: for rpath in record_paths:
file = mpath.split("::")[0] file, key = record2name_key(rpath)
if file not in needed_data.keys(): if file not in needed_data.keys():
needed_data[file] = [] needed_data[file] = []
key = mpath.split("::")[1]
needed_data[file].append(key) needed_data[file].append(key)
returned_data: list = [] returned_data: list = []
for file in needed_data.keys(): for file in needed_data.keys():
for key in list(needed_data[file]): for key in list(needed_data[file]):
if os.path.exists(cache_path(path, file, key) + ".p"): record = name_key2record(file, key)
returned_data.append(load_object(cache_path(path, file, key) + ".p")) current_version = get_version_hash(path, record)
if is_in_cache(path, record):
returned_data.append(load_object(cache_path(path, file, current_version, key) + ".p"))
else: else:
if file not in preloaded: if file not in preloaded:
preloaded[file] = preload(path, file) preloaded[file] = preload(path, file)
returned_data.append(preloaded[file][key]) returned_data.append(preloaded[file][key])
if cached: if cached:
if not os.path.exists(cache_dir(path, file)): if not is_in_cache(path, record):
os.makedirs(cache_dir(path, file)) file, key = record2name_key(record)
dump_object(preloaded[file][key], cache_path(path, file, key)) if not os.path.exists(cache_dir(path, file)):
os.makedirs(cache_dir(path, file))
current_version = get_version_hash(path, record)
dump_object(preloaded[file][key], cache_path(path, file, current_version, key))
return returned_data return returned_data
def cache_dir(path, file):
cache_path_list = [path]
cache_path_list.append(".cache")
cache_path_list.extend(file.split("/")[1:])
cache_path = "/".join(cache_path_list)
return cache_path
def cache_path(path, file, key):
cache_path = os.path.join(cache_dir(path, file), key)
return cache_path
def preload(path: str, file: str): def preload(path: str, file: str):
get_file(path, file) get_file(path, file)
filedict = pj.load_json_dict(os.path.join(path, file)) filedict = pj.load_json_dict(os.path.join(path, file))
@ -175,11 +167,10 @@ def preload(path: str, file: str):
def drop_record(path: str, meas_path: str): def drop_record(path: str, meas_path: str):
file_in_archive = meas_path.split("::")[0] file_in_archive, sub_key = record2name_key(meas_path)
file = os.path.join(path, file_in_archive) file = os.path.join(path, file_in_archive)
db = os.path.join(path, 'backlogger.db') db = os.path.join(path, 'backlogger.db')
get_file(path, 'backlogger.db') get_file(path, 'backlogger.db')
sub_key = meas_path.split("::")[1]
dl.unlock(db, dataset=path) dl.unlock(db, dataset=path)
conn = sqlite3.connect(db) conn = sqlite3.connect(db)
c = conn.cursor() c = conn.cursor()
@ -199,7 +190,3 @@ def drop_record(path: str, meas_path: str):
else: else:
raise ValueError("This measurement does not exist as a file!") raise ValueError("This measurement does not exist as a file!")
def drop_cache(path: str):
cache_dir = os.path.join(path, ".cache")
for f in os.listdir(cache_dir):
shutil.rmtree(os.path.join(cache_dir, f))

View file

@ -1,6 +1,6 @@
import os import os
import datalad.api as dl import datalad.api as dl
import hashlib
def str2list(string): def str2list(string):
return string.split(",") return string.split(",")
@ -19,11 +19,18 @@ def k2m(k):
return (1/(2*k))-4 return (1/(2*k))-4
def get_file(path, file): def record2name_key(record_path: str):
if file == "backlogger.db": file = record_path.split("::")[0]
print("Downloading database...") key = record_path.split("::")[1]
else: return file, key
print("Downloading data...")
dl.get(os.path.join(path, file), dataset=path)
print("> downloaded file") def name_key2record(name: str, key: str):
return name + "::" + key
def make_version_hash(path, record):
file, key = record2name_key(record)
with open(os.path.join(path, file), 'rb') as fp:
file_hash = hashlib.file_digest(fp, 'sha1').hexdigest()
return file_hash

View file

@ -10,7 +10,7 @@ setup(name='pycorrlib',
version=version['__version__'], version=version['__version__'],
author='Justus Kuhlmann', author='Justus Kuhlmann',
author_email='j_kuhl19@uni-muenster.de', author_email='j_kuhl19@uni-muenster.de',
install_requires=['pyerrors>=2.11.1', 'datalad>=1.1.0', 'typer>=0.12.5'], install_requires=['pyerrors>=2.11.1', 'datalad>=1.1.0', 'typer>=0.12.5', 'gitpython>=3.1.45'],
entry_points = { entry_points = {
'console_scripts': ['pcl=corrlib.cli:app'], 'console_scripts': ['pcl=corrlib.cli:app'],
}, },