mirror of
https://github.com/fjosw/pyerrors.git
synced 2025-05-14 11:33:42 +02:00
feat: default value for if_exist in to_sql changed to fail, test for
this behavior added.
This commit is contained in:
parent
03d70d3757
commit
a6ebcb59bb
2 changed files with 14 additions and 3 deletions
|
@ -7,7 +7,7 @@ from ..correlators import Corr
|
||||||
from .json import create_json_string, import_json_string
|
from .json import create_json_string, import_json_string
|
||||||
|
|
||||||
|
|
||||||
def to_sql(df, table_name, db, if_exists="replace", gz=True):
|
def to_sql(df, table_name, db, if_exists='fail', gz=True):
|
||||||
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
|
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
|
@ -30,7 +30,7 @@ def to_sql(df, table_name, db, if_exists="replace", gz=True):
|
||||||
|
|
||||||
|
|
||||||
def read_sql_query(sql, db, auto_gamma=False):
|
def read_sql_query(sql, db, auto_gamma=False):
|
||||||
"""Execute SQL query on sqlite database and obatin DataFrame including Obs or Corr valued columns.
|
"""Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
|
@ -113,7 +113,7 @@ def _serialize_df(df, gz=False):
|
||||||
df : pandas.DataFrame
|
df : pandas.DataFrame
|
||||||
DataFrame to be serilized.
|
DataFrame to be serilized.
|
||||||
gz: bool
|
gz: bool
|
||||||
gzip the json string represenation. Default False.
|
gzip the json string representation. Default False.
|
||||||
"""
|
"""
|
||||||
out = df.copy()
|
out = df.copy()
|
||||||
for column in out:
|
for column in out:
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import pyerrors as pe
|
import pyerrors as pe
|
||||||
|
import pytest
|
||||||
|
|
||||||
def test_df_export_import(tmp_path):
|
def test_df_export_import(tmp_path):
|
||||||
my_dict = {"int": 1,
|
my_dict = {"int": 1,
|
||||||
|
@ -61,3 +62,13 @@ def test_sql(tmp_path):
|
||||||
for auto_gamma in [False, True]:
|
for auto_gamma in [False, True]:
|
||||||
re_df = pe.input.pandas.read_sql_query("SELECT * from My_table", my_db, auto_gamma=auto_gamma)
|
re_df = pe.input.pandas.read_sql_query("SELECT * from My_table", my_db, auto_gamma=auto_gamma)
|
||||||
assert np.all(re_df == pe_df)
|
assert np.all(re_df == pe_df)
|
||||||
|
|
||||||
|
|
||||||
|
def test_sql_if_exists_fail(tmp_path):
|
||||||
|
pe_df = pd.DataFrame([{"Label": 1, "Obs": pe.pseudo_Obs(5 * np.exp(-0.2), 0.01, "test_ensemble", 20)}])
|
||||||
|
my_db = (tmp_path / "test_db.sqlite").as_posix()
|
||||||
|
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
pe.input.pandas.to_sql(pe_df, "My_table", my_db)
|
||||||
|
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='append')
|
||||||
|
pe.input.pandas.to_sql(pe_df, "My_table", my_db, if_exists='replace')
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue