import json import gzip import numpy as np import getpass import socket import datetime import platform import warnings from ..obs import Obs from .. import version as pyerrorsversion def create_json_string(ol, description='', indent=1): """Generate the string for the export of a list of Obs or structures containing Obs to a .json(.gz) file Parameters ---------- ol : list List of objects that will be exported. At the moments, these objects can be either of: Obs, list, numpy.ndarray. All Obs inside a structure have to be defined on the same set of configurations. description : str Optional string that describes the contents of the json file. indent : int Specify the indentation level of the json file. None or 0 is permissible and saves disk space. """ def _default(self, obj): return str(obj) my_encoder = json.JSONEncoder _default.default = json.JSONEncoder().default my_encoder.default = _default class Deltalist: def __init__(self, li): self.cnfg = li[0] self.deltas = li[1:] def __repr__(self): s = '[%d' % (self.cnfg) for d in self.deltas: s += ', %1.15e' % (d) s += ']' return s def __str__(self): return self.__repr__() def _gen_data_d_from_list(ol): dl = [] for name in ol[0].e_names: ed = {} ed['id'] = name ed['replica'] = [] for r_name in ol[0].e_content[name]: rd = {} rd['name'] = r_name if ol[0].is_merged.get(r_name, False): rd['is_merged'] = True rd['deltas'] = [] for i in range(len(ol[0].idl[r_name])): rd['deltas'].append([ol[0].idl[r_name][i]]) for o in ol: rd['deltas'][-1].append(o.deltas[r_name][i]) rd['deltas'][-1] = Deltalist(rd['deltas'][-1]) ed['replica'].append(rd) dl.append(ed) return dl def _assert_equal_properties(ol, otype=Obs): for o in ol: if not isinstance(o, otype): raise Exception("Wrong data type in list.") for o in ol[1:]: if not ol[0].is_merged == o.is_merged: raise Exception("All Obs in list have to be defined on the same set of configs.") if not ol[0].reweighted == o.reweighted: raise Exception("All Obs in list have to have the same property 'reweighted'.") if not ol[0].e_content == o.e_content: raise Exception("All Obs in list have to be defined on the same set of configs.") if not ol[0].idl == o.idl: raise Exception("All Obs in list have to be defined on the same set of configurations.") def write_Obs_to_dict(o): d = {} d['type'] = 'Obs' d['layout'] = '1' if o.tag: d['tag'] = [o.tag] if o.reweighted: d['reweighted'] = o.reweighted d['value'] = [o.value] d['data'] = _gen_data_d_from_list([o]) return d def write_List_to_dict(ol): _assert_equal_properties(ol) d = {} d['type'] = 'List' d['layout'] = '%d' % len(ol) taglist = [o.tag for o in ol] if np.any([tag is not None for tag in taglist]): d['tag'] = taglist if ol[0].reweighted: d['reweighted'] = ol[0].reweighted d['value'] = [o.value for o in ol] d['data'] = _gen_data_d_from_list(ol) return d def write_Array_to_dict(oa): ol = np.ravel(oa) _assert_equal_properties(ol) d = {} d['type'] = 'Array' d['layout'] = str(oa.shape).lstrip('(').rstrip(')').rstrip(',') taglist = [o.tag for o in ol] if np.any([tag is not None for tag in taglist]): d['tag'] = taglist if ol[0].reweighted: d['reweighted'] = ol[0].reweighted d['value'] = [o.value for o in ol] d['data'] = _gen_data_d_from_list(ol) return d if not isinstance(ol, list): ol = [ol] d = {} d['program'] = 'pyerrors %s' % (pyerrorsversion.__version__) d['version'] = '0.1' d['who'] = getpass.getuser() d['date'] = datetime.datetime.now().astimezone().strftime('%Y-%m-%d %H:%M:%S %Z') d['host'] = socket.gethostname() + ', ' + platform.platform() if description: d['description'] = description d['obsdata'] = [] for io in ol: if isinstance(io, Obs): d['obsdata'].append(write_Obs_to_dict(io)) elif isinstance(io, list): d['obsdata'].append(write_List_to_dict(io)) elif isinstance(io, np.ndarray): d['obsdata'].append(write_Array_to_dict(io)) jsonstring = json.dumps(d, indent=indent, cls=my_encoder, ensure_ascii=False) def remove_quotationmarks(s): """Workaround for un-quoting of delta lists, adds 5% of work but is save, compared to a simple replace that could destroy the structure """ deltas = False split = s.split('\n') for i in range(len(split)): if '"deltas":' in split[i]: deltas = True elif deltas: split[i] = split[i].replace('"[', '[').replace(']"', ']') if split[i][-1] == ']': deltas = False return '\n'.join(split) jsonstring = remove_quotationmarks(jsonstring) return jsonstring def dump_to_json(ol, fname, description='', indent=1, gz=True): """Export a list of Obs or structures containing Obs to a .json(.gz) file Parameters ---------- ol : list List of objects that will be exported. At the moments, these objects can be either of: Obs, list, numpy.ndarray. All Obs inside a structure have to be defined on the same set of configurations. fname : str Filename of the output file. description : str Optional string that describes the contents of the json file. indent : int Specify the indentation level of the json file. None or 0 is permissible and saves disk space. gz : bool If True, the output is a gzipped json. If False, the output is a json file. """ jsonstring = create_json_string(ol, description, indent) if not fname.endswith('.json') and not fname.endswith('.gz'): fname += '.json' if gz: if not fname.endswith('.gz'): fname += '.gz' fp = gzip.open(fname, 'wb') fp.write(jsonstring.encode('utf-8')) else: fp = open(fname, 'w', encoding='utf-8') fp.write(jsonstring) fp.close() def load_json(fname, verbose=True, gz=True, full_output=False): """Import a list of Obs or structures containing Obs from a .json.gz file. The following structures are supported: Obs, list, numpy.ndarray If the list contains only one element, it is unpacked from the list. Parameters ---------- fname : str Filename of the input file. verbose : bool Print additional information that was written to the file. gz : bool If True, assumes that data is gzipped. If False, assumes JSON file. full_output : bool If True, a dict containing auxiliary information and the data is returned. If False, only the data is returned. """ def _gen_obsd_from_datad(d): retd = {} retd['names'] = [] retd['idl'] = [] retd['deltas'] = [] retd['is_merged'] = {} for ens in d: for rep in ens['replica']: retd['names'].append(rep['name']) retd['idl'].append([di[0] for di in rep['deltas']]) retd['deltas'].append(np.array([di[1:] for di in rep['deltas']])) retd['is_merged'][rep['name']] = rep.get('is_merged', False) return retd def get_Obs_from_dict(o): layouts = o.get('layout', '1').strip() if layouts != '1': raise Exception("layout is %s has to be 1 for type Obs." % (layouts), RuntimeWarning) values = o['value'] od = _gen_obsd_from_datad(o['data']) ret = Obs([[ddi[0] + values[0] for ddi in di] for di in od['deltas']], od['names'], idl=od['idl']) ret.reweighted = o.get('reweighted', False) ret.is_merged = od['is_merged'] ret.tag = o.get('tag', [None])[0] return ret def get_List_from_dict(o): layouts = o.get('layout', '1').strip() layout = int(layouts) values = o['value'] od = _gen_obsd_from_datad(o['data']) ret = [] taglist = o.get('tag', layout * [None]) for i in range(layout): ret.append(Obs([list(di[:, i] + values[i]) for di in od['deltas']], od['names'], idl=od['idl'])) ret[-1].reweighted = o.get('reweighted', False) ret[-1].is_merged = od['is_merged'] ret[-1].tag = taglist[i] return ret def get_Array_from_dict(o): layouts = o.get('layout', '1').strip() layout = [int(ls.strip()) for ls in layouts.split(',') if len(ls) > 0] N = np.prod(layout) values = o['value'] od = _gen_obsd_from_datad(o['data']) ret = [] taglist = o.get('tag', N * [None]) for i in range(N): ret.append(Obs([di[:, i] + values[i] for di in od['deltas']], od['names'], idl=od['idl'])) ret[-1].reweighted = o.get('reweighted', False) ret[-1].is_merged = od['is_merged'] ret[-1].tag = taglist[i] return np.reshape(ret, layout) if not fname.endswith('.json') and not fname.endswith('.gz'): fname += '.json' if gz: if not fname.endswith('.gz'): fname += '.gz' with gzip.open(fname, 'r') as fin: d = json.loads(fin.read().decode('utf-8')) else: if fname.endswith('.gz'): warnings.warn("Trying to read from %s without unzipping!" % fname, UserWarning) with open(fname, 'r', encoding='utf-8') as fin: d = json.loads(fin.read()) prog = d.get('program', '') version = d.get('version', '') who = d.get('who', '') date = d.get('date', '') host = d.get('host', '') if prog and verbose: print('Data has been written using %s.' % (prog)) if version and verbose: print('Format version %s' % (version)) if np.any([who, date, host] and verbose): print('Written by %s on %s on host %s' % (who, date, host)) description = d.get('description', '') if description and verbose: print() print('Description: ', description) obsdata = d['obsdata'] ol = [] for io in obsdata: if io['type'] == 'Obs': ol.append(get_Obs_from_dict(io)) elif io['type'] == 'List': ol.append(get_List_from_dict(io)) elif io['type'] == 'Array': ol.append(get_Array_from_dict(io)) if full_output: retd = {} retd['program'] = prog retd['version'] = version retd['who'] = who retd['date'] = date retd['host'] = host retd['description'] = description retd['obsdata'] = ol return retd else: if len(obsdata) == 1: ol = ol[0] return ol