Source code for vdat.command_interpreter.types

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Define enumerate-like classes that allows to map from keys to key types and
to the functions that needs to be called to deal with any of them.

It uses pkg_resources and entry points to make the framework extendible
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import abc
import copy
import functools
import itertools as it
import os
import re
import shlex

from astropy.io import fits
import pkg_resources
from pyhetdex.het import fplane
import pyhetdex.tools.files.file_tools as pyhft
import six

from vdat.command_interpreter import exceptions
from vdat.command_interpreter import utils


[docs]def _load_entrypoints(group): """Get all the entry points for the ``group`` and load them. Parameters ---------- group : string name of the group to load Returns ------- entry_points : dictionary key: name; value: callable loaded from the entry point """ entry_points = {} for ep in pkg_resources.iter_entry_points(group): name = ep.name func = ep.load() entry_points[name] = func return entry_points
[docs]@six.add_metaclass(abc.ABCMeta) class _Types(object): """ABC class for the types. If a type ``loop`` exists, it can be accessed as ``instance.loop`` or ``instance['loop']`` Attributes ---------- known_types : list of strings entry_point_group : string """ def __init__(self): self._map_types = {} # map a type with the function to call self._map_types.update(_load_entrypoints(self.entry_point_group)) @utils.abstractproperty def entry_point_group(self): # pragma: no cover """Abstract property with the name of the group to load""" return None def __contains__(self, item): """item in known_types""" return item in self._map_types def __getattr__(self, name): """Gets values from the internal types dictionary as class attributes""" msg = "'{}' object has no attribute '{}'" msg = msg.format(self.__class__.__name__, name) if name == '_map_types': raise AttributeError(msg) try: return self._map_types[name] except KeyError: raise AttributeError(msg) def __getitem__(self, name): """Gets values from the internal types dictionary as if the class is a dictionary""" return self._map_types[name] @property def known_types(self): """list of known types""" return list(self._map_types.keys())
[docs]class PrimaryTypes(_Types): """Fill the type<-->function mapping using the ``vdat.cit.primary`` entry point. """ @property def entry_point_group(self): return 'vdat.cit.primary'
[docs]class KeywordTypes(_Types): """Fill the type<-->function mapping using the ``vdat.cit.keyword`` entry point. """ @property def entry_point_group(self): return 'vdat.cit.keyword'
[docs]class ExecuteTypes(_Types): """Fill the type<-->function mapping using the ``vdat.cit.execute`` entry point. """ @property def entry_point_group(self): return 'vdat.cit.execute'
[docs]def primary_template(target_dir, key_val): # pragma: no cover """Template for a function that deals with a primary keyword. It collects the files from the ``target_dir`` according to the instructions in ``key_val``, if any and either ``yield`` a value or return an iterable. Parameters ---------- target_dir : string directory in which the files must be collected key_val : dictionary configuration for the key handle Yields ------ yield a string or iterable of strings Raises ------ CIPrimaryError if something goes wrong when handling the primary key """ pass
[docs]def keyword_template(primary, key_val): # pragma: no cover """Template for a function that deals with a non-primary keyword. A keyword has a value either statically stored in ``key_val`` or its value need to be extracted from the value of the primary file(s). Parameters ---------- primary : string the value of one of the items returned by :func:`.primary_template` key_val : dictionary configuration for the key handle Returns ------- string value to associate to the keyword Raises ------ CIKeywordError if something goes wrong when handling the key """ pass
[docs]def execute_template(primary, config): # pragma: no cover """For each of the primary entry, this function is called to decide whether to execute or skip the command. Parameters ---------- primary : string the value of one of the items returned by :func:`.primary_template` config : dictionary configuration for the command Returns ------- bool ``True``: the command is executed; ``False``: the command is skipped """ pass
# implementation of the types # primary types
[docs]def primary_plain(target_dir, key_val): """Get all the files in ``target_dir`` matching the string in ``key_val['value']`` Parameters ---------- target_dir : string directory in which the files must be collected key_val : dictionary configuration for the key handle Returns ------- iterator yields file names matching the value recursively """ is_regex = key_val.get('is_regex', False) matches = os.path.join('.*' if is_regex else '*', key_val['value']) files = pyhft.scan_files(target_dir, is_matches_regex=is_regex, matches=matches, recursive=False) if 'returns' in key_val: return_val = key_val['returns'] return_func = KeywordTypes()[return_val['type']] files = [return_func(f, return_val) for f in files] return files
[docs]def primary_loop(target_dir, key_val): """Make a nested loop over the set of given keys, in each step of the loop construct the value using python `format string syntax <https://docs.python.org/3/library/string.html#format-string-syntax>`_ and then get all the files matching it. If the key ``returns`` is found, the output string is manipulated according to the instruction in the value of ``returns``. The type of returns can be any available keyword type. If any of the steps doesn't produce any file, no value is yielded. Parameters ---------- target_dir : string directory in which the files must be collected key_val : dictionary configuration for the key handle Yields ------ string of space separated file names """ # prepare the keys for the loop keys, values = [], [] for k, v in six.iteritems(key_val['keys']): if isinstance(v, six.string_types): value = list(utils.SliceLike(v).range()) else: value = v keys.append(k) values.append(value) # check if ``returns`` exists and get the function implementing it if 'returns' in key_val: return_val = key_val['returns'] types = KeywordTypes() return_func = functools.partial(utils.flip(types[return_val['type']]), return_val) else: # identity function return_func = utils.id_ # prepare the path for the scanned files is_regex = key_val.get('is_regex', False) # loop the product of values and collect the files for vals in it.product(*values): # convert the values into a dictionary dvals = dict(zip(keys, vals)) match = key_val['value'].format(**dvals) match = os.path.join('.*' if is_regex else '*', match) matches = pyhft.scan_files(target_dir, matches=match, is_matches_regex=is_regex, recursive=False) matches = ' '.join(matches) if not matches: continue matches = return_func(matches) yield matches
[docs]def primary_groupby(target_dir, key_val): """Loop over all the files matching the ``value`` entry. For each one, create a list of file names replacing the regex in ``pattern`` with the elements of ``replace``. Parameters ---------- target_dir : string directory in which the files must be collected key_val : dictionary configuration for the key handle Yields ------ string of space separated file names """ keys = copy.deepcopy(key_val) keys['match'] = re.compile(keys['match']) # prepare the matches is_regex = keys.get('is_regex', False) matches = os.path.join('.*' if is_regex else '*', key_val['value']) for fn in pyhft.scan_files(target_dir, matches=matches, is_matches_regex=is_regex, recursive=False): files = [fn] for r in key_val['replace']: keys['replace'] = r files.append(keyword_regex(fn, keys)) yield ' '.join(files)
[docs]def primary_all_files(target_dir, key_val): '''Get all the files in ``target_dir`` matching the string in ``key_val['value']`` and returns all the files as a single string, so that they can be used all at once in a command. This primary type relies on :func:`primary_plain` to collect all the files or values. Parameters ---------- target_dir : string directory in which the files must be collected key_val : dictionary configuration for the key handle Returns ------- files : list of one element space separated list of file names or return values. ''' files = primary_plain(target_dir, key_val) return [' '.join(files), ]
# secondary types
[docs]def keyword_plain(_, key_val): """Returns the value contained in the keyword Parameters ---------- primary : string ignored key_val : dictionary configuration for the key handle Returns ------- string value to associate to the keyword """ return key_val['value']
[docs]def keyword_regex(primary, key_val): """Extract a string from the primary using regular expression substitution. If ``do_split`` is False (default ``True``), do not split the primary on white spaces and use only the first entry. After performing the substitution, it checks that the expected number of substitutions is performed; the number is given by the option ``n_subs`` (default 1), with the following meaning: * negative: no check performed * positive integer: exactly ``n_subs`` must be performed * list of integers: the number of substitutions must be in ``n_subs`` * string: interpreted a ``[start]:[stop][:step]`` or ``[start],[stop][,step]`` and is used to initialise :class:`vdat.command_interpreter.utils.SliceLike`; the number of substitutions must be in ``n_subs``, as defined by the above class. Parameters ---------- primary : string primary file name(s) key_val : dictionary configuration for the key handle Returns ------- string string built from the primary file name Raises ------ CIKeywordError if the number of substitutions is not the expected or the value of the ``n_subs`` key is not correct """ # get only one file name if key_val.get('do_split', True): primary = shlex.split(primary)[0] value, n_subs = re.subn(key_val['match'], key_val['replace'], primary) exp_n_subs = key_val.get('n_subs', len(shlex.split(primary))) do_check = False if isinstance(exp_n_subs, int) and exp_n_subs >= 0: exp_n_subs = [exp_n_subs, ] do_check = True if isinstance(exp_n_subs, int) and exp_n_subs < 0: pass elif isinstance(exp_n_subs, list): do_check = True elif isinstance(exp_n_subs, six.string_types): exp_n_subs = utils.SliceLike(exp_n_subs) do_check = True else: raise exceptions.CIKeywordError('The value of the option ``n_subs:' ' {}`` is not a valid' ' type'.format(exp_n_subs)) if do_check and n_subs not in exp_n_subs: msg = ('The substitution of "{}" with "{}" in "{}" happened "{}" times' ' instead of the expected "{}" times') raise exceptions.CIKeywordError(msg.format(key_val['match'], key_val['replace'], primary, n_subs, exp_n_subs)) return value
[docs]def keyword_header(primary, key_val): """Extract and parse an fits header keyword from the first file. Extract the ``value`` keyword from the header. If ``formatter`` is not given, cast the value to a string, otherwise use convert it to a string using the give formatter; e.g. ``"{0:03d}"`` assumes that the value is an integer and converts it into a zero padded-three digits string; see `format string syntax <https://docs.python.org/3/library/string.html#format-string-syntax>`_ If ``extract`` is in the configuration, it instruct how to build a variable out of the extracted header value using the machinery from :func:`keyword_regex`. If ``do_split`` keyword is given and is ``False``, the ``value`` is extracted from the header of every file, converted to a string and all the values are concatenated with white spaces. Parameters ---------- primary : string primary file name(s) key_val : dictionary configuration for the key handle Returns ------- string value to associate to the keyword """ # get only one file name primary = shlex.split(primary) if key_val.get('do_split', True): primary = primary[:1] formatter = key_val.get('formatter', '{}') head_key = [formatter.format(fits.getval(fn, key_val["value"], memmap=False)) for fn in primary] if "extract" in key_val: keys = copy.deepcopy(key_val) keys['match'] = key_val['extract'][0] keys['replace'] = key_val['extract'][1] keys['do_split'] = False keys['n_subs'] = 1 head_key = [keyword_regex(hk, keys) for hk in head_key] head_key = " ".join([hk for hk in head_key]) return head_key
[docs]def keyword_format(primary, key_val): """Create a new string formatting ``value`` according to the provided ``keys``. The keys are substituted using `format string syntax <https://docs.python.org/3/library/string.html#format-string-syntax>`_. The value of ``keys`` is a map between values to substitute in ``value`` and keyword types used to extract them from the primary file name. Strings are interpreted as of type ``plain``. Parameters ---------- primary : string primary file name(s) key_val : dictionary configuration for the key handle Returns ------- string value to associate to the keyword """ # get only one file name primary = shlex.split(primary) if key_val.get('do_split', True): primary = primary[:1] types = KeywordTypes() values = [] for p in primary: # get the keys keys = {} for k, v in six.iteritems(key_val['keys']): v = value_to_dict(v) if v['type'] == 'format': msg = "'format' type is not valid for keywords in a 'format'." raise exceptions.CIKeywordTypeError(msg) try: keys[k] = types[v['type']](p, v) except KeyError as e: msg = ("The keywords ({}) is unknown." " Edit the command or the configuration to" " sync the keys.".format(", ".join(k))) six.raise_from(exceptions.CIKeywordTypeError(msg), e) try: values.append(key_val['value'].format(**keys)) except KeyError as e: msg = ("Failed to create format the string {} because {}" " is missing from the list of keys".format(key_val['value'], e)) six.raise_from(exceptions.CIKeywordError(msg), e) return ' '.join(values)
[docs]def keyword_fplane_map(primary, key_val): """Create a new ID from the one extracted from ``primary`` using the fplane file for the mapping. For informations about the fplane file parser and the internals, specifically the type of the IDs, see :class:`pyhetdex.het.fplane.FPlane` and :class:`pyhetdex.het.fplane.IFU`. .. warning:: currently the ``fplane_file`` value is a string. If we need more flexibility, we can very easily modify it to act like the ``in_id`` key. Parameters ---------- primary : string primary file name(s) key_val : dictionary configuration for the key handle Returns ------- string value to associate to the keyword """ # get only one file name primary = shlex.split(primary) if key_val.get('do_split', True): primary = primary[:1] # get the fplane file. fp = fplane.FPlane(key_val['fplane_file']) # get the formatter formatter = key_val.get('formatter', '{}') types = KeywordTypes() values = [] for p in primary: # get the ID from the primary in_id_dict = value_to_dict(key_val['in_id']) try: in_id_func = types[in_id_dict['type']] except KeyError as e: msg = ("The keywords ({}) is unknown." " Edit the command or the configuration to" " sync the keys.".format(", ".join(in_id_dict['type']))) six.raise_from(exceptions.CIKeywordTypeError(msg), e) in_id = in_id_func(p, in_id_dict) # get the IFU for the given ID in_id_type = key_val['in_id_type'] try: ifu = fp.by_id(in_id, in_id_type) except fplane.UnknownIDTypeError as e: msg = ('The fplane parser cannot handle the "{}"' ' ID type'.format(in_id_type)) six.raise_from(exceptions.CIKeywordError(msg), e) # get the corresponding ID to return out_id_type = key_val['out_id_type'] try: out_id = getattr(ifu, out_id_type) except AttributeError as e: msg = ('The IFUs do not have the "{}"' ' ID type'.format(out_id_type)) six.raise_from(exceptions.CIKeywordError(msg), e) # for safety convert the ID to string, optionally using the formatter out_id = formatter.format(out_id) values.append(out_id) return ' '.join(values)
# execute types
[docs]def execute_new_file(primary, config): """Given the instructions, the ``new_file`` type constructs, for each primary entry, a string and check if it exists on the file system as a file. If the string is a new file, returns ``True``. The instruction on how to build the string are encoded in the mandatory key ``value``, whose value can be any of the available keyword types. If the path to the file cannot be easily extracted from the primary itself, it is possible to build it using the ``path`` optional key. If ``path`` is present, the value of ``path`` and the basename from ``value`` are joined. ``path`` can be either one of the available keyword types or a ``$identifier``, where ``identifier`` is an other key in the command configuration (**not** the ``execute`` configuration). Parameters ---------- primary : string the value of one of the items returned by :func:`.primary_template` config : dictionary configuration for the command (not for the type) Returns ------- bool ``True``: if the output of the keyword handling does not exist """ # extract the configuration for the type type_conf = config['execute'] types = KeywordTypes() # get the content of value and get collect the output try: value = value_to_dict(type_conf['value']) except KeyError as e: msg = "The keyword 'value' is mandatory in the 'new_file' type" six.raise_from(exceptions.CIKeywordError(msg), e) try: ofile = types[value['type']](primary, value) except KeyError as e: msg = ("The keywords ({}) is unknown." " Edit the command or the configuration to" " sync the keys.".format(e.args[0])) six.raise_from(exceptions.CIKeywordError(msg), e) if 'path' in type_conf: path = value_to_dict(type_conf['path']) if path['type'] == 'plain' and path['value'].startswith('$'): path = value_to_dict(config[path['value'].replace('$', '', 1)]) try: opath = types[path['type']](primary, path) except Exception as e: msg = ("new_file: failed to handle the ``path`` because of {}") six.raise_from(exceptions.CIKeywordError(msg.format(e)), e) ofile = os.path.join(opath, os.path.basename(ofile)) return not os.path.isfile(ofile)
# helper functions for the types
[docs]def value_to_dict(value): """ If it's a string, convert it to a dictionary with two entries: * type: ``plain`` * value: ``value`` And also check that the ``type`` entry is in value Parameters ---------- value : string or dict value to check Returns ------- value : dictionary dictionary defining the type Raises ------ CIKeywordError if ``value`` is not a dictionary or a string """ if isinstance(value, six.string_types): value = {'type': 'plain', 'value': value} try: value['type'] except Exception as e: msg = ("The value '{}' is either of the wrong type or doesn't have" " the required ``type`` key") six.raise_from(exceptions.CIKeywordError(msg.format(value)), e) return value