corrlib/tests/find_test.py
Justus Kuhlmann d6de8e6387
All checks were successful
Mypy / mypy (push) Successful in 1m11s
Pytest / pytest (3.12) (push) Successful in 1m19s
Pytest / pytest (3.13) (push) Successful in 1m14s
Pytest / pytest (3.14) (push) Successful in 1m16s
Ruff / ruff (push) Successful in 1m3s
Mypy / mypy (pull_request) Successful in 1m13s
Pytest / pytest (3.12) (pull_request) Successful in 1m18s
Pytest / pytest (3.13) (pull_request) Successful in 1m12s
Pytest / pytest (3.14) (pull_request) Successful in 1m15s
Ruff / ruff (pull_request) Successful in 1m0s
Introduce thin wrapper for SQL calls
2026-04-21 10:22:46 +02:00

439 lines
19 KiB
Python

import corrlib.find as find
import sqlite3
from pathlib import Path
import corrlib.initialization as cinit
import pytest
import pandas as pd
import datalad.api as dl
import datetime as dt
def make_sql(path: Path) -> Path:
db = path / "backlogger.db"
cinit._create_db(db)
return db
def make_config(path: Path) -> None:
cinit._write_config(path, cinit._create_config(path, "datalad", False))
def test_find_lookup_by_one_alias(tmp_path: Path) -> None:
make_config(tmp_path)
db = make_sql(tmp_path)
conn = sqlite3.connect(db)
c = conn.cursor()
uuid = "test_uuid"
alias_str = "fun_project"
tag_str = "tt"
owner = "tester"
code = "test_code"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
assert uuid == find._project_lookup_by_alias(tmp_path, "fun_project")
uuid = "test_uuid2"
alias_str = "fun_project"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
with pytest.raises(Exception):
assert uuid == find._project_lookup_by_alias(db, "fun_project")
conn.close()
def test_find_lookup_by_id(tmp_path: Path) -> None:
make_config(tmp_path)
db = make_sql(tmp_path)
conn = sqlite3.connect(db)
c = conn.cursor()
uuid = "test_uuid"
alias_str = "fun_project"
tag_str = "tt"
owner = "tester"
code = "test_code"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
conn.close()
result = find._project_lookup_by_id(tmp_path, uuid)[0]
assert uuid == result[0]
assert alias_str == result[1]
assert tag_str == result[2]
assert owner == result[3]
assert code == result[4]
def test_time_filter() -> None:
record_A = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf0", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966'] # only created
record_B = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf1", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-04-26 12:55:18.229966'] # created and updated
record_C = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf2", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2026-03-26 12:55:18.229966', '2026-04-14 12:55:18.229966'] # created and updated later
record_D = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf3", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2026-03-26 12:55:18.229966', '2026-03-27 12:55:18.229966']
record_E = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf4", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2024-03-26 12:55:18.229966', '2024-03-26 12:55:18.229966'] # only created, earlier
record_F = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf5", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2026-03-26 12:55:18.229966', '2024-03-26 12:55:18.229966'] # this is invalid...
record_G = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf2", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2026-03-26 12:55:18.229966', str(dt.datetime.now() + dt.timedelta(days=2, hours=3, minutes=5, seconds=30))] # created and updated later
data = [record_A, record_B, record_C, record_D, record_E]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
results = find._time_filter(df, created_before='2023-03-26 12:55:18.229966')
assert results.empty
results = find._time_filter(df, created_before='2027-03-26 12:55:18.229966')
assert len(results) == 5
results = find._time_filter(df, created_before='2026-03-25 12:55:18.229966')
assert len(results) == 3
results = find._time_filter(df, created_before='2026-03-26 12:55:18.229965')
assert len(results) == 3
results = find._time_filter(df, created_before='2025-03-04 12:55:18.229965')
assert len(results) == 1
results = find._time_filter(df, created_after='2023-03-26 12:55:18.229966')
assert len(results) == 5
results = find._time_filter(df, created_after='2027-03-26 12:55:18.229966')
assert results.empty
results = find._time_filter(df, created_after='2026-03-25 12:55:18.229966')
assert len(results) == 2
results = find._time_filter(df, created_after='2026-03-26 12:55:18.229965')
assert len(results) == 2
results = find._time_filter(df, created_after='2025-03-04 12:55:18.229965')
assert len(results) == 4
results = find._time_filter(df, updated_before='2023-03-26 12:55:18.229966')
assert results.empty
results = find._time_filter(df, updated_before='2027-03-26 12:55:18.229966')
assert len(results) == 5
results = find._time_filter(df, updated_before='2026-03-25 12:55:18.229966')
assert len(results) == 3
results = find._time_filter(df, updated_before='2026-03-26 12:55:18.229965')
assert len(results) == 3
results = find._time_filter(df, updated_before='2025-03-04 12:55:18.229965')
assert len(results) == 1
results = find._time_filter(df, updated_after='2023-03-26 12:55:18.229966')
assert len(results) == 5
results = find._time_filter(df, updated_after='2027-03-26 12:55:18.229966')
assert results.empty
results = find._time_filter(df, updated_after='2026-03-25 12:55:18.229966')
assert len(results) == 2
results = find._time_filter(df, updated_after='2026-03-26 12:55:18.229965')
assert len(results) == 2
results = find._time_filter(df, updated_after='2025-03-04 12:55:18.229965')
assert len(results) == 4
data = [record_A, record_B, record_C, record_D, record_F]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
with pytest.raises(ValueError):
results = find._time_filter(df, created_before='2023-03-26 12:55:18.229966')
data = [record_A, record_B, record_C, record_D, record_G]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
with pytest.raises(ValueError):
results = find._time_filter(df, created_before='2023-03-26 12:55:18.229966')
def test_db_lookup(tmp_path: Path) -> None:
db = make_sql(tmp_path)
conn = sqlite3.connect(db)
c = conn.cursor()
corr = "f_A"
ensemble = "SF_A"
code = "openQCD"
meas_path = "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf"
uuid = "Project_A"
pars = "{par_A: 3.0, par_B: 5.0}"
parameter_file = "projects/Project_A/myinput.in"
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(corr, ensemble, code, meas_path, uuid, pars, parameter_file))
conn.commit()
results = find._db_lookup(db, ensemble, corr, code)
assert len(results) == 1
results = find._db_lookup(db, "SF_B", corr, code)
assert results.empty
results = find._db_lookup(db, ensemble, "g_A", code)
assert results.empty
results = find._db_lookup(db, ensemble, corr, "sfcf")
assert results.empty
results = find._db_lookup(db, ensemble, corr, code, project = "Project_A")
assert len(results) == 1
results = find._db_lookup(db, ensemble, corr, code, project = "Project_B")
assert results.empty
results = find._db_lookup(db, ensemble, corr, code, parameters = pars)
assert len(results) == 1
results = find._db_lookup(db, ensemble, corr, code, parameters = '{"par_A": 3.0, "par_B": 4.0}')
assert results.empty
corr = "g_A"
ensemble = "SF_A"
code = "openQCD"
meas_path = "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf"
uuid = "Project_A"
pars = '{"par_A": 3.0, "par_B": 4.0}'
parameter_file = "projects/Project_A/myinput.in"
c.execute("INSERT INTO backlogs (name, ensemble, code, path, project, parameters, parameter_file, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(corr, ensemble, code, meas_path, uuid, pars, parameter_file))
conn.commit()
corr = "f_A"
results = find._db_lookup(db, ensemble, corr, code)
assert len(results) == 1
results = find._db_lookup(db, "SF_B", corr, code)
assert results.empty
results = find._db_lookup(db, ensemble, "g_A", code)
assert len(results) == 1
results = find._db_lookup(db, ensemble, corr, "sfcf")
assert results.empty
results = find._db_lookup(db, ensemble, corr, code, project = "Project_A")
assert len(results) == 1
results = find._db_lookup(db, ensemble, "g_A", code, project = "Project_A")
assert len(results) == 1
results = find._db_lookup(db, ensemble, corr, code, project = "Project_B")
assert results.empty
results = find._db_lookup(db, ensemble, "g_A", code, project = "Project_B")
assert results.empty
results = find._db_lookup(db, ensemble, corr, code, parameters = pars)
assert results.empty
results = find._db_lookup(db, ensemble, "g_A", code, parameters = '{"par_A": 3.0, "par_B": 4.0}')
assert len(results) == 1
conn.close()
def test_sfcf_drop() -> None:
parameters0 = {
'offset': [0,0,0],
'quarks': [{'mass': 1, 'thetas': [0,0,0]}, {'mass': 2, 'thetas': [0,0,1]}], # m0s = -3.5, -3.75
'wf1': [[1, [0, 0]], [0.5, [1, 0]], [.75, [.5, .5]]],
'wf2': [[1, [2, 1]], [2, [0.5, -0.5]], [.5, [.75, .72]]],
}
assert not find._sfcf_drop(parameters0, offset=[0,0,0])
assert find._sfcf_drop(parameters0, offset=[1,0,0])
assert not find._sfcf_drop(parameters0, quark_kappas = [1, 2])
assert find._sfcf_drop(parameters0, quark_kappas = [-3.1, -3.72])
assert not find._sfcf_drop(parameters0, quark_masses = [-3.5, -3.75])
assert find._sfcf_drop(parameters0, quark_masses = [-3.1, -3.72])
assert not find._sfcf_drop(parameters0, qk1 = 1)
assert not find._sfcf_drop(parameters0, qk2 = 2)
assert find._sfcf_drop(parameters0, qk1 = 2)
assert find._sfcf_drop(parameters0, qk2 = 1)
assert not find._sfcf_drop(parameters0, qk1 = [0.5,1.5])
assert not find._sfcf_drop(parameters0, qk2 = [1.5,2.5])
assert find._sfcf_drop(parameters0, qk1 = 2)
assert find._sfcf_drop(parameters0, qk2 = 1)
with pytest.raises(ValueError):
assert not find._sfcf_drop(parameters0, qk1 = [0.5,1,5])
with pytest.raises(ValueError):
assert not find._sfcf_drop(parameters0, qk2 = [1,5,2.5])
assert find._sfcf_drop(parameters0, qm1 = 1.2)
assert find._sfcf_drop(parameters0, qm2 = 2.2)
assert not find._sfcf_drop(parameters0, qm1 = -3.5)
assert not find._sfcf_drop(parameters0, qm2 = -3.75)
assert find._sfcf_drop(parameters0, qm2 = 1.2)
assert find._sfcf_drop(parameters0, qm1 = 2.2)
with pytest.raises(ValueError):
assert not find._sfcf_drop(parameters0, qm1 = [0.5,1,5])
with pytest.raises(ValueError):
assert not find._sfcf_drop(parameters0, qm2 = [1,5,2.5])
def test_openQCD_filter() -> None:
record_0 = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_1 = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_2 = ["f_P", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_3 = ["f_P", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
data = [
record_0,
record_1,
record_2,
record_3,
]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
with pytest.warns(Warning):
find.openQCD_filter(df, a = "asdf")
def test_code_filter() -> None:
record_0 = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_1 = ["f_A", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_2 = ["f_P", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_3 = ["f_P", "ensA", "sfcf", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_4 = ["f_A", "ensA", "openQCD", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_5 = ["f_A", "ensA", "openQCD", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_6 = ["f_P", "ensA", "openQCD", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_7 = ["f_P", "ensA", "openQCD", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
record_8 = ["f_P", "ensA", "openQCD", "archive/SF_A/f_A/Project_A.json.gz::asdfasdfasdf", "SF_A", '{"par_A": 5.0, "par_B": 5.0}', "projects/SF_A/input.in",
'2025-03-26 12:55:18.229966', '2025-03-26 12:55:18.229966']
data = [
record_0,
record_1,
record_2,
record_3,
]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
res = find._code_filter(df, "sfcf")
assert len(res) == 4
data = [
record_4,
record_5,
record_6,
record_7,
record_8,
]
cols = ["name",
"ensemble",
"code",
"path",
"project",
"parameters",
"parameter_file",
"created_at",
"updated_at"]
df = pd.DataFrame(data,columns=cols)
res = find._code_filter(df, "openQCD")
assert len(res) == 5
with pytest.raises(ValueError):
res = find._code_filter(df, "asdf")
def test_find_record() -> None:
assert True
def test_find_project(tmp_path: Path) -> None:
cinit.create(tmp_path)
db = tmp_path / "backlogger.db"
dl.unlock(str(db), dataset=str(tmp_path))
conn = sqlite3.connect(db)
c = conn.cursor()
uuid = "test_uuid"
alias_str = "fun_project"
tag_str = "tt"
owner = "tester"
code = "test_code"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
assert uuid == find.find_project(tmp_path, "fun_project")
uuid = "test_uuid2"
alias_str = "fun_project"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
with pytest.raises(Exception):
assert uuid == find._project_lookup_by_alias(tmp_path, "fun_project")
conn.close()
def test_list_projects(tmp_path: Path) -> None:
cinit.create(tmp_path)
db = tmp_path / "backlogger.db"
dl.unlock(str(db), dataset=str(tmp_path))
conn = sqlite3.connect(db)
c = conn.cursor()
uuid = "test_uuid"
alias_str = "fun_project"
tag_str = "tt"
owner = "tester"
code = "test_code"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
uuid = "test_uuid2"
alias_str = "fun_project2"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
uuid = "test_uuid3"
alias_str = "fun_project3"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
uuid = "test_uuid4"
alias_str = "fun_project4"
c.execute("INSERT INTO projects (id, aliases, customTags, owner, code, created_at, updated_at) VALUES (?, ?, ?, ?, ?, datetime('now'), datetime('now'))",
(uuid, alias_str, tag_str, owner, code))
conn.commit()
conn.close()
results = find.list_projects(tmp_path)
assert len(results) == 4
for i in range(4):
assert len(results[i]) == 2