import argparse
import os
from typing import Optional, List
import re
from astropy.io import fits
# needed for version info
import numpy as np
import astropy
import pypeit
import dbsp_drp
from pypeit.history import History
from dbsp_drp import splicing, adjust_splicing
[docs]def entrypoint():
main(parse())
[docs]def parse(options: Optional[List[str]] = None) -> argparse.Namespace:
argparser = argparse.ArgumentParser(description="Manually splice two coadded files together for DBSP.\n"
"After preparing the spliced spectrum, the dbsp_adjust_splicing GUI pops up to allow for the splicing "
"to be adjusted. To save the manually spliced spectrum, you MUST press save in the GUI.",
formatter_class=argparse.RawTextHelpFormatter)
argparser.add_argument("raw_data_path", type=str, help="Path to raw data from this reduction")
argparser.add_argument("outfile", type=str, help="Destination of spliced spectrum.")
argparser.add_argument("-r", "--red_file", type=str, default=None,
help="redNNNN-redMMMM_SPATXXXX.fits file.")
argparser.add_argument("-b", "--blue_file", type=str, default=None,
help="blueNNNN-redMMMM_SPATXXXX.fits file.")
return argparser.parse_args() if options is None else argparser.parse_args(options)
[docs]def find_spec1ds_spats(history: History):
unique_spec1ds = {}
spec1ds_spats = []
for line in history.history:
match = re.search(r'File (\d+) "(.*)"', line)
if match:
unique_spec1ds[match.group(1)] = match.group(2)
for line in history.history:
match = re.search(r"Object ID (.*) from file (\d+)", line)
if match:
objid = match.group(1)
spec1d = unique_spec1ds[match.group(2)]
spat = int(re.search(r"SPAT(\d+)", objid).group(1))
spec1ds_spats.append((spec1d, spat))
return spec1ds_spats
[docs]def main(args: argparse.Namespace):
if args.red_file is None and args.blue_file is None:
raise ValueError("You must provide at least one of red_file and blue_file")
# infer output dir
output_path = os.path.abspath(
os.path.join(
os.path.dirname(args.red_file if args.red_file is not None else args.blue_file),
"..")
)
# read them in
blue_history = History()
red_history = History()
spec_b = None
spec_r = None
if args.blue_file is not None:
hdul_b = fits.open(args.blue_file)
spec_b = hdul_b[1].data
blue_history = History(hdul_b[0].header)
if args.red_file is not None:
hdul_r = fits.open(args.red_file)
spec_r = hdul_r[1].data
red_history = History(hdul_r[0].header)
# find the spec1ds and spats
blue_spec1ds_spats = find_spec1ds_spats(blue_history)
red_spec1ds_spats = find_spec1ds_spats(red_history)
# initial aco
((final_wvs, final_flam, final_flam_sig),
(red_wvs, red_flam, red_sig),
(blue_wvs, blue_flam, blue_sig)) = splicing.adjust_and_combine_overlap(spec_b, spec_r, False)
# build the final data product
primary_header = fits.Header()
primary_header['HIERARCH DBSP_DRP_V'] = dbsp_drp.__version__
primary_header['PYPEIT_V'] = pypeit.__version__
primary_header['NUMPY_V'] = np.__version__
primary_header['HIERARCH ASTROPY_V'] = astropy.__version__
primary_header['B_COADD'] = args.blue_file
primary_header['R_COADD'] = args.red_file
primary_hdu = fits.PrimaryHDU(header=primary_header)
raw_red_hdus = splicing.get_raw_hdus_from_spec1d(red_spec1ds_spats, args.raw_data_path, output_path)
raw_blue_hdus = splicing.get_raw_hdus_from_spec1d(blue_spec1ds_spats, args.raw_data_path, output_path)
col_wvs = fits.Column(name='wave', array=red_wvs, unit='ANGSTROM', format='D')
col_flux = fits.Column(name='flux', array=red_flam, unit='E-17 ERG/S/CM^2/ANG', format='D')
col_error = fits.Column(name='sigma', array=red_sig, unit='E-17 ERG/S/CM^2/ANG', format='D')
red_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="RED")
col_wvs = fits.Column(name='wave', array=blue_wvs, unit='ANGSTROM', format='D')
col_flux = fits.Column(name='flux', array=blue_flam, unit='E-17 ERG/S/CM^2/ANG', format='D')
col_error = fits.Column(name='sigma', array=blue_sig, unit='E-17 ERG/S/CM^2/ANG', format='D')
blue_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="BLUE")
col_wvs = fits.Column(name='wave', array=final_wvs, unit='ANGSTROM', format='D')
col_flux = fits.Column(name='flux', array=final_flam, unit='E-17 ERG/S/CM^2/ANG', format='D')
col_error = fits.Column(name='sigma', array=final_flam_sig, unit='E-17 ERG/S/CM^2/ANG', format='D')
table_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="SPLICED")
hdul = fits.HDUList(hdus=[primary_hdu, *raw_red_hdus, *raw_blue_hdus, red_hdu, blue_hdu, table_hdu])
# before we save, run adjust splicing GUI
adjust_splicing.adjust_splicing_GUI(hdul, args.outfile)