Source code for musered.recipes.recipe

import datetime
import itertools
import json
import logging
import os
import re
import time

import cpl
from cpl.param import ParameterList
from mpdaf.log import setup_logfile, setup_logging

__all__ = ("init_cpl_params", "BaseRecipe", "Recipe", "PythonRecipe")


def init_cpl_params(
    recipe_path=None,
    esorex_msg=None,
    esorex_msg_format=None,
    log_dir=".",
    msg="info",
    msg_format="%(levelname)s - %(name)s: %(message)s",
):
    """Load esorex.rc settings and override with the settings file."""

    try:
        cpl.esorex.init()
    except FileNotFoundError as e:
        print(e)

    if recipe_path is not None:
        cpl.Recipe.path = recipe_path
    if esorex_msg is not None:
        cpl.esorex.log.level = esorex_msg  # file logging
    if esorex_msg_format is not None:
        cpl.esorex.log.format = esorex_msg_format

    os.makedirs(log_dir, exist_ok=True)

    # terminal logging: disable cpl's logger as it uses the root logger.
    cpl.esorex.msg.level = "off"
    try:
        setup_logging(name="cpl", level=msg.upper(), color=True, fmt=msg_format)
    except Exception:
        # This fails for some reason when running unit tests on Gitlab CI when
        # using Click's runner...
        pass
    logging.getLogger("cpl").setLevel("DEBUG")


class BaseRecipe:
    """Base class for the recipes."""

    recipe_name = None
    """Name of the recipe."""

    DPR_TYPE = None
    """Type of data to process (DPR.TYPE). If None, cpl.Recipe.tag is used."""

    output_dir = None
    """Default output directory."""

    default_params = {}
    """Default parameters."""

    output_frames = []
    """Output frames."""

    exclude_frames = set()
    """Frames that must be excluded by default."""

    version = None
    """Recipe version."""

    n_inputs_min = None
    """Minimum number of input files, as required by the DRS."""

    def __init__(self, output_dir=None, log_dir=".", temp_dir=None, **kwargs):
        self.nbwarn = 0
        self.timeit = 0
        self.logger = logging.getLogger(__name__)
        self.outfiles = []
        self.log_dir = log_dir
        self.log_file = None
        self.param = {}  # recipe parameters
        self.calib = {}  # calib frames
        self.raw = {}  # raw frames

        self.temp_dir = temp_dir
        if temp_dir is not None:
            os.makedirs(temp_dir, exist_ok=True)

        if output_dir is not None:
            self.output_dir = output_dir

    @property
    def calib_frames(self):
        """Return the list of calibration frames."""
        return set()

    def dump_params(self, json_col=False):
        """Dump non-default parameters to a JSON string."""
        return json.dumps(self.param) if json_col else self.param

    def dump(self, include_files=False, json_col=False):
        """Dump recipe results, stats, parameters in a dict."""
        info = {
            "tottime": self.timeit,
            "nbwarn": self.nbwarn,
            "log_file": self.log_file,
            "params": self.dump_params(json_col=json_col),
            "recipe_version": self.version,
        }
        if include_files:
            calib = dict(iter(self.calib))
            info["raw"] = json.dumps(self.raw) if json_col else self.raw
            info["calib"] = json.dumps(calib) if json_col else calib
        return info

    def activate_file_logger(self):
        """Activate the logging to a file."""
        raise NotImplementedError

    def deactivate_file_logger(self):
        """Deactivate the logging to a file."""
        raise NotImplementedError

    def _run(self, *args, **kwargs):
        raise NotImplementedError

    def run(self, flist, *args, params=None, **kwargs):
        """Run the recipe.

        Subclasses must implement `BaseRecipe._run` to customize this.

        Parameters
        ----------
        flist : str, list of str, or dict
            List of input raw files. Can be a file, list of files, or a dict
            with DPR_TYPE as key and list of files as value.
        params : dict
            Parameters for the recipe.
        *args, **kwargs
            Additional arguments are passed to `BaseRecipe._run`.

        """
        t0 = time.time()
        info = self.logger.info
        self.results = None

        if isinstance(flist, str):
            flist = [flist]

        if isinstance(flist, (list, tuple)):
            nfiles = len(flist)
        elif isinstance(flist, dict):
            nfiles = len(list(itertools.chain(*flist.values())))
        else:
            raise ValueError("flist should be a list or dict")

        if nfiles == 0:
            raise ValueError("no exposure found")

        if self.n_inputs_min is not None and nfiles < self.n_inputs_min:
            raise ValueError(f"need at least {self.n_inputs_min} exposures")

        if "output_dir" in kwargs:
            self.output_dir = kwargs["output_dir"]

        os.makedirs(self.output_dir, exist_ok=True)

        info("- Log file           : %s", self.log_file)
        info("- Output directory   : %s", self.output_dir)
        info("- Non-default params :")

        for key, value in self.default_params.items():
            self.param[key] = value

        if params is not None:
            for key, value in params.items():
                self.param[key] = value

        param = (
            dict(iter(self.param)) if not isinstance(self.param, dict) else self.param
        )
        for key, value in param.items():
            default = (
                self.param[key].default
                if isinstance(self.param, ParameterList)
                else self.default_params.get(key, "")
            )
            if value != default:
                info("%15s = %s (%s)", key, value, default)

        if isinstance(flist, dict):
            assert self.DPR_TYPE in flist
            self.raw = flist
        else:
            self.raw = {self.DPR_TYPE: flist}
        results = self._run(flist, *args, **kwargs)

        self.results = results
        self.timeit = (time.time() - t0) / 60
        info("%s successfully run", self.recipe_name)
        info("Execution time %.2f minutes", self.timeit)
        return results


[docs]class PythonRecipe(BaseRecipe): """Class for Python recipes."""
[docs] def activate_file_logger(self): # setup a root logger, with a format similar to the DRS one date = datetime.datetime.now().isoformat() self.log_file = os.path.join(self.log_dir, f"{self.recipe_name}-{date}.log") fmt = "%(asctime)s [%(levelname)07s] %(name)s: %(message)s" kw = dict( name="", level="DEBUG", logfile=self.log_file, fmt=fmt, rotating=False ) try: setup_logfile(datefmt="%H:%M:%S", **kw) except TypeError: # mpdaf<=3.2 does not support the datefmt parameter setup_logfile(**kw) self.logger.info("starting at %s", date)
[docs] def deactivate_file_logger(self): # count the number of warnings/errors with open(self.log_file) as f: self.nbwarn = len(re.findall(r"\[WARNING\]|\[ ERROR\]", f.read())) self.logger.info("%d warnings", self.nbwarn) # remove the file logger logger = logging.getLogger("") for handler in logger.handlers: if isinstance(handler, logging.FileHandler): logger.removeHandler(handler)
[docs]class Recipe(BaseRecipe): """Base class for the DRS Recipes. Parameters ---------- output_dir : str Name of output directory. use_drs_output : bool If True, output files are saved by the DRS, else files are saved by musered with custom names (experimental!). temp_dir : str Base directory for temporary directories where the recipe is executed. The working dir is created as a subdir with a random file name. If set to None (default), the system temp dir is used. log_dir : str Directory for log files. version : str Version of the recipe. By default the latest version is used. nifu : int IFU to handle. If set to 0, all IFUs are processed serially. If set to -1 (default), all IFUs are processed in parallel. tag : str Tag used by the recipe, default to cpl.Recipe.tag. Can be used to change the type of processed input for e.g. muse_scibasic. """ recipe_name_drs = None """Name of the recipe for the DRS, in case it is renamed in musered.""" DPR_TYPES = {} """Same as DPR_TYPE but when a recipe can process multiples types, e.g. muse_scibasic. If None, cpl.Recipe.tag is used.""" n_inputs_min = 1 """Minimum number of input files, as required by the DRS.""" n_inputs_rec = None """Recommended number of input files, typically for calibration files.""" env = None """Default environment variables.""" exclude_frames = ("MASTER_DARK", "NONLINEARITY_GAIN") """Frames that must be excluded by default.""" use_illum = None """If True, the recipe should use an ILLUM exposure.""" QC_keywords = {} """QC keywords to show, for each frame.""" def __init__( self, output_dir=None, use_drs_output=True, temp_dir=".", log_dir=".", version=None, nifu=-1, tag=None, verbose=True, ): super().__init__(output_dir=output_dir, log_dir=log_dir, temp_dir=temp_dir) self.use_drs_output = use_drs_output recipe_name = self.recipe_name_drs or self.recipe_name self._recipe = cpl.Recipe(recipe_name, version=version) if tag is not None: if tag not in self._recipe.tags: raise ValueError( f"invalid tag {tag} for {recipe_name}, " "should be in {self._recipe.tags}" ) self._recipe.tag = tag if self.DPR_TYPE is None: self.DPR_TYPE = self.DPR_TYPES.get(self._recipe.tag, self._recipe.tag) elif self.output_dir is None: self.output_dir = self.output_frames[0] self._recipe.output_dir = self.output_dir if use_drs_output else None self._recipe.temp_dir = temp_dir self.calib = self._recipe.calib self.param = self._recipe.param if self.env is not None: self._recipe.env.update(self.env) if "nifu" in self.param: self.param["nifu"] = nifu if verbose: msg = "%s recipe (DRS v%s from %s)" self.logger.info(msg, recipe_name, self._recipe.version[1], cpl.Recipe.path) @property def calib_frames(self): return set(dict(iter(self.calib)).keys()) @property def output_frames(self): """Return the list of output frames.""" frames = self._recipe.output[self._recipe.tag] if self.recipe_name == "muse_scipost": # special case for scipost, for which some output frames are # missing from cpl's generated list. This was fixed in the DRS and # should appear in version > v2.5.2 if "DATACUBE_FINAL" not in frames: frames = [ "DATACUBE_FINAL", "IMAGE_FOV", "OBJECT_RESAMPLED", "PIXTABLE_REDUCED", "PIXTABLE_POSITIONED", "PIXTABLE_COMBINED", ] + frames return frames @property def version(self): """Return the recipe version""" return f"DRS-{self._recipe.version[1]}"
[docs] def dump_params(self, json_col=False): params = {p.name: p.value for p in self.param if p.value is not None} return json.dumps(params) if json_col else params
[docs] def dump(self, include_files=False, json_col=False): info = super().dump(include_files=include_files, json_col=json_col) info.update( { "user_time": self.results.stat.user_time, "sys_time": self.results.stat.sys_time, } ) return info
[docs] def activate_file_logger(self): date = datetime.datetime.now().isoformat() self.log_file = os.path.join(self.log_dir, f"{self.recipe_name}-{date}.log") cpl.esorex.log.filename = self.log_file self.logger.info("starting at %s", date)
[docs] def deactivate_file_logger(self): cpl.esorex.log.filename = None
def _run(self, flist, *args, **kwargs): if self.n_inputs_rec and len(flist) != self.n_inputs_rec: msg = "Got %d files though the recommended number is %d" self.logger.warning(msg, len(flist), self.n_inputs_rec) if self._recipe.output_dir is None: raise NotImplementedError # self.save_results(results, name=name) for frame in self.calib_frames: try: self.calib[frame] = kwargs.pop(frame) except KeyError: pass self.raw = {self._recipe.tag: flist} if self.use_illum and kwargs.get("illum"): self.raw["ILLUM"] = kwargs.pop("illum") results = self._recipe(raw=self.raw, **kwargs) self.nbwarn = len(results.log.warning) msg = "DRS user time: %s, sys: %s" self.logger.info(msg, results.stat.user_time, results.stat.sys_time) self.logger.info("%d warnings", self.nbwarn) return results