mirror of
https://github.com/fjosw/pyerrors.git
synced 2025-03-15 06:40:24 +01:00
Introduced JSON I/O for dictionaries
This commit is contained in:
parent
9b63d8e2bd
commit
0dc4d4008a
2 changed files with 322 additions and 3 deletions
|
@ -6,6 +6,7 @@ import socket
|
|||
import datetime
|
||||
import platform
|
||||
import warnings
|
||||
import re
|
||||
from ..obs import Obs
|
||||
from ..covobs import Covobs
|
||||
from ..correlators import Corr
|
||||
|
@ -20,7 +21,7 @@ def create_json_string(ol, description='', indent=1):
|
|||
Parameters
|
||||
----------
|
||||
ol : list
|
||||
List of objects that will be exported. At the moments, these objects can be
|
||||
List of objects that will be exported. At the moment, these objects can be
|
||||
either of: Obs, list, numpy.ndarray, Corr.
|
||||
All Obs inside a structure have to be defined on the same set of configurations.
|
||||
description : str
|
||||
|
@ -246,7 +247,7 @@ def dump_to_json(ol, fname, description='', indent=1, gz=True):
|
|||
Parameters
|
||||
----------
|
||||
ol : list
|
||||
List of objects that will be exported. At the moments, these objects can be
|
||||
List of objects that will be exported. At the moment, these objects can be
|
||||
either of: Obs, list, numpy.ndarray, Corr.
|
||||
All Obs inside a structure have to be defined on the same set of configurations.
|
||||
fname : str
|
||||
|
@ -476,7 +477,7 @@ def import_json_string(json_string, verbose=True, full_output=False):
|
|||
|
||||
|
||||
def load_json(fname, verbose=True, gz=True, full_output=False):
|
||||
"""Import a list of Obs or structures containing Obs from a .json.gz file.
|
||||
"""Import a list of Obs or structures containing Obs from a .json(.gz) file.
|
||||
|
||||
The following structures are supported: Obs, list, numpy.ndarray, Corr
|
||||
If the list contains only one element, it is unpacked from the list.
|
||||
|
@ -507,3 +508,215 @@ def load_json(fname, verbose=True, gz=True, full_output=False):
|
|||
d = fin.read()
|
||||
|
||||
return import_json_string(d, verbose, full_output)
|
||||
|
||||
|
||||
def _ol_from_dict(ind, reps='DICTOBS'):
|
||||
"""Convert a dictionary of Obs objects to a list and a dictionary that contains
|
||||
placeholders instead of the Obs objects.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ind : dict
|
||||
Dict of JSON valid structures and objects that will be exported.
|
||||
At the moment, these object can be either of: Obs, list, numpy.ndarray, Corr.
|
||||
All Obs inside a structure have to be defined on the same set of configurations.
|
||||
reps : str
|
||||
Specify the structure of the placeholder in exported dict to be reps[0-9]+.
|
||||
"""
|
||||
|
||||
obstypes = (Obs, Corr, np.ndarray)
|
||||
|
||||
if not reps.isalnum():
|
||||
raise Exception('Placeholder string has to be alphanumeric!')
|
||||
ol = []
|
||||
counter = 0
|
||||
|
||||
def dict_replace_obs(d):
|
||||
nonlocal ol
|
||||
nonlocal counter
|
||||
x = {}
|
||||
for k, v in d.items():
|
||||
if isinstance(v, dict):
|
||||
v = dict_replace_obs(v)
|
||||
elif isinstance(v, list) and all([isinstance(o, Obs) for o in v]):
|
||||
v = obslist_replace_obs(v)
|
||||
elif isinstance(v, list):
|
||||
v = list_replace_obs(v)
|
||||
elif isinstance(v, obstypes):
|
||||
ol.append(v)
|
||||
v = reps + '%d' % (counter)
|
||||
counter += 1
|
||||
elif isinstance(v, str):
|
||||
if bool(re.match(r'%s[0-9]+' % (reps), v)):
|
||||
raise Exception('Dict contains string %s that matches the placeholder! %s Cannot be savely exported.' % (v, reps))
|
||||
x[k] = v
|
||||
return x
|
||||
|
||||
def list_replace_obs(li):
|
||||
nonlocal ol
|
||||
nonlocal counter
|
||||
x = []
|
||||
for e in li:
|
||||
if isinstance(e, list):
|
||||
e = list_replace_obs(e)
|
||||
elif isinstance(e, list) and all([isinstance(o, Obs) for o in e]):
|
||||
e = obslist_replace_obs(e)
|
||||
elif isinstance(e, dict):
|
||||
e = dict_replace_obs(e)
|
||||
elif isinstance(e, obstypes):
|
||||
ol.append(e)
|
||||
e = reps + '%d' % (counter)
|
||||
counter += 1
|
||||
elif isinstance(e, str):
|
||||
if bool(re.match(r'%s[0-9]+' % (reps), e)):
|
||||
raise Exception('Dict contains string %s that matches the placeholder! %s Cannot be savely exported.' % (e, reps))
|
||||
x.append(e)
|
||||
return x
|
||||
|
||||
def obslist_replace_obs(li):
|
||||
nonlocal ol
|
||||
nonlocal counter
|
||||
il = []
|
||||
for e in li:
|
||||
il.append(e)
|
||||
|
||||
ol.append(il)
|
||||
x = reps + '%d' % (counter)
|
||||
counter += 1
|
||||
return x
|
||||
|
||||
nd = dict_replace_obs(ind)
|
||||
|
||||
return ol, nd
|
||||
|
||||
|
||||
def dump_dict_to_json(od, fname, description='', indent=1, reps='DICTOBS', gz=True):
|
||||
"""Export a dict of Obs or structures containing Obs to a .json(.gz) file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
od : dict
|
||||
Dict of JSON valid structures and objects that will be exported.
|
||||
At the moment, these objects can be either of: Obs, list, numpy.ndarray, Corr.
|
||||
All Obs inside a structure have to be defined on the same set of configurations.
|
||||
fname : str
|
||||
Filename of the output file.
|
||||
description : str
|
||||
Optional string that describes the contents of the json file.
|
||||
indent : int
|
||||
Specify the indentation level of the json file. None or 0 is permissible and
|
||||
saves disk space.
|
||||
reps : str
|
||||
Specify the structure of the placeholder in exported dict to be reps[0-9]+.
|
||||
gz : bool
|
||||
If True, the output is a gzipped json. If False, the output is a json file.
|
||||
"""
|
||||
|
||||
if not isinstance(od, dict):
|
||||
raise Exception('od has to be a dictionary. Did you want to use dump_to_json?')
|
||||
|
||||
infostring = ('This JSON file contains a python dictionary that has been parsed to a list of structures. '
|
||||
'OBSDICT contains the dictionary, where Obs or other structures have been replaced by '
|
||||
'' + reps + '[0-9]+. The field description contains the additional description of this JSON file. '
|
||||
'This file may be parsed to a dict with the pyerrors routine load_json_dict.')
|
||||
|
||||
desc_dict = {'INFO': infostring, 'OBSDICT': {}, 'description': description}
|
||||
ol, desc_dict['OBSDICT'] = _ol_from_dict(od, reps=reps)
|
||||
|
||||
dump_to_json(ol, fname, description=desc_dict, indent=indent, gz=gz)
|
||||
|
||||
|
||||
def _od_from_list_and_dict(ol, ind, reps='DICTOBS'):
|
||||
"""Parse a list of Obs or structures containing Obs and an accompanying
|
||||
dict, where the structures have been replaced by placeholders to a
|
||||
dict that contains the structures.
|
||||
|
||||
The following structures are supported: Obs, list, numpy.ndarray, Corr
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ol : list
|
||||
List of objects -
|
||||
At the moment, these objects can be either of: Obs, list, numpy.ndarray, Corr.
|
||||
All Obs inside a structure have to be defined on the same set of configurations.
|
||||
ind : dict
|
||||
Dict that defines the structure of the resulting dict and contains placeholders
|
||||
reps : str
|
||||
Specify the structure of the placeholder in imported dict to be reps[0-9]+.
|
||||
"""
|
||||
if not reps.isalnum():
|
||||
raise Exception('Placeholder string has to be alphanumeric!')
|
||||
|
||||
counter = 0
|
||||
|
||||
def dict_replace_string(d):
|
||||
nonlocal counter
|
||||
nonlocal ol
|
||||
x = {}
|
||||
for k, v in d.items():
|
||||
if isinstance(v, dict):
|
||||
v = dict_replace_string(v)
|
||||
elif isinstance(v, list):
|
||||
v = list_replace_string(v)
|
||||
elif isinstance(v, str) and bool(re.match(r'%s[0-9]+' % (reps), v)):
|
||||
index = int(v[len(reps):])
|
||||
v = ol[index]
|
||||
counter += 1
|
||||
x[k] = v
|
||||
return x
|
||||
|
||||
def list_replace_string(li):
|
||||
nonlocal counter
|
||||
nonlocal ol
|
||||
x = []
|
||||
for e in li:
|
||||
if isinstance(e, list):
|
||||
e = list_replace_string(e)
|
||||
elif isinstance(e, dict):
|
||||
e = dict_replace_string(e)
|
||||
elif isinstance(e, str) and bool(re.match(r'%s[0-9]+' % (reps), e)):
|
||||
index = int(e[len(reps):])
|
||||
e = ol[index]
|
||||
counter += 1
|
||||
x.append(e)
|
||||
return x
|
||||
|
||||
nd = dict_replace_string(ind)
|
||||
|
||||
if counter == 0:
|
||||
raise Exception('No placeholder has been replaced! Check if reps is set correctly.')
|
||||
|
||||
return nd
|
||||
|
||||
|
||||
def load_json_dict(fname, verbose=True, gz=True, full_output=False, reps='DICTOBS'):
|
||||
"""Import a dict of Obs or structures containing Obs from a .json(.gz) file.
|
||||
|
||||
The following structures are supported: Obs, list, numpy.ndarray, Corr
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fname : str
|
||||
Filename of the input file.
|
||||
verbose : bool
|
||||
Print additional information that was written to the file.
|
||||
gz : bool
|
||||
If True, assumes that data is gzipped. If False, assumes JSON file.
|
||||
full_output : bool
|
||||
If True, a dict containing auxiliary information and the data is returned.
|
||||
If False, only the data is returned.
|
||||
reps : str
|
||||
Specify the structure of the placeholder in imported dict to be reps[0-9]+.
|
||||
"""
|
||||
indata = load_json(fname, verbose=verbose, gz=gz, full_output=True)
|
||||
description = indata['description']['description']
|
||||
indict = indata['description']['OBSDICT']
|
||||
ol = indata['obsdata']
|
||||
od = _od_from_list_and_dict(ol, indict, reps=reps)
|
||||
|
||||
if full_output:
|
||||
indata['description'] = description
|
||||
indata['obsdata'] = od
|
||||
return indata
|
||||
else:
|
||||
return od
|
||||
|
|
106
tests/io_test.py
106
tests/io_test.py
|
@ -3,6 +3,7 @@ import gzip
|
|||
import numpy as np
|
||||
import pyerrors as pe
|
||||
import pyerrors.input.json as jsonio
|
||||
import pytest
|
||||
|
||||
|
||||
def test_jsonio():
|
||||
|
@ -136,3 +137,108 @@ def test_json_corr_2d_io():
|
|||
assert recover[index] is None
|
||||
assert my_corr.tag == recover.tag
|
||||
assert my_corr.prange == recover.prange
|
||||
|
||||
|
||||
def test_json_dict_io():
|
||||
def check_dict_equality(d1, d2):
|
||||
def dict_check_obs(d1, d2):
|
||||
for k, v in d1.items():
|
||||
if isinstance(v, dict):
|
||||
v = dict_check_obs(v, d2[k])
|
||||
elif isinstance(v, list) and all([isinstance(o, pe.Obs) for o in v]):
|
||||
for i in range(len(v)):
|
||||
assert((v[i] - d2[k][i]).is_zero())
|
||||
elif isinstance(v, list):
|
||||
v = list_check_obs(v, d2[k])
|
||||
elif isinstance(v, pe.Obs):
|
||||
assert((v - d2[k]).is_zero())
|
||||
elif isinstance(v, pe.Corr):
|
||||
for i in range(v.T):
|
||||
assert((v[i] - d2[k][i]).is_zero())
|
||||
elif isinstance(v, np.ndarray):
|
||||
a1 = np.ravel(v)
|
||||
a2 = np.ravel(d2[k])
|
||||
for i in range(len(a1)):
|
||||
assert((a1[i] - a2[i]).is_zero())
|
||||
|
||||
def list_check_obs(l1, l2):
|
||||
for ei in range(len(l1)):
|
||||
e = l1[ei]
|
||||
if isinstance(e, list):
|
||||
e = list_check_obs(e, l2[ei])
|
||||
elif isinstance(e, list) and all([isinstance(o, pe.Obs) for o in e]):
|
||||
for i in range(len(e)):
|
||||
assert((e[i] - l2[ei][i]).is_zero())
|
||||
elif isinstance(e, dict):
|
||||
e = dict_check_obs(e, l2[ei])
|
||||
elif isinstance(e, pe.Obs):
|
||||
assert((e - l2[ei]).is_zero())
|
||||
elif isinstance(e, pe.Corr):
|
||||
for i in range(e.T):
|
||||
assert((e[i] - l2[ei][i]).is_zero())
|
||||
elif isinstance(e, np.ndarray):
|
||||
a1 = np.ravel(e)
|
||||
a2 = np.ravel(l2[ei])
|
||||
for i in range(len(a1)):
|
||||
assert((a1[i] - a2[i]).is_zero())
|
||||
dict_check_obs(d1, d2)
|
||||
return True
|
||||
|
||||
od = {
|
||||
'l':
|
||||
{
|
||||
'a': pe.pseudo_Obs(1, .2, 'testa', samples=10),
|
||||
'b': [pe.pseudo_Obs(1.1, .1, 'test', samples=10), pe.pseudo_Obs(1.2, .1, 'test', samples=10), pe.pseudo_Obs(1.3, .1, 'test', samples=10)],
|
||||
'c': {
|
||||
'd': 1,
|
||||
'e': pe.pseudo_Obs(.2, .01, 'teste', samples=10),
|
||||
'f': pe.Corr([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]),
|
||||
'g': np.reshape(np.asarray([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]), (2, 2)),
|
||||
}
|
||||
},
|
||||
's':
|
||||
{
|
||||
'a': 'Infor123',
|
||||
'b': ['Some', 'list'],
|
||||
'd': pe.pseudo_Obs(.01, .001, 'testd', samples=10) * pe.cov_Obs(1, .01, 'cov1'),
|
||||
'se': None,
|
||||
'sf': 1.2,
|
||||
}
|
||||
}
|
||||
|
||||
fname = 'test_rw'
|
||||
|
||||
desc = 'This is a random description'
|
||||
|
||||
with pytest.raises(Exception):
|
||||
jsonio.dump_dict_to_json(od, fname, description=desc, reps='|Test')
|
||||
|
||||
jsonio.dump_dict_to_json(od, fname, description=desc, reps='TEST')
|
||||
nd = jsonio.load_json_dict(fname, full_output=True, reps='TEST')
|
||||
|
||||
with pytest.raises(Exception):
|
||||
nd = jsonio.load_json_dict(fname, full_output=True)
|
||||
|
||||
jsonio.dump_dict_to_json(od, fname, description=desc)
|
||||
nd = jsonio.load_json_dict(fname, full_output=True)
|
||||
assert (desc == nd['description'])
|
||||
|
||||
assert(check_dict_equality(od, nd['obsdata']))
|
||||
nd = jsonio.load_json_dict(fname, full_output=False)
|
||||
assert(check_dict_equality(od, nd))
|
||||
|
||||
nl = jsonio.load_json(fname, full_output=True)
|
||||
nl = jsonio.load_json(fname, full_output=False)
|
||||
|
||||
with pytest.raises(Exception):
|
||||
jsonio.dump_dict_to_json(nl, fname, description=desc)
|
||||
|
||||
od['k'] = 'DICTOBS2'
|
||||
with pytest.raises(Exception):
|
||||
jsonio.dump_dict_to_json(od, fname, description=desc)
|
||||
|
||||
od['k'] = ['DICTOBS2']
|
||||
with pytest.raises(Exception):
|
||||
jsonio.dump_dict_to_json(od, fname, description=desc)
|
||||
|
||||
os.remove(fname + '.json.gz')
|
||||
|
|
Loading…
Add table
Reference in a new issue