# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Utilities"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import collections
import datetime as dt
import itertools as it
import json
import os
import six
from pyhetdex.doc.docstring import format_docstring
import pyhetdex.tools.files.file_tools as ft
from pyhetdex.tools import six_ext
ISOTIME_FMT = "%Y-%m-%dT%H:%M:%S.%f"
"""Date time formatting in the json"""
SHOT_FILE = "shot_name.txt"
"""The file contains basic information about the type of files and the original
directory and is used to rebuild the database on subsequent runs of vdat"""
EXPS_FILE = 'exposure_names.txt'
"""Maps the base name of each virus fits file (basically the time stamp), with
the exposure number"""
# itertool's zip_longest used to be called izip_longest back in the days
try:
zip_longest = it.zip_longest
except AttributeError:
zip_longest = it.izip_longest
# ==== json custom decoder and encoders ==== #
# serialise and de-serialize dictionaries using json. Takes care of datetime
[docs]class DatetimeEncoder(json.JSONEncoder):
"""Encodes :class:`datetime.date`, :class:`datetime.time` or
:class:`datetime.datetime` as dictionary:
{"__datetime__": True, "date": formatted datetime,
"type": datetime type, "fmt": format}
with ``datetime type`` is one of: "datetime", "date", "time"
Parameters
----------
*args, **kwargs : same as :class:`json.JSONEncoder`
dt_formatter : string
formatter used to encode the datetime; defaults to :data:`ISOTIME_FMT`
d_formatter : string
formatter used to encode the date; defaults to the part before "T" in
:data:`ISOTIME_FMT`
t_formatter : string
formatter used to encode the time; defaults to the part after "T" in
:data:`ISOTIME_FMT`
"""
def __init__(self, *args, **kwargs):
self.dt_formatter = kwargs.pop("dt_formatter", ISOTIME_FMT)
self.d_formatter = kwargs.pop("d_formatter", ISOTIME_FMT.split("T")[0])
self.t_formatter = kwargs.pop("t_formatter", ISOTIME_FMT.split("T")[1])
super(DatetimeEncoder, self).__init__(*args, **kwargs)
[docs] def default(self, obj):
"""Decode object ``obj``. If it's not a date/time/datetime instance
delegate to the parent class default
Parameters
----------
obj : object to be serialised
Returns
-------
serialised object
"""
if isinstance(obj, dt.datetime):
return self._datetime_dic(obj.strftime(self.dt_formatter),
"datetime", self.dt_formatter)
elif isinstance(obj, dt.date):
return self._datetime_dic(obj.strftime(self.d_formatter), "date",
self.d_formatter)
elif isinstance(obj, dt.time):
return self._datetime_dic(obj.strftime(self.t_formatter), "time",
self.t_formatter)
else:
return super(DatetimeEncoder, self).default(obj)
[docs] def _datetime_dic(self, date, type_, fmt):
"""Create the dictionary to feed to the encoder"""
return {"__datetime__": True, "date": date, "type": type_, "fmt": fmt}
[docs]def decode_datetime(dct):
"""If the input dictionary has a ``__datetime__`` key set to true,
uses the keys "date", "type" and "fmt" to decode the datetime encoded by
:class:`DatetimeEncoder`
Parameters
----------
dct : dictionaries
object to decode
Returns
-------
decoded object
"""
if dct.get("__datetime__", False):
try:
deserialised = dt.datetime.strptime(dct['date'], dct['fmt'])
# if it's a date or a time instance, get it out
if hasattr(deserialised, dct['type']):
deserialised = getattr(deserialised, dct['type'])()
return deserialised
except Exception:
# any exception here is treated as a sign that there is nothing to
# convert back to datetime
pass
return dct
[docs]@format_docstring(ISOTIME_FMT)
def json_dumps(obj):
"""Serialise ``obj`` into a json using :class:`DatetimeEncoder`
and {} formatting for the date
Parameters
----------
obj : object to serialise
Returns
-------
string
serialised json
"""
return json.dumps(obj, cls=DatetimeEncoder)
[docs]def json_loads(s):
"""Deserialize the string ``s`` into a python object, undoing the datetime
encoding done by :class:`DatetimeEncoder`
Parameters
----------
s : string to deserialize
Returns
-------
python object
"""
return json.loads(s, object_hook=decode_datetime)
[docs]def read_json_file(fname, decode=True):
"""Read the content of the file and, if ``decode`` is ``True`` decode each
line as a json entry
Parameters
----------
fname : string
name of the file to read
decode : bool, optional
decode each line in the file as a json
Returns
-------
list of string or of objects
lines of the file
"""
with open(fname, 'r') as f:
lines = f.readlines()
if decode:
return [json_loads(l) for l in lines]
else:
return lines
[docs]def write_to_json_file(fname, append=True, **kwargs):
"""Serialize the keyword arguments and write them as a single line to
``fname``.
Parameters
----------
fname : string
name of the file where to write
append : bool, optional
if true append to the file, if false write to it
kwargs : dictionary
line to write
"""
mode = 'a' if append else 'w'
with open(fname, mode) as f:
f.write(json_dumps(kwargs) + "\n")
[docs]def _read_file(fname):
"""Wrap :func:`read_json_file` to pass only the directory name"""
@format_docstring(fname)
def wrapped(dir_, decode=True):
"""Read the content of the shot file '{}' in ``dir_``.
Parameters
----------
dir_ : string
name of the directory where the file is located
decode : bool, optional
decode each line in the file as a json
Returns
-------
list of string or of objects
lines of the file
"""
return read_json_file(os.path.join(dir_, fname), decode=decode)
return wrapped
[docs]def _write_file(fname):
"""Wrap :func:`write_to_json_file` to pass only the directory name"""
@format_docstring(fname)
def wrapped(dir_, append=True, **kwargs):
"""Serialize the keyword arguments as a single line to the {} file in
directory ``dir_``.
Parameters
----------
dir_ : string
name of the directory where the file is located
append : bool, optional
if true append to the file, if false write to it
kwargs : dictionary
line to write
"""
write_to_json_file(os.path.join(dir_, fname), append=append, **kwargs)
return wrapped
# read and write the ``SHOT_FILE`` using json to store infos
read_shot_file = _read_file(SHOT_FILE)
read_exps_file = _read_file(EXPS_FILE)
write_to_shot_file = _write_file(SHOT_FILE)
write_to_exps_file = _write_file(EXPS_FILE)
[docs]def merge_dicts(dicts, exclude=[]):
"""Merge the dictionaries into one
Unique entries are copied verbatim. For repeated entries:
* if string: join them with ", "
* if date or datetime: average them
* if bool: all is used: so is True only if all the entries are True
Parameters
----------
dicts : list of dictionaries
dictionaries to merge
exclude : list of strings
exclude entries from ``out_dict``.
Returns
-------
out_dict : dictionary
merged dictionaries
Raises
------
VDATUnknownDictEntry
if it doesn't know what to do how to merge the entry
"""
# merge the input list into a dictionary of sets to get rid of repetitions
out_dict = collections.defaultdict(set)
for l in dicts:
for k, v in six.iteritems(l):
if k not in exclude:
out_dict[k].add(v)
# scan the dictionary to merge the values
for k, v in six.iteritems(out_dict):
if len(v) == 1:
out_dict[k] = v.pop()
else:
lv = list(v) # convert to a list
if isinstance(lv[0], six.string_types):
out_dict[k] = ', '.join(lv)
elif isinstance(lv[0], bool):
out_dict[k] = all(lv)
elif isinstance(lv[0], (dt.date, dt.datetime)):
sum_deltas = sum((i - lv[0] for i in lv), dt.timedelta())
out_dict[k] = lv[0] + sum_deltas // len(lv)
else:
msg = "I don't know how to merge list of type '{}'."
raise VDATUnknownDictEntry(msg.format(type(lv[0])))
return out_dict
[docs]def grouper(iterable, n, fillvalue=None):
'''Collect data into fixed-length chunks or blocks.
From https://docs.python.org/3/library/itertools.html#itertools-recipes
Examples
--------
>>> list(grouper('ABCDEFG', 3, 'x'))
[('A', 'B', 'C'), ('D', 'E', 'F'), ('G', None, None)]
Parameters
----------
iterable :
iterable to split in chunks
n : int
size of the chunks
fillvalue : anything, optional
if the size the interable isn't a multiple of ``n``, fill the last
chunk with ``fillvalue``
Returns
-------
iterable
chunk of size ``n`` of the input iterable
'''
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
# VDAT errors
[docs]class VDATError(Exception):
"""Generic vdat error"""
pass
[docs]class VDATDirError(VDATError, OSError):
"""Error raised when trying to create directories"""
pass
[docs]class VDATSymlinkError(VDATError, OSError):
"""Generic error raised when performing the symlinking"""
pass
[docs]class VDATFitsParseError(VDATSymlinkError, ValueError):
"""Exception raised when the parsing of the fits file name or headers to
extract information during the symlinking fails"""
pass
[docs]class VDATFitsTypeError(VDATSymlinkError, ValueError):
"""Error raised when the type of the fits files is wrong or unknown"""
pass
[docs]class VDATDateError(VDATSymlinkError, ValueError):
"""Error raised when failing to parse dates"""
pass
[docs]class VDATUnknownDictEntry(VDATSymlinkError, ValueError):
"""Error raised when the shot file is malformed or contains unexpected
entries"""
pass
[docs]class VDATDatabaseError(VDATError):
"Database related errors"
pass
[docs]class VDATDatabaseUniquenessError(VDATDatabaseError):
"""The entry in the database is not unique"""
pass