# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Define enumerate-like classes that allows to map from keys to key types and
to the functions that needs to be called to deal with any of them.
It uses pkg_resources and entry points to make the framework extendible
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import abc
import copy
import functools
import itertools as it
import os
import re
import shlex
from astropy.io import fits
import pkg_resources
from pyhetdex.het import fplane
import pyhetdex.tools.files.file_tools as pyhft
import six
from vdat.command_interpreter import exceptions
from vdat.command_interpreter import utils
[docs]def _load_entrypoints(group):
"""Get all the entry points for the ``group`` and load them.
Parameters
----------
group : string
name of the group to load
Returns
-------
entry_points : dictionary
key: name; value: callable loaded from the entry point
"""
entry_points = {}
for ep in pkg_resources.iter_entry_points(group):
name = ep.name
func = ep.load()
entry_points[name] = func
return entry_points
[docs]@six.add_metaclass(abc.ABCMeta)
class _Types(object):
"""ABC class for the types.
If a type ``loop`` exists, it can be accessed as ``instance.loop`` or
``instance['loop']``
Attributes
----------
known_types : list of strings
entry_point_group : string
"""
def __init__(self):
self._map_types = {} # map a type with the function to call
self._map_types.update(_load_entrypoints(self.entry_point_group))
@utils.abstractproperty
def entry_point_group(self): # pragma: no cover
"""Abstract property with the name of the group to load"""
return None
def __contains__(self, item):
"""item in known_types"""
return item in self._map_types
def __getattr__(self, name):
"""Gets values from the internal types dictionary as class
attributes"""
msg = "'{}' object has no attribute '{}'"
msg = msg.format(self.__class__.__name__, name)
if name == '_map_types':
raise AttributeError(msg)
try:
return self._map_types[name]
except KeyError:
raise AttributeError(msg)
def __getitem__(self, name):
"""Gets values from the internal types dictionary as if the class is a
dictionary"""
return self._map_types[name]
@property
def known_types(self):
"""list of known types"""
return list(self._map_types.keys())
[docs]class PrimaryTypes(_Types):
"""Fill the type<-->function mapping using the ``vdat.cit.primary`` entry
point.
"""
@property
def entry_point_group(self):
return 'vdat.cit.primary'
[docs]class KeywordTypes(_Types):
"""Fill the type<-->function mapping using the ``vdat.cit.keyword`` entry
point.
"""
@property
def entry_point_group(self):
return 'vdat.cit.keyword'
[docs]class ExecuteTypes(_Types):
"""Fill the type<-->function mapping using the ``vdat.cit.execute`` entry
point.
"""
@property
def entry_point_group(self):
return 'vdat.cit.execute'
[docs]def primary_template(target_dir, key_val): # pragma: no cover
"""Template for a function that deals with a primary keyword.
It collects the files from the ``target_dir`` according to the instructions
in ``key_val``, if any and either ``yield`` a value or return an iterable.
Parameters
----------
target_dir : string
directory in which the files must be collected
key_val : dictionary
configuration for the key handle
Yields
------
yield a string or iterable of strings
Raises
------
CIPrimaryError
if something goes wrong when handling the primary key
"""
pass
[docs]def keyword_template(primary, key_val): # pragma: no cover
"""Template for a function that deals with a non-primary keyword.
A keyword has a value either statically stored in ``key_val`` or
its value need to be extracted from the value of the primary file(s).
Parameters
----------
primary : string
the value of one of the items returned by :func:`.primary_template`
key_val : dictionary
configuration for the key handle
Returns
-------
string
value to associate to the keyword
Raises
------
CIKeywordError
if something goes wrong when handling the key
"""
pass
[docs]def execute_template(primary, config): # pragma: no cover
"""For each of the primary entry, this function is called to decide
whether to execute or skip the command.
Parameters
----------
primary : string
the value of one of the items returned by :func:`.primary_template`
config : dictionary
configuration for the command
Returns
-------
bool
``True``: the command is executed; ``False``: the command is skipped
"""
pass
# implementation of the types
# primary types
[docs]def primary_plain(target_dir, key_val):
"""Get all the files in ``target_dir`` matching the string in
``key_val['value']``
Parameters
----------
target_dir : string
directory in which the files must be collected
key_val : dictionary
configuration for the key handle
Returns
-------
iterator
yields file names matching the value recursively
"""
is_regex = key_val.get('is_regex', False)
matches = os.path.join('.*' if is_regex else '*', key_val['value'])
files = pyhft.scan_files(target_dir, is_matches_regex=is_regex,
matches=matches, recursive=False)
if 'returns' in key_val:
return_val = key_val['returns']
return_func = KeywordTypes()[return_val['type']]
files = [return_func(f, return_val) for f in files]
return files
[docs]def primary_loop(target_dir, key_val):
"""Make a nested loop over the set of given keys, in each step of the loop
construct the value using python `format string syntax
<https://docs.python.org/3/library/string.html#format-string-syntax>`_
and then get all the files matching it.
If the key ``returns`` is found, the output string is manipulated according
to the instruction in the value of ``returns``. The type of returns can be
any available keyword type.
If any of the steps doesn't produce any file, no value is yielded.
Parameters
----------
target_dir : string
directory in which the files must be collected
key_val : dictionary
configuration for the key handle
Yields
------
string of space separated file names
"""
# prepare the keys for the loop
keys, values = [], []
for k, v in six.iteritems(key_val['keys']):
if isinstance(v, six.string_types):
value = list(utils.SliceLike(v).range())
else:
value = v
keys.append(k)
values.append(value)
# check if ``returns`` exists and get the function implementing it
if 'returns' in key_val:
return_val = key_val['returns']
types = KeywordTypes()
return_func = functools.partial(utils.flip(types[return_val['type']]),
return_val)
else: # identity function
return_func = utils.id_
# prepare the path for the scanned files
is_regex = key_val.get('is_regex', False)
# loop the product of values and collect the files
for vals in it.product(*values):
# convert the values into a dictionary
dvals = dict(zip(keys, vals))
match = key_val['value'].format(**dvals)
match = os.path.join('.*' if is_regex else '*', match)
matches = pyhft.scan_files(target_dir, matches=match,
is_matches_regex=is_regex, recursive=False)
matches = ' '.join(matches)
if not matches:
continue
matches = return_func(matches)
yield matches
[docs]def primary_groupby(target_dir, key_val):
"""Loop over all the files matching the ``value`` entry. For each one,
create a list of file names replacing the regex in ``pattern`` with the
elements of ``replace``.
Parameters
----------
target_dir : string
directory in which the files must be collected
key_val : dictionary
configuration for the key handle
Yields
------
string of space separated file names
"""
keys = copy.deepcopy(key_val)
keys['match'] = re.compile(keys['match'])
# prepare the matches
is_regex = keys.get('is_regex', False)
matches = os.path.join('.*' if is_regex else '*', key_val['value'])
for fn in pyhft.scan_files(target_dir, matches=matches,
is_matches_regex=is_regex, recursive=False):
files = [fn]
for r in key_val['replace']:
keys['replace'] = r
files.append(keyword_regex(fn, keys))
yield ' '.join(files)
[docs]def primary_all_files(target_dir, key_val):
'''Get all the files in ``target_dir`` matching the string in
``key_val['value']`` and returns all the files as a single string, so that
they can be used all at once in a command.
This primary type relies on :func:`primary_plain` to collect all the files
or values.
Parameters
----------
target_dir : string
directory in which the files must be collected
key_val : dictionary
configuration for the key handle
Returns
-------
files : list of one element
space separated list of file names or return values.
'''
files = primary_plain(target_dir, key_val)
return [' '.join(files), ]
# secondary types
[docs]def keyword_plain(_, key_val):
"""Returns the value contained in the keyword
Parameters
----------
primary : string
ignored
key_val : dictionary
configuration for the key handle
Returns
-------
string
value to associate to the keyword
"""
return key_val['value']
[docs]def keyword_regex(primary, key_val):
"""Extract a string from the primary using regular expression substitution.
If ``do_split`` is False (default ``True``), do not split the primary on
white spaces and use only the first entry.
After performing the substitution, it checks that the expected number of
substitutions is performed; the number is given by the option ``n_subs``
(default 1), with the following meaning:
* negative: no check performed
* positive integer: exactly ``n_subs`` must be performed
* list of integers: the number of substitutions must be in ``n_subs``
* string: interpreted a ``[start]:[stop][:step]`` or
``[start],[stop][,step]`` and is used to initialise
:class:`vdat.command_interpreter.utils.SliceLike`; the number of
substitutions must be in ``n_subs``, as defined by the above class.
Parameters
----------
primary : string
primary file name(s)
key_val : dictionary
configuration for the key handle
Returns
-------
string
string built from the primary file name
Raises
------
CIKeywordError
if the number of substitutions is not the expected or the value of the
``n_subs`` key is not correct
"""
# get only one file name
if key_val.get('do_split', True):
primary = shlex.split(primary)[0]
value, n_subs = re.subn(key_val['match'], key_val['replace'], primary)
exp_n_subs = key_val.get('n_subs', len(shlex.split(primary)))
do_check = False
if isinstance(exp_n_subs, int) and exp_n_subs >= 0:
exp_n_subs = [exp_n_subs, ]
do_check = True
if isinstance(exp_n_subs, int) and exp_n_subs < 0:
pass
elif isinstance(exp_n_subs, list):
do_check = True
elif isinstance(exp_n_subs, six.string_types):
exp_n_subs = utils.SliceLike(exp_n_subs)
do_check = True
else:
raise exceptions.CIKeywordError('The value of the option ``n_subs:'
' {}`` is not a valid'
' type'.format(exp_n_subs))
if do_check and n_subs not in exp_n_subs:
msg = ('The substitution of "{}" with "{}" in "{}" happened "{}" times'
' instead of the expected "{}" times')
raise exceptions.CIKeywordError(msg.format(key_val['match'],
key_val['replace'], primary,
n_subs, exp_n_subs))
return value
[docs]def keyword_fplane_map(primary, key_val):
"""Create a new ID from the one extracted from ``primary`` using the fplane
file for the mapping.
For informations about the fplane file parser and the internals,
specifically the type of the IDs, see :class:`pyhetdex.het.fplane.FPlane`
and :class:`pyhetdex.het.fplane.IFU`.
.. warning::
currently the ``fplane_file`` value is a string. If we need more
flexibility, we can very easily modify it to act like the ``in_id``
key.
Parameters
----------
primary : string
primary file name(s)
key_val : dictionary
configuration for the key handle
Returns
-------
string
value to associate to the keyword
"""
# get only one file name
primary = shlex.split(primary)
if key_val.get('do_split', True):
primary = primary[:1]
# get the fplane file.
fp = fplane.FPlane(key_val['fplane_file'])
# get the formatter
formatter = key_val.get('formatter', '{}')
types = KeywordTypes()
values = []
for p in primary:
# get the ID from the primary
in_id_dict = value_to_dict(key_val['in_id'])
try:
in_id_func = types[in_id_dict['type']]
except KeyError as e:
msg = ("The keywords ({}) is unknown."
" Edit the command or the configuration to"
" sync the keys.".format(", ".join(in_id_dict['type'])))
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
in_id = in_id_func(p, in_id_dict)
# get the IFU for the given ID
in_id_type = key_val['in_id_type']
try:
ifu = fp.by_id(in_id, in_id_type)
except fplane.UnknownIDTypeError as e:
msg = ('The fplane parser cannot handle the "{}"'
' ID type'.format(in_id_type))
six.raise_from(exceptions.CIKeywordError(msg), e)
# get the corresponding ID to return
out_id_type = key_val['out_id_type']
try:
out_id = getattr(ifu, out_id_type)
except AttributeError as e:
msg = ('The IFUs do not have the "{}"'
' ID type'.format(out_id_type))
six.raise_from(exceptions.CIKeywordError(msg), e)
# for safety convert the ID to string, optionally using the formatter
out_id = formatter.format(out_id)
values.append(out_id)
return ' '.join(values)
# execute types
[docs]def execute_new_file(primary, config):
"""Given the instructions, the ``new_file`` type constructs, for each
primary entry, a string and check if it exists on the file system as a
file. If the string is a new file, returns ``True``.
The instruction on how to build the string are encoded in the mandatory key
``value``, whose value can be any of the available keyword types.
If the path to the file cannot be easily extracted from the primary itself,
it is possible to build it using the ``path`` optional key. If ``path`` is
present, the value of ``path`` and the basename from ``value`` are joined.
``path`` can be either one of the available keyword types or a
``$identifier``, where ``identifier`` is an other key in the command
configuration (**not** the ``execute`` configuration).
Parameters
----------
primary : string
the value of one of the items returned by :func:`.primary_template`
config : dictionary
configuration for the command (not for the type)
Returns
-------
bool
``True``: if the output of the keyword handling does not exist
"""
# extract the configuration for the type
type_conf = config['execute']
types = KeywordTypes()
# get the content of value and get collect the output
try:
value = value_to_dict(type_conf['value'])
except KeyError as e:
msg = "The keyword 'value' is mandatory in the 'new_file' type"
six.raise_from(exceptions.CIKeywordError(msg), e)
try:
ofile = types[value['type']](primary, value)
except KeyError as e:
msg = ("The keywords ({}) is unknown."
" Edit the command or the configuration to"
" sync the keys.".format(e.args[0]))
six.raise_from(exceptions.CIKeywordError(msg), e)
if 'path' in type_conf:
path = value_to_dict(type_conf['path'])
if path['type'] == 'plain' and path['value'].startswith('$'):
path = value_to_dict(config[path['value'].replace('$', '', 1)])
try:
opath = types[path['type']](primary, path)
except Exception as e:
msg = ("new_file: failed to handle the ``path`` because of {}")
six.raise_from(exceptions.CIKeywordError(msg.format(e)), e)
ofile = os.path.join(opath, os.path.basename(ofile))
return not os.path.isfile(ofile)
# helper functions for the types
[docs]def value_to_dict(value):
"""
If it's a string, convert it to a dictionary with two entries:
* type: ``plain``
* value: ``value``
And also check that the ``type`` entry is in value
Parameters
----------
value : string or dict
value to check
Returns
-------
value : dictionary
dictionary defining the type
Raises
------
CIKeywordError
if ``value`` is not a dictionary or a string
"""
if isinstance(value, six.string_types):
value = {'type': 'plain', 'value': value}
try:
value['type']
except Exception as e:
msg = ("The value '{}' is either of the wrong type or doesn't have"
" the required ``type`` key")
six.raise_from(exceptions.CIKeywordError(msg.format(value)), e)
return value