This commit is contained in:
Justus Kuhlmann 2024-08-26 09:05:31 +00:00
parent b9ce012062
commit a908cc2f2c
12 changed files with 7 additions and 4 deletions

21
corrlib/__init__.py Normal file
View file

@ -0,0 +1,21 @@
"""
The aim of this project is to extend pyerrors to be able to collect measurements from different projects and make them easily accessable to
the research group. The idea is to build a database, in which the researcher can easily search for measurements on a correlator basis,
which may be reusable.
As a standard to store the measurements, we will use the .json.gz format from pyerrors.
This allows us to organize a library by files and exported dictionaries.
Also, this is compressable, but is also human readable in uncompressed form.
The project creates a database with a table to store the measurements, and a folder to store the .json.gz files and tracks the changes to the folder automatically using datalad.
This way, we can harness the power of datalad as a backand, to reproducibly build our database.
Projects, that are also datalad datasets can be linked to the backlog of correlators as subdatasets, such that, using datalad rerun function,
it can be easily seen wehere the respective measurement came from and ho it may be reproduced.
For now, we are interested in collecting primary IObservables only, as these are the most computationally expensive and time consuming to calculate.
"""
from .main import *
from .input import *
from .initialization import *
from .meas_io import *
from .find import *

88
corrlib/find.py Normal file
View file

@ -0,0 +1,88 @@
import sqlite3
import datalad.api as dl
import os
import json
import pandas as pd
# this will implement the search functionality
def _project_aka_lookup(alias):
# this will lookup the project name based on the alias
conn = sqlite3.connect('backlogger.db')
c = conn.cursor()
c.execute(f"SELECT * FROM 'projects' WHERE alias = '{alias}'")
results = c.fetchall()
conn.close()
if len(results) > 1:
print("Error: multiple projects found with alias " + alias)
elif len(results) == 0:
raise Exception("Error: no project found with alias " + alias)
return results[0][0]
def _db_lookup(db, ensemble, correlator_name, project=None, code=None, parameters=None, created_before=None, created_after=None, updated_before=None, updated_after=None, revision=None):
project_str = project
search_expr = f"SELECT * FROM 'backlogs' WHERE name = '{correlator_name}' AND ensemble = '{ensemble}'"
if project:
search_expr += f" AND project = '{project_str}'"
if code:
search_expr += f" AND code = '{code}'"
if parameters:
search_expr += f" AND parameters = '{parameters}'"
if created_before:
search_expr += f" AND created_at < '{created_before}'"
if created_after:
search_expr += f" AND created_at > '{created_after}'"
if updated_before:
search_expr += f" AND updated_at < '{updated_before}'"
if updated_after:
search_expr += f" AND updated_at > '{updated_after}'"
conn = sqlite3.connect(db)
results = pd.read_sql(search_expr, conn)
conn.close()
return results
def filter_results(results, **kwargs):
drops = []
for ind in range(len(results)):
result = results.iloc[ind]
if result['code'] == 'sfcf':
param = json.loads(result['parameters'])
if 'offset' in kwargs:
if kwargs.get('offset') != param['offset']:
drops.append(ind)
continue
if 'quark_masses' in kwargs:
quark_masses = kwargs['quark_masses']
if (quark_masses[0] != param['quarks'][0]['mass'] or quark_masses[1] != param['quarks'][1]['mass']) and (quark_masses[0] != param['quarks'][1]['mass'] or quark_masses[1] != param['quarks'][0]['mass']):
drops.append(ind)
continue
if 'quark_thetas' in kwargs:
quark_thetas = kwargs['quark_thetas']
if (quark_thetas[0] != param['quarks'][0]['thetas'] and quark_thetas[1] != param['quarks'][1]['thetas']) or (quark_thetas[0] != param['quarks'][1]['thetas'] and quark_thetas[1] != param['quarks'][0]['thetas']):
drops.append(ind)
continue
if 'wf1' in kwargs:
wf1 = kwargs['wf1']
if (wf1[0] != param['wf1'][0]) or (wf1[1][0] != param['wf1'][1][0]) or (wf1[1][1] != param['wf1'][1][1]):
drops.append(ind)
continue
if 'wf2' in kwargs:
wf2 = kwargs['wf2']
if (wf2[0] != param['wf2'][0]) or (wf2[1][0] != param['wf2'][1][0]) or (wf2[1][1] != param['wf2'][1][1]):
drops.append(ind)
continue
return results.drop(drops)
def find_record(path, ensemble, correlator_name, project=None, code=None, parameters=None, created_before=None, created_after=None, updated_before=None, updated_after=None, revision=None, **kwargs):
db = path + '/backlogger.db'
if os.path.exists(db):
dl.get(db, dataset=path)
results = _db_lookup(db, ensemble, correlator_name, project, code=None, parameters=None, created_before=created_before, created_after=created_after, updated_before=updated_before, updated_after=updated_after, revision=revision)
results = filter_results(results, **kwargs)
print("Found " + str(len(results)) + " results")
return results.reset_index()

37
corrlib/git_tools.py Normal file
View file

@ -0,0 +1,37 @@
import os
import datalad.api as dl
GITMODULES_FILE = '.gitmodules'
def move_submodule(repo_path, old_path, new_path):
"""
Move a submodule to a new location.
Parameters
----------
repo_path: str
Path to the repository.
old_path: str
The old path of the module.
new_path: str
The new path of the module.
"""
os.rename(os.path.join(repo_path, old_path), os.path.join(repo_path, new_path))
gitmodules_file_path = os.path.join(repo_path, GITMODULES_FILE)
with open(gitmodules_file_path, 'r') as file:
lines = [line.strip() for line in file]
updated_lines = []
for line in lines:
if old_path in line:
line = line.replace(old_path, new_path)
updated_lines.append(line)
with open(gitmodules_file_path, 'w') as file:
file.write("\n".join(updated_lines))
with open(repo_path + '/.gitmodules', 'w') as fp:
fp.writelines(lines)
dl.save(repo_path, message=f"Move module from {old_path} to {new_path}", dataset=repo_path)

46
corrlib/initialization.py Normal file
View file

@ -0,0 +1,46 @@
import sqlite3
import datalad.api as dl
import os
def _create_db(db):
"""
Create the database file and the table.
"""
conn = sqlite3.connect(db)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS backlogs
(id INTEGER PRIMARY KEY,
name TEXT,
ensemble TEXT,
code TEXT,
path TEXT,
project TEXT,
parameters TEXT,
parameter_file TEXT,
created_at TEXT,
updated_at TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS projects
(id TEXT PRIMARY KEY,
aliases TEXT,
code TEXT,
created_at TEXT,
updated_at TEXT)''')
conn.commit()
conn.close()
def create(path):
"""
Create folder of backlogs.
"""
dl.create(path)
_create_db(path + '/backlogger.db')
os.chmod(path + '/backlogger.db', 0o666) # why does this not work?
os.makedirs(path + '/projects')
os.makedirs(path + '/archive')
os.makedirs(path + '/toml_imports')
os.makedirs(path + '/import_scripts/template.py')
dl.save(path, dataset=path, message="Initialize backlogger directory.")

View file

@ -0,0 +1,5 @@
"""
Import functions for different codes.
"""
from . import sfcf

265
corrlib/input/sfcf.py Normal file
View file

@ -0,0 +1,265 @@
import pyerrors as pe
import datalad.api as dl
import json
import os
bi_corrs = ["f_P", "fP", "f_p",
"g_P", "gP", "g_p",
"fA0", "f_A", "f_a",
"gA0", "g_A", "g_a",
"k1V1", "k1_V1", "k_v11",
"l1V1", "l1_V1", "l_v11",
"k2V2", "k2_V2", "k_v22",
"l2V2", "l2_V2", "l_v22",
"k3V3", "k3_V3", "k_v33",
"l3V3", "l3_V3", "l_v33",
"kVk", "k_V", "k_v",
"lVk", "l_V", "l_v",
"k1T01", "k1_T01",
"l1T01", "l1_T01",
"k2T02", "k2_T02",
"l2T02", "l2_T02",
"k3T03", "k3_T03",
"l3T03", "l3_T03",
"kT0k", "k_T", "k_t",
"lT0k", "l_T", "l_t",
"fAk", "f_Ak", "f_ak",
"gAk", "g_Ak", "g_ak",
"kV0", "k_V0", "k_v0",
"lV0", "l_V0", "l_v0",
"k1A2", "k1_A2", "f_av21",
"l1A2", "l1_A2", "g_av21",
"k2A3", "k2_A3", "f_av32",
"l2A3", "l2_A3", "g_av32",
"k3A1", "k3_A1", "f_av13",
"l3A1", "l3_A1", "g_av13",
"k1A3", "k1_A3", "f_av31",
"l1A3", "l1_A3", "g_av31",
"k2A1", "k2_A1", "f_av12",
"l2A1", "l2_A1", "g_av12",
"k3A2", "k3_A2", "f_av23",
"l3A2", "l3_A2", "g_av23",
]
bb_corrs = [
'F1',
'F_1',
'f_1',
'F1ll',
'k_1',
'F_AA_a',
'F_AA_d',
'F_AdP_a',
'F_AdP_d',
'F_dPA_a',
'F_dPA_d',
'F_dPdP_a',
'F_dPdP_d',
'F_sPA_a',
'F_sPA_d',
'F_sPdP_a',
'F_sPdP_d',
]
corr_types = {}
for c in bi_corrs:
corr_types[c] = 'bi'
for c in bb_corrs:
corr_types[c] = 'bb'
def read_param(path, project, file_in_project):
"""
Read the parameters from the sfcf file.
Parameters
----------
file : str
The path to the sfcf file.
Returns
-------
dict
The parameters in the sfcf file.
"""
file = path + "/projects/" + project + '/' + file_in_project
dl.get(file, dataset=path)
with open(file, 'r') as f:
lines = f.readlines()
params = {}
params['wf_offsets'] = []
params['wf_basis'] = []
params['wf_coeff'] = []
params['qr'] = {}
params['mrr'] = []
params['crr'] = []
params['qs'] = {}
params['mrs'] = []
params['crs'] = []
for line in lines:
if line.startswith('#'):
continue
if line.startswith('\n'):
continue
if line.startswith('wf_offsets'):
num_wf_offsets = line.split()[1]
for i in range(int(num_wf_offsets)):
params['wf_offsets'].append([float(x) for x in lines[lines.index(line) + i + 1].split("#")[0].split()])
if line.startswith('wf_basis'):
num_wf_basis = line.split()[1]
for i in range(int(num_wf_basis)):
params['wf_basis'].append([float(x) for x in lines[lines.index(line) + i + 1].split("#")[0].split()])
if line.startswith('wf_coeff'):
num_wf_coeff = line.split()[1]
for i in range(int(num_wf_coeff)):
params['wf_coeff'].append([float(x) for x in lines[lines.index(line) + i + 1].split("#")[0].split()])
if line.startswith('qr'):
num_qr = line.split()[1]
for i in range(int(num_qr)):
dat = lines[lines.index(line) + i + 1].split("#")[0].strip().split()[:-1]
params['qr'][dat[0]] = {}
params['qr'][dat[0]]['mass'] = float(dat[1])
params['qr'][dat[0]]['thetas'] = [float(x) for x in dat[2:5]]
if line.startswith('mrr'):
num_mrr = line.split()[1]
for i in range(int(num_mrr)):
params['mrr'].append(lines[lines.index(line) + i + 1].split("#")[0].strip())
if line.startswith('crr'):
num_crr = line.split()[1]
for i in range(int(num_crr)):
params['crr'].append(lines[lines.index(line) + i + 1].split("#")[0].strip())
if line.startswith('qs'):
num_qs = line.split()[1]
for i in range(int(num_qs)):
dat = lines[lines.index(line) + i + 1].split("#")[0].strip().split()[:-1]
params['qs'][dat[0]] = {}
params['qs'][dat[0]]['mass'] = float(dat[1])
params['qs'][dat[0]]['thetas'] = [float(x) for x in dat[2:5]]
if line.startswith('mrs'):
num_mrs = line.split()[1]
for i in range(int(num_mrs)):
params['mrs'].append(lines[lines.index(line) + i + 1].split("#")[0].strip())
if line.startswith('crs'):
num_crs = line.split()[1]
for i in range(int(num_crs)):
params['mrs'].append(lines[lines.index(line) + i + 1].split("#")[0].strip())
# catch standard cases
if params['wf_offsets'] == []:
params['wf_offsets'] = [[0, 0, 0]]
if params['wf_coeff'] == []:
params['wf_coeff'] = [[0, -1]]
return params
def _map_params(params, spec_list):
"""
Map the extracted parameters to the extracted data.
"""
# quarks/offset/wf/wf2
new_specs = {}
# quarks
quarks = spec_list[0].split(" ")
new_specs['quarks'] = (params['qr'][quarks[0]], params['qr'][quarks[1]])
# offset
new_specs['offset'] = (params['wf_offsets'][int(spec_list[1])])
# wf1
contribs = []
for i, coeff in enumerate(params['wf_coeff'][int(spec_list[2])]):
if not coeff == 0:
contrib = (coeff, params['wf_basis'][i])
contribs.append(contrib)
new_specs['wf1'] = contribs
if len(spec_list) == 4:
# wf2
contribs = []
for i, coeff in enumerate(params['wf_coeff'][int(spec_list[3])]):
if not coeff == 0:
contrib = (coeff, params['wf_basis'][i])
contribs.append(contrib)
new_specs['wf2'] = contribs
return new_specs
def get_specs(key, parameters, sep='/'):
key_parts = key.split(sep)
if corr_types[key_parts[0]] == 'bi':
param = _map_params(parameters, key_parts[1:-1])
else:
param = _map_params(parameters, key_parts[1:])
print(param)
s = json.dumps(param)
return s
def read_data(path, project, dir_in_project, prefix, param, version='1.0c', cfg_seperator='n', sep='/', **kwargs):
"""
Extract the data from the sfcf file.
Returns
-------
dict
The data from the sfcf file.
"""
names = kwargs.get('names', None)
directory = os.path.join(path, "projects", project, dir_in_project)
print("Getting data, this might take a while...")
dl.get(directory, dataset=path)
print("... done downloading.")
corr_type_list = []
for corr_name in param['crr']:
if corr_name not in corr_types:
raise ValueError('Correlator type not known.')
corr_type_list.append(corr_types[corr_name])
if not param['crr'] == []:
if names is not None:
data_crr = pe.input.sfcf.read_sfcf_multi(directory, prefix, param['crr'], param['mrr'], corr_type_list, range(len(param['wf_offsets'])),
range(len(param['wf_basis'])), range(len(param['wf_basis'])), version, cfg_seperator, keyed_out=True, names=names)
else:
data_crr = pe.input.sfcf.read_sfcf_multi(directory, prefix, param['crr'], param['mrr'], corr_type_list, range(len(param['wf_offsets'])),
range(len(param['wf_basis'])), range(len(param['wf_basis'])), version, cfg_seperator, keyed_out=True)
if not param['crs'] == []:
data_crs = pe.input.sfcf.read_sfcf_multi(directory, param['crs'])
data = {}
if not param['crr'] == []:
for key in data_crr.keys():
data[key] = data_crr[key]
if not param['crs'] == []:
for key in data_crs.keys():
data[key] = data_crs[key]
# sort data by correlator
sorted_data = {}
for key in data.keys():
key_parts = key.split(sep)
corr = key_parts[0]
if corr not in sorted_data:
sorted_data[corr] = {}
sorted_data[corr][sep.join(key_parts[1:])] = data[key]
return sorted_data

84
corrlib/main.py Normal file
View file

@ -0,0 +1,84 @@
import sqlite3
import datalad.api as dl
import datalad.config as dlc
import os
from .git_tools import move_submodule
import shutil
def create_project(path, uuid, aliases=None, code=None):
"""
Create a new project entry in the database.
Parameters
----------
path: str
The path to the backlogger folder.
uuid: str
The uuid of the project.
name: str (optional)
Costum name for the project (e.g. 'cA determination on exponential clover').
code: str (optional)
The code that was used to create the measurements.
"""
conn = sqlite3.connect(path + "/backlogger.db")
c = conn.cursor()
known_projects = c.execute("SELECT * FROM projects WHERE id=?", (uuid,))
if known_projects.fetchone():
raise ValueError("Project already imported, use update_project() instead.")
dl.unlock(path + "/backlogger.db", dataset=path)
c.execute("INSERT INTO projects (id, aliases, code, created_at, updated_at) VALUES (?, ?, ?, datetime('now'), datetime('now'))", (uuid, aliases, code))
conn.commit()
conn.close()
dl.save(path + "/backlogger.db", message="Added entry for project " + uuid + " to database", dataset=path)
def import_project(path, url, aliases=None, code=None, isDataset=True):
"""
Parameters
----------
url: str
The url of the project to import. This can be any url that datalad can handle.
path: str
The path to the backlogger folder.
aliases: list[str]
Custom name of the project, alias of the project.
code: str
Code that was used to create the measurements.
Import a datalad dataset into the backlogger.
Parameters
----------
path: str
The path to the backlogger directory.
url: str
The url of the project to import. This can be any url that datalad can handle.
Also supported are non-datalad datasets, which will be converted to datalad datasets,
in order to receive a uuid and have a consistent interface.
"""
tmp_path = path + '/projects/tmp'
if not isDataset:
dl.create(path + '/projects/tmp', dataset=path)
shutil.copytree(url + "/*", path + '/projects/tmp/')
dl.save(path + '/projects/tmp', dataset=path)
else:
dl.install(path=tmp_path, source=url, dataset=path)
tmp_ds = dl.Dataset(tmp_path)
conf = dlc.ConfigManager(tmp_ds)
uuid = conf.get("datalad.dataset.id")
if not uuid:
raise ValueError("The dataset does not have a uuid!")
if not os.path.exists(path + "/projects/" + uuid):
dl.unlock(path + "/backlogger.db", dataset=path)
create_project(path, uuid, aliases, code)
move_submodule(path, 'projects/tmp', 'projects/' + uuid)
os.mkdir(path + '/import_scripts/' + uuid)
dl.save([path + "/backlogger.db", 'projects/' + uuid], message="Import project from " + url, dataset=path)
else:
print("Project is already imported.")
# make this more concrete
return uuid

88
corrlib/meas_io.py Normal file
View file

@ -0,0 +1,88 @@
from pyerrors.input import json as pj
import os
import datalad.api as dl
import sqlite3
from .input.sfcf import get_specs
from .input.sfcf import read_param
def write_measurement(path, ensemble, measurement, uuid, code, parameter_file):
"""
Write a measurement to the backlog.
If the file for the measurement already exists, update the measurement.
Parameters
----------
path: str
The path to the backlogger folder.
ensemble: str
The ensemble of the measurement.
measurement: dict
Measurements to be captured in the backlogging system.
uuid: str
The uuid of the project.
"""
parameters = read_param(path, uuid, parameter_file)
print(parameters)
dl.unlock(path + '/backlogger.db', dataset=path)
conn = sqlite3.connect(path + '/backlogger.db')
c = conn.cursor()
files = []
for corr in measurement.keys():
file = path + "/archive/" + ensemble + "/" + corr + '/' + uuid + '.json.gz'
files.append(file)
known_meas = None
if not os.path.exists(path + "/archive/" + ensemble + "/" + corr):
os.makedirs(path + "/archive/" + ensemble + "/" + corr)
else:
if os.path.exists(file):
dl.unlock(file, dataset=path)
known_meas = pj.load_json_dict(file)
for subkey in measurement[corr].keys():
meas_path = file + "::" + subkey
if known_meas is not None:
known_meas[subkey] = measurement[corr][subkey]
# this should be only set if something changed.
else:
known_meas = measurement[corr]
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path,)).fetchone() is not None:
c.execute("UPDATE backlogs SET updated_at = datetime('now') WHERE path = ?", (meas_path, ))
else:
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))", (corr, ensemble, code, meas_path, uuid, get_specs(corr + "/" + subkey, parameters), parameter_file))
conn.commit()
pj.dump_dict_to_json(measurement[corr], file)
files.append(path + '/backlogger.db')
conn.close()
dl.save(files, message="Add measurements to database", dataset=path)
def get_record(path, meas_path):
file = meas_path.split("::")[0]
sub_key = meas_path.split("::")[1]
dl.get(file, dataset=path)
return pj.load_json_dict(file)[sub_key]
def drop_record(path, meas_path):
file = meas_path.split("::")[0]
sub_key = meas_path.split("::")[1]
dl.unlock(path + '/backlogger.db', dataset=path)
conn = sqlite3.connect(path + '/backlogger.db')
c = conn.cursor()
if c.execute("SELECT * FROM backlogs WHERE path = ?", (meas_path, )).fetchone() is not None:
c.execute("DELETE FROM backlogs WHERE path = ?", (meas_path, ))
else:
raise ValueError("This measurement does not exist as an entry!")
conn.commit()
known_meas = pj.load_json_dict(file)
if sub_key in known_meas:
del known_meas[sub_key]
dl.unlock(file, dataset=path)
pj.dump_dict_to_json(known_meas, file)
dl.save([path + '/backlogger.db', file], message="Drop measurements to database", dataset=path)
return
else:
raise ValueError("This measurement does not exist as a file!")

91
corrlib/toml.py Normal file
View file

@ -0,0 +1,91 @@
"""
TOML interface
--------------
Remember, that keys with dots have to be quoted.
Apart from improting projects yourdelf with python scripts, this package also allows for
the import of projects via TOML.
"""
import tomllib as toml
import shutil
from .input import sfcf
from .main import import_project
from .meas_io import write_measurement
import datalad.api as dl
import os
def check_project_data(d):
print(d.keys())
if 'project' not in d.keys() or 'measurements' not in d.keys() or len(list(d.keys())) > 2:
raise ValueError('There should only be two key on the top level, "project" and "measurements"!')
project_data = d['project']
if 'url' not in project_data.keys():
raise ValueError('project.url is missing!')
if 'code' not in project_data.keys():
raise ValueError('project.code is missing!')
if 'measurements' not in d.keys():
raise ValueError('No measurements to import!')
return
def import_toml(path, file, copy_file=True):
"""
Import a project decribed by a .toml file.
Parameters
----------
path: str
Path to the backlog directory.
file: str
Path to the description file.
"""
print("Import project as decribed in " + file)
with open(file, 'rb') as fp:
toml_dict = toml.load(fp)
check_project_data(toml_dict)
project = toml_dict['project']
measurements = toml_dict['measurements']
uuid = import_project(path, project['url'])
print(measurements.items())
for mname, md in measurements.items():
print("Import measurement: " + mname)
ensemble = md['ensemble']
param = sfcf.read_param(path, uuid, md['param_file'])
if 'names' in md.keys():
measurement = sfcf.read_data(path, uuid, md['path'], md['prefix'], param,
version=md['version'], cfg_seperator=md['cfg_seperator'], sep='/', names=md['names'])
else:
measurement = sfcf.read_data(path, uuid, md['path'], md['prefix'], param,
version=md['version'], cfg_seperator=md['cfg_seperator'], sep='/')
write_measurement(path, ensemble, measurement, uuid,
project['code'], md['param_file'])
print(mname + " imported.")
if not os.path.exists(os.path.join(path, "toml_imports", uuid)):
os.makedirs(os.path.join(path, "toml_imports", uuid))
if copy_file:
import_file = os.path.join(path, "toml_imports", uuid, file)
shutil.copy(file, import_file)
dl.save(path + import_file, message="Import using " + import_file, dataset=path)
print("Imported project, file copied to " + import_file)
return
def reimport_project(path, uuid):
"""
Reimport an existing project using the files that are already available for this project.
Parameters
----------
path: str
Path to repository
uuid: str
uuid of the project that is to be reimported.
"""
config_path = "/".join([path, "import_scripts", uuid])
for p, filenames, dirnames in os.walk(config_path):
for fname in filenames:
import_toml(path, os.path.join(config_path, fname), copy_file=False)
return