Source code for dbsp_drp.splicing

"""
Automated splicing for P200 DBSP.
"""

import os
from typing import Tuple, List

import numpy as np

from astropy.io import fits
import astropy.stats
import astropy.table

import pypeit
import pypeit.pypeit
from pypeit import msgs

import dbsp_drp

[docs]def get_raw_hdus_from_spec1d(spec1d_list: List[Tuple[str, int]], root: str, output_path: str) -> List[fits.BinTableHDU]: """ Returns list of ``fits.BinTableHDU`` s, each containing the raw header and the 1D spectrum, of the input spec1d files. Args: spec1d_list (List[Tuple[str, int]]): List of (spec1d filename, spatial pixel coordinate) root (str): Path to raw data files, possibly including filename prefix. output_path (str): reduction output path Returns: List[fits.BinTableHDU]: List of raw data headers and data from input spec1d files. """ ret = [] for (spec1d, spat) in spec1d_list: raw_fname = os.path.join(os.path.dirname(root), f"{os.path.basename(spec1d).split('_')[1].split('-')[0]}.fits") # get the raw header with fits.open(raw_fname) as raw_hdul: raw_header = raw_hdul[0].header.copy() # get the spectrum spec1d_path = os.path.join(output_path, 'Science', spec1d) with fits.open(spec1d_path) as spec1d_hdul: for hdu in spec1d_hdul: if f'SPAT{spat:04d}' in hdu.name: raw_data = hdu.data.copy() wave_col = fits.Column(name='wave', array=raw_data['OPT_WAVE'], unit='ANGSTROM', format='D') flux_col = fits.Column(name='flux', array=raw_data['OPT_FLAM'], unit='E-17 ERG/S/CM^2/ANG', format='D') sigma_col = fits.Column(name='sigma', array=raw_data['OPT_FLAM_SIG'], unit='E-17 ERG/S/CM^2/ANG', format='D') ret.append(fits.BinTableHDU.from_columns([wave_col, flux_col, sigma_col], name=os.path.splitext(os.path.basename(raw_fname))[0].upper(), header=raw_header)) return ret
[docs]def splice(splicing_dict: dict, interpolate_gaps: bool, root: str, output_path: str) -> None: """ Splices red and blue spectra together. .. code-block:: splicing_dict[target_name][position_along_slit][arm] = { 'spec1ds': [(spec1d_filename_1, spatial_pixel_1), (spec1d_filename_2, spatial_pixel_2)], 'coadd': coadd_filename } Args: splicing_dict (dict): Guides splicing. interpolate_gaps (bool): Interpolate across gaps in wavelength coverage? root (str): Path to raw data files, possibly including filename prefix. output_path (str): reduction output path """ for target, targets_dict in splicing_dict.items(): label = 'a' for _, arm_dict in targets_dict.items(): red_dict = arm_dict.get('red', {}) blue_dict = arm_dict.get('blue', {}) bluefile = blue_dict.get('coadd') redfile = red_dict.get('coadd') spec_b = None spec_r = None if bluefile is not None: bluefile = os.path.join(output_path, 'Science', bluefile) spec_b = fits.open(bluefile)[1].data if redfile is not None: redfile = os.path.join(output_path, 'Science', redfile) spec_r = fits.open(redfile)[1].data if bluefile is None and redfile is None: continue ((final_wvs, final_flam, final_flam_sig), (red_wvs, red_flam, red_sig), (blue_wvs, blue_flam, blue_sig)) = adjust_and_combine_overlap(spec_b, spec_r, interpolate_gaps) primary_header = fits.Header() primary_header['HIERARCH DBSP_DRP_V'] = dbsp_drp.__version__ primary_header['PYPEIT_V'] = pypeit.__version__ primary_header['NUMPY_V'] = np.__version__ primary_header['HIERARCH ASTROPY_V'] = astropy.__version__ primary_header['B_COADD'] = bluefile primary_header['R_COADD'] = redfile primary_hdu = fits.PrimaryHDU(header=primary_header) raw_red_hdus = get_raw_hdus_from_spec1d(red_dict.get('spec1ds', []), root, output_path) raw_blue_hdus = get_raw_hdus_from_spec1d(blue_dict.get('spec1ds', []), root, output_path) col_wvs = fits.Column(name='wave', array=red_wvs, unit='ANGSTROM', format='D') col_flux = fits.Column(name='flux', array=red_flam, unit='E-17 ERG/S/CM^2/ANG', format='D') col_error = fits.Column(name='sigma', array=red_sig, unit='E-17 ERG/S/CM^2/ANG', format='D') red_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="RED") col_wvs = fits.Column(name='wave', array=blue_wvs, unit='ANGSTROM', format='D') col_flux = fits.Column(name='flux', array=blue_flam, unit='E-17 ERG/S/CM^2/ANG', format='D') col_error = fits.Column(name='sigma', array=blue_sig, unit='E-17 ERG/S/CM^2/ANG', format='D') blue_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="BLUE") col_wvs = fits.Column(name='wave', array=final_wvs, unit='ANGSTROM', format='D') col_flux = fits.Column(name='flux', array=final_flam, unit='E-17 ERG/S/CM^2/ANG', format='D') col_error = fits.Column(name='sigma', array=final_flam_sig, unit='E-17 ERG/S/CM^2/ANG', format='D') table_hdu = fits.BinTableHDU.from_columns([col_wvs, col_flux, col_error], name="SPLICED") table_hdu.header['HIERARCH INTERP_GAPS'] = interpolate_gaps hdul = fits.HDUList(hdus=[primary_hdu, *raw_red_hdus, *raw_blue_hdus, red_hdu, blue_hdu, table_hdu]) log_msg = f"{target}_{label}.fits contains " if redfile is None: log_msg += f"{os.path.basename(bluefile)}" elif bluefile is None: log_msg += f"{os.path.basename(redfile)}" else: log_msg += f"{os.path.basename(redfile)} and {os.path.basename(bluefile)}" print(log_msg) hdul.writeto(os.path.join(output_path, "spliced", f'{target}_{label}.fits'), overwrite=True) label = chr(ord(label) + 1)
[docs]def adjust_and_combine_overlap( spec_b: fits.FITS_rec, spec_r: fits.FITS_rec, interpolate_gaps: bool, red_mult: float = 1.0 ) -> Tuple[ Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray] ]: """ Takes in red and blue spectra, adjusts overall flux level by red_mult, and combines overlap region. In the overlap region, the red spectrum is linearly interpolated to match the blue spectrum's wavelength spacing. Args: spec_b (fits.FITS_rec): blue spectrum spec_r (fits.FITS_rec): red spectrum. interpolate_gaps (bool): Interpolate across gaps in wavelength coverage? red_mult (float, optional): Factor multiplied into the red spectrum to match overal flux level with the blue spectrum. Defaults to 1.0. Raises: ValueError: Raised when both `spec_b` and `spec_r` are empty or None. Returns: Tuple[ Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray], Tuple[np.ndarray, np.ndarray, np.ndarray] ]: (blue, red, combined) spectra, where each spectrum is a tuple of (wavelengths, flux, error) """ if ((spec_b is None or not spec_b['wave'].shape[0]) and (spec_r is None or not spec_r['wave'].shape[0])): raise ValueError("Both arguments cannot be empty or None.") # TODO: propagate input masks if spec_r is None or not spec_r['wave'].shape[0]: return ((spec_b['wave'], spec_b['flux'], spec_b['ivar'] ** -0.5), (None, None, None), (spec_b['wave'], spec_b['flux'], spec_b['ivar'] ** -0.5)) if spec_b is None or not spec_b['wave'].shape[0]: return ((spec_r['wave'], red_mult*spec_r['flux'], red_mult*spec_r['ivar'] ** -0.5), (spec_r['wave'], spec_r['flux'], spec_r['ivar'] ** -0.5), (None, None, None)) # combination steps overlap_lo = spec_r['wave'][0] overlap_hi = spec_b['wave'][-1] # maybe need to fix the overlaps? # need more finely spaced grid to be completely contained within coarser grid if overlap_lo > overlap_hi: # there is no overlap! # we can't adjust the flux level # so we just concatenate! final_wvs = np.concatenate([spec_b['wave'], spec_r['wave']]) final_flam = np.concatenate([spec_b['flux'], spec_r['flux']*red_mult]) final_flam_sig = np.concatenate([spec_b['ivar'] ** -0.5, (spec_r['ivar'] ** -0.5) * red_mult]) return ((final_wvs, final_flam, final_flam_sig), (spec_r['wave'], spec_r['flux'], spec_r['ivar'] ** -0.5), (spec_b['wave'], spec_b['flux'], spec_b['ivar'] ** -0.5)) olap_r = (spec_r['wave'] < overlap_hi) olap_b = (spec_b['wave'] > overlap_lo) ## 05/25/2021 red_mult is not really necessary, spectra look better without it. ## 06/25/2021 keeping red_mult as an argument for manual_splicing #red_mult = (astropy.stats.sigma_clipped_stats(spec_b['flux'][olap_b])[1] / # astropy.stats.sigma_clipped_stats(spec_r['flux'][olap_r])[1]) # different dispersion. wvs_b = spec_b['wave'][~olap_b] wvs_r = spec_r['wave'][~olap_r] flam_b = spec_b['flux'][~olap_b] flam_r = spec_r['flux'][~olap_r] flam_sig_b = spec_b['ivar'][~olap_b] ** -0.5 flam_sig_r = spec_r['ivar'][~olap_r] ** -0.5 olap_wvs_r = spec_r['wave'][olap_r] olap_flam_r = red_mult * spec_r['flux'][olap_r] olap_flam_sig_r = red_mult * spec_r['ivar'][olap_r] ** -0.5 olap_wvs_b = spec_b['wave'][olap_b][:-1] olap_flam_b = spec_b['flux'][olap_b][:-1] olap_flam_sig_b = spec_b['ivar'][olap_b][:-1] ** -0.5 olap_flam_r_interp, olap_flam_sig_r_interp = interp_w_error(olap_wvs_b, olap_wvs_r, olap_flam_r, olap_flam_sig_r, interpolate_gaps) olap_flams = np.array([olap_flam_r_interp, olap_flam_b]) sigs = np.array([olap_flam_sig_r_interp, olap_flam_sig_b]) weights = sigs ** -2.0 olap_flam_avgd = np.average(olap_flams, axis=0, weights=weights) olap_flam_sig_avgd = 1.0 / np.sqrt(np.mean(weights, axis=0)) final_wvs = np.concatenate((wvs_b, olap_wvs_b, wvs_r)) final_flam = np.concatenate((flam_b, olap_flam_avgd, red_mult * flam_r)) final_flam_sig = np.concatenate((flam_sig_b, olap_flam_sig_avgd, red_mult * flam_sig_r)) return ((final_wvs, final_flam, final_flam_sig), (spec_r['wave'], spec_r['flux'], spec_r['ivar'] ** -0.5), (spec_b['wave'], spec_b['flux'], spec_b['ivar'] ** -0.5))
[docs]def interp_w_error(x: np.ndarray, xp: np.ndarray, yp: np.ndarray, err_yp: np.ndarray, interpolate_gaps: bool) -> Tuple[np.ndarray, np.ndarray]: """ Linearly interpolate the data points (``xp``, ``yp``) with ``err_yp`` uncertainty onto the grid ``x``. Args: x (np.ndarray): destination x data xp (np.ndarray): source x data yp (np.ndarray): source y data err_yp (np.ndarray): source y error data interpolate_gaps (bool): Interpolate across gaps in ``xp``? Returns: Tuple[np.ndarray, np.ndarray]: Interpolated y and error. """ if len(xp) == 1: return np.ones_like(x) * yp[0], np.ones_like(x) * err_yp[0] y = np.zeros_like(x) yerr = np.zeros_like(x) slopes = np.zeros(xp.shape[0] - 1) dxp = np.diff(xp) mean_dxp, _, _ = astropy.stats.sigma_clipped_stats(dxp) for i in range(len(slopes)): slopes[i] = (yp[i+1] - yp[i])/dxp[i] #slopes[-1] = slopes[-2] for i in range(len(x)): # find the index j into xp such that xp[j-1] <= x[i] < xp[j] j = np.searchsorted(xp, x[i], side='right') if (x[i] == xp[j-1]): y[i] = yp[j-1] yerr[i] = err_yp[j-1] elif (j == len(xp)): # extrapolating outside domain!!! y[i] = yp[-1]# + slopes[j-2]*(x[i] - xp[-1]) yerr[i] = np.sqrt((((x[i] - xp[-2])*err_yp[-1]) ** 2 + ((x[i] - xp[-1])*err_yp[-2]) ** 2) / ((xp[-2] - xp[-1]) ** 2)) elif (j == 0): # extrapolating outside domain!!! y[i] = yp[0]# + slopes[j]*(x[i] - xp[0]) yerr[i] = np.sqrt((((x[i] - xp[0])*err_yp[1]) ** 2 + ((x[i] - xp[1])*err_yp[0]) ** 2) / ((xp[1] - xp[0]) ** 2)) else: y[i] = yp[j-1] + slopes[j-1]*(x[i] - xp[j-1]) # If we are interpolating a gap larger than 5 times the avg d lambda if (xp[j] - xp[j-1]) > mean_dxp * 5: if interpolate_gaps: # err is max of edge points yerr[i] = max(err_yp[j], err_yp[j-1]) else: # err is infinite, so this point is completely discounted yerr[i] = np.inf else: yerr[i] = np.sqrt((((x[i] - xp[j])*err_yp[j-1]) ** 2 + ((x[i] - xp[j-1])*err_yp[j]) ** 2) / ((xp[j-1] - xp[j]) ** 2)) return y, yerr