Source code for dbsp_drp.p200_redux

"""
Automatic Reduction Pipeline for P200 DBSP.
"""

import argparse
import os
import time
import multiprocessing
from typing import Optional, List
import pickle

import numpy as np
from matplotlib import pyplot as plt
from astropy.io import fits
from astropy.table import Table, Column, Row

from pypeit.pypeitsetup import PypeItSetup
import pypeit.display
from pypeit.spectrographs.util import load_spectrograph

import tqdm

from dbsp_drp import reduction, qa, fluxing, coadding, telluric, splicing
from dbsp_drp import table_edit
from dbsp_drp import fix_headers


[docs]def entrypoint(): main(parser())
[docs]def parser(options: Optional[List[str]] = None) -> argparse.Namespace: """Parses command line arguments Args: options (Optional[List[str]], optional): List of command line arguments. Defaults to sys.argv[1:]. Returns: argparse.Namespace: Parsed arguments """ # Define command line arguments. argparser = argparse.ArgumentParser(description="Automatic Data Reduction Pipeline for P200 DBSP", formatter_class=argparse.RawTextHelpFormatter) # Argument for fully-automatic (i.e. nightly) or with user-checking file typing argparser.add_argument('-i', '--no-interactive', default=False, action='store_true', help='Interactive file-checking?') # Argument for input file directory argparser.add_argument('-r', '--root', type=os.path.abspath, default=None, required=True, help='File path+root, e.g. /data/DBSP_20200127') argparser.add_argument('-d', '--output_path', type=os.path.abspath, default='.', help='Path to top-level output directory. ' 'Default is the current working directory.') # Argument for specifying only red/blue argparser.add_argument('-a', '--arm', default=None, choices=['red', 'blue'], help='[red, blue] to only reduce one arm (null splicing)') argparser.add_argument('-m', '--manual-extraction', default=False, action='store_true', help='manual extraction') argparser.add_argument('--debug', default=False, action='store_true', help='debug') argparser.add_argument('-j', '--jobs', type=int, default=1, metavar='N', help='Number of processes to use') argparser.add_argument('-p', '--parameter-file', type=str, default="", help="Path to parameter file. The parameter file should be formatted as follows:\n\n" "[blue]\n" "** PypeIt parameters for the blue side goes here **\n" "[red]\n" "** PypeIt parameters for the red side goes here **\n" "EOF\n\n" "The [red/blue] parameter blocks are optional, and their order does not matter.") argparser.add_argument('-t', '--skip-telluric', default=False, action='store_true', help='Skip telluric correction') argparser.add_argument('-c', '--null-coadd', default=False, action='store_true', help="Don't coadd consecutive exposures of the same target.\n" "By default consective exposures will be coadded.") argparser.add_argument('--splicing-interpolate-gaps', default=False, action='store_true', help="Use this option to linearly interpolate across large gaps\n" "in the spectrum during splicing. The default behavior is to\n" "only use data from one detector in these gaps, which results\n" "in a slightly noisier spliced spectrum.") return argparser.parse_args() if options is None else argparser.parse_args(options)
[docs]def interactive_correction(ps: PypeItSetup) -> None: """ Allows for human correction of FITS headers and frame typing. Launches a GUI via dbsp_drp.table_edit, which handles saving updated FITS headers. table_edit depends on the current DBSP headers. Todo: Make table to FITS header mapping mutable Args: ps (PypeItSetup): PypeItSetup object created in dbsp_drp.reduction.setup """ # function for interactively correcting the fits table fitstbl = ps.fitstbl deleted_files = [] table_edit.main(fitstbl.table, deleted_files) files_to_remove = [] for rm_file in deleted_files: for data_file in ps.file_list: if rm_file in data_file: files_to_remove.append(data_file) break for rm_file in files_to_remove: ps.file_list.remove(rm_file)
[docs]def main(args): t = time.perf_counter() if os.path.isdir(args.output_path): os.chdir(args.output_path) else: os.makedirs(args.output_path, exist_ok=True) if args.arm: do_red = args.arm.lower() == 'red' do_blue = args.arm.lower() == 'blue' else: do_red = True do_blue = True red_root = os.path.join(args.root, 'red') blue_root = os.path.join(args.root, 'blue') qa_dict = {} if args.parameter_file: blue_user_config_lines = reduction.parse_pypeit_parameter_file(args.parameter_file, 'p200_dbsp_blue') red_user_config_lines = reduction.parse_pypeit_parameter_file(args.parameter_file, 'p200_dbsp_red') else: blue_user_config_lines = [] red_user_config_lines = [] if args.debug: pypeit.display.display.connect_to_ginga(raise_err=True, allow_new=True) if do_red: red_files = fix_headers.main(red_root, args.no_interactive, args.no_interactive) context = reduction.setup(red_files, args.output_path, 'p200_dbsp_red') # optionally use interactive correction if not args.no_interactive: interactive_correction(context[0]) pypeit_file_red = reduction.write_setup(context, 'all', 'p200_dbsp_red', red_user_config_lines)[0] if do_blue: blue_files = fix_headers.main(blue_root, args.no_interactive, args.no_interactive) context = reduction.setup(blue_files, args.output_path, 'p200_dbsp_blue') if not args.no_interactive: interactive_correction(context[0]) pypeit_file_blue = reduction.write_setup(context, 'all', 'p200_dbsp_blue', blue_user_config_lines)[0] plt.switch_backend("agg") # TODO: parallelize this # Would need to look like # Splitting up the .pypeit files into bits and pieces # Oooh what if I just do the calibration first # and then parallelize the reduction output_spec1ds_blue = set() output_spec1ds_red = set() if do_red: output_spec1ds_red, output_spec2ds_red = reduction.redux(pypeit_file_red, args.output_path) qa_dict = qa.save_2dspecs(qa_dict, output_spec2ds_red, args.output_path, 'p200_dbsp_red') if do_blue: output_spec1ds_blue, output_spec2ds_blue = reduction.redux(pypeit_file_blue, args.output_path) qa_dict = qa.save_2dspecs(qa_dict, output_spec2ds_blue, args.output_path, 'p200_dbsp_blue') if do_red or do_blue: qa.write_extraction_QA(qa_dict, args.output_path) if do_red: verification_counter = 0 red_pypeit_files = reduction.verify_spec1ds(output_spec1ds_red, verification_counter, args.output_path) while red_pypeit_files: verification_counter += 1 out_1d, out_2d = reduction.re_redux(red_pypeit_files, args.output_path) red_pypeit_files = reduction.verify_spec1ds(out_1d, verification_counter, args.output_path) qa_dict = qa.save_2dspecs(qa_dict, out_2d, args.output_path, 'p200_dbsp_red') output_spec1ds_red |= out_1d output_spec2ds_red |= out_2d if do_blue: verification_counter = 0 blue_pypeit_files = reduction.verify_spec1ds(output_spec1ds_blue, verification_counter, args.output_path) while blue_pypeit_files: verification_counter += 1 out_1d, out_2d = reduction.re_redux(blue_pypeit_files, args.output_path) blue_pypeit_files = reduction.verify_spec1ds(out_1d, verification_counter, args.output_path) qa_dict = qa.save_2dspecs(qa_dict, out_2d, args.output_path, 'p200_dbsp_blue') output_spec1ds_blue |= out_1d output_spec2ds_blue |= out_2d # TODO: use a do/while loop to iterate on the manual extraction GUI until user is satisfied if args.manual_extraction: # wait for user acknowledgement input("Ready for manual extraction? If using GNU screen/tmux behind ssh, make sure to check that $DISPLAY is correct.") plt.switch_backend("Qt5Agg") if do_red: red_manual_pypeit_files = reduction.manual_extraction(output_spec2ds_red, pypeit_file_red, args.output_path) if do_blue: blue_manual_pypeit_files = reduction.manual_extraction(output_spec2ds_blue, pypeit_file_blue, args.output_path) if do_red and red_manual_pypeit_files: out_1d, out_2d = reduction.re_redux(red_manual_pypeit_files, args.output_path) qa.save_2dspecs(qa_dict, out_2d, args.output_path, 'p200_dbsp_red') output_spec1ds_red |= out_1d output_spec2ds_red |= out_2d if do_blue and blue_manual_pypeit_files: out_1d, out_2d = reduction.re_redux(blue_manual_pypeit_files, args.output_path) qa.save_2dspecs(qa_dict, out_2d, args.output_path, 'p200_dbsp_blue') output_spec1ds_blue |= out_1d output_spec2ds_blue |= out_2d # Find standards and make sensitivity functions spec1d_table = Table(names=('filename', 'arm', 'object', 'frametype', 'airmass', 'mjd', 'sensfunc', 'exptime'), dtype=(f'U255', 'U4', 'U255', 'U8', float, float, f'U255', float)) # Ingest spec_1d tables spec1ds = output_spec1ds_red | output_spec1ds_blue for spec1d in spec1ds: path = os.path.join(args.output_path, 'Science', spec1d) with fits.open(path) as hdul: head0 = hdul[0].header head1 = hdul[1].header arm = 'red' if 'red' in head0['PYP_SPEC'] else 'blue' spec1d_table.add_row((spec1d, arm, head0['TARGET'], head1['OBJTYPE'], head0['AIRMASS'], head0['MJD'], '', head0['EXPTIME'])) spec1d_table.add_index('filename') spec1d_table.sort(['arm', 'mjd']) if do_red: for row in spec1d_table[(spec1d_table['arm'] == 'red') & (spec1d_table['frametype'] == 'standard')]: sensfunc = fluxing.make_sensfunc(row['filename'], args.output_path, 'p200_dbsp_red', red_user_config_lines) if sensfunc == "": spec1d_table['frametype'][spec1d_table['filename'] == row['filename']] = 'science' else: spec1d_table['sensfunc'][spec1d_table['filename'] == row['filename']] = sensfunc if do_blue: for row in spec1d_table[(spec1d_table['arm'] == 'blue') & (spec1d_table['frametype'] == 'standard')]: sensfunc = fluxing.make_sensfunc(row['filename'], args.output_path, 'p200_dbsp_blue', blue_user_config_lines) if sensfunc == "": spec1d_table['frametype'][spec1d_table['filename'] == row['filename']] = 'science' else: spec1d_table['sensfunc'][spec1d_table['filename'] == row['filename']] = sensfunc if do_red: arm = spec1d_table['arm'] == 'red' stds = (spec1d_table['frametype'] == 'standard') & arm red_arm = load_spectrograph('p200_dbsp_red') rawfile = os.path.join(args.root, spec1d_table[arm][0]['filename'].split('_')[1].split('-')[0] + '.fits' ) config = '_'.join([ 'red', red_arm.get_meta_value(rawfile, 'dispname').replace('/', '_'), red_arm.get_meta_value(rawfile, 'dichroic').lower() ]) if np.any(stds): for row in spec1d_table[arm]: if row['frametype'] == 'science': best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argmin()] elif row['frametype'] == 'standard': if (stds).sum() == 1: best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argmin()] else: best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argsort()[1]] spec1d_table.loc[row['filename']]['sensfunc'] = best_sens else: for filename in spec1d_table[arm]['filename']: spec1d_table.loc[filename]['sensfunc'] = config if do_blue: arm = spec1d_table['arm'] == 'blue' stds = (spec1d_table['frametype'] == 'standard') & arm blue_arm = load_spectrograph('p200_dbsp_blue') rawfile = os.path.join(args.root, spec1d_table[arm][0]['filename'].split('_')[1].split('-')[0] + '.fits' ) config = '_'.join([ 'blue', blue_arm.get_meta_value(rawfile, 'dispname').replace('/', '_'), blue_arm.get_meta_value(rawfile, 'dichroic').lower() ]) if np.any(stds): for row in spec1d_table[arm]: if row['frametype'] == 'science': best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argmin()] elif row['frametype'] == 'standard': if (stds).sum() == 1: best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argmin()] else: best_sens = spec1d_table[stds]['sensfunc'][np.abs(spec1d_table[stds]['airmass'] - row['airmass']).argsort()[1]] spec1d_table.loc[row['filename']]['sensfunc'] = best_sens else: for filename in spec1d_table[arm]['filename']: spec1d_table.loc[filename]['sensfunc'] = config # build fluxfile if do_red: spec1d_to_sensfunc = {row['filename']: row['sensfunc'] for row in spec1d_table if row['arm'] == 'red'} red_fluxfile = fluxing.build_fluxfile(spec1d_to_sensfunc, args.output_path, 'p200_dbsp_red', red_user_config_lines) if do_blue: spec1d_to_sensfunc = {row['filename']: row['sensfunc'] for row in spec1d_table if row['arm'] == 'blue'} blue_fluxfile = fluxing.build_fluxfile(spec1d_to_sensfunc, args.output_path, 'p200_dbsp_blue', blue_user_config_lines) # flux data if do_red: fluxing.flux(red_fluxfile, args.output_path) if do_blue: fluxing.flux(blue_fluxfile, args.output_path) # coadd - intelligent coadding of multiple files # first make a column "coaddID" that is the same for frames to be coadded # TODO: when there are multiple exposures of an object, splice/output all of them coaddIDs = [] if args.null_coadd: coaddIDs = range(len(spec1d_table)) else: previous_row : Row = None S_PER_DAY = 24 * 60 * 60 thresh = 15 for i, row in enumerate(spec1d_table): if i == 0: coaddIDs.append(0) else: # if this is the same object as the last one # and they were taken consecutively if ((row['arm'] == previous_row['arm']) and (row['object'] == previous_row['object']) and ((row['mjd']*S_PER_DAY - previous_row['mjd']*S_PER_DAY - previous_row['exptime']) < previous_row['exptime'])): coaddIDs.append(coaddIDs[-1]) else: coaddIDs.append(coaddIDs[-1] + 1) previous_row = row spec1d_table.add_column(coaddIDs, name="coadd_id") # figure out where on detector likely target is spec1d_table.add_column(Column(name="spats", dtype=object, length=len(spec1d_table))) spec1d_table.add_column(Column(name="fracpos", dtype=object, length=len(spec1d_table))) all_spats = [] all_fracpos = [] # for each spec1d file for filename in spec1d_table['filename']: path = os.path.join(args.output_path, 'Science', filename) with fits.open(path) as hdul: spats = [] fracpos = [] for i in range(1, len(hdul) - 1): # grab all of its extensions' spatial positions spats.append(int(hdul[i].name.split('-')[0].lstrip('SPAT'))) fracpos.append(hdul[i].header['SPAT_FRACPOS']) spats.sort() fracpos.sort() all_spats.append(spats) all_fracpos.append(fracpos) spec1d_table.loc[filename]['spats'] = spats spec1d_table.loc[filename]['fracpos'] = fracpos # add to table??? # this needs to be dtype object to allow for variable length lists spec1d_table.add_column(Column(name="coadds", dtype=object, length=len(spec1d_table))) spec1d_table.add_column([False]*len(all_spats), name="processed") # coadd # iterate over coadd_ids coadd_to_spec1d = {} for coadd_id in set(coaddIDs): subtable = spec1d_table[spec1d_table['coadd_id'] == coadd_id] fname_spats = {row['filename']: row['spats'].copy() for row in subtable} grouped_spats_list = coadding.group_coadds(fname_spats) if all(subtable['arm'] == 'red'): coadds = coadding.coadd(grouped_spats_list, args.output_path, 'p200_dbsp_red', red_user_config_lines) if all(subtable['arm'] == 'blue'): coadds = coadding.coadd(grouped_spats_list, args.output_path, 'p200_dbsp_blue', blue_user_config_lines) assert all(subtable['arm'] == 'red') or all(subtable['arm'] == 'blue'),\ "Something went wrong with coadding..." for row in subtable: spec1d_table.loc[row['filename']]['coadds'] = coadds for i, coadd in enumerate(coadds): coadd_to_spec1d[coadd] = list(zip(grouped_spats_list[i]['fnames'], grouped_spats_list[i]['spats'])) if not args.skip_telluric: # telluric correct if do_red: tellcorr_inputs = [] tell_coadd_fnames = set() for row in spec1d_table[spec1d_table['arm'] == 'red']: if isinstance(row['coadds'], list): for obj in row['coadds']: if not obj in tell_coadd_fnames: tmp = (obj, args.output_path, 'p200_dbsp_red', red_user_config_lines) tellcorr_inputs.append(tmp) tell_coadd_fnames.add(obj) if args.jobs == 1: # do it in series for tellcorr_input in tqdm.tqdm(tellcorr_inputs): telluric.telluric_correct(*tellcorr_input) else: pool = multiprocessing.Pool(args.jobs) list(tqdm.tqdm(pool.imap(telluric.picklable_telluric_correct, tellcorr_inputs), total=len(tellcorr_inputs))) pool.close() pool.join() # Maybe do something here to verify that telluric correction succeeded # and if so, change the coadd names for coadd in tell_coadd_fnames: tell = coadd.replace(".fits", "_tellcorr.fits") tellpath = os.path.join(args.output_path, 'Science', tell) coaddpath = os.path.join(args.output_path, 'Science', coadd) # check if tell exists and is newer than coadd if os.path.isfile(tellpath) and (os.path.getmtime(tellpath) > os.path.getmtime(coaddpath)): # modify coadd for row in spec1d_table: if coadd in row['coadds']: ix = row['coadds'].index(coadd) spec1d_table.loc[row['filename']]['coadds'][ix] = tell coadd_to_spec1d[tell] = coadd_to_spec1d[coadd] del coadd_to_spec1d[coadd] # current splicing - make sure spatial fraction is similar on blue/red # TODO: handle multiple observations of same target throughout night with null coadding # splice data splicing_dict = {} blue_mask = spec1d_table['arm'] == 'blue' red_mask = spec1d_table['arm'] == 'red' os.makedirs(os.path.join(args.output_path, 'spliced'), exist_ok=True) def get_std_trace(std_path: str) -> float: max_sn = -1 max_fracpos = -1 with fits.open(std_path) as hdul: # loop through trace hdus for hdu in hdul: if not 'SPAT' in hdu.name: continue # look at s/n if 'OPT_COUNTS' in hdu.data.dtype.names: this_sn = np.nanmedian(hdu.data['OPT_COUNTS']/hdu.data['OPT_COUNTS_SIG']) elif 'BOX_COUNTS' in hdu.data.dtype.names: this_sn = np.nanmedian(hdu.data['BOX_COUNTS']/hdu.data['BOX_COUNTS_SIG']) else: this_sn = -1 if this_sn > max_sn: max_sn = this_sn max_fracpos = hdu.header['SPAT_FRACPOS'] if max_fracpos == -1: raise Exception(f"Error! No HDUs in {os.path.basename(std_path)} have median S/N > 0.") return max_fracpos ## Need to find red + blue fracpos for standards # hopefully standards only have one star each? # or should i actually try to do matching there stds = spec1d_table['frametype'] == 'standard' if do_red or do_blue: FRACPOS_SUM = 1.0 FRACPOS_TOL = 0.05 if do_red and do_blue: # real matching + splicing std_fracpos_sums = [] if (stds & blue_mask).any() and (stds & red_mask).any(): for row in spec1d_table[stds]: # find closest mjd frame of other arm if not row['processed']: other_arm = spec1d_table['arm'] != row['arm'] corresponding_row = spec1d_table[other_arm & stds][np.abs(spec1d_table[other_arm & stds]['mjd'] - row['mjd']).argmin()] this_path = os.path.join(args.output_path, 'Science', row['filename']) corresponding_path = os.path.join(args.output_path, 'Science', corresponding_row['filename']) std_fracpos_sums.append(get_std_trace(this_path) + get_std_trace(corresponding_path)) spec1d_table.loc[row['filename']]['processed'] = True spec1d_table.loc[corresponding_row['filename']]['processed'] = True FRACPOS_SUM = np.mean(std_fracpos_sums) FRACPOS_TOL = FRACPOS_SUM * .025 # setup splicing dict splicing_dict = {} # for each target for row in spec1d_table: target = row['object'] arm = row['arm'] # for each of its fracpos for i, fracpos in enumerate(row['fracpos']): coadd = row['coadds'][i] targ_dict = splicing_dict.get(target) # normalize fracpos to red if do_red and do_blue and arm == 'blue': fracpos = FRACPOS_SUM - fracpos # if it's not in the dict if targ_dict is None: # put it in the dict splicing_dict[target] = {fracpos: { arm: { 'spec1ds': coadd_to_spec1d[coadd], 'coadd': coadd } }} # else else: close_enough = False # for each existing fracpos for fracpos_existing in list(targ_dict): # if its close enough if abs(fracpos_existing - fracpos) < FRACPOS_TOL: # put it in the dict splicing_dict[target][fracpos_existing][arm] = { 'spec1ds': coadd_to_spec1d[coadd], 'coadd': coadd } close_enough = True break if not close_enough: # If this fracpos isn't close enough to any others splicing_dict[target][fracpos] = {arm: { 'spec1ds': coadd_to_spec1d[coadd], 'coadd': coadd }} # And now, actually splice! splicing.splice(splicing_dict, args.splicing_interpolate_gaps, red_root, args.output_path) print('Elapsed time: {0} seconds'.format(time.perf_counter() - t))