# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""This module implements the core of the command interpreter and any part
essential for running it
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import copy
from distutils.spawn import find_executable
import logging
import shlex
from string import Template
import subprocess as sp
import textwrap
import traceback
import pyhetdex.tools.processes as proc
import six
from . import exceptions
from . import types as citypes
from .signals import get_signal, get_signal_names
__all__ = ["CommandInterpreter"]
WORKER_NAME = 'command_interpreter'
[docs]class CommandInterpreter(object):
"""Interpret and execute the command.
See :ref:`interpreter` section in the documentation for more details
All the custom errors are defined in
:mod:`vdat.command_interpreter.exceptions`. The ones raised in the
constructor are derived from from :exc:`~.exceptions.CIValidationError`,
Parameters
----------
command : string
command to parse
command_config : dict
dictionary containing the instructions to execute the ``command``. A
deep copy is executed
selected : list-like, optional
None or a list of the selected items; if ``None`` no filtering of the
primary files is done; otherwise must be an object supporting the
membership test operator ``in``.
multiprocessing : bool
run the command using multiprocessing
processes : int
number of processors to use
Raises
------
CINoExeError
if the executable does not exists
CIParseError
if there is some error when extracting the keywords
CIKeywordError
for missing keywords or for keywords of wrong type
CIKeywordTypeError
if the type of the keywords is not among the known ones
"""
# the public interface
def __init__(self, command, command_config, selected=None,
multiprocessing=False, processes=None):
# create the type objects
super(CommandInterpreter, self).__init__()
self.primary_types = citypes.PrimaryTypes()
self.keyword_types = citypes.KeywordTypes()
self.execute_types = citypes.ExecuteTypes()
# connect the default signals
self.make_signals()
self.command = self.original_command = command
self.exe = self.original_exe = command.split()[0]
self.config = copy.deepcopy(command_config)
self.selected = selected
self.multiprocessing = multiprocessing
self.processes = processes
# check the executable
self._replace_alias()
self._check_exe()
# extract the keys
self._keys = self._get_keys(self.command)
# check that all the required keywords are in the configuration and
# that all the mandatory ones exists
self._validate_mandatory()
self._validate_keywords()
# get the correct primary out of the list of primaries
self._replace_primary()
# get the functions to deal with the keywords
self._primary_func, self._keyword_map = self._key_types(self._keys)
# check if the selection needs to happen and that the ``selected`` type
# is acceptable
self.filter_func = self._filter_selected()
# get the function to use to decide whether to run on step or not
self.execute = self._execute()
# create the template
self.template = Template(self.command)
[docs] def make_signals(self):
'''Get the signals from the :mod:`~vdat.command_interpreter.signals`
and save them in attributes with the same names. Reimplement this
method to use custom signals. Refers to the
:mod:`~vdat.command_interpreter.signals` documentation for the list and
names of signals to implement
'''
for name in get_signal_names():
setattr(self, name, get_signal(name))
[docs] def run(self):
"""Collect the files, expand and run the required command
All the custom errors raised here derive from
:exc:`~.exceptions.CIRunError`.
Raises
------
CICommandFmtError
if the substitution of the command doesn't work
"""
target_dir = self._get_value_as_dict('target_dir')['value']
self.global_logger.emit(logging.INFO, "Starting the execution of"
" command <b>{}</b> on directory"
" <b>{}</b>".format(self.original_command,
target_dir))
primaries = self._primary_func(target_dir,
self.config[self.config['primary']])
# filter the primaries according to the ``selected`` list
primaries = list(filter(self.filter_func, primaries))
n_tot = len(primaries)
if n_tot == 0:
msg = "No primary file collected"
self.global_logger.emit(logging.WARNING, msg)
self.command_done.emit(True)
return
self.global_logger.emit(logging.DEBUG,
"Found {} primaries".format(n_tot))
self.n_primaries.emit(n_tot)
worker = proc.get_worker(name=WORKER_NAME,
result_class=proc.DeferredResult,
multiprocessing=self.multiprocessing,
processes=self.processes)
n_exists = 0
primaries_jobs = []
try:
for fn in primaries:
try:
do_execute = self.execute(fn)
except Exception as e:
msg = ('Failed to run function to decide whether'
' to execute the next step of the reduction because'
' of {}'.format(e))
self.global_logger.emit(logging.WARNING, msg)
do_execute = True
if do_execute:
job = worker(run_command, self.template, self.config, fn,
self._keyword_map)
primaries_jobs.append([fn, job])
else:
n_exists += 1
ndone, nerror, _ = worker.jobs_stat()
self.progress.emit(n_tot, ndone + n_exists, n_exists, nerror)
failures = 0
for index, (primary, job) in enumerate(primaries_jobs):
command = self.command
info = warning = error = critical = ''
try:
# command, stdout, stderr, returncode
command, info, warning, returncode = job.get()
self.command_string.emit(index, command)
if returncode != 0:
failures += 1
error = "Command '{}' failed with return code '{}'"
error = error.format(command, returncode)
except Exception as e:
critical = traceback.format_exc()
self.command_logger.emit(primary, command, info, warning,
error, critical, self.config)
self.command_done.emit(False)
ndone, nerror, _ = worker.jobs_stat()
self.progress.emit(n_tot, ndone + n_exists, n_exists,
nerror + failures)
worker.clear_jobs()
finally:
worker.terminate()
proc.remove_worker(name=WORKER_NAME)
msg = textwrap.dedent("""
<b> {} </b> run {} times with {} failures.
Skipped {} times, because the output file(s) already existed.
Logs are under {}.""").format(self.original_exe, ndone, nerror +
failures, n_exists, self.exe)
if (nerror + failures) == 0:
level = logging.INFO
elif (nerror + failures) == ndone:
level = logging.ERROR
else:
level = logging.WARNING
self.global_logger.emit(level, msg)
self.command_done.emit(True)
# init helpers
[docs] def _replace_alias(self):
"""If the command configuration has the ``is_alias_of`` replace the
executable name
"""
try:
alias_name = self.config['is_alias_of']
self.command = self.command.replace(self.exe, alias_name, 1)
self.exe = alias_name
except KeyError:
"""No alias"""
pass
[docs] def _check_exe(self):
"""Check that the executable can be found and replace it with the full
path returned by :func:`distutils.spawn.find_executable`"""
exe = find_executable(self.exe)
if exe:
# if the command exists, replace it in the command string
self.command.replace(self.exe, exe, 1)
else:
raise exceptions.CINoExeError(self.exe)
[docs] def _get_keys(self, command):
"""Extract the keywords from the command"""
keyGroups = Template.pattern.findall(command)
keys = []
for k in keyGroups:
kjoin = ''.join(k).strip()
if not kjoin:
msg = ("In the command there is an isolated '$'" " that is not"
" allowed by the keyword" " expansion. Use '$$' as an"
" escape")
raise exceptions.CIParseError(msg)
elif kjoin != '$':
keys.append(kjoin)
return keys
[docs] def _validate_mandatory(self):
"""Check that all the mandatory keywords are provided.
If ``mandatory`` is not found, return without doing anything
"""
self._validate_primary()
if 'mandatory' not in self.config:
return
# if the mandatory keys are not a subset of the given keys, complain
missing_mandatory = set(self.config['mandatory']) - set(self._keys)
if missing_mandatory:
msg = ("The mandatory key(s) '{}' is(are) missing from the"
" provided command".format(', '.join(missing_mandatory)))
raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_primary(self):
'''Check that the primary key is present and that, if it has more that
one value, only one primary key is present among the command keys'''
if 'primary' not in self.config:
msg = ("The mandatory keyword 'primary' is not in the"
" configuration. Please provide it."
" If you think that there are good reasons not to have the"
" 'primary' keyword as mandatory, please contact the"
" developers with your case.")
raise exceptions.CIKeywordValidationError(msg)
primaries = self.config['primary']
if isinstance(primaries, six.string_types):
primaries = [primaries, ]
primary_in_config = all(p in self.config for p in primaries)
if not primary_in_config:
msg = ("Al least one of the keywords '{}' pointed to by 'primary'"
" are not in the configuration. Please provide it.")
msg = msg.format(primaries)
raise exceptions.CIKeywordValidationError(msg)
n_primary_in_keys = sum(p in self._keys for p in primaries)
if n_primary_in_keys != 1:
msg = ("The number of the primary keywords '{}' in the input"
" command '{}' must be 1, not {}")
msg = msg.format(primaries, self.original_command,
n_primary_in_keys)
raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_keywords(self):
"""Check that all the requested keywords are in the configuration"""
missing_keys = set(self._keys) - set(self.config.keys())
if missing_keys:
msg = ("One or more of the keywords ({}) is unknown."
" Edit the command or the configuration to"
" sync the keys.".format(", ".join(missing_keys)))
raise exceptions.CIKeywordValidationError(msg)
[docs] def _replace_primary(self):
'''If the value of ``primary`` is a string, does nothing. Otherwise
find which of the primaries is used in the command and replace
``self.config['primary'] with it.
Since :attr:`_validate_primary` already checks the primaries, this
function can assume that the primary is used only once in the command.
'''
primaries = self.config['primary']
if not isinstance(primaries, six.string_types):
for p in primaries:
if p in self._keys:
self.config['primary'] = p
break
[docs] def _key_types(self, keys):
"""Scan the keys and check that the interpreter knows how to deal with
them.
Parameters
----------
keys : list of strings
keys extracted from the command
Returns
-------
primary_func : callable
function to call to get the list of primary files
keyword_map : dictionary
map non-primary keywords to the function used to handle them
Raises
------
CIKeywordTypeError
if the type of the keyword is not known
"""
# get the function to use to get the primary key
primary_key = self.config['primary']
primary_value = self._get_value_as_dict(primary_key)
try:
primary_func = self.primary_types[primary_value['type']]
except KeyError as e:
msg = ("Unknown primary type '{}'; either edit the configuration"
" file or create the function to handle it. Consult the"
" documentation for more info.")
msg = msg.format(primary_value['type'])
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
# loop through all the keywords, excluding the primary, and map the
# type with the function to handle it
keyword_map = {}
for k in keys:
if k == primary_key: # skip the primary key
continue
value = self._get_value_as_dict(k)
try:
keyword_map[k] = self.keyword_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], k)
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
return primary_func, keyword_map
[docs] def _get_value_as_dict(self, key):
"""Get the value of ``key`` from the configuration.
If it's a string, convert it to a dictionary with two entries:
* type: ``plain``
* value: ``value``
and re-add it in the configuration dictionary
Parameters
----------
key : string
key to get
Returns
-------
value : dictionary
dictionary defining the type
Raises
------
CIKeywordError
if ``value`` is not a dictionary or a string
"""
value = self.config[key]
try:
value = citypes.value_to_dict(value)
self.config[key] = value
except exceptions.CIKeywordError as e:
msg = ("The value associated with the keyword '{}' is either of"
" the wrong type or doesn't have the required ``type`` key")
six.raise_from(exceptions.CIKeywordError(msg.format(key)), e)
return value
[docs] def _filter_selected(self):
"""Look for the existence of the ``filter_selected`` option and check
that it is of the correct type and that ``selected`` is of the correct
type
Returns
-------
filter_func : function
function that accepts one string (one element of the primary list)
and returns ``True`` or ``False`` if that element is valid or not
"""
if self.selected is None or 'filter_selected' not in self.config:
return self._true
# otherwise check that ``selected`` type is corrected
try:
'' in self.selected
except TypeError:
msg = ("The type of ``selected`` is not correct. It must support"
" the ``in`` statement")
raise exceptions.CIValidationError(msg)
# get the function to deal with ``filter_selected``
value = self._get_value_as_dict('filter_selected')
try:
filter_ = self.keyword_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], 'filter_selected')
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
def filter_func(primary):
return filter_(primary, value) in self.selected
return filter_func
[docs] def _execute(self):
"""Look for the existence of the ``execute`` option in the
configuration and check that it is of the correct type.
Returns
-------
execute_func : function
function that accepts one string (one element of the primary list)
and returns ``True`` or ``False`` if that element must be run or
not
"""
if 'execute' not in self.config:
return self._true
# get the function to deal with ``execute``
value = self._get_value_as_dict('execute')
try:
execute_ = self.execute_types[value['type']]
except KeyError as e:
msg = ("Unknown type '{}' for keyword '{}'; either edit the"
" configuration file or create the function to handle"
" it. Consult the documentation for more info.")
msg = msg.format(value['type'], 'execute')
six.raise_from(exceptions.CIKeywordTypeError(msg), e)
def execute_func(primary):
return execute_(primary, self.config)
return execute_func
# generic helpers
[docs] def _true(self, *_, **__):
"""returns always true"""
return True
def run_command(command_template, command_config, primary, keyword_map):
"""Job that create the keywords, create and run the command string.
Parameters
----------
command_template : :class:`string.Template`
template of the command
command_config : dict
dictionary containing the instructions to build command from the
``command_template`` and execute it.
primary : string
primary file(s) to pass to the functions handling the keywords
keyword_map : dict
key: keys to substitute in the command to execute
value: function implementing the keyword
Returns
-------
command : string
command executed
stdout, stderr : string
stdout and stderr from the command execution
returncode : int
success/error code returned by the executed command
"""
template_dict = {}
template_dict[command_config['primary']] = primary
for key, func in six.iteritems(keyword_map):
try:
template_dict[key] = func(primary, command_config[key])
except Exception as e:
msg = 'Failed to handle key {} because of {}'
msg = msg.format(key, str(e))
six.raise_from(exceptions.CIKeywordError(msg), e)
try:
command = command_template.substitute(**template_dict)
except KeyError as e:
msg = ("Failed to replace key '{}' in the string '{}' when"
" creating the command to execute")
msg = msg.format(e.args[0], command_template.template)
six.raise_from(exceptions.CICommandFmtError(msg), e)
try:
split_command = shlex.split(command)
except ValueError as e:
msg = "Unable to split the command '{}' because of '{}'"
msg = msg.format(command, e.args[0])
six.raise_from(exceptions.CICommandFmtError(msg), e)
try:
p = sp.Popen(split_command, stdout=sp.PIPE, stderr=sp.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate()
except Exception as e:
msg = "Command '{}' failed with the following error: {}"
msg = msg.format(command, str(e))
six.raise_from(exceptions.CISubprocessError(msg), e)
return command, stdout, stderr, p.returncode