Source code for vdat.command_interpreter.core

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""This module implements the core of the command interpreter and any part
essential for running it
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import copy
from distutils.spawn import find_executable
import logging
import shlex
from string import Template
import subprocess as sp
import textwrap
import traceback

import pyhetdex.tools.processes as proc
import six

from . import exceptions
from . import types as citypes
from .signals import get_signal, get_signal_names

__all__ = ["CommandInterpreter"]

WORKER_NAME = 'command_interpreter'


[docs]class CommandInterpreter(object): """Interpret and execute the command. See :ref:`interpreter` section in the documentation for more details All the custom errors are defined in :mod:`vdat.command_interpreter.exceptions`. The ones raised in the constructor are derived from from :exc:`~.exceptions.CIValidationError`, Parameters ---------- command : string command to parse command_config : dict dictionary containing the instructions to execute the ``command``. A deep copy is executed selected : list-like, optional None or a list of the selected items; if ``None`` no filtering of the primary files is done; otherwise must be an object supporting the membership test operator ``in``. multiprocessing : bool run the command using multiprocessing processes : int number of processors to use Raises ------ CINoExeError if the executable does not exists CIParseError if there is some error when extracting the keywords CIKeywordError for missing keywords or for keywords of wrong type CIKeywordTypeError if the type of the keywords is not among the known ones """ # the public interface def __init__(self, command, command_config, selected=None, multiprocessing=False, processes=None): # create the type objects super(CommandInterpreter, self).__init__() self.primary_types = citypes.PrimaryTypes() self.keyword_types = citypes.KeywordTypes() self.execute_types = citypes.ExecuteTypes() # connect the default signals self.make_signals() self.command = self.original_command = command self.exe = self.original_exe = command.split()[0] self.config = copy.deepcopy(command_config) self.selected = selected self.multiprocessing = multiprocessing self.processes = processes # check the executable self._replace_alias() self._check_exe() # extract the keys self._keys = self._get_keys(self.command) # check that all the required keywords are in the configuration and # that all the mandatory ones exists self._validate_mandatory() self._validate_keywords() # get the correct primary out of the list of primaries self._replace_primary() # get the functions to deal with the keywords self._primary_func, self._keyword_map = self._key_types(self._keys) # check if the selection needs to happen and that the ``selected`` type # is acceptable self.filter_func = self._filter_selected() # get the function to use to decide whether to run on step or not self.execute = self._execute() # create the template self.template = Template(self.command)
[docs] def make_signals(self): '''Get the signals from the :mod:`~vdat.command_interpreter.signals` and save them in attributes with the same names. Reimplement this method to use custom signals. Refers to the :mod:`~vdat.command_interpreter.signals` documentation for the list and names of signals to implement ''' for name in get_signal_names(): setattr(self, name, get_signal(name))
[docs] def run(self): """Collect the files, expand and run the required command All the custom errors raised here derive from :exc:`~.exceptions.CIRunError`. Raises ------ CICommandFmtError if the substitution of the command doesn't work """ target_dir = self._get_value_as_dict('target_dir')['value'] self.global_logger.emit(logging.INFO, "Starting the execution of" " command <b>{}</b> on directory" " <b>{}</b>".format(self.original_command, target_dir)) primaries = self._primary_func(target_dir, self.config[self.config['primary']]) # filter the primaries according to the ``selected`` list primaries = list(filter(self.filter_func, primaries)) n_tot = len(primaries) if n_tot == 0: msg = "No primary file collected" self.global_logger.emit(logging.WARNING, msg) self.command_done.emit(True) return self.global_logger.emit(logging.DEBUG, "Found {} primaries".format(n_tot)) self.n_primaries.emit(n_tot) worker = proc.get_worker(name=WORKER_NAME, result_class=proc.DeferredResult, multiprocessing=self.multiprocessing, processes=self.processes) n_exists = 0 primaries_jobs = [] try: for fn in primaries: try: do_execute = self.execute(fn) except Exception as e: msg = ('Failed to run function to decide whether' ' to execute the next step of the reduction because' ' of {}'.format(e)) self.global_logger.emit(logging.WARNING, msg) do_execute = True if do_execute: job = worker(run_command, self.template, self.config, fn, self._keyword_map) primaries_jobs.append([fn, job]) else: n_exists += 1 ndone, nerror, _ = worker.jobs_stat() self.progress.emit(n_tot, ndone + n_exists, n_exists, nerror) failures = 0 for index, (primary, job) in enumerate(primaries_jobs): command = self.command info = warning = error = critical = '' try: # command, stdout, stderr, returncode command, info, warning, returncode = job.get() self.command_string.emit(index, command) if returncode != 0: failures += 1 error = "Command '{}' failed with return code '{}'" error = error.format(command, returncode) except Exception as e: critical = traceback.format_exc() self.command_logger.emit(primary, command, info, warning, error, critical, self.config) self.command_done.emit(False) ndone, nerror, _ = worker.jobs_stat() self.progress.emit(n_tot, ndone + n_exists, n_exists, nerror + failures) worker.clear_jobs() finally: worker.terminate() proc.remove_worker(name=WORKER_NAME) msg = textwrap.dedent(""" <b> {} </b> run {} times with {} failures. Skipped {} times, because the output file(s) already existed. Logs are under {}.""").format(self.original_exe, ndone, nerror + failures, n_exists, self.exe) if (nerror + failures) == 0: level = logging.INFO elif (nerror + failures) == ndone: level = logging.ERROR else: level = logging.WARNING self.global_logger.emit(level, msg) self.command_done.emit(True)
# init helpers
[docs] def _replace_alias(self): """If the command configuration has the ``is_alias_of`` replace the executable name """ try: alias_name = self.config['is_alias_of'] self.command = self.command.replace(self.exe, alias_name, 1) self.exe = alias_name except KeyError: """No alias""" pass
[docs] def _check_exe(self): """Check that the executable can be found and replace it with the full path returned by :func:`distutils.spawn.find_executable`""" exe = find_executable(self.exe) if exe: # if the command exists, replace it in the command string self.command.replace(self.exe, exe, 1) else: raise exceptions.CINoExeError(self.exe)
[docs] def _get_keys(self, command): """Extract the keywords from the command""" keyGroups = Template.pattern.findall(command) keys = [] for k in keyGroups: kjoin = ''.join(k).strip() if not kjoin: msg = ("In the command there is an isolated '$'" " that is not" " allowed by the keyword" " expansion. Use '$$' as an" " escape") raise exceptions.CIParseError(msg) elif kjoin != '$': keys.append(kjoin) return keys
[docs] def _validate_mandatory(self): """Check that all the mandatory keywords are provided. If ``mandatory`` is not found, return without doing anything """ self._validate_primary() if 'mandatory' not in self.config: return # if the mandatory keys are not a subset of the given keys, complain missing_mandatory = set(self.config['mandatory']) - set(self._keys) if missing_mandatory: msg = ("The mandatory key(s) '{}' is(are) missing from the" " provided command".format(', '.join(missing_mandatory))) raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_primary(self): '''Check that the primary key is present and that, if it has more that one value, only one primary key is present among the command keys''' if 'primary' not in self.config: msg = ("The mandatory keyword 'primary' is not in the" " configuration. Please provide it." " If you think that there are good reasons not to have the" " 'primary' keyword as mandatory, please contact the" " developers with your case.") raise exceptions.CIKeywordValidationError(msg) primaries = self.config['primary'] if isinstance(primaries, six.string_types): primaries = [primaries, ] primary_in_config = all(p in self.config for p in primaries) if not primary_in_config: msg = ("Al least one of the keywords '{}' pointed to by 'primary'" " are not in the configuration. Please provide it.") msg = msg.format(primaries) raise exceptions.CIKeywordValidationError(msg) n_primary_in_keys = sum(p in self._keys for p in primaries) if n_primary_in_keys != 1: msg = ("The number of the primary keywords '{}' in the input" " command '{}' must be 1, not {}") msg = msg.format(primaries, self.original_command, n_primary_in_keys) raise exceptions.CIKeywordValidationError(msg)
[docs] def _validate_keywords(self): """Check that all the requested keywords are in the configuration""" missing_keys = set(self._keys) - set(self.config.keys()) if missing_keys: msg = ("One or more of the keywords ({}) is unknown." " Edit the command or the configuration to" " sync the keys.".format(", ".join(missing_keys))) raise exceptions.CIKeywordValidationError(msg)
[docs] def _replace_primary(self): '''If the value of ``primary`` is a string, does nothing. Otherwise find which of the primaries is used in the command and replace ``self.config['primary'] with it. Since :attr:`_validate_primary` already checks the primaries, this function can assume that the primary is used only once in the command. ''' primaries = self.config['primary'] if not isinstance(primaries, six.string_types): for p in primaries: if p in self._keys: self.config['primary'] = p break
[docs] def _key_types(self, keys): """Scan the keys and check that the interpreter knows how to deal with them. Parameters ---------- keys : list of strings keys extracted from the command Returns ------- primary_func : callable function to call to get the list of primary files keyword_map : dictionary map non-primary keywords to the function used to handle them Raises ------ CIKeywordTypeError if the type of the keyword is not known """ # get the function to use to get the primary key primary_key = self.config['primary'] primary_value = self._get_value_as_dict(primary_key) try: primary_func = self.primary_types[primary_value['type']] except KeyError as e: msg = ("Unknown primary type '{}'; either edit the configuration" " file or create the function to handle it. Consult the" " documentation for more info.") msg = msg.format(primary_value['type']) six.raise_from(exceptions.CIKeywordTypeError(msg), e) # loop through all the keywords, excluding the primary, and map the # type with the function to handle it keyword_map = {} for k in keys: if k == primary_key: # skip the primary key continue value = self._get_value_as_dict(k) try: keyword_map[k] = self.keyword_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], k) six.raise_from(exceptions.CIKeywordTypeError(msg), e) return primary_func, keyword_map
[docs] def _get_value_as_dict(self, key): """Get the value of ``key`` from the configuration. If it's a string, convert it to a dictionary with two entries: * type: ``plain`` * value: ``value`` and re-add it in the configuration dictionary Parameters ---------- key : string key to get Returns ------- value : dictionary dictionary defining the type Raises ------ CIKeywordError if ``value`` is not a dictionary or a string """ value = self.config[key] try: value = citypes.value_to_dict(value) self.config[key] = value except exceptions.CIKeywordError as e: msg = ("The value associated with the keyword '{}' is either of" " the wrong type or doesn't have the required ``type`` key") six.raise_from(exceptions.CIKeywordError(msg.format(key)), e) return value
[docs] def _filter_selected(self): """Look for the existence of the ``filter_selected`` option and check that it is of the correct type and that ``selected`` is of the correct type Returns ------- filter_func : function function that accepts one string (one element of the primary list) and returns ``True`` or ``False`` if that element is valid or not """ if self.selected is None or 'filter_selected' not in self.config: return self._true # otherwise check that ``selected`` type is corrected try: '' in self.selected except TypeError: msg = ("The type of ``selected`` is not correct. It must support" " the ``in`` statement") raise exceptions.CIValidationError(msg) # get the function to deal with ``filter_selected`` value = self._get_value_as_dict('filter_selected') try: filter_ = self.keyword_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], 'filter_selected') six.raise_from(exceptions.CIKeywordTypeError(msg), e) def filter_func(primary): return filter_(primary, value) in self.selected return filter_func
[docs] def _execute(self): """Look for the existence of the ``execute`` option in the configuration and check that it is of the correct type. Returns ------- execute_func : function function that accepts one string (one element of the primary list) and returns ``True`` or ``False`` if that element must be run or not """ if 'execute' not in self.config: return self._true # get the function to deal with ``execute`` value = self._get_value_as_dict('execute') try: execute_ = self.execute_types[value['type']] except KeyError as e: msg = ("Unknown type '{}' for keyword '{}'; either edit the" " configuration file or create the function to handle" " it. Consult the documentation for more info.") msg = msg.format(value['type'], 'execute') six.raise_from(exceptions.CIKeywordTypeError(msg), e) def execute_func(primary): return execute_(primary, self.config) return execute_func
# generic helpers
[docs] def _true(self, *_, **__): """returns always true""" return True
def run_command(command_template, command_config, primary, keyword_map): """Job that create the keywords, create and run the command string. Parameters ---------- command_template : :class:`string.Template` template of the command command_config : dict dictionary containing the instructions to build command from the ``command_template`` and execute it. primary : string primary file(s) to pass to the functions handling the keywords keyword_map : dict key: keys to substitute in the command to execute value: function implementing the keyword Returns ------- command : string command executed stdout, stderr : string stdout and stderr from the command execution returncode : int success/error code returned by the executed command """ template_dict = {} template_dict[command_config['primary']] = primary for key, func in six.iteritems(keyword_map): try: template_dict[key] = func(primary, command_config[key]) except Exception as e: msg = 'Failed to handle key {} because of {}' msg = msg.format(key, str(e)) six.raise_from(exceptions.CIKeywordError(msg), e) try: command = command_template.substitute(**template_dict) except KeyError as e: msg = ("Failed to replace key '{}' in the string '{}' when" " creating the command to execute") msg = msg.format(e.args[0], command_template.template) six.raise_from(exceptions.CICommandFmtError(msg), e) try: split_command = shlex.split(command) except ValueError as e: msg = "Unable to split the command '{}' because of '{}'" msg = msg.format(command, e.args[0]) six.raise_from(exceptions.CICommandFmtError(msg), e) try: p = sp.Popen(split_command, stdout=sp.PIPE, stderr=sp.PIPE, universal_newlines=True) stdout, stderr = p.communicate() except Exception as e: msg = "Command '{}' failed with the following error: {}" msg = msg.format(command, str(e)) six.raise_from(exceptions.CISubprocessError(msg), e) return command, stdout, stderr, p.returncode