Merge pull request #53 from s-kuberski/feature/json_io

Introduced JSON I/O for dictionaries
This commit is contained in:
Fabian Joswig 2022-02-03 10:31:43 +00:00 committed by GitHub
commit eab2ba45ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 322 additions and 3 deletions

View file

@ -6,6 +6,7 @@ import socket
import datetime
import platform
import warnings
import re
from ..obs import Obs
from ..covobs import Covobs
from ..correlators import Corr
@ -20,7 +21,7 @@ def create_json_string(ol, description='', indent=1):
Parameters
----------
ol : list
List of objects that will be exported. At the moments, these objects can be
List of objects that will be exported. At the moment, these objects can be
either of: Obs, list, numpy.ndarray, Corr.
All Obs inside a structure have to be defined on the same set of configurations.
description : str
@ -246,7 +247,7 @@ def dump_to_json(ol, fname, description='', indent=1, gz=True):
Parameters
----------
ol : list
List of objects that will be exported. At the moments, these objects can be
List of objects that will be exported. At the moment, these objects can be
either of: Obs, list, numpy.ndarray, Corr.
All Obs inside a structure have to be defined on the same set of configurations.
fname : str
@ -476,7 +477,7 @@ def import_json_string(json_string, verbose=True, full_output=False):
def load_json(fname, verbose=True, gz=True, full_output=False):
"""Import a list of Obs or structures containing Obs from a .json.gz file.
"""Import a list of Obs or structures containing Obs from a .json(.gz) file.
The following structures are supported: Obs, list, numpy.ndarray, Corr
If the list contains only one element, it is unpacked from the list.
@ -507,3 +508,215 @@ def load_json(fname, verbose=True, gz=True, full_output=False):
d = fin.read()
return import_json_string(d, verbose, full_output)
def _ol_from_dict(ind, reps='DICTOBS'):
"""Convert a dictionary of Obs objects to a list and a dictionary that contains
placeholders instead of the Obs objects.
Parameters
----------
ind : dict
Dict of JSON valid structures and objects that will be exported.
At the moment, these object can be either of: Obs, list, numpy.ndarray, Corr.
All Obs inside a structure have to be defined on the same set of configurations.
reps : str
Specify the structure of the placeholder in exported dict to be reps[0-9]+.
"""
obstypes = (Obs, Corr, np.ndarray)
if not reps.isalnum():
raise Exception('Placeholder string has to be alphanumeric!')
ol = []
counter = 0
def dict_replace_obs(d):
nonlocal ol
nonlocal counter
x = {}
for k, v in d.items():
if isinstance(v, dict):
v = dict_replace_obs(v)
elif isinstance(v, list) and all([isinstance(o, Obs) for o in v]):
v = obslist_replace_obs(v)
elif isinstance(v, list):
v = list_replace_obs(v)
elif isinstance(v, obstypes):
ol.append(v)
v = reps + '%d' % (counter)
counter += 1
elif isinstance(v, str):
if bool(re.match(r'%s[0-9]+' % (reps), v)):
raise Exception('Dict contains string %s that matches the placeholder! %s Cannot be savely exported.' % (v, reps))
x[k] = v
return x
def list_replace_obs(li):
nonlocal ol
nonlocal counter
x = []
for e in li:
if isinstance(e, list):
e = list_replace_obs(e)
elif isinstance(e, list) and all([isinstance(o, Obs) for o in e]):
e = obslist_replace_obs(e)
elif isinstance(e, dict):
e = dict_replace_obs(e)
elif isinstance(e, obstypes):
ol.append(e)
e = reps + '%d' % (counter)
counter += 1
elif isinstance(e, str):
if bool(re.match(r'%s[0-9]+' % (reps), e)):
raise Exception('Dict contains string %s that matches the placeholder! %s Cannot be savely exported.' % (e, reps))
x.append(e)
return x
def obslist_replace_obs(li):
nonlocal ol
nonlocal counter
il = []
for e in li:
il.append(e)
ol.append(il)
x = reps + '%d' % (counter)
counter += 1
return x
nd = dict_replace_obs(ind)
return ol, nd
def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=True):
"""Export a dict of Obs or structures containing Obs to a .json(.gz) file
Parameters
----------
od : dict
Dict of JSON valid structures and objects that will be exported.
At the moment, these objects can be either of: Obs, list, numpy.ndarray, Corr.
All Obs inside a structure have to be defined on the same set of configurations.
fname : str
Filename of the output file.
description : str
Optional string that describes the contents of the json file.
indent : int
Specify the indentation level of the json file. None or 0 is permissible and
saves disk space.
reps : str
Specify the structure of the placeholder in exported dict to be reps[0-9]+.
gz : bool
If True, the output is a gzipped json. If False, the output is a json file.
"""
if not isinstance(od, dict):
raise Exception('od has to be a dictionary. Did you want to use dump_to_json?')
infostring = ('This JSON file contains a python dictionary that has been parsed to a list of structures. '
'OBSDICT contains the dictionary, where Obs or other structures have been replaced by '
'' + reps + '[0-9]+. The field description contains the additional description of this JSON file. '
'This file may be parsed to a dict with the pyerrors routine load_json_dict.')
desc_dict = {'INFO': infostring, 'OBSDICT': {}, 'description': description}
ol, desc_dict['OBSDICT'] = _ol_from_dict(od, reps=reps)
dump_to_json(ol, fname, description=desc_dict, indent=indent, gz=gz)
def _od_from_list_and_dict(ol, ind, reps='DICTOBS'):
"""Parse a list of Obs or structures containing Obs and an accompanying
dict, where the structures have been replaced by placeholders to a
dict that contains the structures.
The following structures are supported: Obs, list, numpy.ndarray, Corr
Parameters
----------
ol : list
List of objects -
At the moment, these objects can be either of: Obs, list, numpy.ndarray, Corr.
All Obs inside a structure have to be defined on the same set of configurations.
ind : dict
Dict that defines the structure of the resulting dict and contains placeholders
reps : str
Specify the structure of the placeholder in imported dict to be reps[0-9]+.
"""
if not reps.isalnum():
raise Exception('Placeholder string has to be alphanumeric!')
counter = 0
def dict_replace_string(d):
nonlocal counter
nonlocal ol
x = {}
for k, v in d.items():
if isinstance(v, dict):
v = dict_replace_string(v)
elif isinstance(v, list):
v = list_replace_string(v)
elif isinstance(v, str) and bool(re.match(r'%s[0-9]+' % (reps), v)):
index = int(v[len(reps):])
v = ol[index]
counter += 1
x[k] = v
return x
def list_replace_string(li):
nonlocal counter
nonlocal ol
x = []
for e in li:
if isinstance(e, list):
e = list_replace_string(e)
elif isinstance(e, dict):
e = dict_replace_string(e)
elif isinstance(e, str) and bool(re.match(r'%s[0-9]+' % (reps), e)):
index = int(e[len(reps):])
e = ol[index]
counter += 1
x.append(e)
return x
nd = dict_replace_string(ind)
if counter == 0:
raise Exception('No placeholder has been replaced! Check if reps is set correctly.')
return nd
def load_json_dict(fname, verbose=True, gz=True, full_output=False, reps='DICTOBS'):
"""Import a dict of Obs or structures containing Obs from a .json(.gz) file.
The following structures are supported: Obs, list, numpy.ndarray, Corr
Parameters
----------
fname : str
Filename of the input file.
verbose : bool
Print additional information that was written to the file.
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
full_output : bool
If True, a dict containing auxiliary information and the data is returned.
If False, only the data is returned.
reps : str
Specify the structure of the placeholder in imported dict to be reps[0-9]+.
"""
indata = load_json(fname, verbose=verbose, gz=gz, full_output=True)
description = indata['description']['description']
indict = indata['description']['OBSDICT']
ol = indata['obsdata']
od = _od_from_list_and_dict(ol, indict, reps=reps)
if full_output:
indata['description'] = description
indata['obsdata'] = od
return indata
else:
return od

View file

@ -3,6 +3,7 @@ import gzip
import numpy as np
import pyerrors as pe
import pyerrors.input.json as jsonio
import pytest
def test_jsonio():
@ -136,3 +137,108 @@ def test_json_corr_2d_io():
assert recover[index] is None
assert my_corr.tag == recover.tag
assert my_corr.prange == recover.prange
def test_json_dict_io():
def check_dict_equality(d1, d2):
def dict_check_obs(d1, d2):
for k, v in d1.items():
if isinstance(v, dict):
v = dict_check_obs(v, d2[k])
elif isinstance(v, list) and all([isinstance(o, pe.Obs) for o in v]):
for i in range(len(v)):
assert((v[i] - d2[k][i]).is_zero())
elif isinstance(v, list):
v = list_check_obs(v, d2[k])
elif isinstance(v, pe.Obs):
assert((v - d2[k]).is_zero())
elif isinstance(v, pe.Corr):
for i in range(v.T):
assert((v[i] - d2[k][i]).is_zero())
elif isinstance(v, np.ndarray):
a1 = np.ravel(v)
a2 = np.ravel(d2[k])
for i in range(len(a1)):
assert((a1[i] - a2[i]).is_zero())
def list_check_obs(l1, l2):
for ei in range(len(l1)):
e = l1[ei]
if isinstance(e, list):
e = list_check_obs(e, l2[ei])
elif isinstance(e, list) and all([isinstance(o, pe.Obs) for o in e]):
for i in range(len(e)):
assert((e[i] - l2[ei][i]).is_zero())
elif isinstance(e, dict):
e = dict_check_obs(e, l2[ei])
elif isinstance(e, pe.Obs):
assert((e - l2[ei]).is_zero())
elif isinstance(e, pe.Corr):
for i in range(e.T):
assert((e[i] - l2[ei][i]).is_zero())
elif isinstance(e, np.ndarray):
a1 = np.ravel(e)
a2 = np.ravel(l2[ei])
for i in range(len(a1)):
assert((a1[i] - a2[i]).is_zero())
dict_check_obs(d1, d2)
return True
od = {
'l':
{
'a': pe.pseudo_Obs(1, .2, 'testa', samples=10),
'b': [pe.pseudo_Obs(1.1, .1, 'test', samples=10), pe.pseudo_Obs(1.2, .1, 'test', samples=10), pe.pseudo_Obs(1.3, .1, 'test', samples=10)],
'c': {
'd': 1,
'e': pe.pseudo_Obs(.2, .01, 'teste', samples=10),
'f': pe.Corr([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]),
'g': np.reshape(np.asarray([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]), (2, 2)),
}
},
's':
{
'a': 'Infor123',
'b': ['Some', 'list'],
'd': pe.pseudo_Obs(.01, .001, 'testd', samples=10) * pe.cov_Obs(1, .01, 'cov1'),
'se': None,
'sf': 1.2,
}
}
fname = 'test_rw'
desc = 'This is a random description'
with pytest.raises(Exception):
jsonio.dump_dict_to_json(od, fname, description=desc, reps='|Test')
jsonio.dump_dict_to_json(od, fname, description=desc, reps='TEST')
nd = jsonio.load_json_dict(fname, full_output=True, reps='TEST')
with pytest.raises(Exception):
nd = jsonio.load_json_dict(fname, full_output=True)
jsonio.dump_dict_to_json(od, fname, description=desc)
nd = jsonio.load_json_dict(fname, full_output=True)
assert (desc == nd['description'])
assert(check_dict_equality(od, nd['obsdata']))
nd = jsonio.load_json_dict(fname, full_output=False)
assert(check_dict_equality(od, nd))
nl = jsonio.load_json(fname, full_output=True)
nl = jsonio.load_json(fname, full_output=False)
with pytest.raises(Exception):
jsonio.dump_dict_to_json(nl, fname, description=desc)
od['k'] = 'DICTOBS2'
with pytest.raises(Exception):
jsonio.dump_dict_to_json(od, fname, description=desc)
od['k'] = ['DICTOBS2']
with pytest.raises(Exception):
jsonio.dump_dict_to_json(od, fname, description=desc)
os.remove(fname + '.json.gz')