Source code for musered.musered

import datetime
import fnmatch
import inspect
import json
import logging
import operator
import os
import shutil
from collections import defaultdict
from glob import glob, iglob
from os.path import join

import numpy as np
from astropy.io import fits
from astropy.table import Table, vstack
from astropy.utils.decorators import lazyproperty
from mpdaf.tools import progressbar
from sqlalchemy import column as sql_column
from sqlalchemy import sql
from sqlalchemy import text as sql_text

from .flags import QAFlags
from .frames import FramesFinder
from .recipes import get_recipe_cls, init_cpl_params, normalize_recipe_name
from .reporter import Reporter
from .utils import (
    dict_values,
    ensure_list,
    load_db,
    load_table,
    load_yaml_config,
    parse_gto_db,
    parse_qc_keywords,
    parse_raw_keywords,
    parse_weather_conditions,
    upsert_many,
)
from .version import version as __version__

__all__ = ("MuseRed",)


[docs]class MuseRed(Reporter): """The main class handling all MuseRed's logic. This class manages the database, and use the settings file, to provide all the methods to operate on the datasets. """ def __init__( self, settings_file="settings.yml", report_format="txt", version=None, settings_kw=None, loglevel=None, ): super().__init__(report_format=report_format) self.logger = logging.getLogger(__name__) self.logger.debug("loading settings from %s", settings_file) self.settings_file = settings_file self.conf = load_yaml_config(settings_file) if settings_kw: self.conf.update(settings_kw) self.set_loglevel(loglevel or self.conf.get("loglevel", "INFO")) self.version = version or self.conf.get("version", "0.1") self.datasets = self.conf["datasets"] self.raw_path = self.conf["raw_path"] self.reduced_path = self.conf["reduced_path"] self.db = load_db(filename=self.conf.get("db"), db_env=self.conf.get("db_env")) version = self.version.replace(".", "_") self.tables = { "gto_logs": "gto_logs", "qa_raw": "qa_raw", "qa_reduced": f"qa_reduced_{version}", "qc_info": "qc_info", "raw": "raw", "reduced": f"reduced_{version}", "weather_conditions": "weather_conditions", } for attrname, tablename in self.tables.items(): setattr(self, attrname, self.db.create_table(tablename)) # we defined it here as we don't want to create the table and attribute # yet, this will be done by the property self.tables["flags"] = f"flags_{version}" self.execute = self.db.executable.execute self.frames = FramesFinder(self) # configure cpl cpl_conf = self.conf["cpl"] cpl_conf.setdefault("log_dir", join(self.reduced_path, "logs")) init_cpl_params(**cpl_conf) # default params for recipes params = self.conf["recipes"].setdefault("common", {}) params.setdefault("log_dir", cpl_conf["log_dir"]) # params.setdefault('temp_dir', join(self.reduced_path, 'tmp')) @property def rawc(self): """The SQLAlchemy columns object for the raw table.""" return self.raw.table.c @lazyproperty def nights(self): """Return the list of nights for which data is available.""" if "night" not in self.raw.columns: return [] return self.select_column("night", distinct=True) @lazyproperty def runs(self): """Return the list of runs for which data is available.""" if "run" not in self.raw.columns: return [] return sorted(self.select_column("run", distinct=True)) @lazyproperty def calib_exposures(self): """Return the calibration sequences (TPL.START) for each DPR_TYPE.""" out = defaultdict(list) if "night" not in self.raw.columns: return out for dpr_type, tpl_start in self.execute( sql.select([self.rawc.DPR_TYPE, self.rawc.TPL_START]) .where(self.rawc.DPR_TYPE.isnot(None)) .group_by(self.rawc.DPR_TYPE, self.rawc.TPL_START) ): if self.frames.is_valid(tpl_start, dpr_type, column="TPL_START"): out[dpr_type].append(tpl_start) return out @lazyproperty def exposures(self): """Return a dict of science exposure per target.""" out = defaultdict(list) if "night" not in self.raw.columns: return out # make a dict of (OBJECT, explist) where OBJECT comes from the FITS # headers but has been remapped to the dataset names in update_db query = ( sql.select([self.rawc.OBJECT, self.rawc.name]) .order_by(self.rawc.name) .where(self.rawc.DPR_TYPE == "OBJECT") ) for obj, name in self.execute(query): if self.frames.is_valid(name, "OBJECT"): out[obj].append(name) return out @lazyproperty def flags(self): """Return the `QAFlags` object to manage flags.""" version = self.version.replace(".", "_") flags_tbl = self.db.create_table(f"flags_{version}") return QAFlags(flags_tbl, additional_flags=self.conf.get("flags"))
[docs] def set_loglevel(self, level, cpl=False): """Set the logging level for the root logger or the cpl logger.""" logger = logging.getLogger("cpl" if cpl else "") level = level.upper() logger.setLevel(level) logger.handlers[0].setLevel(level)
[docs] def get_table(self, name): """Return the dataset.Table from the database.""" name = self.tables.get(name, name) if name not in self.db: raise ValueError(f"unknown table {name}") return self.db[name]
[docs] def get_astropy_table(self, name, indexes=None): """Return a table from the database as an astropy Table.""" name = self.tables.get(name, name) if name not in self.db: raise ValueError("unknown table") return load_table(self.db, name, indexes=indexes)
[docs] def copy_reduced(self, version, recipes): """Copy reduced rows from another version. This is needed to start a new version without reprocessing from the start. It allows to copy the database records from the previous version to the reduced table of the current version. Parameters ---------- version : str The version to copy from. recipes : list of str List of recipe names to copy. """ version = version.replace(".", "_") if f"reduced_{version}" not in self.db.tables: raise ValueError tbl = self.db[f"reduced_{version}"] keys = ("name", "recipe_name", "DPR_TYPE") # To force the creation of the table row = tbl.find_one(recipe_name=recipes) if row is None: self.logger.error("could not find recipes %s", recipes) return del row["id"] self.reduced.upsert(row, keys) with self.db as tx: table = tx[self.reduced.name] for row in tbl.find(recipe_name=recipes): del row["id"] table.upsert(row, keys=keys)
[docs] def select_column( self, name, notnull=True, distinct=False, where=None, table="raw" ): """Select values from a column of the database.""" col = self.get_table(table).table.c[name] wc = col.isnot(None) if notnull else None if where is not None: wc = sql.and_(where, wc) select = sql.select([col], whereclause=wc) if distinct: select = select.distinct(col) return [x[0] for x in self.execute(select)]
[docs] def select_dates( self, dpr_type=None, table="raw", column="name", where=None, **kwargs ): """Select the list of dates to process.""" tbl = self.get_table(table) if dpr_type is not None: wc = tbl.table.c.DPR_TYPE == dpr_type if where is not None: where = where & wc else: where = wc dates = self.select_column(column, where=where, table=table, **kwargs) dates = self.frames.filter_valid(dates, DPR_TYPE=dpr_type, column=column) return list(sorted(dates))
[docs] def get_processed(self, table="reduced", filter_names=None, **clauses): """Return the list of processed names for a given query.""" try: tbl = self.get_table(table) except ValueError: # table does not exist return [] processed = {o["name"] for o in tbl.find(**clauses)} if filter_names: processed = processed & set(filter_names) if len(processed) > 0: self.logger.debug("Processed:") for exp in sorted(processed): self.logger.debug("- %s", exp) return processed
[docs] def get_files( self, DPR_TYPE, first_only=False, remove_excludes=False, select=None, exclude_flags=None, return_explist=False, **clauses, ): """Return the list of files for a given DPR_TYPE and a query. Parameters ---------- DPR_TYPE : str DPR.TYPE of files to find. first_only : bool When an item gives several files, return only the first one. remove_excludes : bool If True removes files that are listed in the exclude settings. select : dict Allows to make selection on any table. This should be a dict of (tablename, dict), where the dict value defines a query to make with dataset's find method. exclude_flags : list of flags List of flags to exclude. return_explist : bool If True, also return the list of exposure names. **clauses Parameters passed to `self.reduced.find`. """ flist = [] exc = set() if remove_excludes: exc.update(self.frames.get_excludes(DPR_TYPE=DPR_TYPE)) if exclude_flags is not None: exc.update(self.get_flagged(exclude_flags)) if select is not None: names = set() for key, val in select.items(): if isinstance(val, str): # raw sql query res = [ x[0] for x in self.execute( sql.select([sql_column("name")]) .where(sql_text(val)) .select_from(self.get_table(key).table) ) ] elif isinstance(val, dict): tbl = self.get_table(key) if key == "raw" and "OBJECT" in clauses: kw = dict(OBJECT=clauses["OBJECT"]) else: kw = {} res = [x["name"] for x in tbl.find(**val, **kw)] else: raise ValueError("query should be a string or dict") names.update(res) clauses["name"] = list(names) ntot = 0 explist = [] for r in self.reduced.find(DPR_TYPE=DPR_TYPE, **clauses): ntot += 1 if r["name"] in exc: continue explist.append(r["name"]) files = iglob(f"{r['path']}/{DPR_TYPE}*.fits") if first_only: flist.append(next(files)) else: flist.extend(files) self.logger.info("Selected %d files out of %d", len(flist), ntot) if return_explist: return flist, explist else: return flist
[docs] def get_flagged(self, exclude_flags): """Return the list of flagged exposures. Parameters ---------- exclude_flags : list or bool List of flags, or True to use all flags. """ if isinstance(exclude_flags, (list, tuple)): return self.flags.find(*exclude_flags) elif exclude_flags is True: # exclude all flagged exposures return self.flags.find(*list(self.flags.flags)) else: raise ValueError( "wrong format for exclude_flags, it should " "be a dict or True to exclude all flags" )
[docs] def update_db(self, force=False): """Create or update the database containing FITS keywords.""" # Already parsed raw files known_files = ( self.select_column("filename") if "filename" in self.raw.columns else [] ) # Get the list of FITS files in the raw directory nskip = 0 flist = [] for root, dirs, files in os.walk(self.raw_path): if root.endswith(".cache"): # skip the cache directory self.logger.debug("skipping %s", root) continue for f in files: if f.endswith((".fits", ".fits.fz")): if f in known_files: nskip += 1 if not force: continue flist.append(join(root, f)) self.logger.info("%d new FITS files, %d known", len(flist), nskip) # Parse FITS headers to get the keyword values rows = parse_raw_keywords( flist, self.datasets, runs=self.conf.get("runs"), additional_keywords=self.conf.get("additional_keywords"), ) with self.db as tx: raw = tx["raw"] reduced = tx[self.reduced.name] # Insert or update lines in the raw table if force: raw.delete() raw.insert_many(rows) self.logger.info("updated %d rows", len(rows)) else: raw.insert_many(rows) self.logger.info("inserted %d rows, skipped %d", len(rows), nskip) # Create indexes if needed for name in ("night", "name", "DATE_OBS", "DPR_TYPE"): if not raw.has_index([name]): raw.create_index([name]) for name in ("recipe_name", "name", "DATE_OBS", "DPR_TYPE"): if name not in reduced.columns: reduced.create_column_by_example(name, "") if len(reduced) and not reduced.has_index([name]): reduced.create_index([name]) # weather conditions parse_weather_conditions(self, force=force) # GTO logs if "GTO_logs" in self.conf: self.logger.info("Importing GTO logs") parse_gto_db(self.db, self.conf["GTO_logs"]["db"]) # Cleanup cached attributes del self.nights, self.runs, self.exposures
[docs] def update_qc(self, dpr_types=None, recipe_name=None, force=False): """Create or update the tables containing QC keywords.""" if recipe_name is not None: recipe_name = normalize_recipe_name(recipe_name) # select all types for a given recipe dpr_types = self.select_column( "DPR_TYPE", table="reduced", distinct=True, where=(self.reduced.table.c.recipe_name == recipe_name), ) elif not dpr_types: # select all types dpr_types = self.select_column("DPR_TYPE", table="reduced", distinct=True) # Remove types for which there is no QC params excludes = ("TWILIGHT_CUBE", "TRACE_SAMPLES", "STD_FLUXES", "STD_TELLURIC") for exc in excludes: if exc in dpr_types: dpr_types.remove(exc) now = datetime.datetime.now().isoformat() for dpr_type in dpr_types: self.logger.info("Parsing %s files", dpr_type) tbl = self.db[f"qc_{dpr_type}"] if not force: processed = self.get_processed(table=tbl.name, version=self.version) self.logger.info("Found %d processed exps", len(processed)) else: processed = set() rows = [] items = list(self.reduced.find(DPR_TYPE=dpr_type)) for item in progressbar(items): if item["name"] in processed: continue keys = { "name": item["name"], "reduced_id": item["id"], "version": self.version, "recipe_name": item["recipe_name"], "date_parsed": now, **{k: item[k] for k in ("DATE_OBS", "INS_MODE", "run")}, } flist = sorted(iglob(f"{item['path']}/{dpr_type}*.fits")) for row in parse_qc_keywords(flist): rows.append({**keys, **row}) if len(rows) == 0: self.logger.info("nothing to do") continue tbl.insert_many(rows) self.logger.info("inserted %d rows", len(rows)) self.qc_info.upsert( { "DPR_TYPE": dpr_type, "table": f"qc_{dpr_type}", "version": self.version, "date_updated": now, "nrows": tbl.count(version=self.version), }, ["DPR_TYPE", "version"], )
[docs] def clean( self, recipe_list=None, date_list=None, night_list=None, remove_files=True, force=False, ): """Remove database entries and files.""" date_list = ensure_list(date_list) night_list = ensure_list(night_list) recipe_list = ensure_list(recipe_list) kw = {} if recipe_list: kw = {"recipe_name": [normalize_recipe_name(rec) for rec in recipe_list]} if date_list: kw["name"] = self.prepare_dates(date_list, datecol="name", table="reduced") if night_list: kw["night"] = self.prepare_dates( night_list, datecol="night", table="reduced" ) items = list(self.reduced.find(**kw)) count = len(items) countexp = len(set(o["name"] for o in items)) action = "Remove" if force else "Would remove" if not force: self.logger.info("Dry-run mode, nothing will be done") if remove_files: for item in {o["path"] for o in items}: if os.path.exists(item): self.logger.info("%s %s", action, item) if force: shutil.rmtree(item) msg = "%s %d items (%d exposures/nights) from the database" self.logger.info(msg, action, count, countexp) if force: self.reduced.delete(**kw)
[docs] def find_illum(self, night, ref_temp, ref_mjd_date): """Find the best ILLUM exposure for the night. First, illums are sorted by date to find the closest one in time, then if there are multiple illums within 2 hours, the one with the closest temperature is used. """ illums = [ { "date": o["DATE_OBS"], # Date "temp": abs(o["INS_TEMP7_VAL"] - ref_temp), # Temperature diff "mjd": abs(o["MJD_OBS"] - ref_mjd_date) * 24, # Date diff (hours) "path": o["path"], # File path } for o in self.raw.find(DPR_TYPE="FLAT,LAMP,ILLUM", night=night) ] logger = self.logger if len(illums) == 0: logger.warning("No ILLUM found") return # sort by time difference illums.sort(key=operator.itemgetter("mjd")) # Filter illums to keep the ones within 2 hours close_illums = [illum for illum in illums if illum["mjd"] < 2] if len(close_illums) == 0: logger.warning("No ILLUM in less than 2h, taking the closest one") res = illums[0] elif len(close_illums) == 1: logger.debug("Only one ILLUM in less than 2h") res = close_illums[0] else: logger.debug( "More than one ILLUM in less than 2h, take closest temperature" ) # Sort by temperature difference close_illums.sort(key=operator.itemgetter("temp")) res = close_illums[0] msg = "%s Temp diff=%.2f Time diff=%.2f" for illum in close_illums: logger.debug(msg, illum["date"], illum["temp"], illum["mjd"] * 60) msg = "Found ILLUM : %s (Temp diff: %.3f, Time diff: %.2f min.)" logger.info(msg, res["date"], res["temp"], res["mjd"] * 60) if res["temp"] > 1: logger.warning("ILLUM with Temp difference > 1°, not using it") return None return res["path"]
def _run_recipe_loop( self, recipe_cls, date_list, skip=False, calib=False, params_name=None, recipe_kwargs=None, dry_run=False, use_reduced=False, **kwargs, ): """Main method used to run a recipe on a list of exposures/nights. Parameters ---------- recipe_cls : cls Must be a subclass of `musered.Recipe`. date_list : list of str List of dates (nights or exposure names) to process. skip : bool If True, dates already processed are skipped (default: False). calib : bool If True, process Calibration data, grouped by "night" (though this is also for day calibration !). This changes the columns used for queries. params_name : str By default the recipe name is obtained from the recipe class, but this parameter allows to change the name, which can be useful to have different parameters, and outputs stored under a different name. recipe_kwargs : dict Additional arguments passed to the `musered.Recipe` instantiation. use_reduced : bool If True, find data in the reduced table, otherwise on raw. **kwargs Additional arguments passed to `musered.Recipe.run`. """ recipe_name = params_name or recipe_cls.recipe_name # for calibrations we find exposures with TPL_START, and for # exposures we use DATE_OBS if calib: label = "calibration sequence" datecol = namecol = "TPL_START" else: label = "exposure" datecol = "DATE_OBS" namecol = "name" info = self.logger.info ndates = len(date_list) labelc = label.capitalize() info("Running %s for %d %ss", recipe_name, ndates, label) # build the list of already processed exposures if skip and len(self.reduced) > 0: processed = self.get_processed( recipe_name=recipe_name, filter_names=date_list ) if processed == set(date_list): info("Already processed, nothing to do") return info("Found %d processed exps", len(processed)) else: processed = set() # Instantiate the recipe object recipe_conf = self._get_recipe_conf(recipe_name, params_name) recipe = self._instantiate_recipe(recipe_cls, recipe_name, kwargs=recipe_kwargs) # save recipe's output dir as we will modify it later output_dir = recipe.output_dir table = self.reduced if use_reduced else self.raw for i, date in enumerate(date_list, start=1): if skip and date in processed: self.logger.debug("%s already processed", date) continue # find the exposure to process DPR_TYPE = recipe_conf.get("DPR_TYPE", recipe.DPR_TYPE) select_args = {namecol: date, "DPR_TYPE": DPR_TYPE} if recipe_conf.get("from_recipe"): select_args["recipe_name"] = recipe_conf.get("from_recipe") res = list(table.find(**select_args)) if use_reduced: # for reduced files we should find only one result, otherwise # it means that the query is bad (if 0), or that previous # recipes were not processed (if 0), or that the recipe was run # multiple times with different parameters, in which case the # user has to specify a recipe with from_recipe if len(res) == 0: raise RuntimeError("could not find exposures") elif len(res) > 1: raise RuntimeError( "found several input frames instead of " "one. Maybe use 'from_recipe' ?" ) flist = sorted(glob(f"{res[0]['path']}/{DPR_TYPE}*.fits")) ins_mode = res[0]["INS_MODE"] else: flist = [o["path"] for o in res] ins_mode = {o["INS_MODE"] for o in res} if len(ins_mode) > 1: raise ValueError( f"{label} with multiple INS.MODE, not supported yet" ) ins_mode = ins_mode.pop() # activate the file logger and start to log useful information recipe.activate_file_logger() try: night = res[0]["night"] msg = "%d/%d - %s %s : %d %s file(s), mode=%s" info(msg, i, ndates, labelc, date, len(flist), DPR_TYPE, ins_mode) if getattr(recipe, "use_drs_output", True): out = f"{date}.{ins_mode}" if calib else date kwargs["output_dir"] = join(self.reduced_path, output_dir, out) else: kwargs["output_dir"] = join(self.reduced_path, output_dir) # get the associated calibration frames kwargs.update( self.frames.get_frames( recipe, night=night, ins_mode=ins_mode, dry_run=dry_run, recipe_conf=recipe_conf, OBJECT=res[0]["OBJECT"], ) ) if getattr(recipe, "use_illum", False): ref_temp = np.mean([o["INS_TEMP7_VAL"] for o in res]) ref_date = np.mean([o["MJD_OBS"] for o in res]) kwargs["illum"] = self.find_illum(night, ref_temp, ref_date) if dry_run: continue # we can now run the recipe params = recipe_conf.get("params") recipe.run(flist, name=date, params=params, **kwargs) finally: recipe.deactivate_file_logger() # and we save all the useful information in the reduced table self._save_reduced( recipe, keys=("name", "recipe_name", "DPR_TYPE"), **{ "night": night, "name": res[0][namecol], "run": res[0].get("run"), "recipe_name": recipe_name, "DATE_OBS": res[0][datecol], "DPR_CATG": res[0]["DPR_CATG"], "OBJECT": res[0]["OBJECT"], "INS_MODE": ins_mode, }, ) if ndates > 1: info("===================================================") def _run_recipe_simple( self, recipe_cls, name, OBJECT, flist, params_name=None, save_kwargs=None, dry_run=False, **kwargs, ): """Run a recipe once, simpler than _run_recipe_loop. This is to run recipes like exp_align, exp_combine. Takes a list of files and process them. """ self.logger.info("Running %s", recipe_cls.recipe_name) if isinstance(flist, (list, tuple)): self.logger.info("%d files", len(flist)) self.logger.debug("- " + "\n- ".join(flist)) elif isinstance(flist, dict): self.logger.info("%d files", len(dict_values(flist))) self.logger.debug("- " + "\n- ".join(flist)) # FIXME: improve this # Instantiate the recipe object recipe_name = params_name or recipe_cls.recipe_name recipe_conf = self._get_recipe_conf(recipe_name, params_name) recipe = self._instantiate_recipe(recipe_cls, recipe_name) recipe.activate_file_logger() try: kwargs["output_dir"] = join(self.reduced_path, recipe.output_dir, name) kwargs.update( self.frames.get_frames(recipe, recipe_conf=recipe_conf, OBJECT=OBJECT) ) if dry_run: return recipe.run(flist, params=recipe_conf.get("params"), **kwargs) finally: recipe.deactivate_file_logger() self._save_reduced( recipe, keys=("name", "recipe_name", "DPR_TYPE"), name=name, OBJECT=OBJECT, recipe_name=recipe_name, **(save_kwargs or {}), )
[docs] def prepare_dates(self, dates, DPR_TYPE=None, datecol="name", table="raw"): """Compute the list of dates (nights, exposures) to process.""" alldates = self.select_dates( dpr_type=DPR_TYPE, column=datecol, distinct=True, table=table ) if dates is None: date_list = alldates else: if isinstance(dates, str): dates = [dates] date_list = [] tbl = self.get_table(table).table for date in dates: if date in self.runs: where = tbl.c.run == date if DPR_TYPE is not None: where &= tbl.c.DPR_TYPE == DPR_TYPE d = self.select_column( datecol, distinct=True, where=where, table=table ) if d: d = self.frames.filter_valid( d, DPR_TYPE=DPR_TYPE, column=datecol ) date_list += d elif date in self.nights: where = tbl.c.night == date if DPR_TYPE is not None: where &= tbl.c.DPR_TYPE == DPR_TYPE d = self.select_column( datecol, distinct=True, where=where, table=table ) if d: d = self.frames.filter_valid( d, DPR_TYPE=DPR_TYPE, column=datecol ) date_list += d elif date in alldates: date_list.append(date) elif "*" in date: date_list += fnmatch.filter(alldates, date) else: self.logger.warning("Date %s not found", date) if date_list: date_list.sort() self.logger.debug("Selected dates:") for date in date_list: self.logger.debug("- %s", date) else: self.logger.warning("No valid date found") return date_list
[docs] def process_calib(self, recipe_name, dates=None, params_name=None, **kwargs): """Run a calibration recipe. Parameters ---------- recipe_name : str Recipe to run. dates : str or list of str List of dates to process. params_name : str Name of the parameter block, default to recipe_name. **kwargs : Additional arguments passed to _run_recipe_loop. """ recipe_cls = get_recipe_cls(recipe_name) # get the list of nights to process dates = self.prepare_dates( dates, DPR_TYPE=recipe_cls.DPR_TYPE, datecol="TPL_START" ) self._run_recipe_loop( recipe_cls, dates, calib=True, params_name=params_name, **kwargs )
[docs] def process_exp( self, recipe_name, dates=None, dataset=None, params_name=None, **kwargs ): """Run a science recipe. Parameters ---------- recipe_name : str Recipe to run. dates : str or list of str List of dates to process. dataset : str If given, all exposures from this dataset are processed. params_name : str Name of the parameter block, default to recipe_name. **kwargs : Additional arguments passed to _run_recipe_loop. """ recipe_conf = self._get_recipe_conf(recipe_name, params_name) redc = self.reduced.table.c # get the list of dates to process if dates is None and dataset: dates = self.exposures[dataset] elif dates is None and "from_recipe" in recipe_conf: dates = [ o["name"] for o in self.reduced.find(recipe_name=recipe_conf["from_recipe"]) ] dates = self.select_dates( table="reduced", distinct=True, where=(redc.recipe_name == recipe_conf["from_recipe"]), ) else: dates = self.prepare_dates(dates, DPR_TYPE="OBJECT") if recipe_name == "superflat": # Build a Table (name, run, path) for PIXTABLE_REDUCED that can # be used for the superflats rawc = self.rawc wc = redc.DPR_TYPE == "PIXTABLE_REDUCED" if "from_recipe" in recipe_conf: wc = wc & (redc.recipe_name == recipe_conf["superflat_from"]) exps = [ (name, run, path) for (name, run, path) in self.execute( sql.select([rawc.name, rawc.run, redc.path]) .select_from( self.reduced.table.join(self.raw.table, redc.name == rawc.name) ) .where(wc) .order_by(rawc.name) ) ] tbl = Table(rows=exps, names=("name", "run", "path")) if "exclude_flags" in recipe_conf: # exclude flagged exposures excludes = self.get_flagged(recipe_conf["exclude_flags"]) tbl["excluded"] = np.in1d(tbl["name"], excludes) else: tbl["excluded"] = False kwargs["exposures"] = tbl if "max_exps" in recipe_conf: kwargs["max_exps"] = recipe_conf["max_exps"] elif recipe_name == "fsf": # Find the IMPHOT table for each exposure imphot_recipe = recipe_conf.get("imphot_recipe") if imphot_recipe is not None: kwargs["imphot_tables"] = { o["name"]: f"{o['path']}/IMPHOT.fits" for o in self.reduced.find( name=dates, recipe_name=imphot_recipe, DPR_TYPE="IMPHOT", order_by="name", ) } if len(kwargs["imphot_tables"]) < len(dates): self.logger.error( "Missing %d matching imphot tables for the %d exposures", len(dates) - len(kwargs["imphot_tables"]), len(dates), ) raise ValueError("Missing matching imphot table for exposures") kwargs["filters"] = [ recipe_conf.get("filters"), recipe_conf.get("mean_waves"), ] recipe_cls = get_recipe_cls(recipe_name) use_reduced = recipe_cls.recipe_name not in ("muse_scibasic", "fsf") self._run_recipe_loop( recipe_cls, dates, params_name=params_name, use_reduced=use_reduced, **kwargs, )
[docs] def process_standard(self, recipe_name="muse_standard", dates=None, **kwargs): """Reduce a standard exposure. Running both muse_scibasic and muse_standard. Parameters ---------- recipe_name : str Recipe to run. dates : str or list of str List of dates to process. **kwargs : Additional arguments passed to _run_recipe_loop. """ recipe_sci = get_recipe_cls("muse_scibasic") recipe_std = get_recipe_cls(recipe_name) # get the list of dates to process dates = self.prepare_dates(dates, DPR_TYPE="STD") # run muse_scibasic with specific parameters (tag: STD) recipe = self._instantiate_recipe(recipe_std, recipe_name, verbose=False) recipe_kw = {"tag": "STD", "output_dir": recipe.output_dir} self._run_recipe_loop(recipe_sci, dates, recipe_kwargs=recipe_kw, **kwargs) # run muse_standard self._run_recipe_loop(recipe_std, dates, use_reduced=True, **kwargs)
[docs] def exp_align( self, dataset, recipe_name="muse_exp_align", filt="white", params_name=None, name=None, exps=None, force=False, **kwargs, ): """Compute offsets between exposures of a dataset. Parameters ---------- dataset : str Dataset name. recipe_name : str Recipe to run: 'muse_exp_align' (DRS, default), or 'imphot'. filt : str Filter to use for the images, only for muse_exp_align. params_name : str Name of the parameter block, default to recipe_name. name : str Name of the output record, default to '{dataset}_{recipe_name}'. exps : list of str List of exposures to process. By default all exposures of the dataset are processed. force : bool Force processing if it was already done previously. """ recipe_name = normalize_recipe_name(recipe_name) params_name = params_name or recipe_name recipe_conf = self._get_recipe_conf(recipe_name, params_name) recipe_cls = get_recipe_cls(recipe_name) processed = set() if recipe_name == "imphot": if force: kwargs["force"] = force else: # Find already processed files kwargs["processed"] = processed = self.get_processed( OBJECT=dataset, DPR_TYPE="IMPHOT", recipe_name=params_name ) self.logger.info("Found %d processed exps", len(processed)) DPR_TYPE = recipe_cls.DPR_TYPE name = ( name or recipe_conf.get("name") or "OFFSET_LIST_{}".format( "drs" if recipe_name == "muse_exp_align" else recipe_name ) ) # get the list of dates to process from_recipe = recipe_conf.get("from_recipe", "muse_scipost") querykw = {} if exps: querykw["name"] = self.prepare_dates(exps, DPR_TYPE="OBJECT") query = list( self.reduced.find( OBJECT=dataset, DPR_TYPE=DPR_TYPE, recipe_name=from_recipe, order_by="name", **querykw, ) ) flist = [f for r in query for f in iglob(f"{r['path']}/{DPR_TYPE}*.fits")] filt = recipe_conf.get("filt", filt) if recipe_name == "muse_exp_align" and filt: flist = [ f for f in flist if fits.getval(f, "ESO DRS MUSE FILTER NAME") == filt ] self._run_recipe_simple( recipe_cls, name, dataset, flist, params_name=params_name, **kwargs ) if recipe_name == "imphot": # Special case to insert IMPHOT result files in the table info = self.reduced.find_one(name=name) del info["id"] rows = [ { **info, "name": item["name"], "DPR_TYPE": "IMPHOT", "path": join(info["path"], item["name"]), } for item in query if item["name"] not in processed ] keys = ("name", "recipe_name", "DPR_TYPE") upsert_many(self.db, self.reduced.name, rows, keys=keys)
[docs] def exp_combine( self, dataset, recipe_name="muse_exp_combine", name=None, params_name=None, force=False, **kwargs, ): """Combine exposures for a dataset. Parameters ---------- dataset : str Dataset name. recipe_name : str Recipe to run: 'muse_exp_combine' (DRS, default), or 'mpdaf_combine' (cube combination with MPDAF). name : str Name of the output record, default to '{dataset}_{recipe_name}'. params_name : str Name of the parameter block, default to recipe_name. force : bool Force processing if it was already done previously. """ recipe_name = normalize_recipe_name(recipe_name) recipe_conf = self._get_recipe_conf(recipe_name, params_name) from_recipe = recipe_conf.get("from_recipe", "muse_scipost") fsf_recipe = recipe_conf.get("fsf_recipe") recipe_cls = get_recipe_cls(recipe_name) DPR_TYPE = recipe_cls.DPR_TYPE name_dict = {"muse_exp_combine": "drs", "mpdaf_combine": "mpdaf"} names_select = recipe_conf.get("names_with_selection") if not names_select: name = ( name or recipe_conf.get("name") or "{}_{}".format(dataset, name_dict.get(recipe_name, recipe_name)) ) names_select = {name: recipe_conf.get("select")} if name is not None: names_select = {name: names_select[name]} use_scale = recipe_conf.get("use_scale") params = params_name or recipe_name processed = self.get_processed(recipe_name=params) flags = recipe_conf.get("exclude_flags") for name, select in names_select.items(): if not force and name in processed: self.logger.info("Skipping %s, already processed", name) continue # get the list of files to process self.logger.info("Processing %s", name) flist, explist = self.get_files( DPR_TYPE, first_only=True, OBJECT=dataset, recipe_name=from_recipe, select=select, remove_excludes=True, exclude_flags=flags, order_by="name", return_explist=True, ) if use_scale: scale_tbl = self._get_scales( explist, use_scale["from"], use_scale["bands"] ) else: scale_tbl = None if fsf_recipe: # find the corresponding FSF for each exposures fsf_tables = { o["name"]: f"{o['path']}/FSF.fits" for o in self.reduced.find( name=explist, recipe_name=fsf_recipe, DPR_TYPE="FSF", order_by="name", ) } if len(fsf_tables) < len(explist): self.logger.warning( "Missing %d FSF tables for the %d exposures", len(explist) - len(fsf_tables), len(explist), ) fsf_tables = None else: self.logger.debug("Found %d FSF tables", len(fsf_tables)) else: fsf_tables = None self._run_recipe_simple( recipe_cls, name, dataset, flist, params_name=params_name, scale_table=scale_tbl, fsf_tables=fsf_tables, **kwargs, )
[docs] def std_combine( self, runs, recipe_name="muse_std_combine", name=None, params_name=None, force=False, **kwargs, ): """Combine std stars for a list of runs. Parameters ---------- runs : list of str List of run names. recipe_name : str Recipe to run: 'muse_exp_combine' (DRS, default), or 'mpdaf_combine' (cube combination with MPDAF). name : str Name of the output record, default to '{dataset}_{recipe_name}'. params_name : str Name of the parameter block, default to recipe_name. force : bool Force processing if it was already done previously. """ recipe_name = normalize_recipe_name(recipe_name) recipe_conf = self._get_recipe_conf(recipe_name, params_name) from_recipe = recipe_conf.get("from_recipe", "muse_standard") recipe_cls = get_recipe_cls(recipe_name) DPR_TYPES = recipe_cls.DPR_TYPES params = params_name or recipe_name processed = self.get_processed(recipe_name=params) for run in runs: if not force and run in processed: self.logger.info("Skipping run %s, already processed", run) continue else: self.logger.info("Processing run %s", run) # get the list of files to process flist = { DPR_TYPE: self.get_files( DPR_TYPE, first_only=True, run=run, recipe_name=from_recipe, remove_excludes=True, ) for DPR_TYPE in DPR_TYPES } if any(len(v) == 0 for v in flist.values()): self.logger.error("No files for run %s", run) continue # this is the name used for the output_dir rec_name = name or run self._run_recipe_simple( recipe_cls, rec_name, run, flist, params_name=params_name, save_kwargs={"run": run}, **kwargs, )
def _get_recipe_conf(self, recipe_name, params_name=None, item=None): """Get config dict for a recipe.""" if params_name is not None: if params_name not in self.conf["recipes"]: raise ValueError( f"could not find the '{params_name}' " "parameters in the settings file" ) recipe_name = params_name recipe_conf = self.conf["recipes"].get(recipe_name, {}) if item is not None: return recipe_conf.get(item, {}) else: return recipe_conf def _instantiate_recipe(self, recipe_cls, recipe_name, kwargs=None, verbose=True): """Instantiate the recipe object. Use parameters from the settings, common first, and then from recipe_name.init, and from kwargs. """ recipe_kw = { **self.conf["recipes"]["common"], **self._get_recipe_conf(recipe_name, item="init"), } if kwargs is not None: recipe_kw.update(kwargs) # filter kwargs to match the signature sig = inspect.signature(recipe_cls) recipe_kw = {k: v for k, v in recipe_kw.items() if k in sig.parameters} return recipe_cls(**recipe_kw, verbose=verbose) def _save_reduced( self, recipe, keys, DPR_CATG="SCIENCE", recipe_name=None, **kwargs ): """Save info about reduced data. - A JSON file is saved in the output directory, with all information including the complete list of calibration and input files. - For each output frame, the same information are saved except the calibration and input files, and after checking that files were created for each frame (some are optional). """ date_run = datetime.datetime.now().isoformat() recipe_name = recipe_name or recipe.recipe_name info = { "date_run": date_run, "recipe_name": recipe_name, "path": recipe.output_dir, "DPR_CATG": DPR_CATG, "musered_version": __version__, **kwargs, } recipe_file = f"{recipe.output_dir}/{recipe_name}.json" with open(recipe_file, mode="w") as f: json.dump({**info, **recipe.dump(include_files=True)}, f, indent=4) rows = [ { "DPR_TYPE": out_frame, **info, "recipe_file": recipe_file, **recipe.dump(json_col=True), } for out_frame in recipe.output_frames if any(iglob(f"{recipe.output_dir}/{out_frame}*.fits")) ] upsert_many(self.db, self.reduced.name, rows, keys=keys) if len(rows) == 0: raise RuntimeError("could not find output files") self.logger.info( "Processed data (%s) available in %s", ", ".join(row["DPR_TYPE"] for row in rows), recipe.output_dir, ) def _get_scales(self, explist, from_recipe, bands): """Get scale and offset factors from the imphot tables.""" # read imphot tables, select useful columns, and stack them tables = [] for o in self.reduced.find( name=explist, recipe_name=from_recipe, DPR_TYPE="IMPHOT", order_by="name" ): data = fits.getdata(f"{o['path']}/IMPHOT.fits") t = Table( [[o["name"]] * len(data), data["filter"], data["scale"], data["bg"]], names=("name", "filter", "scale", "offset"), ) tables.append(t) tbl = vstack(tables) # select values for the bands of interest if isinstance(bands, str): scale_tbl = tbl[tbl["filter"] == bands] else: # compute the mean over the bands tbl = tbl[np.logical_or.reduce([tbl["filter"] == f for f in bands], axis=0)] scale_tbl = tbl.group_by("name").groups.aggregate(np.mean) return scale_tbl