Source code for vdat.gui.utils

# Virus Data Analysis Tool: a data reduction GUI for HETDEX/VIRUS data
# Copyright (C) 2016, 2017, 2018  "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''Utilities for qt/gui stuff'''

import configparser
from contextlib import contextmanager
import glob
import itertools as it
import os

import astropy
from astropy.io import fits
import numpy as np
from pkg_resources import parse_version
from pyhetdex.het import reconstruct_ifu as recon
from pyhetdex.tools import six_ext
from qtpy import QtCore, QtWidgets
from qtpy.QtWidgets import QApplication
from qtpy.QtGui import QCursor

import vdat.config as vconf

THUMB_DIR = 'thumbs'
'Name of the directory containing thumbnails'
THUMB_PREFIX = 'thmb_'
'Prefix of the thumbnail images'
RECON_PREFIX = 'rec_'
'Prefix of the reconstructed images'

# from astropy 1.3 overwrite replace clobber
min_version = parse_version('1.3')
if parse_version(astropy.__version__) < min_version:  # pragma: no cover
    astropy_overwrite = {'clobber': True}
else:
    astropy_overwrite = {'overwrite': True}

# store a reconstruction object
_recon = []


[docs]def static_directory(): """Return the absolute path of the static directory. Construct it from the location of vdat.gui Returns ------- string name of the directory """ cur_path = os.path.dirname(os.path.abspath(__file__)) return os.path.join(cur_path, "static")
[docs]def help_collection(): """Return the absolute path of the collection file. If no or more than one files are found return None. Returns ------- string name of the file """ help_files = glob.glob(os.path.join(static_directory(), '*.qhc')) if len(help_files) == 1: return help_files[0] else: None
[docs]@contextmanager def wait_cursor(): '''Context manager that replaces the default cursor with a WaitCursor and then reset it upon exiting''' try: QApplication.setOverrideCursor(QCursor(QtCore.Qt.WaitCursor)) yield finally: QApplication.restoreOverrideCursor()
[docs]class FuncQThread(QtCore.QThread): '''QThread that accepts a function and its arguments in the constructor and execute it in the ``run`` method Parameters ---------- func : callable function to execute args : arguments of the function kwargs['parent'] : :class:`PyQt5.QtWidgets.QWidget` or derivate parent object of the tree view model kwargs : keyword arguments of the function ''' def __init__(self, func, *args, **kwargs): parent = kwargs.pop('parent', None) super(FuncQThread, self).__init__(parent=parent) self.func = func self.args = args self.kwargs = kwargs
[docs] def run(self): # pragma: no cover # there is no way self.func(*self.args, **self.kwargs)
[docs]def delete_files(paths, matches): '''Loop through all the path and the matches and remove all the files matching each match in each path. Parameters ---------- paths : list of strings paths containing the files to remove matches : list of string file names or wildcards pattern for removal ''' for p, m in it.product(paths, matches): for fn in glob.iglob(os.path.join(p, m)): if not os.path.isdir(fn): os.remove(fn)
[docs]def run_wait_func_qthread(func, *args, **kwargs): '''Start a thread and wait for it to finish. The cursor is set to :func:`wait_cursor` and the thread is marked for deletion before returning Parameters ---------- msecs : int, optional if given and not ``None`` force the application to process events every ``msecs` milliseconds; otherwise wait until the thread is done all : see :class:`FuncQThread` ''' msecs = kwargs.pop('msecs', None) if msecs is None: wait_kwargs = {} else: wait_kwargs = {'msecs': msecs} with wait_cursor(): thread = FuncQThread(func, *args, **kwargs) thread.start() while not thread.wait(**wait_kwargs): QApplication.processEvents() thread.deleteLater()
[docs]def get_reconstructed(): '''Get a :class:`pyhetdex.het.reconstruct_ifu.QuickReconstructedIFU` object. If the reconstruction fails, ``None`` will be returned. The object is created the first time the function is called and then cached. Subsequent calls will return always the same object. Returns ------- reconstructed : :class:`pyhetdex.het.reconstruct_ifu.QuickReconstructedIFU` a newly created or cached reconstructed object ''' try: return _recon[0] except IndexError: conf = vconf.get_config('main') # Create the object here and pass copies along, to prevent us # from doing the expensive initialization multiple times try: ifucen = conf.get('reconstruction', 'ifucen') distr = conf.get('reconstruction', 'default_dist_r') distl = conf.get('reconstruction', 'default_dist_l') pscale = conf.getfloat('reconstruction', 'pixscale') with wait_cursor(): reconstructed = recon.QuickReconstructedIFU(ifucen, dist_r=distr, dist_l=distl, pixscale=pscale) except (configparser.Error, six_ext.FileOpenError, recon.ReconstructError): reconstructed = None _recon.append(reconstructed) return reconstructed
[docs]def rebin_fits(infile, outfile=None, rebin=20, z_indx=None, ext=0): '''Create a thumbnail of the fits ``infile``, save and return it. If it is a datacube, flatten the third axis using a nan-sensitive median before rebinning. Parameters ---------- infile : string name of the fits file to use to create the thumbnail outfile : string, optional if not ``None`` save the thumbnail as a fits file rebin : integer, optional reduce the number of bins by ``rebin``, if smaller or equal to 1, no rebinning happens z_indx : tuple of two integers, optional if the input file is a cube, flatten only [z_indx[0], z_indx[1]) ext : int or string, optional extension number or name to use Returns ------- data : numpy.ndarray rebinned data ''' with fits.open(infile, memmap=False) as hdu: data = hdu[ext].data if data.ndim == 3: # if it is a data cube, flatten it along the third axis (that for # python/astropy is the first one if z_indx is not None: data = data[z_indx[0]:z_indx[1], :, :] data = np.nanmedian(data, axis=0) if rebin > 1: data = bin_image(data, rebin) if outfile is not None: bhdu = fits.PrimaryHDU(data, hdu[ext].header) bhdu.header['HISTORY'] = 'Thumbnail created by VDAT' with fits.HDUList(bhdu) as bhdulist: bhdulist.writeto(outfile, **astropy_overwrite) return data
[docs]def bin_image(data, b): '''Re-bin the input data grouping ``b*b`` bins. Some pixel can be lost if the size of ``data`` is not multiple of ``b`` Parameters ---------- data : numpy.ndarray array to rebin b : int number of bins to join Returns ------- b_data : numpy.ndarray rebinned data ''' binlength_x = data.shape[0] // b binlength_y = data.shape[1] // b b_data = np.zeros([binlength_x, binlength_y]) for i, j in it.product(range(b), repeat=2): b_data += data[i:binlength_x*b+i:b, j:binlength_y*b+j:b] return b_data / (b*b)
[docs]def line_separator(parent=None): '''Create and return a vertical line separator Parameters ---------- parent : :class:`QWidget` or derived instance, optional the parent of the current widget Returns ------- line : :class:`PyQt5.QtWidgets.QFrame` vertical sunken line ''' line = QtWidgets.QFrame(parent=parent) line.setFrameShape(QtWidgets.QFrame.VLine) line.setFrameShadow(QtWidgets.QFrame.Sunken) return line