# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2016, 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''Utilities for qt/gui stuff'''
import configparser
from contextlib import contextmanager
import glob
import itertools as it
import os
import astropy
from astropy.io import fits
import numpy as np
from pkg_resources import parse_version
from pyhetdex.het import reconstruct_ifu as recon
from pyhetdex.tools import six_ext
from qtpy import QtCore, QtWidgets
from qtpy.QtWidgets import QApplication
from qtpy.QtGui import QCursor
import vdat.config as vconf
THUMB_DIR = 'thumbs'
'Name of the directory containing thumbnails'
THUMB_PREFIX = 'thmb_'
'Prefix of the thumbnail images'
RECON_PREFIX = 'rec_'
'Prefix of the reconstructed images'
# from astropy 1.3 overwrite replace clobber
min_version = parse_version('1.3')
if parse_version(astropy.__version__) < min_version: # pragma: no cover
astropy_overwrite = {'clobber': True}
else:
astropy_overwrite = {'overwrite': True}
# store a reconstruction object
_recon = []
[docs]def static_directory():
"""Return the absolute path of the static directory. Construct it from the
location of vdat.gui
Returns
-------
string
name of the directory
"""
cur_path = os.path.dirname(os.path.abspath(__file__))
return os.path.join(cur_path, "static")
[docs]def help_collection():
"""Return the absolute path of the collection file. If no or more than one
files are found return None.
Returns
-------
string
name of the file
"""
help_files = glob.glob(os.path.join(static_directory(), '*.qhc'))
if len(help_files) == 1:
return help_files[0]
else:
None
[docs]@contextmanager
def wait_cursor():
'''Context manager that replaces the default cursor with a WaitCursor and
then reset it upon exiting'''
try:
QApplication.setOverrideCursor(QCursor(QtCore.Qt.WaitCursor))
yield
finally:
QApplication.restoreOverrideCursor()
[docs]class FuncQThread(QtCore.QThread):
'''QThread that accepts a function and its arguments in the constructor and
execute it in the ``run`` method
Parameters
----------
func : callable
function to execute
args : arguments of the function
kwargs['parent'] : :class:`PyQt5.QtWidgets.QWidget` or derivate
parent object of the tree view model
kwargs : keyword arguments of the function
'''
def __init__(self, func, *args, **kwargs):
parent = kwargs.pop('parent', None)
super(FuncQThread, self).__init__(parent=parent)
self.func = func
self.args = args
self.kwargs = kwargs
[docs] def run(self): # pragma: no cover # there is no way
self.func(*self.args, **self.kwargs)
[docs]def delete_files(paths, matches):
'''Loop through all the path and the matches and remove all the files
matching each match in each path.
Parameters
----------
paths : list of strings
paths containing the files to remove
matches : list of string
file names or wildcards pattern for removal
'''
for p, m in it.product(paths, matches):
for fn in glob.iglob(os.path.join(p, m)):
if not os.path.isdir(fn):
os.remove(fn)
[docs]def run_wait_func_qthread(func, *args, **kwargs):
'''Start a thread and wait for it to finish. The cursor is set to
:func:`wait_cursor` and the thread is marked for deletion before
returning
Parameters
----------
msecs : int, optional
if given and not ``None`` force the application to process events every
``msecs` milliseconds; otherwise wait until the thread is done
all :
see :class:`FuncQThread`
'''
msecs = kwargs.pop('msecs', None)
if msecs is None:
wait_kwargs = {}
else:
wait_kwargs = {'msecs': msecs}
with wait_cursor():
thread = FuncQThread(func, *args, **kwargs)
thread.start()
while not thread.wait(**wait_kwargs):
QApplication.processEvents()
thread.deleteLater()
[docs]def get_reconstructed():
'''Get a :class:`pyhetdex.het.reconstruct_ifu.QuickReconstructedIFU`
object. If the reconstruction fails, ``None`` will be returned.
The object is created the first time the function is called and then
cached. Subsequent calls will return always the same object.
Returns
-------
reconstructed : :class:`pyhetdex.het.reconstruct_ifu.QuickReconstructedIFU`
a newly created or cached reconstructed object
'''
try:
return _recon[0]
except IndexError:
conf = vconf.get_config('main')
# Create the object here and pass copies along, to prevent us
# from doing the expensive initialization multiple times
try:
ifucen = conf.get('reconstruction', 'ifucen')
distr = conf.get('reconstruction', 'default_dist_r')
distl = conf.get('reconstruction', 'default_dist_l')
pscale = conf.getfloat('reconstruction', 'pixscale')
with wait_cursor():
reconstructed = recon.QuickReconstructedIFU(ifucen,
dist_r=distr,
dist_l=distl,
pixscale=pscale)
except (configparser.Error, six_ext.FileOpenError,
recon.ReconstructError):
reconstructed = None
_recon.append(reconstructed)
return reconstructed
[docs]def rebin_fits(infile, outfile=None, rebin=20, z_indx=None, ext=0):
'''Create a thumbnail of the fits ``infile``, save and return it.
If it is a datacube, flatten the third axis using a nan-sensitive median
before rebinning.
Parameters
----------
infile : string
name of the fits file to use to create the thumbnail
outfile : string, optional
if not ``None`` save the thumbnail as a fits file
rebin : integer, optional
reduce the number of bins by ``rebin``, if smaller or equal to 1, no
rebinning happens
z_indx : tuple of two integers, optional
if the input file is a cube, flatten only [z_indx[0], z_indx[1])
ext : int or string, optional
extension number or name to use
Returns
-------
data : numpy.ndarray
rebinned data
'''
with fits.open(infile, memmap=False) as hdu:
data = hdu[ext].data
if data.ndim == 3:
# if it is a data cube, flatten it along the third axis (that for
# python/astropy is the first one
if z_indx is not None:
data = data[z_indx[0]:z_indx[1], :, :]
data = np.nanmedian(data, axis=0)
if rebin > 1:
data = bin_image(data, rebin)
if outfile is not None:
bhdu = fits.PrimaryHDU(data, hdu[ext].header)
bhdu.header['HISTORY'] = 'Thumbnail created by VDAT'
with fits.HDUList(bhdu) as bhdulist:
bhdulist.writeto(outfile, **astropy_overwrite)
return data
[docs]def bin_image(data, b):
'''Re-bin the input data grouping ``b*b`` bins. Some pixel can be lost if
the size of ``data`` is not multiple of ``b``
Parameters
----------
data : numpy.ndarray
array to rebin
b : int
number of bins to join
Returns
-------
b_data : numpy.ndarray
rebinned data
'''
binlength_x = data.shape[0] // b
binlength_y = data.shape[1] // b
b_data = np.zeros([binlength_x, binlength_y])
for i, j in it.product(range(b), repeat=2):
b_data += data[i:binlength_x*b+i:b, j:binlength_y*b+j:b]
return b_data / (b*b)
[docs]def line_separator(parent=None):
'''Create and return a vertical line separator
Parameters
----------
parent : :class:`QWidget` or derived instance, optional
the parent of the current widget
Returns
-------
line : :class:`PyQt5.QtWidgets.QFrame`
vertical sunken line
'''
line = QtWidgets.QFrame(parent=parent)
line.setFrameShape(QtWidgets.QFrame.VLine)
line.setFrameShadow(QtWidgets.QFrame.Sunken)
return line