Source code for vdat.libvdat.symlink

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017, 2018  "The HETDEX collaboration"
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.
"""Symlink raw files into a redux directory

Multiprocessing is disabled for the following reasons:

* Calibration and science frames need locking to correctly deal with grouping
  and renaming if multiple shots have the same name from same objects;
* a lot of ``peewee.OperationalError: database is locked`` errors are raised
* symlinking is going to run only now and then and is always going to be much
  faster that any of the reduction steps
* if the symlink is run from the gui, there is no risk that interferes with
  reduction steps running
from __future__ import (absolute_import, division, print_function,

import datetime as dt
import logging
import os
import sys
import re

from import fits
import colorama
import peewee
import six

from pyhetdex.doc import docstring
import as ft
import as proc
from import six_ext

import vdat.database as db
import vdat.utilities as vutil
import vdat.config as confp


# date formats
FMT_DATE_DIR = "%Y%m%d_%H%M%S"
"""Format for converting a :class:`~datetime.datetime` instance into as string
used as directory name"""

WORKER_NAME = 'vdat_symlink'

[docs]@docstring.format_docstring(tmp=vutil.SHOT_FILE, exp=vutil.EXPS_FILE) def _scan_dirs(redux_dir): """Scan the redux directories and fill the database with entries from existing '{tmp}' and '{exp}' files. It also updates the '{tmp}' file if the redux directory has changed. Parameters ---------- redux_dir : string name of the redux directory Returns ------- int, int Number of VDATDir and VDATExposures entries added to the database """ # create the database entries shots, exps = [], [] for shot, exp in vutil.collect_metadata(redux_dir, skip_empty=True, repair_redux=True, merge_shot=True): shots.extend(shot) exps.extend(exp) if not shots: # reduction directory empty return 0, 0 with db.connect(): max_args = SQLITE_MAX_VARIABLE_NUMBER try: size = (max_args // len(shots[0])) - 1 for i in range(0, len(shots), size): insert_query_dir = db.VDATDir.insert_many(shots[i:i+size]) insert_query_dir.execute() size = (max_args // len(exps[0])) - 1 for i in range(0, len(exps), size): insert_query_exp = db.VDATExposures.insert_many(exps[i:i+size]) insert_query_exp.execute() except peewee.IntegrityError as e: if 'UNIQUE' in str(e): msg = ('Some of the database entries are repeated and this' ' should not happen. This can happen when copying by' ' hand a redux directory. Please use the ``clone``' ' functionality to do this. To identify the offending' ' directory you can run ``vdat_db check -v``.' ' The directory should be removed or the metadata' ' updated.') elif 'NOT NULL' in str(e): msg = ('The value "{}" is missing, run' ' ``vdat_db update`` to try repairing the meta data.' ' If the reparation fails, the meta data files are' ' moved to a backup and vdat should not fail the next' ' time you run it.'.format(str(e).split()[-1])) else: # pragma: no cover raise print(colorama.Fore.RED + colorama.Style.BRIGHT + msg) sys.exit(1) except (KeyError, AttributeError) as e: print(colorama.Fore.RED + colorama.Style.BRIGHT + 'Some of the meta data files contain extra key(s).\b' 'The error message says "{e}".\n' 'You can run ``vdat_db check`` to inspect the meta' ' data and, if there are not other issues, should be' ' possible repair them with the' ' ``vdat_db update`` command'.format(e=e)) sys.exit(1) return len(shots), len(exps)
[docs]def _save_exposures(shot_dir, fits_files, vdat_dir): """Give the list of files belonging to ``shot`` and the VDATDir database entry ``vdat_dir``, takes the first file in each exposure and create the necessary :class:`~vdat.database.VDATExposures` entries. Parameters ---------- shot_dir : string name of the shot directory fits_files : list of strings symlinked files vdat_dir : :class:`vdat.database.VDATDir` instance database entry related to the input files """ exps_basename = {} for fn in fits_files: relative_split = os.path.relpath(fn, start=shot_dir).split(os.path.sep) exp = relative_split[0] basename = relative_split[-1].split('_')[0] exps_basename[exp] = basename vdat_exp_dict = {'name':, 'path': vdat_dir.path, 'exptype': vdat_dir.type_, 'original_type': vdat_dir.original_type_, 'version_f': db.VDATExposures.version, 'object_': vdat_dir.object_} with db.connect(): for k, v in six.iteritems(exps_basename): vdat_exp_d = vdat_exp_dict.copy() vdat_exp_d.update({'expname': k, 'basename': v}) vdat_exp, created = db.VDATExposures.get_or_create(**vdat_exp_d) if created: vutil.write_to_exps_file(vdat_dir.path, **vdat_exp_d)
[docs]def _get_imagetype_datetime(conf, log, shot_dir, fnames): """Extract the image type and the date from the files Parameters ---------- conf: configuration object can be either a :class:`` or a dictionary(-like) object log : logging object shot_dir : string name of the shot directory fnames : list file names to scan Returns ------- image_type : string type of the images avg_date : :class:`datetime` average timestamp avg_date_string : string string representation of the average time stamp Raises ------ VDATFitsParseError if it fails parsing the file names VDATFitsTypeError if there are more than one image types """ # get the image type and datetime from the file names _image_type_pattern = conf.get_list('symlink', 'image_type_pattern') image_type_pattern = re.compile(_image_type_pattern[0]) image_type_repl = _image_type_pattern[1] _datetime_pattern = conf.get_list('symlink', 'datetime_pattern') datetime_pattern = re.compile(_datetime_pattern[0]) datetime_repl = _datetime_pattern[1] image_type, datetime = set(), set() for f in fnames: _image_type, _n_sub_it = image_type_pattern.subn(image_type_repl, f) _datetime, _n_sub_dt = datetime_pattern.subn(datetime_repl, f) if _n_sub_it != 1 or _n_sub_dt != 1: msg = ('Failed extracting the image type and the date time from' ' the file "{}"; obtained image type: {}, datetime: {}') msg = msg.format(f, _image_type, _datetime) raise vutil.VDATFitsParseError(msg) image_type.add(_image_type) datetime.add(_datetime) # check that all the types are the same if len(image_type) != 1: msg = ("Not all the fits files in the shot '{}' have the same image" " type; instead found: '{}'") msg = msg.format(shot_dir, image_type) raise vutil.VDATFitsTypeError(msg) image_type = list(image_type)[0] infmt = conf['symlink']['datetime_fmt'] avg_date, avg_date_string = _average_timestamps(datetime, infmt) return image_type, avg_date, avg_date_string
[docs]def _get_unique_keyword(fits_files, keyword, shot): """Get the header ``keyword`` from all the fits files and check that only one exists. Parameters ---------- fits_files : list of strings names of the fits files keyword : string name of the keyword to extract shot : string name of the shot Returns ------- string value of the header keyword Raises ------ VDATFitsParseError if the header keyword does not exist VDATFitsTypeError if there are more than one values for the header keywords """ try: values = {fits.getval(fn, keyword, memmap=False).strip() for fn in fits_files} except KeyError: msg = ("At least one of the files in the shot '{}' doesn't have the" " required '{}' header keyword") msg = msg.format(shot, keyword) raise vutil.VDATFitsParseError(msg) # check that all the objects are the same if len(values) != 1: msg = ("Not all the fits files in the shot '{}' represent the same" " object type. Found '{}'. Aborting shot symlinking") msg = msg.format(shot, values) raise vutil.VDATFitsTypeError(msg) return list(values)[0]
[docs]def _mkdir(dirname, log, failsafe=True): """Create the directory. If it exists, log it as error and, if ``failsafe`` is False, re-raise the exception Parameters ---------- dirname : string name of the directory to create log : :class:`logging.Logging` instance log messages to this logger safe : bool, optional if true silently ignores :class:`OSError`` due to existing directories Raises ------ :class:`~vdat.utilities.VDATDirError` if the creation fails with a :class:`OSError` and ``failsafe`` is False """ try: os.makedirs(dirname) log.debug("Directory '%s' created", dirname) except OSError as e: if failsafe and e.errno == 17 and 'File exists' in e.strerror: log.debug("Cannot create output directory '%s'. Error: %s", dirname, str(e)) else: six.raise_from(vutil.VDATDirError(e), e)
[docs]def _average_timestamps(dates, infmt, outfmt=FMT_DATE_DIR): """Average the list of timestamps. Parameters ---------- dates : list of strings strings containing timestamps infmt : strings format of ``dates`` outfmt : string, optional format of the output time stamp Returns ------- avg_timestamp : :class:`datetime.datetime` instance average time string ``avg_timestamp`` formatted according to ``outfmt`` Raises ------ :class:`~vdat.utilities.VDATDateError` if it fails to parse dates from the fits headers """ try: timestamps = [dt.datetime.strptime(d, infmt) for d in dates] except ValueError as e: six.raise_from(vutil.VDATDateError(e), e) avg_deltas = sum((t - timestamps[0] for t in timestamps), dt.timedelta()) // len(timestamps) avg_timestamp = timestamps[0] + avg_deltas return avg_timestamp, avg_timestamp.strftime(outfmt)
[docs]def _find_nearest(q, timestamp, n_nearest=1, nearest_then=None): """Go through the list of query results, order them according to the absolute distance from ``timestamp`` and return the ``n_nearest``. Parameters ---------- q : :class:`peewee.SelectQuery` query to use timestamp : :class:`~datetime.datetime` instance timestamp to use as reference n_nearest : int, optional maximum number of directories returned; set it to negative to return all nearest_then : :class:`~datetime.timedelta` instance if not None, don't consider any directory whose delta time is larger than ``nearest_then``; applied after n_nearest Returns ------- sorted_q : list of query results ordered with respect to the timestamp """ def _key(element): """Create the key for ordering as timedeltas from ``timestamp``""" return abs(element.timestamp - timestamp) def _filter(element): """Test whether the ``timedelta`` is less than ``nearest_then``""" return abs(element.timestamp - timestamp) < nearest_then sorted_q = sorted(q, key=_key) if n_nearest > 0: sorted_q = sorted_q[:n_nearest] if nearest_then: sorted_q = list(filter(_filter, sorted_q)) return sorted_q
[docs]def db_create_references(): """search reference zero and calibration directories and add them to the database""" with db.connect(): qzro = == 'zro') & (db.VDATDir.is_clone == False)) qcal = == 'cal') & (db.VDATDir.is_clone == False)) for vdir in # for both find the reference zero directory ref_zro = _find_nearest(qzro, vdir.timestamp) if ref_zro and vdir.type_ != 'zro': vdir.zero_dir = ref_zro[0] if vdir.type_ not in ['zro', 'cal']: ref_cal = _find_nearest(qcal, vdir.timestamp) if ref_cal: vdir.cal_dir = ref_cal[0]