pyerrors/pyerrors/input/pandas.py
Fabian Joswig 682d23604d
[Fix] Pandas 3 string type changes (#278)
* [Fix] Pandas 3 string type handling

* [ci] Temporarily remove WError because of scipy deprecation

* [Fix] Address edge cases in _deserialize_df

- Use pd.isna() instead of truthiness check for gzip null guard, fixing
  incorrect behavior when null is np.nan (which is truthy in Python)
- Add bounds check to while loop to prevent IndexError when all non-null
  values are empty strings converted to None by regex replace

* [Fix] Address edge cases in _deserialize_df and add string dtype tests

- Guard against IndexError on empty DataFrames and all-null columns
- Use is not None instead of pd.isna() for Obs objects in auto_gamma
- Add tests for string dtype columns (with/without None, CSV and SQL)
- Add test for empty DataFrame deserialization

* [Fix] Avoid skipping NA-to-None conversion and guard auto_gamma against None lists

- Replace continue with conditional to preserve NA-to-None conversion for all-null columns
- Guard auto_gamma list lambda against None values to prevent TypeError
- Add tests for all-empty-string columns and Obs lists with None + auto_gamma
2026-03-29 18:46:15 +02:00

214 lines
7.3 KiB
Python

import warnings
import gzip
import sqlite3
from contextlib import closing
import pandas as pd
from ..obs import Obs
from ..correlators import Corr
from .json import create_json_string, import_json_string
import numpy as np
def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
Parameters
----------
df : pandas.DataFrame
Dataframe to be written to the database.
table_name : str
Name of the table in the database.
db : str
Path to the sqlite database.
if exists : str
How to behave if table already exists. Options 'fail', 'replace', 'append'.
gz : bool
If True the json strings are gzipped.
Returns
-------
None
"""
se_df = _serialize_df(df, gz=gz)
with closing(sqlite3.connect(db)) as con:
se_df.to_sql(table_name, con=con, if_exists=if_exists, index=False, **kwargs)
def read_sql(sql, db, auto_gamma=False, **kwargs):
"""Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns.
Parameters
----------
sql : str
SQL query to be executed.
db : str
Path to the sqlite database.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
Returns
-------
data : pandas.DataFrame
Dataframe with the content of the sqlite database.
"""
with closing(sqlite3.connect(db)) as con:
extract_df = pd.read_sql(sql, con=con, **kwargs)
return _deserialize_df(extract_df, auto_gamma=auto_gamma)
def dump_df(df, fname, gz=True):
"""Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file.
Before making use of pandas to_csv functionality Obs objects are serialized via the standardized
json format of pyerrors.
Parameters
----------
df : pandas.DataFrame
Dataframe to be dumped to a file.
fname : str
Filename of the output file.
gz : bool
If True, the output is a gzipped csv file. If False, the output is a csv file.
Returns
-------
None
"""
for column in df:
serialize = _need_to_serialize(df[column])
if not serialize:
if all(isinstance(entry, (int, np.integer, float, np.floating)) for entry in df[column]):
if any([np.isnan(entry) for entry in df[column]]):
warnings.warn("nan value in column " + column + " will be replaced by None", UserWarning)
out = _serialize_df(df, gz=False)
if not fname.endswith('.csv'):
fname += '.csv'
if gz is True:
if not fname.endswith('.gz'):
fname += '.gz'
out.to_csv(fname, index=False, compression='gzip')
else:
out.to_csv(fname, index=False)
def load_df(fname, auto_gamma=False, gz=True):
"""Imports a pandas DataFrame from a csv.(gz) file in which Obs objects are serialized as json strings.
Parameters
----------
fname : str
Filename of the input file.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
Returns
-------
data : pandas.DataFrame
Dataframe with the content of the sqlite database.
"""
if not fname.endswith('.csv') and not fname.endswith('.gz'):
fname += '.csv'
if gz is True:
if not fname.endswith('.gz'):
fname += '.gz'
with gzip.open(fname) as f:
re_import = pd.read_csv(f, keep_default_na=False)
else:
if fname.endswith('.gz'):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
re_import = pd.read_csv(fname, keep_default_na=False)
return _deserialize_df(re_import, auto_gamma=auto_gamma)
def _serialize_df(df, gz=False):
"""Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be serilized.
gz: bool
gzip the json string representation. Default False.
"""
out = df.copy()
for column in out:
serialize = _need_to_serialize(out[column])
if serialize is True:
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0) if not _is_null(x) else None)
if gz is True:
out[column] = out[column].transform(lambda x: gzip.compress(x.encode('utf-8')) if not _is_null(x) else gzip.compress(b''))
return out
def _deserialize_df(df, auto_gamma=False):
"""Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be deserilized.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
Notes:
------
In case any column of the DataFrame is gzipped it is gunzipped in the process.
"""
# In pandas 3+, string columns use 'str' dtype instead of 'object'
string_like_dtypes = ["object", "str"] if int(pd.__version__.split(".")[0]) >= 3 else ["object"]
for column in df.select_dtypes(include=string_like_dtypes):
if len(df[column]) == 0:
continue
if isinstance(df[column].iloc[0], bytes):
if df[column].iloc[0].startswith(b"\x1f\x8b\x08\x00"):
df[column] = df[column].transform(lambda x: gzip.decompress(x).decode('utf-8') if not pd.isna(x) else '')
if df[column].notna().any():
df[column] = df[column].replace({r'^$': None}, regex=True)
i = 0
while i < len(df[column]) and pd.isna(df[column].iloc[i]):
i += 1
if i < len(df[column]) and isinstance(df[column].iloc[i], str):
if '"program":' in df[column].iloc[i][:20]:
df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False) if not pd.isna(x) else None)
if auto_gamma is True:
if isinstance(df[column].iloc[i], list):
df[column].apply(lambda x: [o.gm() if o is not None else x for o in x] if x is not None else x)
else:
df[column].apply(lambda x: x.gm() if x is not None else x)
# Convert NA values back to Python None for compatibility with `x is None` checks
if df[column].isna().any():
df[column] = df[column].astype(object).where(df[column].notna(), None)
return df
def _need_to_serialize(col):
serialize = False
i = 0
while i < len(col) and _is_null(col.iloc[i]):
i += 1
if i == len(col):
return serialize
if isinstance(col.iloc[i], (Obs, Corr)):
serialize = True
elif isinstance(col.iloc[i], list):
if all(isinstance(o, Obs) for o in col.iloc[i]):
serialize = True
return serialize
def _is_null(val):
"""Check if a value is null (None or NA), handling list/array values."""
return False if isinstance(val, (list, np.ndarray)) else pd.isna(val)