mirror of
				https://github.com/fjosw/pyerrors.git
				synced 2025-11-04 01:25:46 +01:00 
			
		
		
		
	* [Fix] Removed the possibility to create an Obs from data on several replica * [Fix] extended tests and corrected a small bug in the previous commit --------- Co-authored-by: Simon Kuberski <simon.kuberski@cern.ch>
		
			
				
	
	
		
			429 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			429 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import gzip
 | 
						|
import rapidjson
 | 
						|
import numpy as np
 | 
						|
import pyerrors as pe
 | 
						|
import pyerrors.input.json as jsonio
 | 
						|
import pyerrors.input.dobs as dobsio
 | 
						|
import pytest
 | 
						|
 | 
						|
 | 
						|
def test_jsonio():
 | 
						|
    o = pe.pseudo_Obs(1.0, .2, 'one')
 | 
						|
    o2 = pe.pseudo_Obs(0.5, .1, 'two|r1')
 | 
						|
    o3 = pe.pseudo_Obs(0.5, .1, 'two|r2')
 | 
						|
    o4 = pe.merge_obs([o2, o3, pe.pseudo_Obs(0.5, .1, 'two|r3', samples=3221)])
 | 
						|
    otag = 'This has been merged!'
 | 
						|
    o4.tag = otag
 | 
						|
    do = o - .2 * o4
 | 
						|
    co1 = pe.cov_Obs(1., .123, 'cov1')
 | 
						|
    co3 = pe.cov_Obs(4., .1 ** 2, 'cov3')
 | 
						|
    do *= co1 / co3
 | 
						|
    do.tag = {'A': 2}
 | 
						|
 | 
						|
    o5 = pe.pseudo_Obs(0.8, .1, 'two|r2')
 | 
						|
    co2 = pe.cov_Obs([1, 2], [[.12, .004], [.004, .02]], 'cov2')
 | 
						|
    o5 /= co2[0]
 | 
						|
    o3 /= co2[1]
 | 
						|
    o5.tag = 2 * otag
 | 
						|
    testl = [o3, o5]
 | 
						|
 | 
						|
    arr = np.array([o3, o5])
 | 
						|
    mat = np.array([[pe.pseudo_Obs(1.0, .1, 'mat'), pe.pseudo_Obs(0.3, .1, 'mat')], [pe.pseudo_Obs(0.2, .1, 'mat'), pe.pseudo_Obs(2.0, .4, 'mat')]])
 | 
						|
    mat[0][1].tag = ['This', 'is', 2, None]
 | 
						|
    mat[1][0].tag = '{testt}'
 | 
						|
    mat[1][1].tag = '[tag]'
 | 
						|
 | 
						|
    tt1 = pe.Obs([np.random.rand(100), np.random.rand(100)], ['t|r1', 't|r2'], idl=[range(2, 202, 2), range(22, 222, 2)])
 | 
						|
    tt3 = pe.Obs([np.random.rand(102)], ['qe'])
 | 
						|
 | 
						|
    tt = tt1 + tt3
 | 
						|
 | 
						|
    tt.tag = 'Test Obs: Ä'
 | 
						|
 | 
						|
    ol = [o4, do, testl, mat, arr, np.array([o]), np.array([tt, tt]), [tt, tt], co1, co2, np.array(co2), co1 / co2[0]]
 | 
						|
    fname = 'test_rw'
 | 
						|
 | 
						|
    jsonio.dump_to_json(ol, fname, indent=1, description='[I am a tricky description]')
 | 
						|
 | 
						|
    rl = jsonio.load_json(fname)
 | 
						|
 | 
						|
    os.remove(fname + '.json.gz')
 | 
						|
 | 
						|
    for o, r in zip(ol, rl):
 | 
						|
        assert np.all(o == r)
 | 
						|
 | 
						|
    for i in range(len(ol)):
 | 
						|
        if isinstance(ol[i], pe.Obs):
 | 
						|
            o = ol[i] - rl[i]
 | 
						|
            assert(o.is_zero())
 | 
						|
            assert(ol[i].tag == rl[i].tag)
 | 
						|
        or1 = np.ravel(ol[i])
 | 
						|
        or2 = np.ravel(rl[i])
 | 
						|
        for j in range(len(or1)):
 | 
						|
            o = or1[j] - or2[j]
 | 
						|
            assert(o.is_zero())
 | 
						|
        if isinstance(ol[i], pe.Obs):
 | 
						|
            for name in ol[i].r_values:
 | 
						|
                assert(np.isclose(ol[i].r_values[name], rl[i].r_values[name]))
 | 
						|
        elif isinstance(ol[i], list):
 | 
						|
            for j in range(len(ol[i])):
 | 
						|
                for name in ol[i][j].r_values:
 | 
						|
                    assert(np.isclose(ol[i][j].r_values[name], rl[i][j].r_values[name]))
 | 
						|
 | 
						|
    description = {'I': {'Am': {'a': 'nested dictionary!'}}}
 | 
						|
    jsonio.dump_to_json(ol, fname, indent=0, gz=False, description=description)
 | 
						|
 | 
						|
    rl = jsonio.load_json(fname, gz=False, full_output=True)
 | 
						|
 | 
						|
    os.remove(fname + '.json')
 | 
						|
 | 
						|
    for o, r in zip(ol, rl['obsdata']):
 | 
						|
        assert np.all(o == r)
 | 
						|
 | 
						|
    assert(description == rl['description'])
 | 
						|
 | 
						|
 | 
						|
def test_json_string_reconstruction():
 | 
						|
    my_obs = pe.Obs([np.random.rand(100)], ['name'])
 | 
						|
 | 
						|
    json_string = pe.input.json.create_json_string(my_obs)
 | 
						|
    reconstructed_obs1 = pe.input.json.import_json_string(json_string)
 | 
						|
    assert my_obs == reconstructed_obs1
 | 
						|
 | 
						|
    compressed_string = gzip.compress(json_string.encode('utf-8'))
 | 
						|
 | 
						|
    reconstructed_string = gzip.decompress(compressed_string).decode('utf-8')
 | 
						|
    reconstructed_obs2 = pe.input.json.import_json_string(reconstructed_string)
 | 
						|
 | 
						|
    assert reconstructed_string == json_string
 | 
						|
    assert my_obs == reconstructed_obs2
 | 
						|
 | 
						|
 | 
						|
def test_json_corr_io():
 | 
						|
    my_list = [pe.Obs([np.random.normal(1.0, 0.1, 100), np.random.normal(1.0, 0.1, 321)], ['ens1|r1', 'ens1|r2'], idl=[range(1, 201, 2), range(321)]) for o in range(8)]
 | 
						|
    rw_list = pe.reweight(pe.Obs([np.random.normal(1.0, 0.1, 100), np.random.normal(1.0, 0.1, 321)], ['ens1|r1', 'ens1|r2'], idl=[range(1, 201, 2), range(321)]), my_list)
 | 
						|
 | 
						|
    for obs_list in [my_list, rw_list]:
 | 
						|
        for tag in [None, "test"]:
 | 
						|
            obs_list[3].tag = tag
 | 
						|
            for pad in [0, 2]:
 | 
						|
                for corr_tag in [None, 'my_Corr_tag']:
 | 
						|
                    for prange in [None, [3, 6]]:
 | 
						|
                        for gap in [False, True]:
 | 
						|
                            for mult in [1., pe.cov_Obs([12.22, 1.21], [.212**2, .11**2], 'renorm')[0]]:
 | 
						|
                                my_corr = mult * pe.Corr(obs_list, padding=[pad, pad], prange=prange)
 | 
						|
                                my_corr.tag = corr_tag
 | 
						|
                                if gap:
 | 
						|
                                    my_corr.content[4] = None
 | 
						|
                                pe.input.json.dump_to_json(my_corr, 'corr')
 | 
						|
                                recover = pe.input.json.load_json('corr')
 | 
						|
                                os.remove('corr.json.gz')
 | 
						|
                                assert np.all([o.is_zero() for o in [x for x in (my_corr - recover) if x is not None]])
 | 
						|
                                for index, entry in enumerate(my_corr):
 | 
						|
                                    if entry is None:
 | 
						|
                                        assert recover[index] is None
 | 
						|
                                assert my_corr.tag == recover.tag
 | 
						|
                                assert my_corr.prange == recover.prange
 | 
						|
                                assert my_corr.reweighted == recover.reweighted
 | 
						|
 | 
						|
 | 
						|
def test_json_corr_2d_io():
 | 
						|
    obs_list = [np.array([
 | 
						|
        [
 | 
						|
         pe.merge_obs([pe.pseudo_Obs(1.0 + i, 0.1 * i, 'test|r2'), pe.pseudo_Obs(1.0 + i, 0.1 * i, 'test|r1', samples=321)]), 
 | 
						|
         pe.merge_obs([pe.pseudo_Obs(0.0, 0.1 * i, 'test|r2'), pe.pseudo_Obs(0.0, 0.1 * i, 'test|r1', samples=321)]),
 | 
						|
        ], 
 | 
						|
        [
 | 
						|
         pe.merge_obs([pe.pseudo_Obs(0.0, 0.1 * i, 'test|r2'), pe.pseudo_Obs(0.0, 0.1 * i, 'test|r1', samples=321),]),
 | 
						|
         pe.merge_obs([pe.pseudo_Obs(1.0 + i, 0.1 * i, 'test|r2'), pe.pseudo_Obs(1.0 + i, 0.1 * i, 'test|r1', samples=321)]),
 | 
						|
        ],
 | 
						|
        ]) for i in range(4)]
 | 
						|
 | 
						|
    for tag in [None, "test"]:
 | 
						|
        obs_list[3][0, 1].tag = tag
 | 
						|
        for padding in [0, 1]:
 | 
						|
            for prange in [None, [3, 6]]:
 | 
						|
                for mult in [1., pe.cov_Obs([12.22, 1.21], [.212**2, .11**2], 'renorm')[0]]:
 | 
						|
                    my_corr = mult * pe.Corr(obs_list, padding=[padding, padding], prange=prange)
 | 
						|
                    my_corr.tag = tag
 | 
						|
                    pe.input.json.dump_to_json(my_corr, 'corr')
 | 
						|
                    recover = pe.input.json.load_json('corr')
 | 
						|
                    os.remove('corr.json.gz')
 | 
						|
                    assert np.all([np.all([o.is_zero() for o in q]) for q in [x.ravel() for x in (my_corr - recover) if x is not None]])
 | 
						|
                    for index, entry in enumerate(my_corr):
 | 
						|
                        if entry is None:
 | 
						|
                            assert recover[index] is None
 | 
						|
                    assert my_corr.tag == recover.tag
 | 
						|
                    assert my_corr.prange == recover.prange
 | 
						|
 | 
						|
 | 
						|
def test_json_dict_io():
 | 
						|
    def check_dict_equality(d1, d2):
 | 
						|
        def dict_check_obs(d1, d2):
 | 
						|
            for k, v in d1.items():
 | 
						|
                if isinstance(v, dict):
 | 
						|
                    v = dict_check_obs(v, d2[k])
 | 
						|
                elif isinstance(v, list) and all([isinstance(o, pe.Obs) for o in v]):
 | 
						|
                    for i in range(len(v)):
 | 
						|
                        assert((v[i] - d2[k][i]).is_zero())
 | 
						|
                elif isinstance(v, list):
 | 
						|
                    v = list_check_obs(v, d2[k])
 | 
						|
                elif isinstance(v, pe.Obs):
 | 
						|
                    assert((v - d2[k]).is_zero())
 | 
						|
                elif isinstance(v, pe.Corr):
 | 
						|
                    for i in range(v.T):
 | 
						|
                        assert((v[i] - d2[k][i]).is_zero())
 | 
						|
                elif isinstance(v, np.ndarray):
 | 
						|
                    a1 = np.ravel(v)
 | 
						|
                    a2 = np.ravel(d2[k])
 | 
						|
                    for i in range(len(a1)):
 | 
						|
                        assert((a1[i] - a2[i]).is_zero())
 | 
						|
 | 
						|
        def list_check_obs(l1, l2):
 | 
						|
            for ei in range(len(l1)):
 | 
						|
                e = l1[ei]
 | 
						|
                if isinstance(e, list):
 | 
						|
                    e = list_check_obs(e, l2[ei])
 | 
						|
                elif isinstance(e, list) and all([isinstance(o, pe.Obs) for o in e]):
 | 
						|
                    for i in range(len(e)):
 | 
						|
                        assert((e[i] - l2[ei][i]).is_zero())
 | 
						|
                elif isinstance(e, dict):
 | 
						|
                    e = dict_check_obs(e, l2[ei])
 | 
						|
                elif isinstance(e, pe.Obs):
 | 
						|
                    assert((e - l2[ei]).is_zero())
 | 
						|
                elif isinstance(e, pe.Corr):
 | 
						|
                    for i in range(e.T):
 | 
						|
                        assert((e[i] - l2[ei][i]).is_zero())
 | 
						|
                elif isinstance(e, np.ndarray):
 | 
						|
                    a1 = np.ravel(e)
 | 
						|
                    a2 = np.ravel(l2[ei])
 | 
						|
                    for i in range(len(a1)):
 | 
						|
                        assert((a1[i] - a2[i]).is_zero())
 | 
						|
        dict_check_obs(d1, d2)
 | 
						|
        return True
 | 
						|
 | 
						|
    od = {
 | 
						|
        'l':
 | 
						|
        {
 | 
						|
            'a': pe.pseudo_Obs(1, .2, 'testa', samples=10),
 | 
						|
            'b': [pe.pseudo_Obs(1.1, .1, 'test', samples=10), pe.pseudo_Obs(1.2, .1, 'test', samples=10), pe.pseudo_Obs(1.3, .1, 'test', samples=10)],
 | 
						|
            'c': {
 | 
						|
                'd': 1,
 | 
						|
                'e': pe.pseudo_Obs(.2, .01, 'teste', samples=10),
 | 
						|
                'f': pe.Corr([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]),
 | 
						|
                'g': np.reshape(np.asarray([pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10), pe.pseudo_Obs(.1, .01, 'a', samples=10)]), (2, 2)),
 | 
						|
            }
 | 
						|
        },
 | 
						|
        's':
 | 
						|
        {
 | 
						|
            'a': 'Infor123',
 | 
						|
            'b': ['Some', 'list'],
 | 
						|
            'd': pe.pseudo_Obs(.01, .001, 'testd', samples=10) * pe.cov_Obs(1, .01, 'cov1'),
 | 
						|
            'se': None,
 | 
						|
            'sf': 1.2,
 | 
						|
            'k': pe.cov_Obs(.1, .001**2, 'cov') * pe.merge_obs([pe.pseudo_Obs(1.0, 0.1, 'test|r2'), pe.pseudo_Obs(1.0, 0.1, 'test|r1', samples=321)]),
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    fname = 'test_rw'
 | 
						|
 | 
						|
    desc = 'This is a random description'
 | 
						|
 | 
						|
    with pytest.raises(Exception):
 | 
						|
        jsonio.dump_dict_to_json(od, fname, description=desc, reps='|Test')
 | 
						|
 | 
						|
    jsonio.dump_dict_to_json(od, fname, description=desc, reps='TEST')
 | 
						|
    nd = jsonio.load_json_dict(fname, full_output=True, reps='TEST')
 | 
						|
 | 
						|
    with pytest.raises(Exception):
 | 
						|
        nd = jsonio.load_json_dict(fname, full_output=True)
 | 
						|
 | 
						|
    jsonio.dump_dict_to_json(od, fname, description=desc)
 | 
						|
    nd = jsonio.load_json_dict(fname, full_output=True)
 | 
						|
    assert (desc == nd['description'])
 | 
						|
 | 
						|
    assert(check_dict_equality(od, nd['obsdata']))
 | 
						|
    nd = jsonio.load_json_dict(fname, full_output=False)
 | 
						|
    assert(check_dict_equality(od, nd))
 | 
						|
 | 
						|
    nl = jsonio.load_json(fname, full_output=True)
 | 
						|
    nl = jsonio.load_json(fname, full_output=False)
 | 
						|
 | 
						|
    with pytest.raises(Exception):
 | 
						|
        jsonio.dump_dict_to_json(nl, fname, description=desc)
 | 
						|
 | 
						|
    od['k'] = 'DICTOBS2'
 | 
						|
    with pytest.raises(Exception):
 | 
						|
        jsonio.dump_dict_to_json(od, fname, description=desc)
 | 
						|
 | 
						|
    od['k'] = ['DICTOBS2']
 | 
						|
    with pytest.raises(Exception):
 | 
						|
        jsonio.dump_dict_to_json(od, fname, description=desc)
 | 
						|
 | 
						|
    od = {1: 'test', False: 'True', None: 'None'}
 | 
						|
    jsonio.dump_dict_to_json(od, fname, description={np.int64(1): np.float64(2.444444)})
 | 
						|
    jsonio.dump_dict_to_json(od, fname, description=np.float32(2.444444))
 | 
						|
 | 
						|
    os.remove(fname + '.json.gz')
 | 
						|
 | 
						|
 | 
						|
def test_renorm_deriv_of_corr(tmp_path):
 | 
						|
    c = pe.Corr([pe.pseudo_Obs(i, .1, 'test') for i in range(10)])
 | 
						|
    c *= pe.cov_Obs(1., .1, '#ren')
 | 
						|
    c = c.deriv()
 | 
						|
    pe.input.json.dump_to_json(c, (tmp_path / 'test').as_posix())
 | 
						|
    recover = pe.input.json.load_json((tmp_path / 'test').as_posix())
 | 
						|
    assert np.all([o == 0 for o in (c - recover)[1:-1]])
 | 
						|
 | 
						|
 | 
						|
def test_dobsio():
 | 
						|
    o = pe.pseudo_Obs(1.0, .2, 'one|r1')
 | 
						|
    o1 = pe.pseudo_Obs(1.5, .2, 'one|r1')
 | 
						|
 | 
						|
    ol = [o, o1]
 | 
						|
    fname = 'test_rw'
 | 
						|
    dobsio.write_pobs(ol, fname, 'Testobs')
 | 
						|
    rl = dobsio.read_pobs(fname)
 | 
						|
 | 
						|
    for i in range(len(ol)):
 | 
						|
        assert (ol[i] - rl[i].is_zero())
 | 
						|
 | 
						|
    od = {
 | 
						|
        'obsdata': ol,
 | 
						|
        'name': 'testn',
 | 
						|
        'spec': 'tests',
 | 
						|
        'origin': 'testo',
 | 
						|
        'symbol': ['A', 'B']
 | 
						|
    }
 | 
						|
    dobsio.write_pobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
 | 
						|
    rd = dobsio.read_pobs(fname, full_output=True)
 | 
						|
 | 
						|
    for i in range(len(od['obsdata'])):
 | 
						|
        assert (od['obsdata'][i] - rd['obsdata'][i].is_zero())
 | 
						|
 | 
						|
    assert(od['spec'] == rd['description']['spec'])
 | 
						|
    assert(od['origin'] == rd['description']['origin'])
 | 
						|
    assert(od['name'] == rd['description']['name'])
 | 
						|
    assert(rd['description']['enstag'] == ol[0].e_names[0])
 | 
						|
 | 
						|
    dobsio.write_dobs(ol, fname, 'Testobs')
 | 
						|
    rl = dobsio.read_dobs(fname)
 | 
						|
 | 
						|
    for i in range(len(ol)):
 | 
						|
        assert (ol[i] - rl[i].is_zero())
 | 
						|
 | 
						|
    dobsio.write_dobs(ol, fname, od['name'], od['spec'], od['origin'], od['symbol'])
 | 
						|
    rd = dobsio.read_dobs(fname, full_output=True)
 | 
						|
    os.remove(fname + '.xml.gz')
 | 
						|
 | 
						|
    for i in range(len(od['obsdata'])):
 | 
						|
        assert (od['obsdata'][i] - rd['obsdata'][i].is_zero())
 | 
						|
 | 
						|
    assert(od['spec'] == rd['description']['spec'])
 | 
						|
    assert(od['origin'] == rd['description']['origin'])
 | 
						|
    assert(od['name'] == rd['description']['name'])
 | 
						|
 | 
						|
    o2 = pe.pseudo_Obs(0.5, .1, 'two|r1')
 | 
						|
    o3 = pe.pseudo_Obs(0.5, .1, 'two|r2')
 | 
						|
    o4 = pe.merge_obs([o2, o3, pe.pseudo_Obs(0.5, .1, 'two|r3', samples=3221)])
 | 
						|
    otag = 'This has been merged!'
 | 
						|
    o4.tag = otag
 | 
						|
    do = o - .2 * o4
 | 
						|
    co1 = pe.cov_Obs(1., .123, 'cov1')
 | 
						|
    co3 = pe.cov_Obs(4., .1 ** 2, 'cov3')
 | 
						|
    do *= co1 / co3
 | 
						|
    do.tag = {'A': 2}
 | 
						|
 | 
						|
    o5 = pe.pseudo_Obs(0.8, .1, 'two|r2')
 | 
						|
    co2 = pe.cov_Obs([1, 2], [[.12, .004], [.004, .02]], 'cov2')
 | 
						|
    o5 /= co2[0]
 | 
						|
    o5.tag = 2 * otag
 | 
						|
 | 
						|
    tt1 = pe.Obs([np.random.rand(100), np.random.rand(102)], ['t|r1', 't|r2'], idl=[range(2, 202, 2), range(22, 226, 2)])
 | 
						|
    tt3 = pe.Obs([np.random.rand(102)], ['qe|r1'])
 | 
						|
 | 
						|
    tt = tt1 + tt3
 | 
						|
 | 
						|
    tt.tag = 'Test Obs: Ä'
 | 
						|
 | 
						|
    tt4 = pe.Obs([np.random.rand(100), np.random.rand(100)], ['t|r1', 't|r2'], idl=[range(1, 101, 1), range(2, 202, 2)])
 | 
						|
 | 
						|
    ol = [o2, o3, o4, do, o5, tt, tt4, np.log(tt4 / o5**2), np.exp(o5 + np.log(co3 / tt3 + o4) / tt), o4.reweight(o4)]
 | 
						|
    print(ol)
 | 
						|
    fname = 'test_rw'
 | 
						|
 | 
						|
    dobsio.write_dobs(ol, fname, 'TEST')
 | 
						|
 | 
						|
    rl = dobsio.read_dobs(fname)
 | 
						|
    os.remove(fname + '.xml.gz')
 | 
						|
    [o.gamma_method() for o in rl]
 | 
						|
 | 
						|
    for i in range(len(ol)):
 | 
						|
        if isinstance(ol[i], pe.Obs):
 | 
						|
            o = ol[i] - rl[i]
 | 
						|
            assert(o.is_zero())
 | 
						|
        or1 = np.ravel(ol[i])
 | 
						|
        or2 = np.ravel(rl[i])
 | 
						|
        for j in range(len(or1)):
 | 
						|
            o = or1[j] - or2[j]
 | 
						|
            assert(o.is_zero())
 | 
						|
        if isinstance(ol[i], pe.Obs):
 | 
						|
            for name in ol[i].r_values:
 | 
						|
                assert(np.isclose(ol[i].r_values[name], rl[i].r_values[name]))
 | 
						|
 | 
						|
 | 
						|
def test_reconstruct_non_linear_r_obs(tmp_path):
 | 
						|
    to = (
 | 
						|
        pe.Obs([np.random.rand(500), np.random.rand(1200)],
 | 
						|
                ["e|r1", "e|r2", ],
 | 
						|
                idl=[range(1, 501), range(0, 1200)])
 | 
						|
        + pe.Obs([np.random.rand(111)], ["my_new_ensemble_54^£$|8'[@124435%6^7&()~#"], idl=[range(1, 999, 9)])
 | 
						|
        )
 | 
						|
    to = np.log(to ** 2) / to
 | 
						|
    to.dump((tmp_path / "test_equality").as_posix())
 | 
						|
    ro = pe.input.json.load_json((tmp_path / "test_equality").as_posix())
 | 
						|
    assert assert_equal_Obs(to, ro)
 | 
						|
 | 
						|
 | 
						|
def test_reconstruct_non_linear_r_obs_list(tmp_path):
 | 
						|
    to = (
 | 
						|
        pe.Obs([np.random.rand(500), np.random.rand(1200)],
 | 
						|
                ["e|r1", "e|r2", ],
 | 
						|
                idl=[range(1, 501), range(0, 1200)])
 | 
						|
        + pe.Obs([np.random.rand(111)], ["my_new_ensemble_54^£$|8'[@124435%6^7&()~#"], idl=[range(1, 999, 9)])
 | 
						|
        )
 | 
						|
    to = np.log(to ** 2) / to
 | 
						|
    for to_list in [[to, to, to], np.array([to, to, to])]:
 | 
						|
        pe.input.json.dump_to_json(to_list, (tmp_path / "test_equality_list").as_posix())
 | 
						|
        ro_list = pe.input.json.load_json((tmp_path / "test_equality_list").as_posix())
 | 
						|
        for oa, ob in zip(to_list, ro_list):
 | 
						|
            assert assert_equal_Obs(oa, ob)
 | 
						|
 | 
						|
 | 
						|
def test_import_non_json_string():
 | 
						|
    with pytest.raises(rapidjson.JSONDecodeError):
 | 
						|
        pe.input.json.import_json_string("this is garbage")
 | 
						|
 | 
						|
 | 
						|
def assert_equal_Obs(to, ro):
 | 
						|
    for kw in ["N", "cov_names", "covobs", "ddvalue", "dvalue", "e_content",
 | 
						|
               "e_names", "idl", "mc_names", "names",
 | 
						|
               "reweighted", "shape", "tag"]:
 | 
						|
        if not getattr(to, kw) == getattr(ro, kw):
 | 
						|
            print(kw, "does not match.")
 | 
						|
            return False
 | 
						|
 | 
						|
    for kw in ["value"]:
 | 
						|
        if not np.isclose(getattr(to, kw), getattr(ro, kw), atol=1e-14):
 | 
						|
            print(kw, "does not match.")
 | 
						|
            return False
 | 
						|
 | 
						|
 | 
						|
    for kw in ["r_values", "deltas"]:
 | 
						|
        for (k, v), (k2, v2) in zip(getattr(to, kw).items(), getattr(ro, kw).items()):
 | 
						|
            assert k == k2
 | 
						|
            if not np.allclose(v, v2, atol=1e-14):
 | 
						|
                print(kw, "does not match.")
 | 
						|
                return False
 | 
						|
    return True
 |