Source code for vdat.utilities

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Utilities"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import collections
import datetime as dt
import itertools as it
import json
import os

import six
from pyhetdex.doc.docstring import format_docstring
import pyhetdex.tools.files.file_tools as ft
from pyhetdex.tools import six_ext


ISOTIME_FMT = "%Y-%m-%dT%H:%M:%S.%f"
"""Date time formatting in the json"""
SHOT_FILE = "shot_name.txt"
"""The file contains basic information about the type of files and the original
directory and is used to rebuild the database on subsequent runs of vdat"""
EXPS_FILE = 'exposure_names.txt'
"""Maps the base name of each virus fits file (basically the time stamp), with
the exposure number"""


# itertool's zip_longest used to be called izip_longest back in the days
try:
    zip_longest = it.zip_longest
except AttributeError:
    zip_longest = it.izip_longest


# ==== json custom decoder and encoders ==== #
# serialise and de-serialize dictionaries using json. Takes care of datetime
[docs]class DatetimeEncoder(json.JSONEncoder): """Encodes :class:`datetime.date`, :class:`datetime.time` or :class:`datetime.datetime` as dictionary: {"__datetime__": True, "date": formatted datetime, "type": datetime type, "fmt": format} with ``datetime type`` is one of: "datetime", "date", "time" Parameters ---------- *args, **kwargs : same as :class:`json.JSONEncoder` dt_formatter : string formatter used to encode the datetime; defaults to :data:`ISOTIME_FMT` d_formatter : string formatter used to encode the date; defaults to the part before "T" in :data:`ISOTIME_FMT` t_formatter : string formatter used to encode the time; defaults to the part after "T" in :data:`ISOTIME_FMT` """ def __init__(self, *args, **kwargs): self.dt_formatter = kwargs.pop("dt_formatter", ISOTIME_FMT) self.d_formatter = kwargs.pop("d_formatter", ISOTIME_FMT.split("T")[0]) self.t_formatter = kwargs.pop("t_formatter", ISOTIME_FMT.split("T")[1]) super(DatetimeEncoder, self).__init__(*args, **kwargs)
[docs] def default(self, obj): """Decode object ``obj``. If it's not a date/time/datetime instance delegate to the parent class default Parameters ---------- obj : object to be serialised Returns ------- serialised object """ if isinstance(obj, dt.datetime): return self._datetime_dic(obj.strftime(self.dt_formatter), "datetime", self.dt_formatter) elif isinstance(obj, dt.date): return self._datetime_dic(obj.strftime(self.d_formatter), "date", self.d_formatter) elif isinstance(obj, dt.time): return self._datetime_dic(obj.strftime(self.t_formatter), "time", self.t_formatter) else: return super(DatetimeEncoder, self).default(obj)
[docs] def _datetime_dic(self, date, type_, fmt): """Create the dictionary to feed to the encoder""" return {"__datetime__": True, "date": date, "type": type_, "fmt": fmt}
[docs]def decode_datetime(dct): """If the input dictionary has a ``__datetime__`` key set to true, uses the keys "date", "type" and "fmt" to decode the datetime encoded by :class:`DatetimeEncoder` Parameters ---------- dct : dictionaries object to decode Returns ------- decoded object """ if dct.get("__datetime__", False): try: deserialised = dt.datetime.strptime(dct['date'], dct['fmt']) # if it's a date or a time instance, get it out if hasattr(deserialised, dct['type']): deserialised = getattr(deserialised, dct['type'])() return deserialised except Exception: # any exception here is treated as a sign that there is nothing to # convert back to datetime pass return dct
[docs]@format_docstring(ISOTIME_FMT) def json_dumps(obj): """Serialise ``obj`` into a json using :class:`DatetimeEncoder` and {} formatting for the date Parameters ---------- obj : object to serialise Returns ------- string serialised json """ return json.dumps(obj, cls=DatetimeEncoder)
[docs]def json_loads(s): """Deserialize the string ``s`` into a python object, undoing the datetime encoding done by :class:`DatetimeEncoder` Parameters ---------- s : string to deserialize Returns ------- python object """ return json.loads(s, object_hook=decode_datetime)
[docs]def read_json_file(fname, decode=True): """Read the content of the file and, if ``decode`` is ``True`` decode each line as a json entry Parameters ---------- fname : string name of the file to read decode : bool, optional decode each line in the file as a json Returns ------- list of string or of objects lines of the file """ with open(fname, 'r') as f: lines = f.readlines() if decode: return [json_loads(l) for l in lines] else: return lines
[docs]def write_to_json_file(fname, append=True, **kwargs): """Serialize the keyword arguments and write them as a single line to ``fname``. Parameters ---------- fname : string name of the file where to write append : bool, optional if true append to the file, if false write to it kwargs : dictionary line to write """ mode = 'a' if append else 'w' with open(fname, mode) as f: f.write(json_dumps(kwargs) + "\n")
[docs]def _read_file(fname): """Wrap :func:`read_json_file` to pass only the directory name""" @format_docstring(fname) def wrapped(dir_, decode=True): """Read the content of the shot file '{}' in ``dir_``. Parameters ---------- dir_ : string name of the directory where the file is located decode : bool, optional decode each line in the file as a json Returns ------- list of string or of objects lines of the file """ return read_json_file(os.path.join(dir_, fname), decode=decode) return wrapped
[docs]def _write_file(fname): """Wrap :func:`write_to_json_file` to pass only the directory name""" @format_docstring(fname) def wrapped(dir_, append=True, **kwargs): """Serialize the keyword arguments as a single line to the {} file in directory ``dir_``. Parameters ---------- dir_ : string name of the directory where the file is located append : bool, optional if true append to the file, if false write to it kwargs : dictionary line to write """ write_to_json_file(os.path.join(dir_, fname), append=append, **kwargs) return wrapped
# read and write the ``SHOT_FILE`` using json to store infos read_shot_file = _read_file(SHOT_FILE) read_exps_file = _read_file(EXPS_FILE) write_to_shot_file = _write_file(SHOT_FILE) write_to_exps_file = _write_file(EXPS_FILE)
[docs]def merge_dicts(dicts, exclude=[]): """Merge the dictionaries into one Unique entries are copied verbatim. For repeated entries: * if string: join them with ", " * if date or datetime: average them * if bool: all is used: so is True only if all the entries are True Parameters ---------- dicts : list of dictionaries dictionaries to merge exclude : list of strings exclude entries from ``out_dict``. Returns ------- out_dict : dictionary merged dictionaries Raises ------ VDATUnknownDictEntry if it doesn't know what to do how to merge the entry """ # merge the input list into a dictionary of sets to get rid of repetitions out_dict = collections.defaultdict(set) for l in dicts: for k, v in six.iteritems(l): if k not in exclude: out_dict[k].add(v) # scan the dictionary to merge the values for k, v in six.iteritems(out_dict): if len(v) == 1: out_dict[k] = v.pop() else: lv = list(v) # convert to a list if isinstance(lv[0], six.string_types): out_dict[k] = ', '.join(lv) elif isinstance(lv[0], bool): out_dict[k] = all(lv) elif isinstance(lv[0], (dt.date, dt.datetime)): sum_deltas = sum((i - lv[0] for i in lv), dt.timedelta()) out_dict[k] = lv[0] + sum_deltas // len(lv) else: msg = "I don't know how to merge list of type '{}'." raise VDATUnknownDictEntry(msg.format(type(lv[0]))) return out_dict
[docs]@format_docstring(sf=SHOT_FILE, ef=EXPS_FILE) def collect_metadata(redux_dir, skip_empty=False, repair_redux=False, merge_shot=False, yield_dir=False): '''Recursively scan the ``redux_dir`` directory looking for {sf} and {ef} files and yield their content. Parameters ---------- redux_dir : string name of the directory to scan skip_empty : bool, optional if any of the two files is empty or does not exist, skip the directory repair_redux : bool, optional if the ``redux_dir`` entry in the {sf} files is different from the input one, update it merge_shot : bool, optional if the file {sf} contains multiple lines, merge them into one yield_dir : bool, optional if True, yields also the name of the directory containing the files Yields ------ shot_file, exps_file : list of dict content of the {sf} and the {ef} files ''' for dir_ in ft.scan_dirs(redux_dir, followlinks=False): # If the shot file exists create the database entry shot_exist = exps_exist = True try: shot_file = read_shot_file(dir_) except six_ext.FileOpenError: shot_exist = False shot_file = [] try: exps_file = read_exps_file(dir_) except six_ext.FileOpenError: exps_exist = False exps_file = [] if (not shot_exist) and (not exps_exist): # skip directories if no file exist in them continue if skip_empty and (len(shot_file) == 0 or len(exps_file) == 0): continue # if the redux directory has changed update it in the shot file if shot_file and repair_redux: if shot_file[0]['redux_dir'] != redux_dir: append = False for l in shot_file: l['path'] = l['path'].replace(l['redux_dir'], redux_dir) l['redux_dir'] = redux_dir write_to_shot_file(dir_, append=append, **l) append = True if merge_shot: shot_file = [merge_dicts(shot_file, exclude=['id', 'zero_dir', 'cal_dir', 'ifus']), ] if yield_dir: yield shot_file, exps_file, dir_ else: yield shot_file, exps_file
[docs]def grouper(iterable, n, fillvalue=None): '''Collect data into fixed-length chunks or blocks. From https://docs.python.org/3/library/itertools.html#itertools-recipes Examples -------- >>> list(grouper('ABCDEFG', 3, 'x')) [('A', 'B', 'C'), ('D', 'E', 'F'), ('G', None, None)] Parameters ---------- iterable : iterable to split in chunks n : int size of the chunks fillvalue : anything, optional if the size the interable isn't a multiple of ``n``, fill the last chunk with ``fillvalue`` Returns ------- iterable chunk of size ``n`` of the input iterable ''' args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue)
# VDAT errors
[docs]class VDATError(Exception): """Generic vdat error""" pass
[docs]class VDATDirError(VDATError, OSError): """Error raised when trying to create directories""" pass
[docs]class VDATSymlinkError(VDATError, OSError): """Generic error raised when performing the symlinking""" pass
[docs]class VDATFitsParseError(VDATSymlinkError, ValueError): """Exception raised when the parsing of the fits file name or headers to extract information during the symlinking fails""" pass
[docs]class VDATFitsTypeError(VDATSymlinkError, ValueError): """Error raised when the type of the fits files is wrong or unknown""" pass
[docs]class VDATDateError(VDATSymlinkError, ValueError): """Error raised when failing to parse dates""" pass
[docs]class VDATUnknownDictEntry(VDATSymlinkError, ValueError): """Error raised when the shot file is malformed or contains unexpected entries""" pass
[docs]class VDATDatabaseError(VDATError): "Database related errors" pass
[docs]class VDATDatabaseUniquenessError(VDATDatabaseError): """The entry in the database is not unique""" pass