diff --git a/pyerrors/input/pandas.py b/pyerrors/input/pandas.py index caf3e0b6..53e6efd0 100644 --- a/pyerrors/input/pandas.py +++ b/pyerrors/input/pandas.py @@ -1,11 +1,53 @@ import warnings import gzip +import sqlite3 import pandas as pd from ..obs import Obs from ..correlators import Corr from .json import create_json_string, import_json_string +def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs): + """Write DataFrame including Obs or Corr valued columns to sqlite database. + + Parameters + ---------- + df : pandas.DataFrame + Dataframe to be written to the database. + table_name : str + Name of the table in the database. + db : str + Path to the sqlite database. + if exists : str + How to behave if table already exists. Options 'fail', 'replace', 'append'. + gz : bool + If True the json strings are gzipped. + """ + se_df = _serialize_df(df, gz=gz) + con = sqlite3.connect(db) + se_df.to_sql(table_name, con, if_exists=if_exists, index=False, **kwargs) + con.close() + + +def read_sql(sql, db, auto_gamma=False, **kwargs): + """Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns. + + Parameters + ---------- + sql : str + SQL query to be executed. + db : str + Path to the sqlite database. + auto_gamma : bool + If True applies the gamma_method to all imported Obs objects with the default parameters for + the error analysis. Default False. + """ + con = sqlite3.connect(db) + extract_df = pd.read_sql(sql, con, **kwargs) + con.close() + return _deserialize_df(extract_df, auto_gamma=auto_gamma) + + def dump_df(df, fname, gz=True): """Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file. @@ -21,11 +63,7 @@ def dump_df(df, fname, gz=True): gz : bool If True, the output is a gzipped csv file. If False, the output is a csv file. """ - - out = df.copy() - for column in out: - if isinstance(out[column][0], (Obs, Corr)): - out[column] = out[column].transform(lambda x: create_json_string(x, indent=0)) + out = _serialize_df(df, gz=False) if not fname.endswith('.csv'): fname += '.csv' @@ -51,7 +89,6 @@ def load_df(fname, auto_gamma=False, gz=True): gz : bool If True, assumes that data is gzipped. If False, assumes JSON file. """ - if not fname.endswith('.csv') and not fname.endswith('.gz'): fname += '.csv' @@ -65,11 +102,50 @@ def load_df(fname, auto_gamma=False, gz=True): warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning) re_import = pd.read_csv(fname) - for column in re_import.select_dtypes(include="object"): - if isinstance(re_import[column][0], str): - if re_import[column][0][:20] == '{"program":"pyerrors': - re_import[column] = re_import[column].transform(lambda x: import_json_string(x, verbose=False)) - if auto_gamma is True: - re_import[column].apply(lambda x: x.gamma_method()) + return _deserialize_df(re_import, auto_gamma=auto_gamma) - return re_import + +def _serialize_df(df, gz=False): + """Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame to be serilized. + gz: bool + gzip the json string representation. Default False. + """ + out = df.copy() + for column in out: + if isinstance(out[column][0], (Obs, Corr)): + out[column] = out[column].transform(lambda x: create_json_string(x, indent=0)) + if gz is True: + out[column] = out[column].transform(lambda x: gzip.compress(x.encode('utf-8'))) + return out + + +def _deserialize_df(df, auto_gamma=False): + """Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame to be deserilized. + auto_gamma : bool + If True applies the gamma_method to all imported Obs objects with the default parameters for + the error analysis. Default False. + + Notes: + ------ + In case any column of the DataFrame is gzipped it is gunzipped in the process. + """ + for column in df.select_dtypes(include="object"): + if isinstance(df[column][0], bytes): + if df[column][0].startswith(b"\x1f\x8b\x08\x00"): + df[column] = df[column].transform(lambda x: gzip.decompress(x).decode('utf-8')) + if isinstance(df[column][0], str): + if '"program":' in df[column][0][:20]: + df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False)) + if auto_gamma is True: + df[column].apply(lambda x: x.gamma_method()) + return df diff --git a/setup.py b/setup.py index 33bde5bc..61f27767 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ setup(name='pyerrors', license="MIT", packages=find_packages(), python_requires='>=3.6.0', - install_requires=['numpy>=1.16', 'autograd>=1.4', 'numdifftools', 'matplotlib>=3.3', 'scipy>=1', 'iminuit>=2', 'h5py>=3', 'lxml>=4', 'python-rapidjson>=1', 'pandas>=1.1'], + install_requires=['numpy>=1.16', 'autograd>=1.4', 'numdifftools', 'matplotlib>=3.3', 'scipy>=1', 'iminuit>=2', 'h5py>=3', 'lxml>=4', 'python-rapidjson>=1', 'pandas>=1.1', 'pysqlite3>=0.4'], classifiers=[ 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Science/Research', diff --git a/tests/json_io_test.py b/tests/json_io_test.py index 5474c8ce..b161ea5c 100644 --- a/tests/json_io_test.py +++ b/tests/json_io_test.py @@ -1,5 +1,6 @@ import os import gzip +import rapidjson import numpy as np import pyerrors as pe import pyerrors.input.json as jsonio @@ -378,6 +379,11 @@ def test_reconstruct_non_linear_r_obs_list(tmp_path): assert assert_equal_Obs(oa, ob) +def test_import_non_json_string(): + with pytest.raises(rapidjson.JSONDecodeError): + pe.input.json.import_json_string("this is garbage") + + def assert_equal_Obs(to, ro): for kw in ["N", "cov_names", "covobs", "ddvalue", "dvalue", "e_content", "e_names", "idl", "mc_names", "names", diff --git a/tests/pandas_test.py b/tests/pandas_test.py index 658f4375..71961d38 100644 --- a/tests/pandas_test.py +++ b/tests/pandas_test.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd import pyerrors as pe +import pytest def test_df_export_import(tmp_path): my_dict = {"int": 1, @@ -28,3 +29,46 @@ def test_df_Corr(tmp_path): pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix()) reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True) + + +def test_default_export_pe_import(tmp_path): + df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}]) + df.to_csv((tmp_path / 'plain_df.csv').as_posix(), index=False) + re_df = pe.input.pandas.load_df((tmp_path / 'plain_df').as_posix(), gz=False) + assert np.all(df == re_df) + + +def test_pe_export_default_import(tmp_path): + df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}]) + pe.input.pandas.dump_df(df, (tmp_path / 'pe_df').as_posix(), gz=False) + re_df = pd.read_csv((tmp_path / 'pe_df.csv').as_posix()) + assert np.all(df == re_df) + + +def test_gz_serialization(): + my_obs = pe.pseudo_Obs(0.1, 0.01, "pandas DataFrame ensemble only for test purposes.") + my_df = pd.DataFrame([{"Label": 1, "Obs": my_obs}]) + for gz in [False, True]: + ser = pe.input.pandas._serialize_df(my_df, gz=gz) + deser = pe.input.pandas._deserialize_df(ser) + assert np.all(my_df == deser) + + +def test_sql(tmp_path): + my_list = [{"Label": i, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2 * i), 0.01, "test_ensemble", 20)} for i in range(15)] + pe_df = pd.DataFrame(my_list) + my_db = (tmp_path / "test_db.sqlite").as_posix() + pe.input.pandas.to_sql(pe_df, "My_table", my_db) + for auto_gamma in [False, True]: + re_df = pe.input.pandas.read_sql("SELECT * from My_table", my_db, auto_gamma=auto_gamma) + assert np.all(re_df == pe_df) + + +def test_sql_if_exists_fail(tmp_path): + pe_df = pd.DataFrame([{"Label": 1, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2), 0.01, "test_ensemble", 20)}]) + my_db = (tmp_path / "test_db.sqlite").as_posix() + pe.input.pandas.to_sql(pe_df, "My_table", my_db) + with pytest.raises(ValueError): + pe.input.pandas.to_sql(pe_df, "My_table", my_db) + pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='append') + pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='replace')