from enum import Enum
import numpy as np
from astropy.utils.decorators import lazyproperty
from sqlalchemy import sql
from .utils import ensure_list, load_table
FLAGS = {
"BAD_CENTERING": "Centering offset is wrong",
"BAD_IMAQUALITY": "Bad image quality",
"BAD_SKY_FLUX": "Sky flux looks wrong",
"BAD_SKY_SUB": "Sky subtraction is bad",
"BAD_SLICE": "Slices with wrong flux",
"IMPHOT_BAD_SCALE": "Flux scale value computed by Imphot seems wrong",
"IMPHOT_HIGH_BKG": "Flux offset value computed by Imphot seems wrong",
"IMPHOT_OUTLIER_OFFSET": "Centering value computed by Imphot seems wrong",
"SATELLITE": "Satellite track",
"SHORT_EXPTIME": "Incomplete observation",
"SLICE_GRADIENT": "Slices show a flux gradient",
}
"""List of pre-defined quality flags, can be extended in the settings file."""
def flag_name(flag):
return flag.name if isinstance(flag, Enum) else flag
[docs]class QAFlags:
"""Manage QA flags.
Parameters
----------
table : dataset.Table
The table containing the flags.
additional_flags : dict
Additional flags, added to the default FLAGS dict.
"""
def __init__(self, table, additional_flags=None):
flags = FLAGS.copy()
if additional_flags:
flags.update(additional_flags)
self.flags = Enum("flags", flags.items())
# create integer columns for all flags
self.table = table
self.table._sync_columns({"name": "", **{k: 1 for k in self.names}}, True)
self.execute = self.table.db.executable.execute
@lazyproperty
def names(self):
"""Return the list of flag names."""
return [f.name for f in self.flags]
def __getattr__(self, name):
try:
return self.flags[name]
except KeyError:
raise AttributeError
def __dir__(self):
return self.names + super().__dir__()
def _upsert_many(self, rows, keys=["name"]):
with self.table.db as tx:
table = tx[self.table.name]
for row in rows:
table.upsert(row, keys=keys)
[docs] def add(self, exps, *flags, value=1):
"""Add flags to exposures."""
flags = {flag_name(flag): value for flag in flags}
rows = []
for e in ensure_list(exps):
if not isinstance(e, str):
raise TypeError("exposure names should be given as str")
rows.append({"name": e, **flags})
self._upsert_many(rows)
[docs] def remove(self, exps, *flags):
"""Remove flags from exposures."""
# TODO: add a mode where we check that the flags were present
self.add(exps, *flags, value=None)
[docs] def list(self, exps):
"""List flags for exposures."""
exps = ensure_list(exps)
res = {o["name"]: o for o in self.table.find(name=exps)}
out = []
for exp in exps:
if exp not in res:
out.append([])
else:
expf = res[exp]
out.append(
[
flag
for flag in self.flags
if (
expf[flag_name(flag)] is not None
and expf[flag_name(flag)] > 0
)
]
)
return out[0] if len(exps) == 1 else out
[docs] def find(self, *flags, _and=False, flag_dict=None):
"""Find exposures that have some flags.
Examples::
>>> mr.flags.find(mr.flags.SHORT_EXPTIME) # doctest: +SKIP
>>> flags.find(flag_dict={flags.SHORT_EXPTIME: 2,
... flags.IMPHOT_BAD_SCALE: 2}) # doctest: +SKIP
"""
col = self.table.table.c
clauses = [col[flag_name(flag)] > 0 for flag in flags]
if flag_dict:
clauses += [col[flag_name(flag)] == val for flag, val in flag_dict.items()]
if len(clauses) > 1:
func = sql.and_ if _and else sql.or_
wc = func(*clauses)
else:
wc = clauses[0]
return [x[0] for x in self.execute(sql.select([col.name], whereclause=wc))]
[docs] def as_table(self, indexes=None, remove_empty_columns=True):
"""Return the flags table as an astropy Table."""
# For some reason columns are created as float instead of int. So we
# need to convert them. We also remove the columns with no flagged
# exposure.
tbl = load_table(self.table.db, self.table.name, indexes=indexes)
to_remove = ["id"]
for name, col in tbl.columns.items():
if col.info.dtype.kind == "f":
col = col.astype(int)
tbl.replace_column(name, col)
if remove_empty_columns and col.sum() is np.ma.masked:
to_remove.append(name)
tbl.remove_columns(to_remove)
tbl.sort("name")
return tbl