Source code for musered.recipes.superflat

import logging
import os
import pathlib
import platform
import shutil
import sys
from glob import glob
from os.path import join
from tempfile import TemporaryDirectory
from time import time

import numpy as np
from astropy.io import fits
from astropy.table import Table
from mpdaf.obj import Cube, CubeList

from ..masking import mask_sources
from ..utils import make_band_images
from .recipe import PythonRecipe
from .science import SCIPOST


[docs]class SUPERFLAT(PythonRecipe): """Recipe to compute and subtract a superflat.""" recipe_name = "superflat" DPR_TYPE = "DATACUBE_FINAL" output_dir = "superflat" output_frames = [ "DATACUBE_FINAL", "IMAGE_FOV", "DATACUBE_SUPERFLAT", "IMAGE_SUPERFLAT", "DATACUBE_EXPMAP", "IMAGE_EXPMAP", "STATPIX", ] version = "0.2" # Save the V,R,I images default_params = { "cache_pixtables": False, "filter": "white,Johnson_V,Cousins_R,Cousins_I", "keep_cubes": False, "method": "sigclip", "scipost": {"filter": "white", "save": "cube", "skymethod": "none"}, "temp_dir": None, } @property def calib_frames(self): # We suppose that most of the scipost steps were already run (flux # calibration, sky subtraction, autocalib), so we exclude all these # frames to speedup the frame association. exclude = ( "RAMAN_LINES", "STD_RESPONSE", "STD_TELLURIC", "LSF_PROFILE", "SKY_CONTINUUM", "SKY_LINES", ) return [f for f in SCIPOST(verbose=False).calib_frames if f not in exclude]
[docs] def get_pixtables(self, name, path): """Return pixtables path, after optionally caching them.""" cachedir = self.param["cache_pixtables"] if isinstance(cachedir, dict): # get cachedir specific to a given hostname cachedir = cachedir.get(platform.node()) if cachedir: cachedir = pathlib.Path(cachedir) / name if not cachedir.exists(): cachedir.mkdir(parents=True) for f in glob(f"{path}/PIXTABLE_REDUCED*.fits"): self.logger.debug("copy %s to %s", f, cachedir) shutil.copy(f, cachedir) return glob(f"{cachedir}/PIXTABLE_REDUCED*.fits") else: return glob(f"{path}/PIXTABLE_REDUCED*.fits")
def _run(self, flist, *args, exposures=None, name=None, **kwargs): hdr = fits.getheader(flist[0]) ra, dec = hdr["RA"], hdr["DEC"] info = self.logger.info # 1. Run scipost for all exposures used to build the superflat run = exposures[exposures["name"] == name]["run"][0] exps = exposures[ (exposures["run"] == run) & (~exposures["excluded"]) & (exposures["name"] != name) ] nexps = len(exps) info("Found %d exposures for run %s", nexps, run) # if a maximum number of exposures has been set, truncate the list if "max_exps" in self.param: if nexps > self.param["max_exps"]: exps = exps[0:self.param["max_exps"]] nexps = len(exps) info("Only the first %d exposures will be used for the superflat", nexps) # Fix the RA/DEC/DROT values for all exposures to the values of the # reference exp. Take into account the offset of the exposure, as we # need the superflat to be aligned with the exposure. The offset is # applied them directly to the RA/DEC values, otherwise the DRS checks # the exposure name. if "OFFSET_LIST" in kwargs: offsets = Table.read(kwargs["OFFSET_LIST"]) offsets = offsets[offsets["DATE_OBS"] == hdr["DATE-OBS"]] ra -= offsets["RA_OFFSET"][0] dec -= offsets["DEC_OFFSET"][0] os.environ["MUSE_SUPERFLAT_POS"] = ",".join( map(str, (ra, dec, hdr["ESO INS DROT POSANG"])) ) # Prepare scipost recipe and args, and remove OFFSET_LIST as offsets # are applied manually above recipe = SCIPOST(log_dir=self.log_dir) recipe_kw = { key: kwargs[key] for key in self.calib_frames if key in kwargs and key != "OFFSET_LIST" } cubelist = [] self.raw["SUPERFLAT_EXPS"] = [] # cubes directory: either inside tmpdir or inside output_dir if the # keep_cubes option is True if self.param["keep_cubes"]: cubesdir = join(self.output_dir, "cubes") else: temp_dir = self.param["temp_dir"] or self.temp_dir if isinstance(temp_dir, dict): # get temp_dir specific to a given hostname temp_dir = temp_dir.get(platform.node()) os.makedirs(temp_dir, exist_ok=True) temp_dir = TemporaryDirectory(dir=temp_dir) cubesdir = temp_dir.name t0 = time() for i, exp in enumerate(exps, start=1): outdir = join(cubesdir, exp["name"]) outname = f"{outdir}/DATACUBE_FINAL.fits" if os.path.exists(outname): info("%d/%d : %s already processed", i, nexps, exp["name"]) else: info("%d/%d : %s processing", i, nexps, exp["name"]) explist = self.get_pixtables(exp["name"], exp["path"]) self.raw["SUPERFLAT_EXPS"] += explist recipe.run( explist, output_dir=outdir, params=self.param["scipost"], **recipe_kw, ) # Mask sources mask_cube(outname) cubelist.append(outname) info("Scipost and masking done, took %.2f sec.", time() - t0) # 2. Combine exposures to obtain the superflat t0 = time() method = self.param["method"] info("Combining cubes with method %s", method) cubes = CubeList(cubelist) if method == "median": supercube, expmap, stat = cubes.median() elif method == "sigclip": supercube, expmap, stat = cubes.combine(var="propagate", mad=True) # Mask values where the variance is NaN supercube.mask |= np.isnan(supercube._var) else: raise ValueError(f"unknown method {method}") info("Combine done, took %.2f sec.", time() - t0) # remove temp directory if not self.param["keep_cubes"]: info("Removing temporary files") temp_dir.cleanup() info("Saving superflat cube and images") superim = supercube.mean(axis=0) expim = expmap.mean(axis=0) outdir = self.output_dir supercube.write(join(outdir, "DATACUBE_SUPERFLAT.fits.gz"), savemask="nan") expmap.write(join(outdir, "DATACUBE_EXPMAP.fits.gz"), savemask="nan") expim.write(join(outdir, "IMAGE_EXPMAP.fits"), savemask="nan") stat.write(join(outdir, "STATPIX.fits"), overwrite=True) superim.write(join(outdir, "IMAGE_SUPERFLAT.fits"), savemask="nan") # 3. Subtract superflat info("Applying superflat to %s", flist[0]) expcube = Cube(flist[0]) assert expcube.shape == supercube.shape # Do nothing for masked values mask = supercube.mask.copy() supercube.data[mask] = 0 if supercube.var is not None: supercube.var[mask] = 0 expcube -= supercube im = expcube.mean(axis=0) expcube.write(join(outdir, "DATACUBE_FINAL.fits"), savemask="nan") im.write(join(outdir, "IMAGE_FOV.fits"), savemask="nan") filt = self.param["filter"] if filt: info("Making band images") make_band_images(expcube, join(outdir, "IMAGE_FOV_{filt}.fits"), filt) make_band_images( supercube, join(outdir, "IMAGE_SUPERFLAT_{filt}.fits"), filt )
def mask_cube(cubef): logger = logging.getLogger(__name__) t0 = time() cubedir = os.path.dirname(cubef) try: fits.getval(cubef, "MASKED", extname="DATA") except KeyError: pass else: logger.info("%s is already masked", cubef) return mask = mask_sources( join(cubedir, "IMAGE_FOV_0001.fits"), sigma=5.0, iterations=2, opening_iterations=1, return_image=False, ) with fits.open(cubef) as hdul: hdul["DATA"].header["MASKED"] = True hdul["DATA"].data[:, mask] = np.nan # for some reason this is much faster than updating the # cube in-place with mode='update' hdul.writeto(cubef, overwrite=True) logger.info("Masking done, took %.2f sec.", time() - t0)