Merge pull request #115 from fjosw/feature/refactor_pandas

Pandas SQL support
This commit is contained in:
Fabian Joswig 2022-07-06 14:33:24 +01:00 committed by GitHub
commit 4f37804fe7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 140 additions and 14 deletions

View file

@ -1,11 +1,53 @@
import warnings
import gzip
import sqlite3
import pandas as pd
from ..obs import Obs
from ..correlators import Corr
from .json import create_json_string, import_json_string
def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
Parameters
----------
df : pandas.DataFrame
Dataframe to be written to the database.
table_name : str
Name of the table in the database.
db : str
Path to the sqlite database.
if exists : str
How to behave if table already exists. Options 'fail', 'replace', 'append'.
gz : bool
If True the json strings are gzipped.
"""
se_df = _serialize_df(df, gz=gz)
con = sqlite3.connect(db)
se_df.to_sql(table_name, con, if_exists=if_exists, index=False, **kwargs)
con.close()
def read_sql(sql, db, auto_gamma=False, **kwargs):
"""Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns.
Parameters
----------
sql : str
SQL query to be executed.
db : str
Path to the sqlite database.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
"""
con = sqlite3.connect(db)
extract_df = pd.read_sql(sql, con, **kwargs)
con.close()
return _deserialize_df(extract_df, auto_gamma=auto_gamma)
def dump_df(df, fname, gz=True):
"""Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file.
@ -21,11 +63,7 @@ def dump_df(df, fname, gz=True):
gz : bool
If True, the output is a gzipped csv file. If False, the output is a csv file.
"""
out = df.copy()
for column in out:
if isinstance(out[column][0], (Obs, Corr)):
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0))
out = _serialize_df(df, gz=False)
if not fname.endswith('.csv'):
fname += '.csv'
@ -51,7 +89,6 @@ def load_df(fname, auto_gamma=False, gz=True):
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
"""
if not fname.endswith('.csv') and not fname.endswith('.gz'):
fname += '.csv'
@ -65,11 +102,50 @@ def load_df(fname, auto_gamma=False, gz=True):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
re_import = pd.read_csv(fname)
for column in re_import.select_dtypes(include="object"):
if isinstance(re_import[column][0], str):
if re_import[column][0][:20] == '{"program":"pyerrors':
re_import[column] = re_import[column].transform(lambda x: import_json_string(x, verbose=False))
if auto_gamma is True:
re_import[column].apply(lambda x: x.gamma_method())
return _deserialize_df(re_import, auto_gamma=auto_gamma)
return re_import
def _serialize_df(df, gz=False):
"""Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be serilized.
gz: bool
gzip the json string representation. Default False.
"""
out = df.copy()
for column in out:
if isinstance(out[column][0], (Obs, Corr)):
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0))
if gz is True:
out[column] = out[column].transform(lambda x: gzip.compress(x.encode('utf-8')))
return out
def _deserialize_df(df, auto_gamma=False):
"""Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be deserilized.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
Notes:
------
In case any column of the DataFrame is gzipped it is gunzipped in the process.
"""
for column in df.select_dtypes(include="object"):
if isinstance(df[column][0], bytes):
if df[column][0].startswith(b"\x1f\x8b\x08\x00"):
df[column] = df[column].transform(lambda x: gzip.decompress(x).decode('utf-8'))
if isinstance(df[column][0], str):
if '"program":' in df[column][0][:20]:
df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False))
if auto_gamma is True:
df[column].apply(lambda x: x.gamma_method())
return df

View file

@ -25,7 +25,7 @@ setup(name='pyerrors',
license="MIT",
packages=find_packages(),
python_requires='>=3.6.0',
install_requires=['numpy>=1.16', 'autograd>=1.4', 'numdifftools', 'matplotlib>=3.3', 'scipy>=1', 'iminuit>=2', 'h5py>=3', 'lxml>=4', 'python-rapidjson>=1', 'pandas>=1.1'],
install_requires=['numpy>=1.16', 'autograd>=1.4', 'numdifftools', 'matplotlib>=3.3', 'scipy>=1', 'iminuit>=2', 'h5py>=3', 'lxml>=4', 'python-rapidjson>=1', 'pandas>=1.1', 'pysqlite3>=0.4'],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Science/Research',

View file

@ -1,5 +1,6 @@
import os
import gzip
import rapidjson
import numpy as np
import pyerrors as pe
import pyerrors.input.json as jsonio
@ -378,6 +379,11 @@ def test_reconstruct_non_linear_r_obs_list(tmp_path):
assert assert_equal_Obs(oa, ob)
def test_import_non_json_string():
with pytest.raises(rapidjson.JSONDecodeError):
pe.input.json.import_json_string("this is garbage")
def assert_equal_Obs(to, ro):
for kw in ["N", "cov_names", "covobs", "ddvalue", "dvalue", "e_content",
"e_names", "idl", "mc_names", "names",

View file

@ -1,6 +1,7 @@
import numpy as np
import pandas as pd
import pyerrors as pe
import pytest
def test_df_export_import(tmp_path):
my_dict = {"int": 1,
@ -28,3 +29,46 @@ def test_df_Corr(tmp_path):
pe.input.pandas.dump_df(my_df, (tmp_path / 'df_output').as_posix())
reconstructed_df = pe.input.pandas.load_df((tmp_path / 'df_output').as_posix(), auto_gamma=True)
def test_default_export_pe_import(tmp_path):
df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}])
df.to_csv((tmp_path / 'plain_df.csv').as_posix(), index=False)
re_df = pe.input.pandas.load_df((tmp_path / 'plain_df').as_posix(), gz=False)
assert np.all(df == re_df)
def test_pe_export_default_import(tmp_path):
df = pd.DataFrame([{"Column1": 1.1, "Column2": 2, "Column3": "my string£"}])
pe.input.pandas.dump_df(df, (tmp_path / 'pe_df').as_posix(), gz=False)
re_df = pd.read_csv((tmp_path / 'pe_df.csv').as_posix())
assert np.all(df == re_df)
def test_gz_serialization():
my_obs = pe.pseudo_Obs(0.1, 0.01, "pandas DataFrame ensemble only for test purposes.")
my_df = pd.DataFrame([{"Label": 1, "Obs": my_obs}])
for gz in [False, True]:
ser = pe.input.pandas._serialize_df(my_df, gz=gz)
deser = pe.input.pandas._deserialize_df(ser)
assert np.all(my_df == deser)
def test_sql(tmp_path):
my_list = [{"Label": i, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2 * i), 0.01, "test_ensemble", 20)} for i in range(15)]
pe_df = pd.DataFrame(my_list)
my_db = (tmp_path / "test_db.sqlite").as_posix()
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
for auto_gamma in [False, True]:
re_df = pe.input.pandas.read_sql("SELECT * from My_table", my_db, auto_gamma=auto_gamma)
assert np.all(re_df == pe_df)
def test_sql_if_exists_fail(tmp_path):
pe_df = pd.DataFrame([{"Label": 1, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2), 0.01, "test_ensemble", 20)}])
my_db = (tmp_path / "test_db.sqlite").as_posix()
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
with pytest.raises(ValueError):
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='append')
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='replace')