mirror of
https://github.com/fjosw/pyerrors.git
synced 2025-05-14 19:43:41 +02:00
Merge branch 'develop' into documentation
This commit is contained in:
commit
e937e0797f
4 changed files with 140 additions and 14 deletions
|
@ -1,5 +1,6 @@
|
|||
import os
|
||||
import gzip
|
||||
import rapidjson
|
||||
import numpy as np
|
||||
import pyerrors as pe
|
||||
import pyerrors.input.json as jsonio
|
||||
|
@ -378,6 +379,11 @@ def test_reconstruct_non_linear_r_obs_list(tmp_path):
|
|||
assert assert_equal_Obs(oa, ob)
|
||||
|
||||
|
||||
def test_import_non_json_string():
|
||||
with pytest.raises(rapidjson.JSONDecodeError):
|
||||
pe.input.json.import_json_string("this is garbage")
|
||||
|
||||
|
||||
def assert_equal_Obs(to, ro):
|
||||
for kw in ["N", "cov_names", "covobs", "ddvalue", "dvalue", "e_content",
|
||||
"e_names", "idl", "mc_names", "names",
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyerrors as pe
|
||||
import pytest
|
||||
|
||||
def test_df_export_import(tmp_path):
|
||||
my_dict = {"int": 1,
|
||||
|
@ -28,3 +29,46 @@ def test_df_Corr(tmp_path):
|
|||
|
||||
pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix())
|
||||
reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True)
|
||||
|
||||
|
||||
def test_default_export_pe_import(tmp_path):
|
||||
df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}])
|
||||
df.to_csv((tmp_path / 'plain_df.csv').as_posix(), index=False)
|
||||
re_df = pe.input.pandas.load_df((tmp_path / 'plain_df').as_posix(), gz=False)
|
||||
assert np.all(df == re_df)
|
||||
|
||||
|
||||
def test_pe_export_default_import(tmp_path):
|
||||
df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}])
|
||||
pe.input.pandas.dump_df(df, (tmp_path / 'pe_df').as_posix(), gz=False)
|
||||
re_df = pd.read_csv((tmp_path / 'pe_df.csv').as_posix())
|
||||
assert np.all(df == re_df)
|
||||
|
||||
|
||||
def test_gz_serialization():
|
||||
my_obs = pe.pseudo_Obs(0.1, 0.01, "pandas DataFrame ensemble only for test purposes.")
|
||||
my_df = pd.DataFrame([{"Label": 1, "Obs": my_obs}])
|
||||
for gz in [False, True]:
|
||||
ser = pe.input.pandas._serialize_df(my_df, gz=gz)
|
||||
deser = pe.input.pandas._deserialize_df(ser)
|
||||
assert np.all(my_df == deser)
|
||||
|
||||
|
||||
def test_sql(tmp_path):
|
||||
my_list = [{"Label": i, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2 * i), 0.01, "test_ensemble", 20)} for i in range(15)]
|
||||
pe_df = pd.DataFrame(my_list)
|
||||
my_db = (tmp_path / "test_db.sqlite").as_posix()
|
||||
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
|
||||
for auto_gamma in [False, True]:
|
||||
re_df = pe.input.pandas.read_sql("SELECT * from My_table", my_db, auto_gamma=auto_gamma)
|
||||
assert np.all(re_df == pe_df)
|
||||
|
||||
|
||||
def test_sql_if_exists_fail(tmp_path):
|
||||
pe_df = pd.DataFrame([{"Label": 1, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2), 0.01, "test_ensemble", 20)}])
|
||||
my_db = (tmp_path / "test_db.sqlite").as_posix()
|
||||
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
|
||||
with pytest.raises(ValueError):
|
||||
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
|
||||
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='append')
|
||||
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='replace')
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue