Merge branch 'develop' into documentation

This commit is contained in:
s-kuberski 2021-12-10 08:59:48 +00:00
commit 6da0c6f619
4 changed files with 57 additions and 29 deletions

View file

@ -203,20 +203,18 @@ def dump_to_json(ol, fname, description='', indent=1, gz=True):
fp.close() fp.close()
def load_json(fname, verbose=True, gz=True, full_output=False): def import_json_string(json_string, verbose=True, full_output=False):
"""Import a list of Obs or structures containing Obs from a .json.gz file. """Reconstruct a list of Obs or structures containing Obs from a json string.
The following structures are supported: Obs, list, numpy.ndarray The following structures are supported: Obs, list, numpy.ndarray
If the list contains only one element, it is unpacked from the list. If the list contains only one element, it is unpacked from the list.
Parameters Parameters
---------- ----------
fname : str json_string : str
Filename of the input file. json string containing the data.
verbose : bool verbose : bool
Print additional information that was written to the file. Print additional information that was written to the file.
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
full_output : bool full_output : bool
If True, a dict containing auxiliary information and the data is returned. If True, a dict containing auxiliary information and the data is returned.
If False, only the data is returned. If False, only the data is returned.
@ -281,35 +279,22 @@ def load_json(fname, verbose=True, gz=True, full_output=False):
ret[-1].tag = taglist[i] ret[-1].tag = taglist[i]
return np.reshape(ret, layout) return np.reshape(ret, layout)
if not fname.endswith('.json') and not fname.endswith('.gz'): prog = json_string.get('program', '')
fname += '.json' version = json_string.get('version', '')
if gz: who = json_string.get('who', '')
if not fname.endswith('.gz'): date = json_string.get('date', '')
fname += '.gz' host = json_string.get('host', '')
with gzip.open(fname, 'r') as fin:
d = json.loads(fin.read().decode('utf-8'))
else:
if fname.endswith('.gz'):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
with open(fname, 'r', encoding='utf-8') as fin:
d = json.loads(fin.read())
prog = d.get('program', '')
version = d.get('version', '')
who = d.get('who', '')
date = d.get('date', '')
host = d.get('host', '')
if prog and verbose: if prog and verbose:
print('Data has been written using %s.' % (prog)) print('Data has been written using %s.' % (prog))
if version and verbose: if version and verbose:
print('Format version %s' % (version)) print('Format version %s' % (version))
if np.any([who, date, host] and verbose): if np.any([who, date, host] and verbose):
print('Written by %s on %s on host %s' % (who, date, host)) print('Written by %s on %s on host %s' % (who, date, host))
description = d.get('description', '') description = json_string.get('description', '')
if description and verbose: if description and verbose:
print() print()
print('Description: ', description) print('Description: ', description)
obsdata = d['obsdata'] obsdata = json_string['obsdata']
ol = [] ol = []
for io in obsdata: for io in obsdata:
if io['type'] == 'Obs': if io['type'] == 'Obs':
@ -335,3 +320,37 @@ def load_json(fname, verbose=True, gz=True, full_output=False):
ol = ol[0] ol = ol[0]
return ol return ol
def load_json(fname, verbose=True, gz=True, full_output=False):
"""Import a list of Obs or structures containing Obs from a .json.gz file.
The following structures are supported: Obs, list, numpy.ndarray
If the list contains only one element, it is unpacked from the list.
Parameters
----------
fname : str
Filename of the input file.
verbose : bool
Print additional information that was written to the file.
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
full_output : bool
If True, a dict containing auxiliary information and the data is returned.
If False, only the data is returned.
"""
if not fname.endswith('.json') and not fname.endswith('.gz'):
fname += '.json'
if gz:
if not fname.endswith('.gz'):
fname += '.gz'
with gzip.open(fname, 'r') as fin:
d = json.loads(fin.read().decode('utf-8'))
else:
if fname.endswith('.gz'):
warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning)
with open(fname, 'r', encoding='utf-8') as fin:
d = json.loads(fin.read())
return import_json_string(d, verbose, full_output)

View file

@ -11,6 +11,7 @@ def test_covobs():
name = 'Covariance' name = 'Covariance'
co = pe.cov_Obs(val, cov, name) co = pe.cov_Obs(val, cov, name)
co.gamma_method() co.gamma_method()
co.details()
assert (co.dvalue == np.sqrt(cov)) assert (co.dvalue == np.sqrt(cov))
assert (co.value == val) assert (co.value == val)

View file

@ -41,6 +41,9 @@ def test_jsonio():
os.remove(fname + '.json.gz') os.remove(fname + '.json.gz')
for o, r in zip(ol, rl):
assert np.all(o == r)
for i in range(len(rl)): for i in range(len(rl)):
if isinstance(ol[i], pe.Obs): if isinstance(ol[i], pe.Obs):
o = ol[i] - rl[i] o = ol[i] - rl[i]
@ -57,6 +60,9 @@ def test_jsonio():
rl = jsonio.load_json(fname, gz=False, full_output=True) rl = jsonio.load_json(fname, gz=False, full_output=True)
assert(description == rl['description'])
os.remove(fname + '.json') os.remove(fname + '.json')
for o, r in zip(ol, rl['obsdata']):
assert np.all(o == r)
assert(description == rl['description'])

View file

@ -46,8 +46,9 @@ def test_Obs_exceptions():
my_obs.gamma_method(tau_exp=2.3) my_obs.gamma_method(tau_exp=2.3)
my_obs.gamma_method() my_obs.gamma_method()
my_obs.details() my_obs.details()
my_obs.plot_rep_dist()
my_obs += pe.Obs([np.random.rand(6)], ['name2|r1']) my_obs += pe.Obs([np.random.rand(6)], ['name2|r1'], idl=[[1, 3, 4, 5, 6, 7]])
my_obs += pe.Obs([np.random.rand(6)], ['name2|r2']) my_obs += pe.Obs([np.random.rand(6)], ['name2|r2'])
my_obs.gamma_method() my_obs.gamma_method()
my_obs.details() my_obs.details()
@ -475,6 +476,7 @@ def test_irregular_error_propagation():
pe.Obs([np.random.rand(6)], ['t'], idl=[[4, 18, 27, 29, 57, 80]]), pe.Obs([np.random.rand(6)], ['t'], idl=[[4, 18, 27, 29, 57, 80]]),
pe.Obs([np.random.rand(50)], ['t'], idl=[list(range(1, 26)) + list(range(50, 100, 2))])] pe.Obs([np.random.rand(50)], ['t'], idl=[list(range(1, 26)) + list(range(50, 100, 2))])]
for obs1 in obs_list: for obs1 in obs_list:
obs1.details()
for obs2 in obs_list: for obs2 in obs_list:
assert obs1 == (obs1 / obs2) * obs2 assert obs1 == (obs1 / obs2) * obs2
assert obs1 == (obs1 * obs2) / obs2 assert obs1 == (obs1 * obs2) / obs2