# Usage:
# fsf_exposure raw imphot
# compute full fsf from psfrec and optionaly rescale values using imphot
#
import logging
import os
import sys
from os.path import join
import numpy as np
from astropy.table import Column, Table
from mpdaf.log import clear_loggers
from mpdaf.MUSE import MoffatModel2
from ..utils import get_exp_name
from .recipe import PythonRecipe
try:
import muse_psfr
except ImportError:
PSFR_VERSION = "unknown"
muse_psfr = None
else:
PSFR_VERSION = muse_psfr.__version__
def do_fsf(
expname, inputfile, outputfile1, outputfile2, imphot_table=None, filters=None
):
logger = logging.getLogger(__name__)
if muse_psfr is None:
logger.error("muse_psfr is not installed")
sys.exit(1)
# clear muse_psfr logger and set to WARNING if not in DEBUG
clear_loggers("muse_psfr")
if logging.getLogger("").handlers[0].level > logging.DEBUG:
logging.getLogger("muse_psfr").setLevel("WARNING")
# run psfrec
fsfmodel = MoffatModel2.from_psfrec(inputfile, verbose=False)
lbrange = np.array([fsfmodel.lbrange[0],7000.0,fsfmodel.lbrange[1]])
fwhm = fsfmodel.get_fwhm(lbrange)
beta = fsfmodel.get_beta(lbrange)
logger.info(
"FSF PsfRec model FWHM %.2f-%.2f BETA %.2f-%.2f",
fwhm[0],
fwhm[-1],
beta[0],
beta[-1],
)
kernel = 0
if imphot_table is not None:
tab = Table.read(imphot_table)
if outputfile2 is not None:
# create output table with FWHM and BETA difference
vtab = Table(
names=[
"BAND",
"LBDA",
"PSREC_FWHM",
"PSFREC_BETA",
"IMPHOT_FWHM",
"IMPHOT_BETA",
"ERR_FWHM",
"ERR_BETA",
],
dtype=["S25"] + 7 * ["f8"],
)
for b, lb in zip(*filters):
irow = tab[tab["filter"] == b]
ifwhm = irow["fwhm"]
ibeta = irow["beta"]
pfwhm = fsfmodel.get_fwhm(lb)
pbeta = fsfmodel.get_beta(lb)
vtab.add_row(
[b, lb, pfwhm, pbeta, ifwhm, ibeta, ifwhm - pfwhm, ibeta - pbeta]
)
vtab.write(outputfile2, overwrite=True)
# computing convolution kernel
imphot_fwhms = tab[[e["filter"] in filters[0] for e in tab]]["fwhm"]
psfrec_fwhms = fsfmodel.get_fwhm(np.array(filters[1]))
kernels = np.sqrt(np.clip(imphot_fwhms ** 2 - psfrec_fwhms ** 2, 0, np.nan))
for band, wave, f1, f2, kernel in zip(
filters[0], filters[1], imphot_fwhms, psfrec_fwhms, kernels
):
logger.debug(
"Filter %s Wave: %.1f IMPHOT FWHM %.2f PSFREC FWHM %.2f Kernel %.2f",
band,
wave,
f1,
f2,
kernel,
)
kernel_max = np.mean(kernels)
if kernel_max == 0:
logger.debug("IMPHOT FSF is smaller than PSFREC FSF, no convolution")
else:
kernels = [
0.6 * kernel_max,
0.7 * kernel_max,
0.8 * kernel_max,
0.9 * kernel_max,
kernel_max,
]
diffs = []
for kernel in kernels:
logger.debug(
"Convolution of FSFmodel with a Gaussian Kernel FWHM: %.2f", kernel
)
cfsfmodel = fsfmodel.convolve(kernel)
final_fwhms = cfsfmodel.get_fwhm(np.array(filters[1]))
diff = []
for band, wave, f1, f2, f3 in zip(
filters[0], filters[1], imphot_fwhms, psfrec_fwhms, final_fwhms
):
logger.debug(
"Filter %s Wave: %.1f IMPHOT FWHM %.2f PSFREC FWHM %.2f COMPUTED %.2f DIFF %.2f",
band,
wave,
f1,
f2,
f3,
f3 - f1,
)
diff.append(f3 - f1)
diffs.append(np.mean(diff))
kernel = np.interp(0, diffs, kernels)
logger.debug("Final Gaussian Kernel FWHM: %.2f", kernel)
cfsfmodel = fsfmodel.convolve(kernel)
final_fwhms = cfsfmodel.get_fwhm(np.array(filters[1]))
for band, wave, f1, f2, f3 in zip(
filters[0], filters[1], imphot_fwhms, psfrec_fwhms, final_fwhms
):
logger.debug(
"Filter %s Wave: %.1f IMPHOT FWHM %.2f PSFREC FWHM %.2f FINAL %.2f DIFF %.2f",
band,
wave,
f1,
f2,
f3,
f3 - f1,
)
lbrange = np.array([fsfmodel.lbrange[0],7000.0,fsfmodel.lbrange[1]])
fwhm = cfsfmodel.get_fwhm(lbrange)
beta = cfsfmodel.get_beta(lbrange)
logger.info(
"FSF Final model FWHM %.2f-%.2f BETA %.2f-%.2f",
fwhm[0],
fwhm[-1],
beta[0],
beta[-1],
)
fsfmodel = cfsfmodel
tab = Table(
names=[
"NAME",
"LBDA0",
"LBDA1",
"FWHM_B",
"FWHM_V",
"FWHM_R",
"BETA_B",
"BETA_V",
"BETA_R",
"KERNEL",
"NCFWHM",
"NCBETA",
],
dtype=["S25"] + 9 * ["f8"] + 2 * ["i4"],
)
row = [
expname,
fsfmodel.lbrange[0],
fsfmodel.lbrange[1],
fwhm[0],
fwhm[1],
fwhm[2],
beta[0],
beta[1],
beta[2],
kernel,
len(fsfmodel.fwhm_pol),
len(fsfmodel.beta_pol),
]
for k, val in enumerate(fsfmodel.fwhm_pol):
tab.add_column(Column(name=f"FWHM_P{k}", dtype="f8"))
row.append(val)
for k, val in enumerate(fsfmodel.beta_pol):
tab.add_column(Column(name=f"BETA_P{k}", dtype="f8"))
row.append(val)
tab.add_row(row)
tab.write(outputfile1, overwrite=True)
[docs]class FSF(PythonRecipe):
"""Recipe to compute FSF for individual exposures"""
recipe_name = "fsf"
DPR_TYPE = "OBJECT"
output_dir = "fsf"
output_frames = ["FSF"]
version = f"muse_psfr-{PSFR_VERSION}"
n_inputs_rec = 1
default_params = dict()
def _run(self, flist, *args, imphot_tables=None, filters=None, **kwargs):
expname = get_exp_name(flist[0])
out1 = join(self.output_dir, f"FSF.fits")
if imphot_tables is not None:
out2 = join(self.output_dir, f"FSF_PSFREC_IMPHOT.fits")
self.imphot_table = imphot_tables.get(expname)
else:
out2 = None
self.imphot_table = None
do_fsf(
expname,
flist[0],
out1,
out2,
imphot_table=self.imphot_table,
filters=filters,
**self.param,
)
return out1, out2
[docs] def dump(self, include_files=False, json_col=False):
info = super().dump(include_files=include_files, json_col=json_col)
if include_files:
# Add the imphot table only in the json file
info.update({"imphot_table": self.imphot_table})
return info