# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2015, 2016, 2017 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"Implementation of the ``vdat_config`` entry point"
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
from difflib import unified_diff
from distutils.version import StrictVersion
import os
import sys
import textwrap as tw
import traceback
from colorama import init, Fore, Style
import pyhetdex.tools.io_helpers as ioh
import pkg_resources
import six
import vdat
from . import versions
try:
input = raw_input
except NameError:
pass
# initialize colorama
init(autoreset=True)
# command line code to get the configuration files
[docs]def main(argv=None):
"""Main function of the implementation of the ``vdat_config`` executable
Parameters
----------
argv : list
command line, if None taken from ``sys.argv``
"""
args = parse(argv=argv)
args.func(args)
[docs]def parse(argv=None):
"""Create the parser and parse the command line arguments
Parameters
----------
argv : list
command line, if None taken from ``sys.argv``
Returns
-------
Namespace
parsed command line
"""
# shared options
common_parser = argparse.ArgumentParser(add_help=False)
common_parser.add_argument("--verbose", '-v', action="store_true",
help="Increase verbosity")
common_parser.add_argument('--version', '-V', action='version',
version=vdat.__version__)
# main parser
parser = argparse.ArgumentParser(description="""Deal with the vdat
configuration files""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
subparser = parser.add_subparsers(title='Subcommands',
dest='subparser_name',
description="""Type '%(prog)s cmd -h' for
detailed information about the
subcommands""")
# copy parser
description = ("Copy the files '{}' into the given"
" directory. If any of the files exists, it will ask the"
" user if she/he wants to overwrite the files; allowed"
" answers are 'y' and 'n'; EOF (Ctrl+D) is interpreted as"
" no".format("', '".join(versions.COPY_FILES)))
copy_p = subparser.add_parser('copy', description=description,
help='Copy the configuration files',
parents=[common_parser],
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
copy_p.set_defaults(func=copy)
copy_p.add_argument('-t', '--to-dir', default='.',
help="Directory where to copy the files")
copy_p.add_argument('-f', '--force', action='store_true',
help="""Force overwriting existing configuration files.
If this option is used, all local modifications will be
lost""")
copy_p.add_argument('-b', '--backup', action='store_true',
help='''Backup existing files before copying the new
ones. If this option is used, the `-f/--force` is
ignored''')
# compare parser
description = ("Compare the configuration files shipped with VDAT with"
" those in the current directory")
compare_p = subparser.add_parser('compare', description=description,
help='Compare the configuration files',
parents=[common_parser],
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
compare_p.set_defaults(func=compare)
compare_p.add_argument('-D', '--dir', default='.', dest='dir_',
metavar='DIR',
help="Directory with the configuration files")
compare_p.add_argument('-d', '--diff', action='store_true',
help="""Show the difference between the files in
VDAT and the ones already in use""")
compare_p.add_argument('-l', '--load', action='store_true',
help='''Try to load the files to check for errors
and software missmatches.''')
compare_p.add_argument('-a', '--all', action='store_true',
help='''Run the diff and the loading on all the
available files, instead of only '{}'.
'''.format(', '.join(versions.IMPORTANT_FILES)))
args = parser.parse_args(args=argv)
if not args.subparser_name:
parser.print_usage()
print("{}: error: too few"
" arguments".format(os.path.basename(sys.argv[0])))
sys.exit(2)
return args
[docs]def resource_replace(file_content, filename, path):
'''Replace in the file content the ``<++>CONFIG_DIR<++>`` and
``<++>VERSION<++>`` placeholders.
Parameters
----------
file_content : string
content of resource
filename : string
name of the file from which the resource is coming
path : string
path where the file will be copied
Returns
-------
resource : string
modified resource
'''
# replace the <++>CONFIG_DIR<++> placeholder with the directory
# where the files is copied
file_content = file_content.replace('<++>CONFIG_DIR<++>',
os.path.abspath(path))
# replace the <++>VERSION<++> placeholder with
# VERSION_FILE_NAME = version
v_name = versions.version_name(filename)
version = versions.version_of_file(filename)
if version:
file_content = file_content.replace('<++>VERSION<++>',
v_name + ' = ' + version)
return file_content
[docs]class VDATCopyResource(ioh.CopyResource):
'''Overrides
:meth:`~pyhetdex.tools.io_helpers.CopyResource.manipulate_resource` to
replace placeholders'''
[docs] def manipulate_resource(self, file_content):
'''call :func:`resource_replace`'''
return resource_replace(file_content, self.filename, self.target_dir)
[docs]def copy(args):
"""Copy the configuration files
Parameters
----------
args : Namespace
arguments to make the copy run
"""
vdat_cr = VDATCopyResource(__name__, backup=args.backup, force=args.force,
verbose=args.verbose)
vdat_cr(versions.COPY_FILES, args.to_dir)
msg = Fore.YELLOW + '{}' + Fore.RESET
vdat_cr.report(header_written=msg.format("Copied files: "),
header_non_written=msg.format("Skipped files: "),
header_backedup=msg.format("Backed-up files: "))
[docs]def compare(args):
"""Compare the configuration files
Parameters
----------
args : Namespace
arguments to make the copy run
"""
# first check that all files are there
if args.verbose:
print('Checking that all the files exist.')
existing_files = _existing_files(args)
if not existing_files:
sys.exit(1)
if args.verbose:
print('Checking the versions.')
_compare_versions(existing_files, args)
if args.all:
diff_files = existing_files[:]
else:
diff_files = [i for i in existing_files
if i in versions.IMPORTANT_FILES]
if args.diff:
if args.verbose:
print('Comparing the file contents.')
_file_diff(diff_files, args)
if args.load:
if args.verbose:
print('Try to load the files.')
_file_load(diff_files, args)
[docs]def _existing_files(args):
'''Check which files exist and which not.
Parameters
----------
args : Namespace
arguments to make the copy run
Returns
-------
existing_files : list of strings
existing files
'''
important_missing_files, missing_files = [], []
existing_files = []
for fn in versions.COPY_FILES:
if not os.path.exists(os.path.join(args.dir_, fn)):
if fn in versions.IMPORTANT_FILES:
important_missing_files.append(fn)
else:
missing_files.append(fn)
else:
existing_files.append(fn)
if not existing_files:
msg = tw.fill('None of the configuration files exist.'
' Either go to the correct directory or type'
' `{}vdat_config copy{}`.'.format(Style.BRIGHT,
Style.NORMAL),
initial_indent='ERROR: ', subsequent_indent=' '*7)
print(Fore.RED + msg)
elif important_missing_files or missing_files:
if important_missing_files:
files = ', '.join(important_missing_files)
msg = tw.fill('The important file(s) {}"{}"{} is(are) missing.'
' If you'
' renamed them or are using files from an other'
' location, it is fine, as long as you know what'
' you are you doing.'.format(Style.BRIGHT, files,
Style.NORMAL),
initial_indent='ERROR: ', subsequent_indent=' '*7)
print(Fore.RED + msg)
if missing_files:
files = ', '.join(missing_files)
msg = tw.fill('The file(s) {}"{}"{} are missing. Some of the'
' functionalities or commands might not work as'
' expected.'.format(Style.BRIGHT, files,
Style.NORMAL),
initial_indent='WARNING: ', subsequent_indent=' '*9)
print(Fore.YELLOW + msg)
else:
print(Fore.GREEN + 'INFO: All the expected files exist.')
return existing_files
[docs]def _compare_versions(fnames, args):
'''Check the version of the files.
Parameters
----------
fnames : list of strings
list of files for which to check the version
args : Namespace
arguments to make the copy run
Returns
-------
remaining_files : list of strings
files for which no version is expected
'''
remaining_files, good_version = [], []
for fn in fnames:
expected_version = versions.version_of_file(fn)
found_version = versions.version_from_file(args.dir_, fn)
if not expected_version:
remaining_files.append(fn)
elif not found_version:
msg = tw.fill('The file {sb}"{fn}"{sn} does not have a version'
' although one is expected. Either you are using'
' an old version of the file or it has been'
' modified. VDAT might not work properly. The'
' current version is {sb}"{v}"{sn}.'
''.format(sb=Style.BRIGHT, sn=Style.NORMAL,
fn=fn, v=expected_version),
initial_indent='ERROR: ',
subsequent_indent=' '*7)
print(Fore.RED + msg)
else:
expected_tuple = StrictVersion(expected_version).version
found_tuple = StrictVersion(found_version).version
if expected_tuple == found_tuple:
good_version.append(fn)
elif expected_tuple[0] != found_tuple[0]:
msg = tw.fill('The major version of file {sb}"{fn}"{sn} is'
' different from the one expected code'
' and VDAT will not work properly. Found:'
' {sb}"{vf}"{sn}, expected: {sb}"{ve}"{sn}.'
' Please update the file.'
''.format(sb=Style.BRIGHT, sn=Style.NORMAL,
fn=fn, vf=found_version,
ve=expected_version),
initial_indent='ERROR: ',
subsequent_indent=' '*7)
print(Fore.RED + msg)
else:
msg = tw.fill('The minor/patch version of file'
' {sb}"{fn}"{sn} is'
' different from the one expected code.'
' This likely means that some new optional'
' configuration variable has been added'
' or that some value has been changed.'
' Found: {sb}"{vf}"{sn}, expected:'
' {sb}"{ve}"{sn}. Update the file the get'
' the latest version.'
''.format(sb=Style.BRIGHT, sn=Style.NORMAL,
fn=fn, vf=found_version,
ve=expected_version),
initial_indent='WARNING: ',
subsequent_indent=' '*9)
print(Fore.YELLOW + msg)
if len(good_version + remaining_files) == len(fnames):
if good_version:
msg = tw.fill('The files {sb}"{fn}"{sn} have the correct version.'
' The other files are not expected to have one.'
''.format(sb=Style.BRIGHT, sn=Style.NORMAL,
fn=', '.join(good_version)),
initial_indent='INFO: ', subsequent_indent=' '*6)
else:
msg = 'INFO: None of the file is expected to have a version number'
print(Fore.GREEN + msg)
return remaining_files
[docs]def _file_diff(fnames, args):
'''Make a diff for all the files
Parameters
----------
fnames : list of strings
list of files for which to check the version
args : Namespace
arguments to make the copy run
Returns
-------
equal_files, diff_files : list of strings
files that are the same and that are different
'''
equal_files, diff_files = [], []
for fn in fnames:
vdat_file = ioh.get_resource_file(__name__, fn)
vdat_file = resource_replace(vdat_file, fn, args.dir_)
vdat_file = vdat_file.splitlines()
full_fn = os.path.join(args.dir_, fn)
with open(full_fn) as f:
other_file = f.read().splitlines()
i = 0
for i, l in enumerate(unified_diff(vdat_file, other_file,
fromfile=os.path.join('VDAT', fn),
tofile=full_fn, n=2, lineterm='')):
if diff_files and i == 0:
# add an empty new line ad the beginning if at least one file
# has already been diffed
print()
if l.startswith('+'):
colour = Fore.CYAN
elif l.startswith('-'):
colour = Fore.RED
elif l.startswith('@'):
colour = Fore.MAGENTA
else:
colour = Fore.RESET
print(colour + l)
if i > 0:
diff_files.append(fn)
else:
equal_files.append(fn)
if not diff_files:
print(Fore.GREEN + 'INFO: No difference found in any file')
return equal_files, diff_files
[docs]def _file_load(fnames, args):
'''Try to load the files with the appropriate function to check if they are
reasonable
Parameters
----------
fnames : list of strings
list of files for which to check the version
args : Namespace
arguments to make the copy run
Returns
-------
success_files, failed_files, skipped_files : list of strings
files that where successfully loaded, failed or where skipped
'''
success_files, failed_files, skipped_files = [], [], []
for fn in fnames:
full_fn = os.path.join(args.dir_, fn)
try:
loader = versions.LOADERS_FILES[fn]
loader(full_fn)
success_files.append(fn)
except KeyError:
skipped_files.append(fn)
except versions.LoaderError as e:
failed_files.append(fn)
msg = tw.fill('The file {sb}"{fn}"{sn} failed to load because of '
'{sb}"{e}"{sn}. Use ``-v`` option for more info.'
''.format(sb=Style.BRIGHT, sn=Style.NORMAL,
fn=fn, e=e),
initial_indent='ERROR: ',
subsequent_indent=' '*7)
print(Fore.RED + msg)
if args.verbose:
tb = traceback.format_exc()
tb = '\n'.join([' ' * 7 + l for l in tb.splitlines()])
print(tb)
if not failed_files:
print(Fore.GREEN + "INFO: All files loaded successfully.")
return success_files, failed_files, skipped_files
if __name__ == '__main__': # pragma: no cover
main()