Source code for redback.transient.transient

from __future__ import annotations

from typing import Union

import matplotlib
import numpy as np
import pandas as pd

import redback
from redback.plotting import \
    LuminosityPlotter, FluxDensityPlotter, IntegratedFluxPlotter, MagnitudePlotter, \
    IntegratedFluxOpticalPlotter, SpectrumPlotter, LuminosityOpticalPlotter
from redback.model_library import all_models_dict
from collections import namedtuple

[docs] class Spectrum(object):
[docs] def __init__(self, angstroms: np.ndarray, flux_density: np.ndarray, flux_density_err: np.ndarray, time: str = None, name: str = '', **kwargs) -> None: """ A class to store spectral data. :param angstroms: Wavelength in angstroms. :param flux_density: flux density in ergs/s/cm^2/angstrom. :param flux_density_err: flux density error in ergs/s/cm^2/angstrom. :param time: Time of the spectrum. Could be a phase or time since burst. Only used for plotting. :param name: Name of the spectrum. """ self.angstroms = angstroms self.flux_density = flux_density self.flux_density_err = flux_density_err self.time = time self.name = name if self.time is None: self.plot_with_time_label = False else: self.plot_with_time_label = True self.directory_structure = redback.get_data.directory.spectrum_directory_structure(transient=name) self.data_mode = 'spectrum'
@property def xlabel(self) -> str: """ :return: xlabel used in plotting functions :rtype: str """ return r'Wavelength [$\mathrm{\AA}$]' @property def ylabel(self) -> str: """ :return: ylabel used in plotting functions :rtype: str """ return r'Flux ($10^{-17}$ erg s$^{-1}$ cm$^{-2}$ $\mathrm{\AA}$)'
[docs] def plot_data(self, axes: matplotlib.axes.Axes = None, filename: str = None, outdir: str = None, save: bool = True, show: bool = True, color: str = 'k', **kwargs) -> matplotlib.axes.Axes: """Plots the Transient data and returns Axes. :param axes: Matplotlib axes to plot the lightcurve into. Useful for user specific modifications to the plot. :param filename: Name of the file to be plotted in. :param outdir: The directory in which to save the file in. :param save: Whether to save the plot. (Default value = True) :param show: Whether to show the plot. (Default value = True) :param color: Color of the data. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_data.__doc__)` to see all options! :return: The axes with the plot. """ plotter = SpectrumPlotter(spectrum=self, color=color, filename=filename, outdir=outdir, **kwargs) return plotter.plot_data(axes=axes, save=save, show=show)
[docs] def plot_spectrum( self, model: callable, filename: str = None, outdir: str = None, axes: matplotlib.axes.Axes = None, save: bool = True, show: bool = True, random_models: int = 100, posterior: pd.DataFrame = None, model_kwargs: dict = None, **kwargs: None) -> matplotlib.axes.Axes: """ :param model: The model used to plot the lightcurve. :param filename: The output filename. Otherwise, use default which starts with the name attribute and ends with *lightcurve.png. :param axes: Axes to plot in if given. :param save: Whether to save the plot. :param show: Whether to show the plot. :param random_models: Number of random posterior samples plotted faintly. (Default value = 100) :param posterior: Posterior distribution to which to draw samples from. Is optional but must be given. :param outdir: Out directory in which to save the plot. Default is the current working directory. :param model_kwargs: Additional keyword arguments to be passed into the model. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_lightcurve.__doc__)` to see all options! :return: The axes. """ plotter = SpectrumPlotter( spectrum=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) return plotter.plot_spectrum(axes=axes, save=save, show=show)
[docs] def plot_residual(self, model: callable, filename: str = None, outdir: str = None, axes: matplotlib.axes.Axes = None, save: bool = True, show: bool = True, posterior: pd.DataFrame = None, model_kwargs: dict = None, **kwargs: None) -> matplotlib.axes.Axes: """ :param model: The model used to plot the lightcurve. :param filename: The output filename. Otherwise, use default which starts with the name attribute and ends with *lightcurve.png. :param axes: Axes to plot in if given. :param save: Whether to save the plot. :param show: Whether to show the plot. :param posterior: Posterior distribution to which to draw samples from. Is optional but must be given. :param outdir: Out directory in which to save the plot. Default is the current working directory. :param model_kwargs: Additional keyword arguments to be passed into the model. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_residual.__doc__)` to see all options! :return: The axes. """ plotter = SpectrumPlotter( spectrum=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, **kwargs) return plotter.plot_residuals(axes=axes, save=save, show=show)
LuminosityPlotter, FluxDensityPlotter, IntegratedFluxPlotter, MagnitudePlotter, IntegratedFluxOpticalPlotter
[docs] class Transient(object): DATA_MODES = ['luminosity', 'flux', 'flux_density', 'magnitude', 'counts', 'ttes'] _ATTRIBUTE_NAME_DICT = dict(luminosity="Lum50", flux="flux", flux_density="flux_density", counts="counts", magnitude="magnitude") ylabel_dict = dict(luminosity=r'Luminosity [$10^{50}$ erg s$^{-1}$]', magnitude=r'Magnitude', flux=r'Flux [erg cm$^{-2}$ s$^{-1}$]', flux_density=r'Flux density [mJy]', counts=r'Counts') luminosity_data = redback.utils.DataModeSwitch('luminosity') flux_data = redback.utils.DataModeSwitch('flux') flux_density_data = redback.utils.DataModeSwitch('flux_density') magnitude_data = redback.utils.DataModeSwitch('magnitude') counts_data = redback.utils.DataModeSwitch('counts') tte_data = redback.utils.DataModeSwitch('ttes')
[docs] def __init__( self, time: np.ndarray = None, time_err: np.ndarray = None, time_mjd: np.ndarray = None, time_mjd_err: np.ndarray = None, time_rest_frame: np.ndarray = None, time_rest_frame_err: np.ndarray = None, Lum50: np.ndarray = None, Lum50_err: np.ndarray = None, flux: np.ndarray = None, flux_err: np.ndarray = None, flux_density: np.ndarray = None, flux_density_err: np.ndarray = None, magnitude: np.ndarray = None, magnitude_err: np.ndarray = None, counts: np.ndarray = None, ttes: np.ndarray = None, bin_size: float = None, redshift: float = np.nan, data_mode: str = None, name: str = '', photon_index: float = np.nan, use_phase_model: bool = False, optical_data: bool = False, frequency: np.ndarray = None, system: np.ndarray = None, bands: np.ndarray = None, active_bands: Union[np.ndarray, str] = None, plotting_order: Union[np.ndarray, str] = None, detections: np.ndarray = None, upper_limit_sigma: Union[float, np.ndarray] = 3.0, **kwargs: None) -> None: """This is a general constructor for the Transient class. Note that you only need to give data corresponding to the data mode you are using. For luminosity data provide times in the rest frame, if using a phase model provide time in MJD, else use the default time (observer frame). :param time: Times in the observer frame. :type time: np.ndarray, optional :param time_err: Time errors in the observer frame. :type time_err: np.ndarray, optional :param time_mjd: Times in MJD. Used if using phase model. :type time_mjd: np.ndarray, optional :param time_mjd_err: Time errors in MJD. Used if using phase model. :type time_mjd_err: np.ndarray, optional :param time_rest_frame: Times in the rest frame. Used for luminosity data. :type time_rest_frame: np.ndarray, optional :param time_rest_frame_err: Time errors in the rest frame. Used for luminosity data. :type time_rest_frame_err: np.ndarray, optional :param Lum50: Luminosity values. :type Lum50: np.ndarray, optional :param Lum50_err: Luminosity error values. :type Lum50_err: np.ndarray, optional :param flux: Flux values. :type flux: np.ndarray, optional :param flux_err: Flux error values. :type flux_err: np.ndarray, optional :param flux_density: Flux density values. :type flux_density: np.ndarray, optional :param flux_density_err: Flux density error values. :type flux_density_err: np.ndarray, optional :param magnitude: Magnitude values for photometry data. :type magnitude: np.ndarray, optional :param magnitude_err: Magnitude error values for photometry data. :type magnitude_err: np.ndarray, optional :param counts: Counts for prompt data. :type counts: np.ndarray, optional :param ttes: Time-tagged events data for unbinned prompt data. :type ttes: np.ndarray, optional :param bin_size: Bin size for binning time-tagged event data. :type bin_size: float, optional :param redshift: Redshift value. :type redshift: float, optional :param data_mode: Data mode. Must be one from `Transient.DATA_MODES`. :type data_mode: str, optional :param name: Name of the transient. :type name: str, optional :param photon_index: Photon index value. :type photon_index: float, optional :param use_phase_model: Whether we are using a phase model. :type use_phase_model: bool, optional :param optical_data: Whether we are fitting optical data, useful for plotting. :type optical_data: bool, optional :param frequency: Array of band frequencies in photometry data. :type frequency: np.ndarray, optional :param system: System values. :type system: np.ndarray, optional :param bands: Band values. :type bands: np.ndarray, optional :param active_bands: List or array of active bands to be used in the analysis. Use all available bands if 'all' is given. :type active_bands: Union[list, np.ndarray], optional :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param detections: Boolean or integer array (same length as time) indicating which data points are detections (1/True) vs upper limits (0/False). None means all points are detections. :type detections: np.ndarray, optional :param upper_limit_sigma: The sigma level of the upper limits. Can be a single float (applied to all upper limits) or an array (one per data point). Default is 3.0 (3-sigma limits). :type upper_limit_sigma: Union[float, np.ndarray], optional :param kwargs: Additional callables: bands_to_frequency: Conversion function to convert a list of bands to frequencies. Use redback.utils.bands_to_frequency if not given. bin_ttes: Binning function for time-tagged event data. Use redback.utils.bands_to_frequency if not given. :type kwargs: None, optional """ self.bin_size = bin_size self.bin_ttes = kwargs.get("bin_ttes", redback.utils.bin_ttes) self.bands_to_frequency = kwargs.get("bands_to_frequency", redback.utils.bands_to_frequency) if data_mode == 'ttes': time, counts = self.bin_ttes(ttes, self.bin_size) self.time = time self.time_err = time_err self.time_mjd = time_mjd self.time_mjd_err = time_mjd_err self.time_rest_frame = time_rest_frame self.time_rest_frame_err = time_rest_frame_err self.Lum50 = Lum50 self.Lum50_err = Lum50_err self.flux = flux self.flux_err = flux_err self.flux_density = flux_density self.flux_density_err = flux_density_err self.magnitude = magnitude self.magnitude_err = magnitude_err self.counts = counts self.counts_err = np.sqrt(counts) if counts is not None else None self.ttes = ttes self._frequency = None self._bands = None self.set_bands_and_frequency(bands=bands, frequency=frequency) self.system = system self.data_mode = data_mode self.active_bands = active_bands self.sncosmo_bands = redback.utils.sncosmo_bandname_from_band(self.bands) self.redshift = redshift self.name = name self.use_phase_model = use_phase_model self.optical_data = optical_data self.plotting_order = plotting_order self.detections = detections self.upper_limit_sigma = upper_limit_sigma self.meta_data = None self.photon_index = photon_index self.directory_structure = redback.get_data.directory.DirectoryStructure( directory_path=".", raw_file_path=".", processed_file_path=".")
[docs] @staticmethod def load_data_generic(processed_file_path, data_mode="magnitude"): """Loads data from specified directory and file, and returns it as a tuple. :param processed_file_path: Path to the processed file to load :type processed_file_path: str :param data_mode: Name of the data mode. Must be from ['magnitude', 'flux_density', 'all']. Default is magnitude. :type data_mode: str, optional :return: Six elements when querying magnitude or flux_density data, Eight for 'all'. :rtype: tuple """ DATA_MODES = ['luminosity', 'flux', 'flux_density', 'magnitude', 'counts', 'ttes', 'all'] df = pd.read_csv(processed_file_path) time_days = np.array(df["time (days)"]) time_mjd = np.array(df["time"]) magnitude = np.array(df["magnitude"]) magnitude_err = np.array(df["e_magnitude"]) bands = np.array(df["band"]) flux_density = np.array(df["flux_density(mjy)"]) flux_density_err = np.array(df["flux_density_error"]) if data_mode not in DATA_MODES: raise ValueError(f"Data mode {data_mode} not in {DATA_MODES}") if data_mode == "magnitude": return time_days, time_mjd, magnitude, magnitude_err, bands elif data_mode == "flux_density": return time_days, time_mjd, flux_density, flux_density_err, bands elif data_mode == "all": return time_days, time_mjd, flux_density, flux_density_err, magnitude, magnitude_err, bands
[docs] @classmethod def from_lasair_data( cls, name: str, data_mode: str = "magnitude", active_bands: Union[np.ndarray, str] = 'all', use_phase_model: bool = False, plotting_order: Union[np.ndarray, str] = None) -> Transient: """Constructor method to built object from LASAIR data. :param name: Name of the transient. :type name: str :param data_mode: Data mode used. Must be from `OpticalTransient.DATA_MODES`. Default is magnitude. :type data_mode: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :return: A class instance. :rtype: OpticalTransient """ if cls.__name__ == "TDE": transient_type = "tidal_disruption_event" else: transient_type = cls.__name__.lower() directory_structure = redback.get_data.directory.lasair_directory_structure( transient=name, transient_type=transient_type) df = pd.read_csv(directory_structure.processed_file_path) time_days = np.array(df["time (days)"]) time_mjd = np.array(df["time"]) magnitude = np.array(df["magnitude"]) magnitude_err = np.array(df["e_magnitude"]) bands = np.array(df["band"]) flux = np.array(df["flux(erg/cm2/s)"]) flux_err = np.array(df["flux_error"]) flux_density = np.array(df["flux_density(mjy)"]) flux_density_err = np.array(df["flux_density_error"]) return cls(name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, flux=flux, flux_err=flux_err, bands=bands, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=True, plotting_order=plotting_order)
[docs] @classmethod def from_simulated_optical_data( cls, name: str, data_mode: str = "magnitude", active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False, include_upper_limits: bool = False, upper_limit_sigma: Union[float, np.ndarray] = 3.0) -> Transient: """Constructor method to built object from SimulatedOpticalTransient. :param name: Name of the transient. :type name: str :param data_mode: Data mode used. Must be from `OpticalTransient.DATA_MODES`. Default is magnitude. :type data_mode: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :param include_upper_limits: Whether to include non-detection data points as upper limits. If False (default), non-detections are filtered out (backward compatible behavior). If True, non-detections are preserved and flagged in the detections array. :type include_upper_limits: bool, optional :param upper_limit_sigma: The sigma level of the upper limits. Default is 3.0 (3-sigma limits). :type upper_limit_sigma: Union[float, np.ndarray], optional :return: A class instance. :rtype: OpticalTransient """ path = "simulated/" + name + ".csv" df = pd.read_csv(path) detections = None if include_upper_limits and "detected" in df.columns: detections = (df["detected"] != 0).to_numpy().astype(bool) n_total = len(df) n_det = int(detections.sum()) redback.utils.logger.info(f"Keeping all {n_total} data points ({n_det} detections, {n_total - n_det} upper limits)") else: n_before = len(df) df = df[df.detected != 0] n_after = len(df) redback.utils.logger.info(f"Filtered {n_before - n_after} non-detections, {n_after} detections remain") time_days = np.array(df["time (days)"]) time_mjd = np.array(df["time"]) magnitude = np.array(df["magnitude"]) # For non-detections, magnitude may be NaN; substitute limiting_magnitude where available. if detections is not None and "limiting_magnitude" in df.columns: limiting_mag = np.array(df["limiting_magnitude"]) nan_mask = ~np.isfinite(magnitude) magnitude[nan_mask] = limiting_mag[nan_mask] magnitude_err = np.array(df["e_magnitude"]) bands = np.array(df["band"]) flux = np.array(df["flux(erg/cm2/s)"]) flux_err = np.array(df["flux_error"]) flux_density = np.array(df["flux_density(mjy)"]) flux_density_err = np.array(df["flux_density_error"]) return cls(name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, flux=flux, flux_err=flux_err, bands=bands, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=True, plotting_order=plotting_order, detections=detections, upper_limit_sigma=upper_limit_sigma)
[docs] @classmethod def from_otter( cls, name: str, data_mode: str = 'flux_density', obs_type: str = 'radio', active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False ) -> Transient: """Constructor method to build object from OTTER database (for non-optical data like radio/xray) :param name: Name of the transient in OTTER database. :type name: str :param data_mode: Data mode used. Default is 'flux_density' for non-optical data. :type data_mode: str, optional :param obs_type: Observation type: 'radio' (default) or 'xray'. :type obs_type: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :return: A class instance :rtype: Transient """ if cls.__name__ == "TDE": transient_type = "tidal_disruption_event" else: transient_type = cls.__name__.lower() # Get data from OTTER with specified obs_type from redback.get_data.otter import OtterDataGetter getter = OtterDataGetter(transient=name, transient_type=transient_type, obs_type=obs_type) data = getter.get_data() # Load the processed data df = pd.read_csv(getter.processed_file_path) time_days = np.array(df["time (days)"]) time_mjd = np.array(df["time"]) flux_density = np.array(df["flux_density(mjy)"]) flux_density_err = np.array(df["flux_density_error"]) bands = np.array(df["band"]) return cls( name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, bands=bands, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=False, plotting_order=plotting_order)
[docs] @classmethod def from_lightcurvelynx( cls, name: str, data: pd.DataFrame = None, data_mode: str = "magnitude", active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False, frequency: np.ndarray = None, include_upper_limits: bool = False, upper_limit_sigma: Union[float, np.ndarray] = 3.0) -> Transient: """Constructor method to built object from a LightCurveLynx simulated light curve. https://github.com/lincc-frameworks/lightcurvelynx Only the time, bands, magnitude and magnitude error columns are used. The rest are computed from those. :param name: Name of the transient. :type name: str :param data: DataFrame containing the light curve data. If None, it will try to load from "simulated/{name}.csv". :type data: pd.DataFrame, optional :param data_mode: Data mode used. Must be from `OpticalTransient.DATA_MODES`. Default is magnitude. :type data_mode: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :param frequency: Array of frequencies corresponding to each observation. If None, will be computed from bands using bands_to_frequency. :type frequency: np.ndarray, optional :param include_upper_limits: Whether to include non-detection data points as upper limits. If False (default), non-detections are filtered out (backward compatible behavior). If True, non-detections are preserved and flagged in the detections array. :type include_upper_limits: bool, optional :param upper_limit_sigma: The sigma level of the upper limits. Default is 3.0 (3-sigma limits). :type upper_limit_sigma: Union[float, np.ndarray], optional :return: A class instance. :rtype: OpticalTransient """ redback.utils.logger.info(f"Loading LightCurveLynx data for {name}") if data is None: path = "simulated/" + name + ".csv" redback.utils.logger.info(f"Reading data from {path}") data = pd.read_csv(path) else: redback.utils.logger.info(f"Using provided DataFrame with {len(data)} rows") # Handle non-detections: either preserve them or filter them out. detections = None if include_upper_limits and "detection" in data.columns: detections = (data["detection"] != 0).to_numpy().astype(bool) n_total = len(data) n_det = int(detections.sum()) redback.utils.logger.info(f"Keeping all {n_total} data points ({n_det} detections, {n_total - n_det} upper limits)") else: if "detection" in data.columns: n_before = len(data) data = data[data.detection != 0] n_after = len(data) redback.utils.logger.info(f"Filtered {n_before - n_after} non-detections, {n_after} detections remain") # Process the time and bands data. bands = data["filter"].to_numpy() unique_bands = np.unique(bands) redback.utils.logger.info(f"Found {len(unique_bands)} unique bands: {unique_bands}") time_mjd = data["mjd"].to_numpy() redback.utils.logger.info(f"Time range: MJD {time_mjd.min():.2f} to {time_mjd.max():.2f}") if "time_rel" in data.columns: time_days = data["time_rel"].to_numpy() elif not use_phase_model: raise ValueError("Relative time column 'time_rel' is required if not using phase model.") else: time_days = None # Process the magnitude data. Checking that we have the values if the data mode is magnitude. if "mag" not in data.columns: raise ValueError("Magnitude values are required for LightCurveLynx input.") magnitude = data["mag"].to_numpy() magnitude_err = data["magerr"].to_numpy() redback.utils.logger.info(f"Converting magnitudes to flux and flux_density for {len(data)} observations") # Compute the other values from the given magnitude and band data. We use the formulation # from SimulateOpticalTransient._make_observation_single(). ref_flux = redback.utils.bands_to_reference_flux(bands) flux = redback.utils.bandpass_magnitude_to_flux(magnitude=magnitude, bands=bands) flux_err = redback.utils.calc_flux_error_from_magnitude(magnitude, magnitude_err, ref_flux) flux_density = redback.utils.calc_flux_density_from_ABmag(magnitude).value flux_density_err = redback.utils.calc_flux_density_error_from_monochromatic_magnitude( magnitude=magnitude, magnitude_error=magnitude_err, reference_flux=3631, magnitude_system='AB', ) # Set frequency array for flux_density mode if frequency is None: frequency = redback.utils.bands_to_frequency(bands) redback.utils.logger.info(f"Computed frequencies from bands using bands_to_frequency") else: redback.utils.logger.info(f"Using provided frequency array") return cls(name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, flux=flux, flux_err=flux_err, bands=bands, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=True, plotting_order=plotting_order, frequency=frequency, detections=detections, upper_limit_sigma=upper_limit_sigma)
@property def _time_attribute_name(self) -> str: if self.luminosity_data: return "time_rest_frame" elif self.use_phase_model: return "time_mjd" return "time" @property def _time_err_attribute_name(self) -> str: return self._time_attribute_name + "_err" @property def _y_attribute_name(self) -> str: return self._ATTRIBUTE_NAME_DICT[self.data_mode] @property def _y_err_attribute_name(self) -> str: return self._ATTRIBUTE_NAME_DICT[self.data_mode] + "_err" @property def x(self) -> np.ndarray: """ :return: The time values given the active data mode. :rtype: np.ndarray """ return getattr(self, self._time_attribute_name) @x.setter def x(self, x: np.ndarray) -> None: """Sets the time values for the active data mode. :param x: The desired time values. :type x: np.ndarray """ setattr(self, self._time_attribute_name, x) @property def x_err(self) -> np.ndarray: """ :return: The time error values given the active data mode. :rtype: np.ndarray """ return getattr(self, self._time_err_attribute_name) @x_err.setter def x_err(self, x_err: np.ndarray) -> None: """Sets the time error values for the active data mode. :param x_err: The desired time error values. :type x_err: np.ndarray """ setattr(self, self._time_err_attribute_name, x_err) @property def y(self) -> np.ndarray: """ :return: The y values given the active data mode. :rtype: np.ndarray """ return getattr(self, self._y_attribute_name) @y.setter def y(self, y: np.ndarray) -> None: """Sets the y values for the active data mode. :param y: The desired y values. :type y: np.ndarray """ setattr(self, self._y_attribute_name, y) @property def y_err(self) -> np.ndarray: """ :return: The y error values given the active data mode. :rtype: np.ndarray """ return getattr(self, self._y_err_attribute_name) @y_err.setter def y_err(self, y_err: np.ndarray) -> None: """Sets the y error values for the active data mode. :param y_err: The desired y error values. :type y_err: np.ndarray """ setattr(self, self._y_err_attribute_name, y_err) @property def data_mode(self) -> str: """ :return: The currently active data mode (one in `Transient.DATA_MODES`). :rtype: str """ return self._data_mode @data_mode.setter def data_mode(self, data_mode: str) -> None: """ :param data_mode: One of the data modes in `Transient.DATA_MODES`. :type data_mode: str """ if data_mode in self.DATA_MODES or data_mode is None: self._data_mode = data_mode else: raise ValueError("Unknown data mode.") @property def xlabel(self) -> str: """ :return: xlabel used in plotting functions :rtype: str """ if self.use_phase_model: return r"Time [MJD]" else: return r"Time since explosion [days]" @property def ylabel(self) -> str: """ :return: ylabel used in plotting functions :rtype: str """ try: return self.ylabel_dict[self.data_mode] except KeyError: raise ValueError("No data mode specified")
[docs] def set_bands_and_frequency( self, bands: Union[None, list, np.ndarray], frequency: Union[None, list, np.ndarray]): """Sets bands and frequencies at the same time to keep the logic consistent. If both are given use those values. If only frequencies are given, use them also as band names. If only bands are given, try to convert them to frequencies. :param bands: The bands, e.g. ['g', 'i']. :type bands: Union[None, list, np.ndarray] :param frequency: The frequencies associated with the bands i.e., the effective frequency. :type frequency: Union[None, list, np.ndarray] """ if (bands is None and frequency is None) or (bands is not None and frequency is not None): self._bands = bands self._frequency = frequency elif bands is None and frequency is not None: self._frequency = frequency self._bands = self.frequency elif bands is not None and frequency is None: self._bands = bands self._frequency = self.bands_to_frequency(self.bands)
@property def frequency(self) -> np.ndarray: """ :return: Used band frequencies :rtype: np.ndarray """ return self._frequency @frequency.setter def frequency(self, frequency: np.ndarray) -> None: """ :param frequency: Set band frequencies if an array is given. Otherwise, convert bands to frequencies. :type frequency: np.ndarray """ self.set_bands_and_frequency(bands=self.bands, frequency=frequency) @property def bands(self) -> Union[list, None, np.ndarray]: return self._bands @bands.setter def bands(self, bands: Union[list, None, np.ndarray]): self.set_bands_and_frequency(bands=bands, frequency=self.frequency) @property def filtered_frequencies(self) -> np.array: """ :return: The frequencies only associated with the active bands. :rtype: np.ndarray """ return self.frequency[self.filtered_indices] @property def filtered_sncosmo_bands(self) -> np.array: """ :return: The sncosmo bands only associated with the active bands. :rtype: np.ndarray """ return self.sncosmo_bands[self.filtered_indices] @property def filtered_bands(self) -> np.array: """ :return: The band names only associated with the active bands. :rtype: np.ndarray """ return self.bands[self.filtered_indices] @property def active_bands(self) -> list: """ :return: List of active bands used. :rtype list: """ return self._active_bands @active_bands.setter def active_bands(self, active_bands: Union[list, str, None]) -> None: """ :param active_bands: Sets active bands based on list given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[list, str] """ if str(active_bands) == 'all': self._active_bands = list(np.unique(self.bands)) else: self._active_bands = active_bands @property def filtered_indices(self) -> Union[list, None]: """ :return: The list indices in `bands` associated with the active bands. :rtype: Union[list, None] """ if self.bands is None: return list(np.arange(len(self.x))) return [b in self.active_bands for b in self.bands]
[docs] def get_filtered_data(self) -> tuple: """Used to filter flux density, photometry or integrated flux data, so we only use data that is using the active bands. :return: A tuple with the filtered data. Format is (x, x_err, y, y_err) :rtype: tuple """ if any([self.flux_data, self.magnitude_data, self.flux_density_data]): filtered_x = self.x[self.filtered_indices] try: if self.x_err.ndim == 2: filtered_x_err = self.x_err[:, self.filtered_indices] else: filtered_x_err = self.x_err[self.filtered_indices] except (IndexError, TypeError, AttributeError): filtered_x_err = None filtered_y = self.y[self.filtered_indices] try: if self.y_err.ndim == 2: filtered_y_err = self.y_err[:, self.filtered_indices] else: filtered_y_err = self.y_err[self.filtered_indices] except (IndexError, TypeError, AttributeError): filtered_y_err = None return filtered_x, filtered_x_err, filtered_y, filtered_y_err else: raise ValueError(f"Transient needs to be in flux density, magnitude or flux data mode, " f"but is in {self.data_mode} instead.")
[docs] def get_filtered_data_with_limits(self) -> tuple: """Used to filter data by active bands, returning detection information alongside the usual data. :return: A tuple with the filtered data. Format is (x, x_err, y, y_err, detections) where detections is a boolean array (True=detection, False=upper limit) or None if all points are detections. :rtype: tuple """ filtered_x, filtered_x_err, filtered_y, filtered_y_err = self.get_filtered_data() if self._detections is not None: filtered_detections = self._detections[self.filtered_indices] else: filtered_detections = None return filtered_x, filtered_x_err, filtered_y, filtered_y_err, filtered_detections
@property def detections(self) -> Union[np.ndarray, None]: """Boolean array: True=detection, False=upper limit. None if all points are detections.""" return self._detections @detections.setter def detections(self, detections: Union[np.ndarray, None]) -> None: if detections is None: self._detections = None else: detections = np.asarray(detections, dtype=bool) if self.time is not None and len(detections) != len(self.time): raise ValueError( f"detections array (length {len(detections)}) must have the same length " f"as time (length {len(self.time)})." ) self._detections = detections self.validate_upper_limits() @property def upper_limits(self) -> np.ndarray: """Boolean array: True=upper limit, False=detection. All False if no upper limits.""" if self._detections is None: return np.zeros(len(self.time), dtype=bool) if self.time is not None else np.array([], dtype=bool) return ~self._detections @property def has_upper_limits(self) -> bool: """Returns True if any data points are upper limits.""" if self._detections is None: return False return bool(np.any(~self._detections)) @property def upper_limit_sigma(self) -> Union[float, np.ndarray]: """The sigma level of the upper limits (e.g. 3.0 for 3-sigma limits).""" return self._upper_limit_sigma @upper_limit_sigma.setter def upper_limit_sigma(self, upper_limit_sigma: Union[float, np.ndarray]) -> None: self._upper_limit_sigma = upper_limit_sigma
[docs] def validate_upper_limits(self) -> None: """Check that upper limit data points have finite y-values. Upper limits require a finite y-value (the limit value) for both plotting and likelihood computation. NaN y-values for upper limits are not usable because there is no position to draw the marker at, and the likelihood cannot evaluate the CDF without a numerical limit. Logs a warning for each band with NaN upper limits. """ if self._detections is None or not self.has_upper_limits: return ul_mask = self.upper_limits ul_y = self.y[ul_mask] nan_count = np.sum(np.isnan(ul_y)) if nan_count > 0: redback.utils.logger.warning( f"{int(nan_count)} upper limit(s) have NaN y-values. These cannot be " f"plotted or used in likelihood fitting. Replace NaN values with the " f"upper limit value (e.g. the limiting magnitude or flux), or remove " f"those data points." )
@property def unique_bands(self) -> np.ndarray: """ :return: All bands that we get from the data, eliminating all duplicates. :rtype: np.ndarray """ if self.plotting_order is not None: return self.plotting_order else: return np.unique(self.bands) @property def unique_frequencies(self) -> np.ndarray: """ :return: All frequencies that we get from the data, eliminating all duplicates. :rtype: np.ndarray """ try: if isinstance(self.unique_bands[0], (float, int)): return self.unique_bands except (TypeError, IndexError): pass return self.bands_to_frequency(self.unique_bands) @property def list_of_band_indices(self) -> list: """ :return: Indices that map between bands in the data and the unique bands we obtain. :rtype: list """ return [np.where(self.bands == np.array(b))[0] for b in self.unique_bands] @property def default_filters(self) -> list: """ :return: Default list of filters to use. :rtype: list """ return ["g", "r", "i", "z", "y", "J", "H", "K"]
[docs] @staticmethod def get_colors(filters: Union[np.ndarray, list]) -> matplotlib.colors.Colormap: """ :param filters: Array of list of filters to use in the plot. :type filters: Union[np.ndarray, list] :return: Colormap with one color for each filter. :rtype: matplotlib.colors.Colormap """ return matplotlib.cm.rainbow(np.linspace(0, 1, len(filters)))
[docs] def plot_data(self, axes: matplotlib.axes.Axes = None, filename: str = None, outdir: str = None, save: bool = True, show: bool = True, plot_others: bool = True, color: str = 'k', **kwargs) -> matplotlib.axes.Axes: """Plots the Transient data and returns Axes. :param axes: Matplotlib axes to plot the lightcurve into. Useful for user specific modifications to the plot. :param filename: Name of the file to be plotted in. :param outdir: The directory in which to save the file in. :param save: Whether to save the plot. (Default value = True) :param show: Whether to show the plot. (Default value = True) :param plot_others: Whether to plot inactive bands. (Default value = True) :param color: Color of the data. :param kwargs: Additional keyword arguments passed to the Plotter. All KwargsAccessorWithDefault attributes on redback.plotting.Plotter are accepted. Run ``redback.plotting.get_plotter_kwargs_docs()`` to see all options and defaults. :return: The axes with the plot. """ if self.flux_data: if self.optical_data: plotter = IntegratedFluxOpticalPlotter(transient=self, color=color, filename=filename, outdir=outdir, plot_others=plot_others, **kwargs) else: plotter = IntegratedFluxPlotter(transient=self, color=color, filename=filename, outdir=outdir, **kwargs) elif self.luminosity_data: if self.optical_data: plotter = LuminosityOpticalPlotter(transient=self, color=color, filename=filename, outdir=outdir, **kwargs) else: plotter = LuminosityPlotter(transient=self, color=color, filename=filename, outdir=outdir, **kwargs) elif self.flux_density_data: plotter = FluxDensityPlotter(transient=self, color=color, filename=filename, outdir=outdir, plot_others=plot_others, **kwargs) elif self.magnitude_data: plotter = MagnitudePlotter(transient=self, color=color, filename=filename, outdir=outdir, plot_others=plot_others, **kwargs) else: return axes return plotter.plot_data(axes=axes, save=save, show=show)
[docs] def plot_multiband( self, figure: matplotlib.figure.Figure = None, axes: matplotlib.axes.Axes = None, filename: str = None, outdir: str = None, ncols: int = 2, save: bool = True, show: bool = True, nrows: int = None, figsize: tuple = None, filters: list = None, **kwargs: None) \ -> matplotlib.axes.Axes: """ :param figure: Figure can be given if defaults are not satisfying. :param axes: Axes can be given if defaults are not satisfying. :param filename: Name of the file to be plotted in. :param outdir: The directory in which to save the file in. :param save: Whether to save the plot. (Default value = True) :param show: Whether to show the plot. (Default value = True) :param ncols: Number of columns to use on the plot. Default is 2. :param nrows: Number of rows to use on the plot. If None are given this will be inferred from ncols and the number of filters. :param figsize: Size of the figure. A default based on ncols and nrows will be used if None is given. :param filters: Which bands to plot. Will use default filters if None is given. :param kwargs: Additional keyword arguments passed to the Plotter. All KwargsAccessorWithDefault attributes on redback.plotting.Plotter are accepted. Run ``redback.plotting.get_plotter_kwargs_docs()`` to see all options and defaults. :return: The axes. """ if self.data_mode not in ['flux_density', 'magnitude', 'flux']: raise ValueError( f'You cannot plot multiband data with {self.data_mode} data mode . Why are you doing this?') if self.magnitude_data: plotter = MagnitudePlotter(transient=self, filters=filters, filename=filename, outdir=outdir, nrows=nrows, ncols=ncols, figsize=figsize, **kwargs) elif self.flux_density_data: plotter = FluxDensityPlotter(transient=self, filters=filters, filename=filename, outdir=outdir, nrows=nrows, ncols=ncols, figsize=figsize, **kwargs) elif self.flux_data: plotter = IntegratedFluxOpticalPlotter(transient=self, filters=filters, filename=filename, outdir=outdir, nrows=nrows, ncols=ncols, figsize=figsize, **kwargs) else: return return plotter.plot_multiband(figure=figure, axes=axes, save=save, show=show)
[docs] def plot_lightcurve( self, model: callable, filename: str = None, outdir: str = None, axes: matplotlib.axes.Axes = None, save: bool = True, show: bool = True, random_models: int = 100, posterior: pd.DataFrame = None, model_kwargs: dict = None, **kwargs: None) -> matplotlib.axes.Axes: """ :param model: The model used to plot the lightcurve. :param filename: The output filename. Otherwise, use default which starts with the name attribute and ends with *lightcurve.png. :param axes: Axes to plot in if given. :param save: Whether to save the plot. :param show: Whether to show the plot. :param random_models: Number of random posterior samples plotted faintly. (Default value = 100) :param posterior: Posterior distribution to which to draw samples from. Is optional but must be given. :param outdir: Out directory in which to save the plot. Default is the current working directory. :param model_kwargs: Additional keyword arguments to be passed into the model. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_lightcurve.__doc__)` to see all options! :return: The axes. """ if self.flux_data: if self.optical_data: plotter = IntegratedFluxOpticalPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) else: plotter = IntegratedFluxPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) elif self.luminosity_data: if self.optical_data: plotter = LuminosityOpticalPlotter(transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) else: plotter = LuminosityPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) elif self.flux_density_data: plotter = FluxDensityPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) elif self.magnitude_data: plotter = MagnitudePlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) else: return axes return plotter.plot_lightcurve(axes=axes, save=save, show=show)
[docs] def plot_residual(self, model: callable, filename: str = None, outdir: str = None, axes: matplotlib.axes.Axes = None, save: bool = True, show: bool = True, posterior: pd.DataFrame = None, model_kwargs: dict = None, **kwargs: None) -> matplotlib.axes.Axes: """ :param model: The model used to plot the lightcurve. :param filename: The output filename. Otherwise, use default which starts with the name attribute and ends with *lightcurve.png. :param axes: Axes to plot in if given. :param save: Whether to save the plot. :param show: Whether to show the plot. :param posterior: Posterior distribution to which to draw samples from. Is optional but must be given. :param outdir: Out directory in which to save the plot. Default is the current working directory. :param model_kwargs: Additional keyword arguments to be passed into the model. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_residual.__doc__)` to see all options! :return: The axes. """ if self.flux_data: plotter = IntegratedFluxPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, **kwargs) elif self.luminosity_data: if self.optical_data: plotter = LuminosityOpticalPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, **kwargs) else: plotter = LuminosityPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, **kwargs) else: raise ValueError("Residual plotting not implemented for this data mode") return plotter.plot_residuals(axes=axes, save=save, show=show)
[docs] def fit_gp(self, mean_model, kernel, prior=None, use_frequency=True): """ Fit a GP to the data using george and scipy minimization. :param mean_model: Mean model to use in the GP fit. Can be a string to refer to a redback model, a callable, or None :param kernel: George GP to use. User must ensure this is set up correctly. :param prior: Prior to use when fitting with a mean model. :param use_frequency: Whether to use the effective frequency in a 2D GP fit. Cannot be used with most mean models. :return: Named tuple with George GP object and additional useful data. """ try: import george import george.kernels as kernels except ImportError: redback.utils.logger.warning("George must be installed to use GP fitting.") import scipy.optimize as op from bilby.core.likelihood import function_to_george_mean_model output = namedtuple("gp_out", ["gp", "scaled_y", "y_scaler", 'use_frequency', 'mean_model']) output.use_frequency = use_frequency output.mean_model = mean_model if self.data_mode == 'luminosity': x = self.time_rest_frame y = self.y try: y_err = np.max(self.y_err, axis=0) except IndexError: y_err = self.y_err else: x, x_err, y, y_err = self.get_filtered_data() redback.utils.logger.info("Rescaling data for GP fitting.") gp_y_err = y_err / np.max(y) gp_y = y / np.max(y) output.scaled_y = gp_y output.y_scaler = np.max(y) def nll(p): gp.set_parameter_vector(p) ll = gp.log_likelihood(gp_y, quiet=True) return -ll if np.isfinite(ll) else 1e25 def grad_nll(p): gp.set_parameter_vector(p) return -gp.grad_log_likelihood(gp_y, quiet=True) if use_frequency: redback.utils.logger.info("Using frequencies and time in the GP fit.") redback.utils.logger.info("Kernel used: " + str(kernel)) redback.utils.logger.info("Ensure that the kernel is set up correctly for 2D GP.") redback.utils.logger.info("You will be returned a single GP object with frequency as a parameter") freqs = self.filtered_frequencies X = np.column_stack((freqs, x)) else: redback.utils.logger.info("Using time in GP fit.") redback.utils.logger.info("Kernel used: " + str(kernel)) redback.utils.logger.info("Ensure that the kernel is set up correctly for 1D GP.") redback.utils.logger.info("You will be returned a GP object unique to a band/frequency" " in the data if working with multiband data") X = x if mean_model is None: redback.utils.logger.info("Mean model not given, fitting GP with no mean model.") gp = george.GP(kernel) gp.compute(X, gp_y_err + 1e-8) p0 = gp.get_parameter_vector() results = op.minimize(nll, p0, jac=grad_nll) gp.set_parameter_vector(results.x) redback.utils.logger.info(f"GP final loglikelihood: {gp.log_likelihood(gp_y)}") redback.utils.logger.info(f"GP final parameters: {gp.get_parameter_dict()}") output.gp = gp else: if isinstance(mean_model, str): mean_model_func = all_models_dict[mean_model] redback.utils.logger.info("Using inbuilt redback function {} as a mean model.".format(mean_model)) if prior is None: redback.utils.logger.warning("No prior given for mean model. Using default prior.") prior = redback.priors.get_priors(mean_model) else: mean_model_func = mean_model redback.utils.logger.info("Using user-defined python function as a mean model.") if prior is None: redback.utils.logger.warning("Prior must be specified for GP fit with a mean model") raise ValueError("No prior specified") if self.data_mode in ['flux_density', 'magnitude', 'flux']: redback.utils.logger.info("Setting up GP version of mean model.") gp_dict = {} scaled_y_dict = {} for ii in range(len(self.unique_bands)): scaled_y_dict[self.unique_bands[ii]] = gp_y[self.list_of_band_indices[ii]] redback.utils.logger.info("Fitting for band {}".format(self.unique_bands[ii])) gp_x = X[self.list_of_band_indices[ii]] def nll(p): gp.set_parameter_vector(p) ll = gp.log_likelihood(gp_y[self.list_of_band_indices[ii]], quiet=True) return -ll if np.isfinite(ll) else 1e25 mean_model_class = function_to_george_mean_model(mean_model_func) mm = mean_model_class(**prior.sample()) gp = george.GP(kernel, mean=mm, fit_mean=True) gp.compute(gp_x, gp_y_err[self.list_of_band_indices[ii]] + 1e-8) p0 = gp.get_parameter_vector() results = op.minimize(nll, p0) gp.set_parameter_vector(results.x) redback.utils.logger.info(f"GP final loglikelihood: {gp.log_likelihood(gp_y[self.list_of_band_indices[ii]])}") redback.utils.logger.info(f"GP final parameters: {gp.get_parameter_dict()}") gp_dict[self.unique_bands[ii]] = gp del gp output.gp = gp_dict output.scaled_y = scaled_y_dict else: mean_model_class = function_to_george_mean_model(mean_model_func) mm = mean_model_class(**prior.sample()) gp = george.GP(kernel, mean=mm, fit_mean=True) gp.compute(X, gp_y_err + 1e-8) p0 = gp.get_parameter_vector() results = op.minimize(nll, p0) gp.set_parameter_vector(results.x) redback.utils.logger.info(f"GP final loglikelihood: {gp.log_likelihood(gp_y)}") redback.utils.logger.info(f"GP final parameters: {gp.get_parameter_dict()}") output.gp = gp return output
[docs] def plot_multiband_lightcurve( self, model: callable, filename: str = None, outdir: str = None, figure: matplotlib.figure.Figure = None, axes: matplotlib.axes.Axes = None, save: bool = True, show: bool = True, random_models: int = 100, posterior: pd.DataFrame = None, model_kwargs: dict = None, **kwargs: object) -> matplotlib.axes.Axes: """ :param model: The model used to plot the lightcurve. :param filename: The output filename. Otherwise, use default which starts with the name attribute and ends with *lightcurve.png. :param figure: Figure can be given if defaults are not satisfying. :param axes: Axes to plot in if given. :param save: Whether to save the plot. :param show: Whether to show the plot. :param random_models: Number of random posterior samples plotted faintly. (Default value = 100) :param posterior: Posterior distribution to which to draw samples from. Is optional but must be given. :param outdir: Out directory in which to save the plot. Default is the current working directory. :param model_kwargs: Additional keyword arguments to be passed into the model. :param kwargs: Additional keyword arguments to pass in the Plotter methods. Available in the online documentation under at `redback.plotting.Plotter`. `print(Transient.plot_multiband_lightcurve.__doc__)` to see all options! :return: The axes. """ if self.data_mode not in ['flux_density', 'magnitude', 'flux']: raise ValueError( f'You cannot plot multiband data with {self.data_mode} data mode . Why are you doing this?') if self.magnitude_data: plotter = MagnitudePlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) elif self.flux_data: plotter = IntegratedFluxOpticalPlotter(transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) elif self.flux_density_data: plotter = FluxDensityPlotter( transient=self, model=model, filename=filename, outdir=outdir, posterior=posterior, model_kwargs=model_kwargs, random_models=random_models, **kwargs) else: return return plotter.plot_multiband_lightcurve(figure=figure, axes=axes, save=save, show=show)
_formatted_kwargs_options = redback.plotting.Plotter.keyword_docstring plot_data.__doc__ = plot_data.__doc__.replace( "`print(Transient.plot_data.__doc__)` to see all options!", _formatted_kwargs_options) plot_multiband.__doc__ = plot_multiband.__doc__.replace( "`print(Transient.plot_multiband.__doc__)` to see all options!", _formatted_kwargs_options) plot_lightcurve.__doc__ = plot_lightcurve.__doc__.replace( "`print(Transient.plot_lightcurve.__doc__)` to see all options!", _formatted_kwargs_options) plot_multiband_lightcurve.__doc__ = plot_multiband_lightcurve.__doc__.replace( "`print(Transient.plot_multiband_lightcurve.__doc__)` to see all options!", _formatted_kwargs_options) plot_residual.__doc__ = plot_residual.__doc__.replace( "`print(Transient.plot_residual.__doc__)` to see all options!", _formatted_kwargs_options)
[docs] class OpticalTransient(Transient): DATA_MODES = ['flux', 'flux_density', 'magnitude', 'luminosity']
[docs] @staticmethod def load_data(processed_file_path, data_mode="magnitude"): """Loads data from specified directory and file, and returns it as a tuple. :param processed_file_path: Path to the processed file to load :type processed_file_path: str :param data_mode: Name of the data mode. Must be from ['magnitude', 'flux_density', 'all']. Default is magnitude. :type data_mode: str, optional :return: Six elements when querying magnitude or flux_density data, Eight for 'all' :rtype: tuple """ df = pd.read_csv(processed_file_path) time_days = np.array(df["time (days)"]) time_mjd = np.array(df["time"]) magnitude = np.array(df["magnitude"]) magnitude_err = np.array(df["e_magnitude"]) bands = np.array(df["band"]) system = np.array(df["system"]) flux_density = np.array(df["flux_density(mjy)"]) flux_density_err = np.array(df["flux_density_error"]) flux = np.array(df["flux(erg/cm2/s)"]) flux_err = np.array(df['flux_error']) if data_mode == "magnitude": return time_days, time_mjd, magnitude, magnitude_err, bands, system elif data_mode == "flux_density": return time_days, time_mjd, flux_density, flux_density_err, bands, system elif data_mode == "flux": return time_days, time_mjd, flux, flux_err, bands, system elif data_mode == "all": return time_days, time_mjd, flux_density, flux_density_err, \ magnitude, magnitude_err, flux, flux_err, bands, system
[docs] def __init__( self, name: str, data_mode: str = 'magnitude', time: np.ndarray = None, time_err: np.ndarray = None, time_mjd: np.ndarray = None, time_mjd_err: np.ndarray = None, time_rest_frame: np.ndarray = None, time_rest_frame_err: np.ndarray = None, Lum50: np.ndarray = None, Lum50_err: np.ndarray = None, flux: np.ndarray = None, flux_err: np.ndarray = None, flux_density: np.ndarray = None, flux_density_err: np.ndarray = None, magnitude: np.ndarray = None, magnitude_err: np.ndarray = None, redshift: float = np.nan, photon_index: float = np.nan, frequency: np.ndarray = None, bands: np.ndarray = None, system: np.ndarray = None, active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False, optical_data:bool = True, **kwargs: None) -> None: """This is a general constructor for the Transient class. Note that you only need to give data corresponding to the data mode you are using. For luminosity data provide times in the rest frame, if using a phase model provide time in MJD, else use the default time (observer frame). :param name: Name of the transient. :type name: str :param data_mode: Data mode. Must be one from `OpticalTransient.DATA_MODES`. :type data_mode: str, optional :param time: Times in the observer frame. :type time: np.ndarray, optional :param time_err: Time errors in the observer frame. :type time_err: np.ndarray, optional :param time_mjd: Times in MJD. Used if using phase model. :type time_mjd: np.ndarray, optional :param time_mjd_err: Time errors in MJD. Used if using phase model. :type time_mjd_err: np.ndarray, optional :param time_rest_frame: Times in the rest frame. Used for luminosity data. :type time_rest_frame: np.ndarray, optional :param time_rest_frame_err: Time errors in the rest frame. Used for luminosity data. :type time_rest_frame_err: np.ndarray, optional :param Lum50: Luminosity values. :type Lum50: np.ndarray, optional :param Lum50_err: Luminosity error values. :type Lum50_err: np.ndarray, optional :param flux: Flux values. :type flux: np.ndarray, optional :param flux_err: Flux error values. :type flux_err: np.ndarray, optional :param flux_density: Flux density values. :type flux_density: np.ndarray, optional :param flux_density_err: Flux density error values. :type flux_density_err: np.ndarray, optional :param magnitude: Magnitude values for photometry data. :type magnitude: np.ndarray, optional :param magnitude_err: Magnitude error values for photometry data. :type magnitude_err: np.ndarray, optional :param redshift: Redshift value. :type redshift: float, optional :param photon_index: Photon index value. :type photon_index: float, optional :param frequency: Array of band frequencies in photometry data. :type frequency: np.ndarray, optional :param bands: Band values. :type bands: np.ndarray, optional :param system: System values. :type system: np.ndarray, optional :param active_bands: List or array of active bands to be used in the analysis. Use all available bands if 'all' is given. :type active_bands: Union[list, np.ndarray], optional :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether we are using a phase model. :type use_phase_model: bool, optional :param optical_data: Whether we are fitting optical data, useful for plotting. :type optical_data: bool, optional :param kwargs: Additional callables: bands_to_frequency: Conversion function to convert a list of bands to frequencies. Use redback.utils.bands_to_frequency if not given. :type kwargs: dict, optional """ super().__init__(time=time, time_err=time_err, time_rest_frame=time_rest_frame, time_mjd=time_mjd, time_mjd_err=time_mjd_err, frequency=frequency, time_rest_frame_err=time_rest_frame_err, Lum50=Lum50, Lum50_err=Lum50_err, flux=flux, flux_err=flux_err, redshift=redshift, photon_index=photon_index, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, data_mode=data_mode, name=name, use_phase_model=use_phase_model, optical_data=optical_data, system=system, bands=bands, plotting_order=plotting_order, active_bands=active_bands, **kwargs) self.directory_structure = redback.get_data.directory.DirectoryStructure( directory_path=".", raw_file_path=".", processed_file_path=".")
[docs] @classmethod def from_open_access_catalogue( cls, name: str, data_mode: str = "magnitude", active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False) -> OpticalTransient: """Constructor method to built object from Open Access Catalogue :param name: Name of the transient. :type name: str :param data_mode: Data mode used. Must be from `OpticalTransient.DATA_MODES`. Default is magnitude. :type data_mode: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :return: A class instance :rtype: OpticalTransient """ if cls.__name__ == "TDE": transient_type = "tidal_disruption_event" else: transient_type = cls.__name__.lower() directory_structure = redback.get_data.directory.open_access_directory_structure( transient=name, transient_type=transient_type) time_days, time_mjd, flux_density, flux_density_err, magnitude, magnitude_err, flux, flux_err, bands, system = \ cls.load_data(processed_file_path=directory_structure.processed_file_path, data_mode="all") return cls(name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, bands=bands, system=system, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=True, flux=flux, flux_err=flux_err, plotting_order=plotting_order)
[docs] @classmethod def from_otter( cls, name: str, data_mode: str = 'magnitude', obs_type: str = 'uvoir', active_bands: Union[np.ndarray, str] = 'all', plotting_order: Union[np.ndarray, str] = None, use_phase_model: bool = False ) -> OpticalTransient: """Constructor method to build object from OTTER database :param name: Name of the transient in OTTER database. :type name: str :param data_mode: Data mode used. Must be from `OpticalTransient.DATA_MODES`. Default is magnitude. :type data_mode: str, optional :param obs_type: Observation type: 'uvoir' (default), 'radio', or 'xray'. :type obs_type: str, optional :param active_bands: Sets active bands based on array given. If argument is 'all', all unique bands in `self.bands` will be used. :type active_bands: Union[np.ndarray, str] :param plotting_order: Order in which to plot the bands/and how unique bands are stored. :type plotting_order: Union[np.ndarray, str], optional :param use_phase_model: Whether to use a phase model. :type use_phase_model: bool, optional :return: A class instance :rtype: OpticalTransient """ if cls.__name__ == "TDE": transient_type = "tidal_disruption_event" else: transient_type = cls.__name__.lower() # Get data from OTTER with specified obs_type from redback.get_data.otter import OtterDataGetter getter = OtterDataGetter(transient=name, transient_type=transient_type, obs_type=obs_type) data = getter.get_data() # Load data using standard load_data method (handles data_mode selection) time_days, time_mjd, flux_density, flux_density_err, magnitude, magnitude_err, flux, flux_err, bands, system = \ cls.load_data(processed_file_path=getter.processed_file_path, data_mode="all") return cls( name=name, data_mode=data_mode, time=time_days, time_err=None, time_mjd=time_mjd, flux_density=flux_density, flux_density_err=flux_density_err, magnitude=magnitude, magnitude_err=magnitude_err, bands=bands, system=system, active_bands=active_bands, use_phase_model=use_phase_model, optical_data=True, flux=flux, flux_err=flux_err, plotting_order=plotting_order)
@property def event_table(self) -> str: """ :return: Path to the metadata table. :rtype: str """ return f"{self.directory_structure.directory_path}/{self.name}_metadata.csv" def _set_data(self) -> None: """Sets the metadata from the event table.""" try: meta_data = pd.read_csv(self.event_table, on_bad_lines='skip', delimiter=',', dtype='str') except FileNotFoundError as e: redback.utils.logger.info(e) redback.utils.logger.info("Setting metadata to None. This is not an error, but a warning that no metadata could be found online.") meta_data = None self.meta_data = meta_data @property def transient_dir(self) -> str: """ :return: The transient directory given the name of the transient. :rtype: str """ return self._get_transient_dir() def _get_transient_dir(self) -> str: """ :return: The transient directory path :rtype: str """ transient_dir, _, _ = redback.get_data.directory.open_access_directory_structure( transient=self.name, transient_type=self.__class__.__name__.lower()) return transient_dir
[docs] def estimate_bb_params(self, distance: float = 1e27, bin_width: float = 1.0, min_filters: int = 3, **kwargs): """ Estimate the blackbody temperature and photospheric radius as functions of time by fitting a blackbody SED to the multi‑band photometry. The method groups the photometric data into time bins (epochs) of width bin_width (in the same units as self.x, typically days). For each epoch with at least min_filters measurements (from distinct filters), it fits a blackbody model to the data. When working with photometry provided in an effective flux density format (data_mode == "flux_density") the effective–wavelength approximation is used. When the data_mode is "flux" (or "magnitude") users have the option (via use_eff_wavelength=True) to instead use the effective wavelength approximation by converting AB magnitudes to flux density (using redback.utils.calc_flux_density_from_ABmag). If this flag is not provided (or is False) then the full bandpass integration is applied. Parameters ---------- distance : float, optional Distance to the transient in centimeters. Default is 1e27 cm. bin_width : float, optional Width of the time bins (in days) used to group the photometric data. Default is 1.0. min_filters : int, optional Minimum number of measurements (from distinct filters) required in a bin to perform the fit. Default is 3. kwargs : Additional keyword arguments maxfev : int, optional, default is 1000 T_init : float, optional, default is 1e4, used as the initial guess for the fit. R_init : float, optional, default is 1e15, used as the initial guess for the fit. use_eff_wavelength : bool, optional, default is False. If True, then even for photometry provided as magnitudes (or bandpass fluxes), the effective wavelength approximation is used. In that case the AB magnitudes are converted to flux densities via redback.utils.calc_flux_density_from_ABmag. If False, full bandpass integration is used. Returns ------- df_bb : pandas.DataFrame or None A DataFrame containing columns: - epoch_times : binned epoch times, - temperature : best-fit blackbody temperatures (Kelvin), - radius : best-fit photospheric radii (cm), - temp_err : 1σ uncertainties on the temperatures, - radius_err : 1σ uncertainties on the radii. Returns None if insufficient data are available. """ from scipy.optimize import curve_fit import astropy.units as uu import numpy as np import pandas as pd # Get the filtered photometry. # Assumes self.get_filtered_data() returns (time, time_err, y, y_err) time_data, _, flux_data, flux_err_data = self.get_filtered_data() redback.utils.logger.info("Estimating blackbody parameters for {}.".format(self.name)) redback.utils.logger.info("Using data mode = {}".format(self.data_mode)) # Determine whether we are in bandpass mode. use_bandpass = False if hasattr(self, "data_mode") and self.data_mode in ['flux', 'magnitude']: use_bandpass = True # Assume self.filtered_sncosmo_bands contains the (string) band names. band_data = self.filtered_sncosmo_bands else: # Otherwise the flux data and frequencies are assumed to be given. redback.utils.logger.info("Using effective wavelength approximation for {}".format(self.data_mode)) freq_data = self.filtered_frequencies # Option: force effective wavelength approximation even if data_mode is bandpass. force_eff = kwargs.get('use_eff_wavelength', False) if use_bandpass and force_eff: redback.utils.logger.warning("Using effective wavelength approximation for {}".format(self.data_mode)) if self.data_mode == 'magnitude': # Convert the AB magnitudes to flux density using the redback function. from redback.utils import abmag_to_flux_density_and_error_inmjy flux_data, flux_err_data = abmag_to_flux_density_and_error_inmjy(flux_data, flux_err_data) freq_data = redback.utils.bands_to_frequency(band_data) else: # Convert the bandpass fluxes to flux density using the redback function. from redback.utils import bandpass_flux_to_flux_density, bands_to_effective_width redback.utils.logger.warning("Ensure filters.csv has the correct bandpass effective widths for your filter.") effective_widths = bands_to_effective_width(band_data) freq_data = redback.utils.bands_to_frequency(band_data) flux_data, flux_err_data = bandpass_flux_to_flux_density(flux_data, flux_err_data, effective_widths) # Use the effective frequency approach. use_bandpass = False # Get initial guesses. T_init = kwargs.get('T_init', 1e4) R_init = kwargs.get('R_init', 1e15) maxfev = kwargs.get('maxfev', 1000) # Sort photometric data by time. sort_idx = np.argsort(time_data) time_data = time_data[sort_idx] flux_data = flux_data[sort_idx] flux_err_data = flux_err_data[sort_idx] if use_bandpass: band_data = np.array(band_data)[sort_idx] else: freq_data = np.array(freq_data)[sort_idx] # Retrieve redshift. redshift = np.nan_to_num(self.redshift) if redshift <= 0.: raise ValueError("Redshift must be provided to perform K-correction.") # For effective frequency mode, K-correct frequencies. if not use_bandpass: freq_data, _ = redback.utils.calc_kcorrected_properties(frequency=freq_data, redshift=redshift, time=0.) # Define the model functions. if not use_bandpass: # --- Effective-wavelength model --- def bb_model(freq, logT, logR): T = 10 ** logT R = 10 ** logR # Compute the model flux density in erg/s/cm^2/Hz. # freq is in rest frame, so we need to apply cosmological dimming factor model_flux_cgs = redback.sed.blackbody_to_flux_density(T, R, distance, freq) * (1 + redshift) # Convert to mJy. (1 Jy = 1e-23 erg/s/cm^2/Hz; 1 mJy = 1e-3 Jy = 1e-26 erg/s/cm^2/Hz) model_flux_mjy = (model_flux_cgs / (1e-26 * uu.erg / uu.s / uu.cm**2 / uu.Hz)).value return model_flux_mjy model_func = bb_model else: # --- Full bandpass integration model --- # In this branch we do NOT want to pass strings to curve_fit. # Instead, we will dummy-encode the independent variable as indices. # We also capture the band names in a closure variable. def bb_model_bandpass_from_index(x, logT, logR): # Ensure x is a numpy array and convert indices to integers. i_idx = np.round(x).astype(int) # Retrieve all corresponding band names in one step. bands = np.array(epoch_bands)[i_idx] # Call bb_model_bandpass with the entire array of bands. return bb_model_bandpass(bands, logT, logR, redshift, distance, output_format=self.data_mode) def bb_model_bandpass(band, logT, logR, redshift, distance, output_format='magnitude'): from redback.utils import calc_kcorrected_properties, lambda_to_nu, bandpass_magnitude_to_flux # Create a wavelength grid (in Å) from 100 to 80,000 Å. lambda_obs = np.geomspace(100, 80000, 300) # Convert to frequency (Hz) and apply K-correction. frequency, _ = calc_kcorrected_properties(frequency=lambda_to_nu(lambda_obs), redshift=redshift, time=0.) T = 10 ** logT R = 10 ** logR # Compute the model SED (flux density in erg/s/cm^2/Hz). # Apply cosmological dimming factor for observer-frame flux model_flux = redback.sed.blackbody_to_flux_density(T, R, distance, frequency) * (1 + redshift) # Convert the SED to per-Å units _spectra = model_flux.to(uu.erg / uu.cm ** 2 / uu.s / uu.Angstrom, equivalencies=uu.spectral_density(wav=lambda_obs * uu.Angstrom)) spectra = np.zeros((5, 300)) spectra[:, :] = _spectra.value # Create a source object from the spectrum. source = redback.sed.RedbackTimeSeriesSource(phase=np.array([0, 1, 2, 3, 4]), wave=lambda_obs, flux=spectra) if output_format == 'flux': # Convert bandpass magnitude to flux. mag = source.bandmag(phase=0, band=band, magsys='ab') return bandpass_magnitude_to_flux(magnitude=mag, bands=band) elif output_format == 'magnitude': mag = source.bandmag(phase=0, band=band, magsys='ab') return mag else: raise ValueError("Unknown output_format in bb_model_bandpass.") # Our wrapper for curve_fit uses dummy x-values. model_func = bb_model_bandpass_from_index # Initialize lists to store fit results. epoch_times = [] temperatures = [] radii = [] temp_errs = [] radius_errs = [] t_min = np.min(time_data) t_max = np.max(time_data) bins = np.arange(t_min, t_max + bin_width, bin_width) redback.utils.logger.info("Number of bins: {}".format(len(bins))) # Ensure at least one bin has enough points. bins_with_enough = [i for i in range(len(bins) - 1) if np.sum((time_data >= bins[i]) & (time_data < bins[i + 1])) >= min_filters] if len(bins_with_enough) == 0: redback.utils.logger.warning("No time bins have at least {} measurements. Fitting cannot proceed.".format(min_filters)) redback.utils.logger.warning("Try generating more data through GPs, increasing bin widths, or using fewer filters.") return None # Loop over bins (epochs): for each with enough data perform the fit. for i in range(len(bins) - 1): mask = (time_data >= bins[i]) & (time_data < bins[i + 1]) if np.sum(mask) < min_filters: continue t_epoch = np.mean(time_data[mask]) try: if not use_bandpass: # Use effective frequency array (numeric). xdata = freq_data[mask] else: # For full bandpass integration mode, we dummy encode xdata. # We ignore the value and simply use indices [0, 1, 2, ...]. epoch_bands = list(band_data[mask]) # capture the list of bands for this epoch xdata = np.arange(len(epoch_bands)) popt, pcov = curve_fit( model_func, xdata, flux_data[mask], sigma=flux_err_data[mask], p0=[np.log10(T_init), np.log10(R_init)], absolute_sigma=True, maxfev=maxfev ) except Exception as e: redback.utils.logger.warning(f"Fit failed for epoch {i}: {e}") redback.utils.logger.warning(f"Skipping epoch {i} with time {t_epoch:.2f} days.") continue logT_fit, logR_fit = popt T_fit = 10 ** logT_fit R_fit = 10 ** logR_fit perr = np.sqrt(np.diag(pcov)) T_err = np.log(10) * T_fit * perr[0] R_err = np.log(10) * R_fit * perr[1] epoch_times.append(t_epoch) temperatures.append(T_fit) radii.append(R_fit) temp_errs.append(T_err) radius_errs.append(R_err) if len(epoch_times) == 0: redback.utils.logger.warning("No epochs with sufficient data yielded a successful fit.") return None df_bb = pd.DataFrame({ 'epoch_times': epoch_times, 'temperature': temperatures, 'radius': radii, 'temp_err': temp_errs, 'radius_err': radius_errs }) redback.utils.logger.info('Masking epochs with likely wrong extractions') df_bb = df_bb[df_bb['temp_err'] / df_bb['temperature'] < 1] df_bb = df_bb[df_bb['radius_err'] / df_bb['radius'] < 1] return df_bb
[docs] def estimate_bolometric_luminosity(self, distance: float = 1e27, bin_width: float = 1.0, min_filters: int = 3, **kwargs): """ Estimate the bolometric luminosity as a function of time by fitting the blackbody SED to the multi‑band photometry and then integrating that spectrum. For each epoch the bolometric luminosity is computed using the Stefan–Boltzmann law evaluated at the source: L_bol = 4 π R² σ_SB T⁴ Uncertainties in T and R are propagated assuming (ΔL_bol / L_bol)² = (2 ΔR / R)² + (4 ΔT / T)². Optionally, two corrections can be applied: 1. A boost–factor to “restore” missing blue flux. If a cutoff wavelength is provided via the keyword 'lambda_cut' (in angstroms), it is converted to centimeters and a boost factor is calculated as: Boost = (F_tot / F_red) where F_tot = σ_SB T⁴ and F_red is computed by numerically integrating π * B_λ(T) from the cutoff wavelength (in cm) to infinity. The final (boosted) luminosity becomes: L_boosted = Boost × (4π R² σ_SB T⁴). 2. An extinction correction. If the bolometric extinction (A_ext, in magnitudes) is supplied via the keyword 'A_ext', the luminosity will be reduced by a factor of 10^(–0.4·A_ext) to account for dust extinction. (A_ext defaults to 0.) Parameters ---------- distance : float, optional Distance to the transient in centimeters. (Default is 1e27 cm.) bin_width : float, optional Width of the time bins (in days) used for grouping photometry. (Default is 1.0.) min_filters : int, optional Minimum number of independent filters required in a bin to perform a fit. (Default is 3.) kwargs : dict, optional Additional keyword arguments to pass to `estimate_bb_params` (e.g., maxfev, T_init, R_init, use_eff_wavelength, etc.). Additionally: - 'lambda_cut': If provided (in angstroms), the bolometric luminosity will be “boosted” to account for missing blue flux. - 'A_ext': Bolometric extinction in magnitudes. The observed luminosity is increased by a factor 10^(+0.4·A_ext). (Default is 0.) Returns ------- df_bol : pandas.DataFrame or None A DataFrame containing columns: - epoch_times: Mean time of the bin (days). - temperature: Fitted blackbody temperature (K). - radius: Fitted photospheric radius (cm). - lum_bol: Derived bolometric luminosity (1e50 erg/s) computed as 4π R² σ_SB T⁴ (boosted and extinction-corrected if requested). - lum_bol_bb: Derived bolometric blackbody luminosity (1e50 erg/s) computed as 4π R² σ_SB T⁴, before applying either the boost or extinction correction. - lum_bol_err: 1σ uncertainty on L_bol (1e50 erg/s) from error propagation. - time_rest_frame: Epoch time divided by (1+redshift), i.e., the rest-frame time in days. Returns None if no valid blackbody fits were obtained. """ from redback.sed import boosted_bolometric_luminosity # Retrieve optional lambda_cut (in angstroms) for the boost correction. lambda_cut_angstrom = kwargs.pop('lambda_cut', None) if lambda_cut_angstrom is not None: redback.utils.logger.info("Including effects of missing flux due to line blanketing.") redback.utils.logger.info( "Using lambda_cut = {} Å for bolometric luminosity boost.".format(lambda_cut_angstrom)) # Convert lambda_cut from angstroms to centimeters (1 Å = 1e-8 cm) lambda_cut = lambda_cut_angstrom * 1e-8 else: redback.utils.logger.info("No lambda_cut provided; no correction applied. Assuming a pure blackbody SED.") lambda_cut = None # Retrieve optional extinction in magnitudes. A_ext = kwargs.pop('A_ext', 0.0) if A_ext != 0.0: redback.utils.logger.info("Applying extinction correction with A_ext = {} mag.".format(A_ext)) extinction_factor = 10 ** (0.4 * A_ext) # Retrieve blackbody parameters via your existing method. df_bb = self.estimate_bb_params(distance=distance, bin_width=bin_width, min_filters=min_filters, **kwargs) if df_bb is None or len(df_bb) == 0: redback.utils.logger.warning("No valid blackbody fits were obtained; cannot estimate bolometric luminosity.") return None # Compute L_bol (or L_boosted) for each epoch and propagate uncertainties. L_bol = [] L_bol_err = [] L_bol_bb = [] L_bol_bb_err = [] for index, row in df_bb.iterrows(): temp = row['temperature'] radius = row['radius'] T_err = row['temp_err'] R_err = row['radius_err'] # Use boosted luminosity if lambda_cut is provided. if lambda_cut is not None: lum, lum_bb = boosted_bolometric_luminosity(temp, radius, lambda_cut) else: lum = 4 * np.pi * (radius ** 2) * redback.constants.sigma_sb * (temp ** 4) lum_bb = lum # Apply extinction correction to both luminosities. lum *= extinction_factor lum_bb *= extinction_factor # Propagate uncertainties using: # (ΔL/L)² = (2 ΔR / R)² + (4 ΔT / T)². rel_err = np.sqrt((2 * R_err / radius) ** 2 + (4 * T_err / temp) ** 2) L_err = lum * rel_err L_err_bb = lum_bb * rel_err L_bol.append(lum) L_bol_bb.append(lum_bb) L_bol_err.append(L_err) L_bol_bb_err.append(L_err_bb) df_bol = df_bb.copy() df_bol['lum_bol'] = np.array(L_bol) / 1e50 df_bol['lum_bol_err'] = np.array(L_bol_err) / 1e50 df_bol['lum_bol_bb'] = np.array(L_bol_bb) / 1e50 df_bol['lum_bol_bb_err'] = np.array(L_bol_bb_err) / 1e50 df_bol['time_rest_frame'] = df_bol['epoch_times'] / (1 + self.redshift) redback.utils.logger.info('Masking bolometric estimates with likely wrong extractions') df_bol = df_bol[df_bol['lum_bol_err'] / df_bol['lum_bol'] < 1] redback.utils.logger.info( "Estimated bolometric luminosity using blackbody integration (with boost and extinction corrections if specified).") return df_bol