Source code for musered.recipes.zap

import logging
import os
import sys
from os.path import join

import numpy as np
from astropy.io import fits
from mpdaf.scripts.make_white_image import make_white_image

from ..utils import make_band_images
from .recipe import PythonRecipe

try:
    import zap
except ImportError:
    ZAP_VERSION = "unknown"
    zap = None
else:
    ZAP_VERSION = zap.__version__


def do_zap(
    inputfile,
    outputfile,
    skyfile=None,
    imgfile=None,
    cfwidthSVD=100,
    cfwidthSP=100,
    cftype="fit",
    zlevel="median",
    mask=None,
    compute_mask=False,
    additional_mask=None,
    mask_edges=False,
    n_components=None,
    ncpu=None,
    varcurvefile=None,
    filter=None,
):
    logger = logging.getLogger(__name__)

    if zap is None:
        logger.error("zap is not installed")
        sys.exit(1)

    if compute_mask:
        from mpdaf.obj import mask_sources

        logger.info("Computing mask ...")
        img = inputfile.replace("DATACUBE", "IMAGE")
        mask = outputfile.replace("DATACUBE", "MASK")
        if not os.path.exists(img):
            make_white_image(inputfile, img)
        im_mask = mask_sources(img, iterations=1, sigma=3.0)
        if additional_mask is not None:
            logger.info("Combining mask with %s ...", additional_mask)
            addmask = fits.getdata(additional_mask)
            im_mask.data |= addmask
        im_mask.write(mask, savemask="none")
        logger.info("Saved mask to %s", mask)

    logger.info("Zapping %s", inputfile)
    zap.process(
        inputfile,
        outcubefits=outputfile,
        clean=True,
        zlevel=zlevel,
        cftype=cftype,
        cfwidthSVD=cfwidthSVD,
        cfwidthSP=cfwidthSP,
        skycubefits=skyfile,
        mask=mask,
        ncpu=ncpu,
        overwrite=True,
        n_components=n_components,
        varcurvefits=varcurvefile,
    )

    if imgfile is not None:
        make_white_image(outputfile, imgfile)

    if mask_edges:
        if imgfile is None:
            raise ValueError("White light image must be computed")

        logger.info("Masking NaN edges from %s", imgfile)
        maskfile = imgfile.replace("IMAGE", "MASK-EDGES")
        mask, cube = zap.mask_nan_edges(imgfile, outputfile, threshold=10.0)
        mask = mask.astype(np.uint8)
        im = cube.mean(axis=0)
        im.write(imgfile, savemask="nan")
        cube.write(outputfile, savemask="nan")
        fits.writeto(maskfile, mask, header=im.get_wcs_header(), clobber=True)
    else:
        cube = None

    if filter:
        bandname = imgfile.replace(".fits", "_{filt}.fits")
        make_band_images(cube or outputfile, bandname, filter)


[docs]class ZAP(PythonRecipe): """Recipe to subtract sky with ZAP.""" recipe_name = "zap" DPR_TYPE = "DATACUBE_FINAL" output_dir = "zap" output_frames = ["DATACUBE_ZAP", "IMAGE_ZAP", "SKYCUBE_ZAP", "VARCURVE_ZAP"] version = f"zap-{ZAP_VERSION}" n_inputs_rec = 1 default_params = dict( cftype="median", cfwidthSVD=300, cfwidthSP=300, compute_mask=False, mask_edges=False, n_components=None, ncpu=None, zlevel="median", filter="white,Johnson_V,Cousins_R,Cousins_I", ) @property def calib_frames(self): """Return the list of calibration frames.""" return {"SOURCE_MASK", "ADDITIONAL_MASK"} def _run(self, flist, *args, **kwargs): out = dict( cube=join(self.output_dir, f"DATACUBE_ZAP.fits"), image=join(self.output_dir, f"IMAGE_ZAP.fits"), skycube=join(self.output_dir, f"SKYCUBE_ZAP.fits"), varcurve=join(self.output_dir, f"VARCURVE_ZAP.fits"), ) do_zap( flist[0], out["cube"], skyfile=out["skycube"], imgfile=out["image"], varcurvefile=out["varcurve"], additional_mask=kwargs.get("ADDITIONAL_MASK"), mask=kwargs.get("SOURCE_MASK"), **self.param, ) return out