Source code for vdat.config.entry_point

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"Implementation of the ``vdat_config`` entry point"
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import argparse
from difflib import unified_diff
from distutils.version import StrictVersion
import os
import sys
import textwrap as tw
import traceback

from colorama import init, Fore, Style
import pyhetdex.tools.io_helpers as ioh
import pkg_resources
import six

import vdat
from . import versions

try:
    input = raw_input
except NameError:
    pass

# initialize colorama
init(autoreset=True)


# command line code to get the configuration files
[docs]def main(argv=None): """Main function of the implementation of the ``vdat_config`` executable Parameters ---------- argv : list command line, if None taken from ``sys.argv`` """ args = parse(argv=argv) args.func(args)
[docs]def parse(argv=None): """Create the parser and parse the command line arguments Parameters ---------- argv : list command line, if None taken from ``sys.argv`` Returns ------- Namespace parsed command line """ # shared options common_parser = argparse.ArgumentParser(add_help=False) common_parser.add_argument("--verbose", '-v', action="store_true", help="Increase verbosity") common_parser.add_argument('--version', '-V', action='version', version=vdat.__version__) # main parser parser = argparse.ArgumentParser(description="""Deal with the vdat configuration files""", formatter_class=argparse.ArgumentDefaultsHelpFormatter) subparser = parser.add_subparsers(title='Subcommands', dest='subparser_name', description="""Type '%(prog)s cmd -h' for detailed information about the subcommands""") # copy parser description = ("Copy the files '{}' into the given" " directory. If any of the files exists, it will ask the" " user if she/he wants to overwrite the files; allowed" " answers are 'y' and 'n'; EOF (Ctrl+D) is interpreted as" " no".format("', '".join(versions.COPY_FILES))) copy_p = subparser.add_parser('copy', description=description, help='Copy the configuration files', parents=[common_parser], formatter_class=argparse.ArgumentDefaultsHelpFormatter) copy_p.set_defaults(func=copy) copy_p.add_argument('-t', '--to-dir', default='.', help="Directory where to copy the files") copy_p.add_argument('-f', '--force', action='store_true', help="""Force overwriting existing configuration files. If this option is used, all local modifications will be lost""") copy_p.add_argument('-b', '--backup', action='store_true', help='''Backup existing files before copying the new ones. If this option is used, the `-f/--force` is ignored''') # compare parser description = ("Compare the configuration files shipped with VDAT with" " those in the current directory") compare_p = subparser.add_parser('compare', description=description, help='Compare the configuration files', parents=[common_parser], formatter_class=argparse.ArgumentDefaultsHelpFormatter) compare_p.set_defaults(func=compare) compare_p.add_argument('-D', '--dir', default='.', dest='dir_', metavar='DIR', help="Directory with the configuration files") compare_p.add_argument('-d', '--diff', action='store_true', help="""Show the difference between the files in VDAT and the ones already in use""") compare_p.add_argument('-l', '--load', action='store_true', help='''Try to load the files to check for errors and software missmatches.''') compare_p.add_argument('-a', '--all', action='store_true', help='''Run the diff and the loading on all the available files, instead of only '{}'. '''.format(', '.join(versions.IMPORTANT_FILES))) args = parser.parse_args(args=argv) if not args.subparser_name: parser.print_usage() print("{}: error: too few" " arguments".format(os.path.basename(sys.argv[0]))) sys.exit(2) return args
[docs]def resource_replace(file_content, filename, path): '''Replace in the file content the ``<++>CONFIG_DIR<++>`` and ``<++>VERSION<++>`` placeholders. Parameters ---------- file_content : string content of resource filename : string name of the file from which the resource is coming path : string path where the file will be copied Returns ------- resource : string modified resource ''' # replace the <++>CONFIG_DIR<++> placeholder with the directory # where the files is copied file_content = file_content.replace('<++>CONFIG_DIR<++>', os.path.abspath(path)) # replace the <++>VERSION<++> placeholder with # VERSION_FILE_NAME = version v_name = versions.version_name(filename) version = versions.version_of_file(filename) if version: file_content = file_content.replace('<++>VERSION<++>', v_name + ' = ' + version) return file_content
[docs]class VDATCopyResource(ioh.CopyResource): '''Overrides :meth:`~pyhetdex.tools.io_helpers.CopyResource.manipulate_resource` to replace placeholders'''
[docs] def manipulate_resource(self, file_content): '''call :func:`resource_replace`''' return resource_replace(file_content, self.filename, self.target_dir)
[docs]def copy(args): """Copy the configuration files Parameters ---------- args : Namespace arguments to make the copy run """ vdat_cr = VDATCopyResource(__name__, backup=args.backup, force=args.force, verbose=args.verbose) vdat_cr(versions.COPY_FILES, args.to_dir) msg = Fore.YELLOW + '{}' + Fore.RESET vdat_cr.report(header_written=msg.format("Copied files: "), header_non_written=msg.format("Skipped files: "), header_backedup=msg.format("Backed-up files: "))
[docs]def compare(args): """Compare the configuration files Parameters ---------- args : Namespace arguments to make the copy run """ # first check that all files are there if args.verbose: print('Checking that all the files exist.') existing_files = _existing_files(args) if not existing_files: sys.exit(1) if args.verbose: print('Checking the versions.') _compare_versions(existing_files, args) if args.all: diff_files = existing_files[:] else: diff_files = [i for i in existing_files if i in versions.IMPORTANT_FILES] if args.diff: if args.verbose: print('Comparing the file contents.') _file_diff(diff_files, args) if args.load: if args.verbose: print('Try to load the files.') _file_load(diff_files, args)
[docs]def _existing_files(args): '''Check which files exist and which not. Parameters ---------- args : Namespace arguments to make the copy run Returns ------- existing_files : list of strings existing files ''' important_missing_files, missing_files = [], [] existing_files = [] for fn in versions.COPY_FILES: if not os.path.exists(os.path.join(args.dir_, fn)): if fn in versions.IMPORTANT_FILES: important_missing_files.append(fn) else: missing_files.append(fn) else: existing_files.append(fn) if not existing_files: msg = tw.fill('None of the configuration files exist.' ' Either go to the correct directory or type' ' `{}vdat_config copy{}`.'.format(Style.BRIGHT, Style.NORMAL), initial_indent='ERROR: ', subsequent_indent=' '*7) print(Fore.RED + msg) elif important_missing_files or missing_files: if important_missing_files: files = ', '.join(important_missing_files) msg = tw.fill('The important file(s) {}"{}"{} is(are) missing.' ' If you' ' renamed them or are using files from an other' ' location, it is fine, as long as you know what' ' you are you doing.'.format(Style.BRIGHT, files, Style.NORMAL), initial_indent='ERROR: ', subsequent_indent=' '*7) print(Fore.RED + msg) if missing_files: files = ', '.join(missing_files) msg = tw.fill('The file(s) {}"{}"{} are missing. Some of the' ' functionalities or commands might not work as' ' expected.'.format(Style.BRIGHT, files, Style.NORMAL), initial_indent='WARNING: ', subsequent_indent=' '*9) print(Fore.YELLOW + msg) else: print(Fore.GREEN + 'INFO: All the expected files exist.') return existing_files
[docs]def _compare_versions(fnames, args): '''Check the version of the files. Parameters ---------- fnames : list of strings list of files for which to check the version args : Namespace arguments to make the copy run Returns ------- remaining_files : list of strings files for which no version is expected ''' remaining_files, good_version = [], [] for fn in fnames: expected_version = versions.version_of_file(fn) found_version = versions.version_from_file(args.dir_, fn) if not expected_version: remaining_files.append(fn) elif not found_version: msg = tw.fill('The file {sb}"{fn}"{sn} does not have a version' ' although one is expected. Either you are using' ' an old version of the file or it has been' ' modified. VDAT might not work properly. The' ' current version is {sb}"{v}"{sn}.' ''.format(sb=Style.BRIGHT, sn=Style.NORMAL, fn=fn, v=expected_version), initial_indent='ERROR: ', subsequent_indent=' '*7) print(Fore.RED + msg) else: expected_tuple = StrictVersion(expected_version).version found_tuple = StrictVersion(found_version).version if expected_tuple == found_tuple: good_version.append(fn) elif expected_tuple[0] != found_tuple[0]: msg = tw.fill('The major version of file {sb}"{fn}"{sn} is' ' different from the one expected code' ' and VDAT will not work properly. Found:' ' {sb}"{vf}"{sn}, expected: {sb}"{ve}"{sn}.' ' Please update the file.' ''.format(sb=Style.BRIGHT, sn=Style.NORMAL, fn=fn, vf=found_version, ve=expected_version), initial_indent='ERROR: ', subsequent_indent=' '*7) print(Fore.RED + msg) else: msg = tw.fill('The minor/patch version of file' ' {sb}"{fn}"{sn} is' ' different from the one expected code.' ' This likely means that some new optional' ' configuration variable has been added' ' or that some value has been changed.' ' Found: {sb}"{vf}"{sn}, expected:' ' {sb}"{ve}"{sn}. Update the file the get' ' the latest version.' ''.format(sb=Style.BRIGHT, sn=Style.NORMAL, fn=fn, vf=found_version, ve=expected_version), initial_indent='WARNING: ', subsequent_indent=' '*9) print(Fore.YELLOW + msg) if len(good_version + remaining_files) == len(fnames): if good_version: msg = tw.fill('The files {sb}"{fn}"{sn} have the correct version.' ' The other files are not expected to have one.' ''.format(sb=Style.BRIGHT, sn=Style.NORMAL, fn=', '.join(good_version)), initial_indent='INFO: ', subsequent_indent=' '*6) else: msg = 'INFO: None of the file is expected to have a version number' print(Fore.GREEN + msg) return remaining_files
[docs]def _file_diff(fnames, args): '''Make a diff for all the files Parameters ---------- fnames : list of strings list of files for which to check the version args : Namespace arguments to make the copy run Returns ------- equal_files, diff_files : list of strings files that are the same and that are different ''' equal_files, diff_files = [], [] for fn in fnames: vdat_file = ioh.get_resource_file(__name__, fn) vdat_file = resource_replace(vdat_file, fn, args.dir_) vdat_file = vdat_file.splitlines() full_fn = os.path.join(args.dir_, fn) with open(full_fn) as f: other_file = f.read().splitlines() i = 0 for i, l in enumerate(unified_diff(vdat_file, other_file, fromfile=os.path.join('VDAT', fn), tofile=full_fn, n=2, lineterm='')): if diff_files and i == 0: # add an empty new line ad the beginning if at least one file # has already been diffed print() if l.startswith('+'): colour = Fore.CYAN elif l.startswith('-'): colour = Fore.RED elif l.startswith('@'): colour = Fore.MAGENTA else: colour = Fore.RESET print(colour + l) if i > 0: diff_files.append(fn) else: equal_files.append(fn) if not diff_files: print(Fore.GREEN + 'INFO: No difference found in any file') return equal_files, diff_files
[docs]def _file_load(fnames, args): '''Try to load the files with the appropriate function to check if they are reasonable Parameters ---------- fnames : list of strings list of files for which to check the version args : Namespace arguments to make the copy run Returns ------- success_files, failed_files, skipped_files : list of strings files that where successfully loaded, failed or where skipped ''' success_files, failed_files, skipped_files = [], [], [] for fn in fnames: full_fn = os.path.join(args.dir_, fn) try: loader = versions.LOADERS_FILES[fn] loader(full_fn) success_files.append(fn) except KeyError: skipped_files.append(fn) except versions.LoaderError as e: failed_files.append(fn) msg = tw.fill('The file {sb}"{fn}"{sn} failed to load because of ' '{sb}"{e}"{sn}. Use ``-v`` option for more info.' ''.format(sb=Style.BRIGHT, sn=Style.NORMAL, fn=fn, e=e), initial_indent='ERROR: ', subsequent_indent=' '*7) print(Fore.RED + msg) if args.verbose: tb = traceback.format_exc() tb = '\n'.join([' ' * 7 + l for l in tb.splitlines()]) print(tb) if not failed_files: print(Fore.GREEN + "INFO: All files loaded successfully.") return success_files, failed_files, skipped_files
if __name__ == '__main__': # pragma: no cover main()