mirror of
https://github.com/fjosw/pyerrors.git
synced 2025-03-15 14:50:25 +01:00
Some changes on dobs
This commit is contained in:
parent
be1eeccd8e
commit
0f735c0d2d
2 changed files with 192 additions and 297 deletions
|
@ -13,9 +13,9 @@ from ..covobs import Covobs
|
|||
from .. import version as pyerrorsversion
|
||||
|
||||
|
||||
# standard pobs
|
||||
# https://stackoverflow.com/a/10076823
|
||||
# Based on https://stackoverflow.com/a/10076823
|
||||
def _etree_to_dict(t):
|
||||
""" Convert the content of an XML file to a python dict"""
|
||||
d = {t.tag: {} if t.attrib else None}
|
||||
children = list(t)
|
||||
if children:
|
||||
|
@ -38,164 +38,6 @@ def _etree_to_dict(t):
|
|||
return d
|
||||
|
||||
|
||||
def _get_data_from_root(d, root):
|
||||
tree = et.ElementTree(root)
|
||||
nl = []
|
||||
pl = []
|
||||
for node in root.findall('.//array'):
|
||||
nl.append(node)
|
||||
pl.append(tree.getpath(node))
|
||||
|
||||
for nd in range(len(nl)):
|
||||
s = pl[nd].split('/')
|
||||
ad = d
|
||||
for i in range(len(s) - 1):
|
||||
if len(s[i]) > 0:
|
||||
ad = ad[s[i]]
|
||||
nr = int(ad['nr'])
|
||||
ad = ad['array']
|
||||
if '[' in s[-1]:
|
||||
ai = int(s[-1].lstrip('array[').rstrip(']')) - 1
|
||||
ad = ad[ai]
|
||||
if nr == 1:
|
||||
dats = [t for t in nl[nd].itertext()][0]
|
||||
else:
|
||||
dats = [t for t in nl[nd].itertext()][-1]
|
||||
dats = [o.strip() for o in dats.split('\n')]
|
||||
|
||||
layout = ad['layout']
|
||||
sl = layout.split()
|
||||
datl = []
|
||||
idx = []
|
||||
for di in dats:
|
||||
if len(di) > 0:
|
||||
ds = di.split()
|
||||
if sl[1] == 'i':
|
||||
i = int(ds[0])
|
||||
idx.append(i)
|
||||
df = np.asarray(ds[1:], dtype=np.float64)
|
||||
else:
|
||||
df = np.asarray(ds, dtype=np.float64)
|
||||
nfloat = int(sl[-1].lstrip('f'))
|
||||
assert len(df) == nfloat
|
||||
datl.append(df)
|
||||
ad['idx'] = idx
|
||||
ad['#data'] = np.transpose(np.array(datl))
|
||||
return d
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/9808122
|
||||
def _find(key, value):
|
||||
for k, v in (value.items() if isinstance(value, dict) else enumerate(value) if isinstance(value, list) else []):
|
||||
if k == key:
|
||||
yield v
|
||||
elif isinstance(v, (dict, list)):
|
||||
for result in _find(key, v):
|
||||
yield result
|
||||
|
||||
|
||||
# generate obs from all 'array' entries inside a dict
|
||||
def _gen_obs_from_dict(d):
|
||||
pobs_list = [a for a in _find('array', d)]
|
||||
ol = []
|
||||
for po in pobs_list:
|
||||
if isinstance(po, list):
|
||||
names = []
|
||||
idl = [poi['idx'] for poi in po]
|
||||
names = [poi['id'] for poi in po]
|
||||
for i in range(len(po[0])):
|
||||
tag = po[0]['symbol'].split()[-8 + i]
|
||||
data = [poi['#data'][i] for poi in po]
|
||||
ol.append(Obs(data, names, idl=idl))
|
||||
ol[-1].tag = tag
|
||||
print(tag)
|
||||
return ol
|
||||
|
||||
|
||||
# generate obs from entries inside a 'pobs'-dict
|
||||
def _gen_obs_from_pobs_dict(d):
|
||||
po = d['array']
|
||||
if isinstance(po, dict):
|
||||
po = [po]
|
||||
enstag = d['enstag']
|
||||
nr = int(d['nr'])
|
||||
assert len(po) == nr, 'Lenght of array %d does not match nr=%d' % (len(po), nr)
|
||||
ol = []
|
||||
names = []
|
||||
idl = [poi['idx'] for poi in po]
|
||||
names = [poi['id'] for poi in po]
|
||||
names = [name.replace(enstag, enstag + '|') for name in names]
|
||||
if len(po[0]['symbol'].split()) >= len(po[0]):
|
||||
gettag = True
|
||||
else:
|
||||
gettag = False
|
||||
nobs = np.shape(po[0]['#data'])[0]
|
||||
print(nobs, 'observables')
|
||||
for i in range(nobs):
|
||||
if gettag:
|
||||
tag = po[0]['symbol'].split()[i + 1]
|
||||
else:
|
||||
tag = po[0]['symbol']
|
||||
data = [poi['#data'][i] for poi in po]
|
||||
ol.append(Obs(data, names, idl=idl))
|
||||
ol[-1].tag = tag
|
||||
return ol
|
||||
|
||||
|
||||
def _read_pobs_traditional(fname, full_output=False):
|
||||
"""Import a list of Obs from an xml.gz file in the Zeuthen pobs format,
|
||||
this does not include pobs that are written using the dobs package.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fname : str
|
||||
Filename of the input file.
|
||||
full_output : bool
|
||||
If True, a dict containing auxiliary information and the data is returned.
|
||||
If False, only the data is returned as list.
|
||||
"""
|
||||
# open and read gzipped xml file
|
||||
infile = gzip.open(fname)
|
||||
content = infile.read()
|
||||
|
||||
# parse xml file content
|
||||
root = et.fromstring(content)
|
||||
|
||||
# parse to dict
|
||||
d = _etree_to_dict(root)
|
||||
|
||||
# get the actual data from the xml file
|
||||
d = _get_data_from_root(d, root)
|
||||
|
||||
# pobs dict:
|
||||
pod = d[list(d.keys())[0]]['pobs']
|
||||
|
||||
# convert data to Obs
|
||||
ol = _gen_obs_from_pobs_dict(pod)
|
||||
|
||||
if full_output:
|
||||
retd = {}
|
||||
retd['obsdata'] = ol
|
||||
for k in pod.keys():
|
||||
if k == 'spec' and isinstance(pod[k], dict):
|
||||
retd[k] = ''
|
||||
if '#data' in pod[k]:
|
||||
if isinstance(pod[k]['#data'], list):
|
||||
for el in pod[k]['#data']:
|
||||
retd[k] += el
|
||||
else:
|
||||
retd[k] = pod[k]['#data']
|
||||
for li in pod[k]:
|
||||
if not li == '#data':
|
||||
retd[k] += '\n<%s>\n ' % (li) + pod[k][li] + '\n<%s%s>' % ('/', li)
|
||||
elif not k == 'array':
|
||||
retd[k] = pod[k]
|
||||
|
||||
return retd
|
||||
else:
|
||||
return ol
|
||||
|
||||
|
||||
def _dict_to_xmlstring(d):
|
||||
if isinstance(d, dict):
|
||||
iters = ''
|
||||
|
@ -243,7 +85,7 @@ def _dict_to_xmlstring_spaces(d, space=' '):
|
|||
return o
|
||||
|
||||
|
||||
def create_pobs_string(obsl, name, spec='', origin='', symbol=[]):
|
||||
def create_pobs_string(obsl, name, spec='', origin='', symbol=[], enstag=None):
|
||||
"""Export a list of Obs or structures containing Obs to an xml string
|
||||
according to the Zeuthen pobs format.
|
||||
|
||||
|
@ -262,6 +104,8 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[]):
|
|||
Specify where the data has its origin.
|
||||
symbol : list
|
||||
A list of symbols that describe the observables to be written. May be empty.
|
||||
enstag : str
|
||||
Enstag that is written to pobs. If None, the ensemble name is used.
|
||||
"""
|
||||
|
||||
od = {}
|
||||
|
@ -288,6 +132,11 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[]):
|
|||
pd['spec'] = spec
|
||||
pd['origin'] = origin
|
||||
pd['name'] = name
|
||||
if enstag:
|
||||
if not isinstance(enstag, str):
|
||||
raise Exception('enstag has to be a string!')
|
||||
pd['enstag'] = enstag
|
||||
else:
|
||||
pd['enstag'] = ename
|
||||
pd['nr'] = '%d' % (nr)
|
||||
pd['array'] = []
|
||||
|
@ -322,7 +171,7 @@ def create_pobs_string(obsl, name, spec='', origin='', symbol=[]):
|
|||
return rs
|
||||
|
||||
|
||||
def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], gz=True):
|
||||
def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], enstag=None, gz=True):
|
||||
"""Export a list of Obs or structures containing Obs to a .xml.gz file
|
||||
according to the Zeuthen pobs format.
|
||||
|
||||
|
@ -343,10 +192,12 @@ def write_pobs(obsl, fname, name, spec='', origin='', symbol=[], gz=True):
|
|||
Specify where the data has its origin.
|
||||
symbol : list
|
||||
A list of symbols that describe the observables to be written. May be empty.
|
||||
enstag : str
|
||||
Enstag that is written to pobs. If None, the ensemble name is used.
|
||||
gz : bool
|
||||
If True, the output is a gzipped json. If False, the output is a json file.
|
||||
"""
|
||||
pobsstring = create_pobs_string(obsl, name, spec, origin, symbol)
|
||||
pobsstring = create_pobs_string(obsl, name, spec, origin, symbol, enstag)
|
||||
|
||||
if not fname.endswith('.xml') and not fname.endswith('.gz'):
|
||||
fname += '.xml'
|
||||
|
@ -372,11 +223,18 @@ def _check(condition):
|
|||
raise Exception("XML file format not supported")
|
||||
|
||||
|
||||
class _NoTagInDataError(Exception):
|
||||
"""Raised when tag is not in data"""
|
||||
def __init__(self, tag):
|
||||
self.tag = tag
|
||||
super().__init__('Tag %s not in data!' % (self.tag))
|
||||
|
||||
|
||||
def _find_tag(dat, tag):
|
||||
for i in range(len(dat)):
|
||||
if dat[i].tag == tag:
|
||||
return i
|
||||
raise Exception('Tag %s not in data!' % (tag))
|
||||
raise _NoTagInDataError(tag)
|
||||
|
||||
|
||||
def _import_array(arr):
|
||||
|
@ -384,7 +242,7 @@ def _import_array(arr):
|
|||
index = _find_tag(arr, 'layout')
|
||||
try:
|
||||
sindex = _find_tag(arr, 'symbol')
|
||||
except:
|
||||
except _NoTagInDataError:
|
||||
sindex = 0
|
||||
if sindex > index:
|
||||
tmp = _import_data(arr[sindex].tail)
|
||||
|
@ -430,7 +288,7 @@ def _import_cdata(cd):
|
|||
return cd[0].text.strip(), cov, grad
|
||||
|
||||
|
||||
def read_pobs(fname, full_output=False, gz=True):
|
||||
def read_pobs(fname, full_output=False, gz=True, separator_insertion=None):
|
||||
"""Import a list of Obs from an xml.gz file in the Zeuthen pobs format.
|
||||
|
||||
Tags are not written or recovered automatically.
|
||||
|
@ -442,6 +300,10 @@ def read_pobs(fname, full_output=False, gz=True):
|
|||
full_output : bool
|
||||
If True, a dict containing auxiliary information and the data is returned.
|
||||
If False, only the data is returned as list.
|
||||
separatior_insertion: str or int
|
||||
str: replace all occurences of "separator_insertion" within the replica names
|
||||
by "|%s" % (separator_insertion) when constructing the names of the replica.
|
||||
int: Insert the separator "|" at position separator_insertion
|
||||
"""
|
||||
|
||||
if not fname.endswith('.xml') and not fname.endswith('.gz'):
|
||||
|
@ -463,18 +325,6 @@ def read_pobs(fname, full_output=False, gz=True):
|
|||
_check(root[2].tag == 'pobs')
|
||||
pobs = root[2]
|
||||
|
||||
if root.tag == 'observables' and False:
|
||||
_check(root[0].tag in ['schema'])
|
||||
version = root[0][1].text.strip()
|
||||
|
||||
_check(root[1].tag == 'origin')
|
||||
file_origin = _etree_to_dict(root[1])['origin']
|
||||
|
||||
res = _read_pobs_traditional(fname)
|
||||
|
||||
else:
|
||||
# _check(root.tag in ['OBSERVABLES'])
|
||||
# _check(root[0].tag in ['SCHEMA'])
|
||||
version = root[0][1].text.strip()
|
||||
|
||||
_check(root[1].tag == 'origin')
|
||||
|
@ -486,6 +336,14 @@ def read_pobs(fname, full_output=False, gz=True):
|
|||
for i in range(5, len(pobs)):
|
||||
delta, name, idx = _import_rdata(pobs[i])
|
||||
deltas.append(delta)
|
||||
if separator_insertion is None:
|
||||
pass
|
||||
elif isinstance(separator_insertion, int):
|
||||
name = name[:separator_insertion] + '|' + name[separator_insertion:]
|
||||
elif isinstance(separator_insertion, str):
|
||||
name = name.replace(separator_insertion, "|%s" % (separator_insertion))
|
||||
else:
|
||||
raise Exception("separator_insertion has to be string or int, is ", type(separator_insertion))
|
||||
names.append(name)
|
||||
idl.append(idx)
|
||||
res = [Obs([d[i] for d in deltas], names, idl=idl) for i in range(len(deltas[0]))]
|
||||
|
@ -520,9 +378,13 @@ def read_pobs(fname, full_output=False, gz=True):
|
|||
return res
|
||||
|
||||
|
||||
# Reading (and writing) dobs is not yet working properly:
|
||||
# we have to loop over root[2:] because each entry is a dobs
|
||||
# But maybe this is just a problem with Ben's implementation
|
||||
|
||||
# this is based on Mattia Bruno's implementation at https://github.com/mbruno46/pyobs/blob/master/pyobs/IO/xml.py
|
||||
def import_dobs_string(content, noempty=False, full_output=False):
|
||||
"""Import a list of Obs from an xml.gz file in the Zeuthen dobs format.
|
||||
def import_dobs_string(content, noempty=False, full_output=False, separator_insertion=None):
|
||||
"""Import a list of Obs from a string in the Zeuthen dobs format.
|
||||
|
||||
Tags are not written or recovered automatically.
|
||||
|
||||
|
@ -536,6 +398,10 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
full_output : bool
|
||||
If True, a dict containing auxiliary information and the data is returned.
|
||||
If False, only the data is returned as list.
|
||||
separatior_insertion: str or int
|
||||
str: replace all occurences of "separator_insertion" within the replica names
|
||||
by "|%s" % (separator_insertion) when constructing the names of the replica.
|
||||
int: Insert the separator "|" at position separator_insertion
|
||||
"""
|
||||
|
||||
root = et.fromstring(content)
|
||||
|
@ -574,6 +440,7 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
gradd = {}
|
||||
names = []
|
||||
e_names = []
|
||||
enstags = {}
|
||||
for k in range(6, len(list(dobs))):
|
||||
if dobs[k].tag == "edata":
|
||||
_check(dobs[k][0].tag == "enstag")
|
||||
|
@ -583,8 +450,20 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
R = int(dobs[k][1].text.strip())
|
||||
for i in range(2, 2 + R):
|
||||
deltas, rname, idx = _import_rdata(dobs[k][i])
|
||||
if rname != ename:
|
||||
if separator_insertion is None:
|
||||
if rname.startswith(ename):
|
||||
rname = rname[:len(ename)] + '|' + rname[len(ename):]
|
||||
elif isinstance(separator_insertion, int):
|
||||
rname = rname[:separator_insertion] + '|' + rname[separator_insertion:]
|
||||
elif isinstance(separator_insertion, str):
|
||||
rname = rname.replace(separator_insertion, "|%s" % (separator_insertion))
|
||||
else:
|
||||
raise Exception("separator_insertion has to be string or int, is ", type(separator_insertion))
|
||||
if '|' in rname:
|
||||
new_ename = rname[:rname.index('|')]
|
||||
else:
|
||||
new_ename = ename
|
||||
enstags[new_ename] = ename
|
||||
idld[rname] = idx
|
||||
deltad[rname] = deltas
|
||||
names.append(rname)
|
||||
|
@ -602,6 +481,7 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
for name in names:
|
||||
for i in range(len(deltad[name])):
|
||||
deltad[name][i] = np.array(deltad[name][i]) + mean[i]
|
||||
|
||||
res = []
|
||||
for i in range(len(mean)):
|
||||
deltas = []
|
||||
|
@ -615,7 +495,7 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
obs_names.append(name)
|
||||
idl.append(idld[name])
|
||||
res.append(Obs(deltas, obs_names, idl=idl))
|
||||
|
||||
print(mean, 'vs', res)
|
||||
_check(len(e_names) == ne)
|
||||
|
||||
cnames = list(covd.keys())
|
||||
|
@ -654,13 +534,14 @@ def import_dobs_string(content, noempty=False, full_output=False):
|
|||
retd['date'] = file_origin['date']
|
||||
retd['host'] = file_origin['host']
|
||||
retd['description'] = descriptiond
|
||||
retd['enstags'] = enstags
|
||||
retd['obsdata'] = res
|
||||
return retd
|
||||
else:
|
||||
return res
|
||||
|
||||
|
||||
def read_dobs(fname, noempty=False, full_output=False, gz=True):
|
||||
def read_dobs(fname, noempty=False, full_output=False, gz=True, separator_insertion=None):
|
||||
"""Import a list of Obs from an xml.gz file in the Zeuthen dobs format.
|
||||
|
||||
Tags are not written or recovered automatically.
|
||||
|
@ -677,6 +558,10 @@ def read_dobs(fname, noempty=False, full_output=False, gz=True):
|
|||
If False, only the data is returned as list.
|
||||
gz : bool
|
||||
If True, assumes that data is gzipped. If False, assumes XML file.
|
||||
separatior_insertion: str or int
|
||||
str: replace all occurences of "separator_insertion" within the replica names
|
||||
by "|%s" % (separator_insertion) when constructing the names of the replica.
|
||||
int: Insert the separator "|" at position separator_insertion
|
||||
"""
|
||||
|
||||
if not fname.endswith('.xml') and not fname.endswith('.gz'):
|
||||
|
@ -696,7 +581,7 @@ def read_dobs(fname, noempty=False, full_output=False, gz=True):
|
|||
infile = gzip.open(fname)
|
||||
content = infile.read()
|
||||
|
||||
return import_dobs_string(content, noempty, full_output)
|
||||
return import_dobs_string(content, noempty, full_output, separator_insertion=separator_insertion)
|
||||
|
||||
|
||||
def _dobsdict_to_xmlstring(d):
|
||||
|
@ -758,7 +643,7 @@ def _dobsdict_to_xmlstring_spaces(d, space=' '):
|
|||
return o
|
||||
|
||||
|
||||
def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=None):
|
||||
def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags={}):
|
||||
"""Generate the string for the export of a list of Obs or structures containing Obs
|
||||
to a .xml.gz file according to the Zeuthen dobs format.
|
||||
|
||||
|
@ -780,6 +665,9 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
|
|||
A list of symbols that describe the observables to be written. May be empty.
|
||||
who : str
|
||||
Provide the name of the person that exports the data.
|
||||
enstags : dict
|
||||
Provide alternative enstag for ensembles in the form enstags = {ename: enstag}
|
||||
Otherwise, the ensemble name is used.
|
||||
"""
|
||||
od = {}
|
||||
r_names = []
|
||||
|
@ -787,6 +675,9 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
|
|||
r_names += [name for name in o.names if name.split('|')[0] in o.mc_names]
|
||||
r_names = sorted(set(r_names))
|
||||
mc_names = sorted(set([n.split('|')[0] for n in r_names]))
|
||||
for tmpname in mc_names:
|
||||
if tmpname not in enstags:
|
||||
enstags[tmpname] = tmpname
|
||||
ne = len(set(mc_names))
|
||||
cov_names = []
|
||||
for o in obsl:
|
||||
|
@ -827,7 +718,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
|
|||
pd['edata'] = []
|
||||
for name in mc_names:
|
||||
ed = {}
|
||||
ed['enstag'] = name
|
||||
ed['enstag'] = enstags[name]
|
||||
onames = sorted([n for n in r_names if (n.startswith(name + '|') or n == name)])
|
||||
nr = len(onames)
|
||||
ed['nr'] = nr
|
||||
|
@ -852,7 +743,6 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
|
|||
data += '0 '
|
||||
continue
|
||||
if o.idl[repname][counters[oi]] == ci:
|
||||
# c = o.idl[repname][counters[oi]]
|
||||
num = o.deltas[repname][counters[oi]]
|
||||
if num == 0:
|
||||
data += '0 '
|
||||
|
@ -926,7 +816,7 @@ def create_dobs_string(obsl, name, spec='dobs v1.0', origin='', symbol=[], who=N
|
|||
return rs
|
||||
|
||||
|
||||
def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=None, gz=True):
|
||||
def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=None, enstags={}, gz=True):
|
||||
"""Export a list of Obs or structures containing Obs to a .xml.gz file
|
||||
according to the Zeuthen dobs format.
|
||||
|
||||
|
@ -950,11 +840,14 @@ def write_dobs(obsl, fname, name, spec='dobs v1.0', origin='', symbol=[], who=No
|
|||
A list of symbols that describe the observables to be written. May be empty.
|
||||
who : str
|
||||
Provide the name of the person that exports the data.
|
||||
enstags : dict
|
||||
Provide alternative enstag for ensembles in the form enstags = {ename: enstag}
|
||||
Otherwise, the ensemble name is used.
|
||||
gz : bool
|
||||
If True, the output is a gzipped json. If False, the output is a json file.
|
||||
If True, the output is a gzipped XML. If False, the output is a XML file.
|
||||
"""
|
||||
|
||||
dobsstring = create_dobs_string(obsl, name, spec, origin, symbol, who)
|
||||
dobsstring = create_dobs_string(obsl, name, spec, origin, symbol, who, enstags=enstags)
|
||||
|
||||
if not fname.endswith('.xml') and not fname.endswith('.gz'):
|
||||
fname += '.xml'
|
||||
|
|
|
@ -6,6 +6,7 @@ import pyerrors.input.json as jsonio
|
|||
import pyerrors.input.dobs as dobsio
|
||||
import pytest
|
||||
|
||||
|
||||
def test_jsonio():
|
||||
o = pe.pseudo_Obs(1.0, .2, 'one')
|
||||
o2 = pe.pseudo_Obs(0.5, .1, 'two|r1')
|
||||
|
@ -139,99 +140,6 @@ def test_json_corr_2d_io():
|
|||
assert my_corr.prange == recover.prange
|
||||
|
||||
|
||||
def test_dobsio():
|
||||
o = pe.pseudo_Obs(1.0, .2, 'one')
|
||||
o1 = pe.pseudo_Obs(1.5, .2, 'one')
|
||||
|
||||
ol = [o, o1]
|
||||
fname = 'test_rw'
|
||||
dobsio.write_pobs(ol, fname, 'Testobs')
|
||||
rl = dobsio.read_pobs(fname)
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
od = {
|
||||
'obsdata': ol,
|
||||
'name': 'testn',
|
||||
'spec': 'tests',
|
||||
'origin': 'testo',
|
||||
'symbol': ['A', 'B']
|
||||
}
|
||||
dobsio.write_pobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
|
||||
rd = dobsio.read_pobs(fname, full_output=True)
|
||||
|
||||
for o, r in zip(od['obsdata'], rd['obsdata']):
|
||||
assert np.all(o == r)
|
||||
assert(od['spec'] == rd['description']['spec'])
|
||||
assert(od['origin'] == rd['description']['origin'])
|
||||
assert(od['name'] == rd['description']['name'])
|
||||
assert(rd['description']['enstag'] == ol[0].e_names[0])
|
||||
|
||||
dobsio.write_dobs(ol, fname, 'Testobs')
|
||||
rl = dobsio.read_dobs(fname)
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
dobsio.write_dobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
|
||||
rd = dobsio.read_dobs(fname, full_output=True)
|
||||
|
||||
for o, r in zip(od['obsdata'], rd['obsdata']):
|
||||
assert np.all(o == r)
|
||||
assert(od['spec'] == rd['description']['spec'])
|
||||
assert(od['origin'] == rd['description']['origin'])
|
||||
assert(od['name'] == rd['description']['name'])
|
||||
|
||||
o2 = pe.pseudo_Obs(0.5, .1, 'two|r1')
|
||||
o3 = pe.pseudo_Obs(0.5, .1, 'two|r2')
|
||||
o4 = pe.merge_obs([o2, o3])
|
||||
otag = 'This has been merged!'
|
||||
o4.tag = otag
|
||||
do = o - .2 * o4
|
||||
co1 = pe.cov_Obs(1., .123, 'cov1')
|
||||
co3 = pe.cov_Obs(4., .1 ** 2, 'cov3')
|
||||
do *= co1 / co3
|
||||
do.tag = {'A': 2}
|
||||
|
||||
o5 = pe.pseudo_Obs(0.8, .1, 'two|r2')
|
||||
co2 = pe.cov_Obs([1, 2], [[.12, .004], [.004, .02]], 'cov2')
|
||||
o5 /= co2[0]
|
||||
#o3 /= co2[1]
|
||||
o5.tag = 2 * otag
|
||||
|
||||
tt1 = pe.Obs([np.random.rand(100)], ['t|r1'], idl=[range(2, 202, 2)])
|
||||
tt2 = pe.Obs([np.random.rand(100)], ['t|r2'], idl=[range(2, 202, 2)])
|
||||
tt3 = pe.Obs([np.random.rand(102)], ['qe'])
|
||||
|
||||
tt = tt1 + tt2 + tt3
|
||||
|
||||
tt.tag = 'Test Obs: Ä'
|
||||
|
||||
ol = [o2, o3, o4, do, o5, tt]
|
||||
print(ol)
|
||||
fname = 'test_rw'
|
||||
|
||||
dobsio.write_dobs(ol, fname, 'TEST')
|
||||
|
||||
rl = dobsio.read_dobs(fname, noempty=True)
|
||||
[o.gamma_method() for o in rl]
|
||||
|
||||
#os.remove(fname + '.xml.gz')
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
for i in range(len(ol)):
|
||||
if isinstance(ol[i], pe.Obs):
|
||||
o = ol[i] - rl[i]
|
||||
assert(o.is_zero())
|
||||
or1 = np.ravel(ol[i])
|
||||
or2 = np.ravel(rl[i])
|
||||
for j in range(len(or1)):
|
||||
o = or1[j] - or2[j]
|
||||
assert(o.is_zero())
|
||||
|
||||
def test_json_dict_io():
|
||||
def check_dict_equality(d1, d2):
|
||||
def dict_check_obs(d1, d2):
|
||||
|
@ -344,3 +252,97 @@ def test_renorm_deriv_of_corr(tmp_path):
|
|||
pe.input.json.dump_to_json(c, (tmp_path / 'test').as_posix())
|
||||
recover = pe.input.json.load_json((tmp_path / 'test').as_posix())
|
||||
assert np.all([o == 0 for o in (c - recover)[1:-1]])
|
||||
|
||||
|
||||
def test_dobsio():
|
||||
o = pe.pseudo_Obs(1.0, .2, 'one')
|
||||
o1 = pe.pseudo_Obs(1.5, .2, 'one')
|
||||
|
||||
ol = [o, o1]
|
||||
fname = 'test_rw'
|
||||
dobsio.write_pobs(ol, fname, 'Testobs')
|
||||
rl = dobsio.read_pobs(fname)
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
od = {
|
||||
'obsdata': ol,
|
||||
'name': 'testn',
|
||||
'spec': 'tests',
|
||||
'origin': 'testo',
|
||||
'symbol': ['A', 'B']
|
||||
}
|
||||
dobsio.write_pobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
|
||||
rd = dobsio.read_pobs(fname, full_output=True)
|
||||
|
||||
for o, r in zip(od['obsdata'], rd['obsdata']):
|
||||
assert np.all(o == r)
|
||||
assert(od['spec'] == rd['description']['spec'])
|
||||
assert(od['origin'] == rd['description']['origin'])
|
||||
assert(od['name'] == rd['description']['name'])
|
||||
assert(rd['description']['enstag'] == ol[0].e_names[0])
|
||||
|
||||
dobsio.write_dobs(ol, fname, 'Testobs')
|
||||
rl = dobsio.read_dobs(fname)
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
dobsio.write_dobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
|
||||
rd = dobsio.read_dobs(fname, full_output=True)
|
||||
|
||||
for o, r in zip(od['obsdata'], rd['obsdata']):
|
||||
assert np.all(o == r)
|
||||
assert(od['spec'] == rd['description']['spec'])
|
||||
assert(od['origin'] == rd['description']['origin'])
|
||||
assert(od['name'] == rd['description']['name'])
|
||||
|
||||
o2 = pe.pseudo_Obs(0.5, .1, 'two|r1')
|
||||
o3 = pe.pseudo_Obs(0.5, .1, 'two|r2')
|
||||
o4 = pe.merge_obs([o2, o3])
|
||||
otag = 'This has been merged!'
|
||||
o4.tag = otag
|
||||
do = o - .2 * o4
|
||||
co1 = pe.cov_Obs(1., .123, 'cov1')
|
||||
co3 = pe.cov_Obs(4., .1 ** 2, 'cov3')
|
||||
do *= co1 / co3
|
||||
do.tag = {'A': 2}
|
||||
|
||||
o5 = pe.pseudo_Obs(0.8, .1, 'two|r2')
|
||||
co2 = pe.cov_Obs([1, 2], [[.12, .004], [.004, .02]], 'cov2')
|
||||
o5 /= co2[0]
|
||||
#o3 /= co2[1]
|
||||
o5.tag = 2 * otag
|
||||
|
||||
tt1 = pe.Obs([np.random.rand(100)], ['t|r1'], idl=[range(2, 202, 2)])
|
||||
tt2 = pe.Obs([np.random.rand(100)], ['t|r2'], idl=[range(2, 202, 2)])
|
||||
tt3 = pe.Obs([np.random.rand(102)], ['qe'])
|
||||
|
||||
tt = tt1 + tt2 + tt3
|
||||
|
||||
tt.tag = 'Test Obs: Ä'
|
||||
|
||||
ol = [o2, o3, o4, do, o5, tt]
|
||||
print(ol)
|
||||
fname = 'test_rw'
|
||||
|
||||
dobsio.write_dobs(ol, fname, 'TEST')
|
||||
|
||||
rl = dobsio.read_dobs(fname, noempty=True)
|
||||
[o.gamma_method() for o in rl]
|
||||
|
||||
#os.remove(fname + '.xml.gz')
|
||||
|
||||
for o, r in zip(ol, rl):
|
||||
assert np.all(o == r)
|
||||
|
||||
for i in range(len(ol)):
|
||||
if isinstance(ol[i], pe.Obs):
|
||||
o = ol[i] - rl[i]
|
||||
assert(o.is_zero())
|
||||
or1 = np.ravel(ol[i])
|
||||
or2 = np.ravel(rl[i])
|
||||
for j in range(len(or1)):
|
||||
o = or1[j] - or2[j]
|
||||
assert(o.is_zero())
|
||||
|
|
Loading…
Add table
Reference in a new issue