Source code for vdat.libvdat.symlink

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Symlink raw files into a redux directory

Multiprocessing is disabled for the following reasons:

* Calibration and science frames need locking to correctly deal with grouping
  and renaming if multiple shots have the same name from same objects;
* a lot of ``peewee.OperationalError: database is locked`` errors are raised
* symlinking is going to run only now and then and is always going to be much
  faster that any of the reduction steps
* if the symlink is run from the gui, there is no risk that interferes with
  reduction steps running
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import datetime as dt
import logging
import os
import sys
import re

from astropy.io import fits
import colorama
import peewee
import six

from pyhetdex.doc import docstring
import pyhetdex.tools.files.file_tools as ft
import pyhetdex.tools.processes as proc
from pyhetdex.tools import six_ext
from pyhetdex.tools.db_helpers import SQLITE_MAX_VARIABLE_NUMBER

import vdat.database as db
import vdat.utilities as vutil
import vdat.config as confp

colorama.init(autoreset=True)

# date formats
FMT_DATE_DIR = "%Y%m%d_%H%M%S"
"""Format for converting a :class:`~datetime.datetime` instance into as string
used as directory name"""

WORKER_NAME = 'vdat_symlink'





[docs]@docstring.format_docstring(tmp=vutil.SHOT_FILE, exp=vutil.EXPS_FILE) def _scan_dirs(redux_dir): """Scan the redux directories and fill the database with entries from existing '{tmp}' and '{exp}' files. It also updates the '{tmp}' file if the redux directory has changed. Parameters ---------- redux_dir : string name of the redux directory Returns ------- int, int Number of VDATDir and VDATExposures entries added to the database """ # create the database entries shots, exps = [], [] for shot, exp in vutil.collect_metadata(redux_dir, skip_empty=True, repair_redux=True, merge_shot=True): shots.extend(shot) exps.extend(exp) if not shots: # reduction directory empty return 0, 0 with db.connect(): max_args = SQLITE_MAX_VARIABLE_NUMBER try: size = (max_args // len(shots[0])) - 1 for i in range(0, len(shots), size): insert_query_dir = db.VDATDir.insert_many(shots[i:i+size]) insert_query_dir.execute() size = (max_args // len(exps[0])) - 1 for i in range(0, len(exps), size): insert_query_exp = db.VDATExposures.insert_many(exps[i:i+size]) insert_query_exp.execute() except peewee.IntegrityError as e: if 'UNIQUE' in str(e): msg = ('Some of the database entries are repeated and this' ' should not happen. This can happen when copying by' ' hand a redux directory. Please use the ``clone``' ' functionality to do this. To identify the offending' ' directory you can run ``vdat_db check -v``.' ' The directory should be removed or the metadata' ' updated.') elif 'NOT NULL' in str(e): msg = ('The value "{}" is missing, run' ' ``vdat_db update`` to try repairing the meta data.' ' If the reparation fails, the meta data files are' ' moved to a backup and vdat should not fail the next' ' time you run it.'.format(str(e).split()[-1])) else: # pragma: no cover raise print(colorama.Fore.RED + colorama.Style.BRIGHT + msg) sys.exit(1) except (KeyError, AttributeError) as e: print(colorama.Fore.RED + colorama.Style.BRIGHT + 'Some of the meta data files contain extra key(s).\b' 'The error message says "{e}".\n' 'You can run ``vdat_db check`` to inspect the meta' ' data and, if there are not other issues, should be' ' possible repair them with the' ' ``vdat_db update`` command'.format(e=e)) sys.exit(1) return len(shots), len(exps)
[docs]def _save_exposures(shot_dir, fits_files, vdat_dir): """Give the list of files belonging to ``shot`` and the VDATDir database entry ``vdat_dir``, takes the first file in each exposure and create the necessary :class:`~vdat.database.VDATExposures` entries. Parameters ---------- shot_dir : string name of the shot directory fits_files : list of strings symlinked files vdat_dir : :class:`vdat.database.VDATDir` instance database entry related to the input files """ exps_basename = {} for fn in fits_files: relative_split = os.path.relpath(fn, start=shot_dir).split(os.path.sep) exp = relative_split[0] basename = relative_split[-1].split('_')[0] exps_basename[exp] = basename vdat_exp_dict = {'name': vdat_dir.name, 'path': vdat_dir.path, 'exptype': vdat_dir.type_, 'original_type': vdat_dir.original_type_, 'version_f': db.VDATExposures.version, 'object_': vdat_dir.object_} with db.connect(): for k, v in six.iteritems(exps_basename): vdat_exp_d = vdat_exp_dict.copy() vdat_exp_d.update({'expname': k, 'basename': v}) vdat_exp, created = db.VDATExposures.get_or_create(**vdat_exp_d) if created: vutil.write_to_exps_file(vdat_dir.path, **vdat_exp_d)
[docs]def _get_imagetype_datetime(conf, log, shot_dir, fnames): """Extract the image type and the date from the files Parameters ---------- conf: configuration object can be either a :class:`~pyhetdex.tools.configuration.ConfigParser` or a dictionary(-like) object log : logging object shot_dir : string name of the shot directory fnames : list file names to scan Returns ------- image_type : string type of the images avg_date : :class:`datetime` average timestamp avg_date_string : string string representation of the average time stamp Raises ------ VDATFitsParseError if it fails parsing the file names VDATFitsTypeError if there are more than one image types """ # get the image type and datetime from the file names _image_type_pattern = conf.get_list('symlink', 'image_type_pattern') image_type_pattern = re.compile(_image_type_pattern[0]) image_type_repl = _image_type_pattern[1] _datetime_pattern = conf.get_list('symlink', 'datetime_pattern') datetime_pattern = re.compile(_datetime_pattern[0]) datetime_repl = _datetime_pattern[1] image_type, datetime = set(), set() for f in fnames: _image_type, _n_sub_it = image_type_pattern.subn(image_type_repl, f) _datetime, _n_sub_dt = datetime_pattern.subn(datetime_repl, f) if _n_sub_it != 1 or _n_sub_dt != 1: msg = ('Failed extracting the image type and the date time from' ' the file "{}"; obtained image type: {}, datetime: {}') msg = msg.format(f, _image_type, _datetime) raise vutil.VDATFitsParseError(msg) image_type.add(_image_type) datetime.add(_datetime) # check that all the types are the same if len(image_type) != 1: msg = ("Not all the fits files in the shot '{}' have the same image" " type; instead found: '{}'") msg = msg.format(shot_dir, image_type) raise vutil.VDATFitsTypeError(msg) image_type = list(image_type)[0] infmt = conf['symlink']['datetime_fmt'] avg_date, avg_date_string = _average_timestamps(datetime, infmt) return image_type, avg_date, avg_date_string
[docs]def _get_unique_keyword(fits_files, keyword, shot): """Get the header ``keyword`` from all the fits files and check that only one exists. Parameters ---------- fits_files : list of strings names of the fits files keyword : string name of the keyword to extract shot : string name of the shot Returns ------- string value of the header keyword Raises ------ VDATFitsParseError if the header keyword does not exist VDATFitsTypeError if there are more than one values for the header keywords """ try: values = {fits.getval(fn, keyword, memmap=False).strip() for fn in fits_files} except KeyError: msg = ("At least one of the files in the shot '{}' doesn't have the" " required '{}' header keyword") msg = msg.format(shot, keyword) raise vutil.VDATFitsParseError(msg) # check that all the objects are the same if len(values) != 1: msg = ("Not all the fits files in the shot '{}' represent the same" " object type. Found '{}'. Aborting shot symlinking") msg = msg.format(shot, values) raise vutil.VDATFitsTypeError(msg) return list(values)[0]
[docs]def _mkdir(dirname, log, failsafe=True): """Create the directory. If it exists, log it as error and, if ``failsafe`` is False, re-raise the exception Parameters ---------- dirname : string name of the directory to create log : :class:`logging.Logging` instance log messages to this logger safe : bool, optional if true silently ignores :class:`OSError`` due to existing directories Raises ------ :class:`~vdat.utilities.VDATDirError` if the creation fails with a :class:`OSError` and ``failsafe`` is False """ try: os.makedirs(dirname) log.debug("Directory '%s' created", dirname) except OSError as e: if failsafe and e.errno == 17 and 'File exists' in e.strerror: log.debug("Cannot create output directory '%s'. Error: %s", dirname, str(e)) else: six.raise_from(vutil.VDATDirError(e), e)
[docs]def _average_timestamps(dates, infmt, outfmt=FMT_DATE_DIR): """Average the list of timestamps. Parameters ---------- dates : list of strings strings containing timestamps infmt : strings format of ``dates`` outfmt : string, optional format of the output time stamp Returns ------- avg_timestamp : :class:`datetime.datetime` instance average time string ``avg_timestamp`` formatted according to ``outfmt`` Raises ------ :class:`~vdat.utilities.VDATDateError` if it fails to parse dates from the fits headers """ try: timestamps = [dt.datetime.strptime(d, infmt) for d in dates] except ValueError as e: six.raise_from(vutil.VDATDateError(e), e) avg_deltas = sum((t - timestamps[0] for t in timestamps), dt.timedelta()) // len(timestamps) avg_timestamp = timestamps[0] + avg_deltas return avg_timestamp, avg_timestamp.strftime(outfmt)
[docs]def _find_nearest(q, timestamp, n_nearest=1, nearest_then=None): """Go through the list of query results, order them according to the absolute distance from ``timestamp`` and return the ``n_nearest``. Parameters ---------- q : :class:`peewee.SelectQuery` query to use timestamp : :class:`~datetime.datetime` instance timestamp to use as reference n_nearest : int, optional maximum number of directories returned; set it to negative to return all nearest_then : :class:`~datetime.timedelta` instance if not None, don't consider any directory whose delta time is larger than ``nearest_then``; applied after n_nearest Returns ------- sorted_q : list of query results ordered with respect to the timestamp """ def _key(element): """Create the key for ordering as timedeltas from ``timestamp``""" return abs(element.timestamp - timestamp) def _filter(element): """Test whether the ``timedelta`` is less than ``nearest_then``""" return abs(element.timestamp - timestamp) < nearest_then sorted_q = sorted(q, key=_key) if n_nearest > 0: sorted_q = sorted_q[:n_nearest] if nearest_then: sorted_q = list(filter(_filter, sorted_q)) return sorted_q
[docs]def db_create_references(): """search reference zero and calibration directories and add them to the database""" with db.connect(): qzro = db.VDATDir.select().where((db.VDATDir.type_ == 'zro') & (db.VDATDir.is_clone == False)) qcal = db.VDATDir.select().where((db.VDATDir.type_ == 'cal') & (db.VDATDir.is_clone == False)) for vdir in db.VDATDir.select(): # for both find the reference zero directory ref_zro = _find_nearest(qzro, vdir.timestamp) if ref_zro and vdir.type_ != 'zro': vdir.zero_dir = ref_zro[0] if vdir.type_ not in ['zro', 'cal']: ref_cal = _find_nearest(qcal, vdir.timestamp) if ref_cal: vdir.cal_dir = ref_cal[0] vdir.save()