Source code for redback.get_data.fink

import os
import io
from typing import Union

import astropy.units as uu
import numpy as np
import pandas as pd
import requests
from astropy.time import Time

import redback
import redback.get_data.directory
import redback.get_data.utils
import redback.redback_errors
from redback.get_data.getter import DataGetter
from redback.utils import logger, calc_flux_density_from_ABmag, \
    calc_flux_density_error_from_monochromatic_magnitude, bandpass_magnitude_to_flux, bands_to_reference_flux, \
    jd_to_mjd, calc_flux_error_from_magnitude

dirname = os.path.dirname(__file__)


[docs]class FinkDataGetter(DataGetter): VALID_TRANSIENT_TYPES = ["afterglow", "kilonova", "supernova", "tidal_disruption_event", "unknown"]
[docs] def __init__(self, transient: str, transient_type: str) -> None: """ Constructor class for a data getter. The instance will be able to downloaded the specified Swift data. :param transient: Telephone number of GRB, e.g., 'GRB140903A' or '140903A' are valid inputs. :type transient: str :param transient_type: Type of the transient. Must be from `redback.get_data.open_data.FinkDataGetter.VALID_TRANSIENT_TYPES`. :type transient_type: str """ super().__init__(transient, transient_type) self.directory_path, self.raw_file_path, self.processed_file_path = \ redback.get_data.directory.lasair_directory_structure(transient=self.transient, transient_type=self.transient_type)
@property def url(self) -> str: """ :return: The fink raw data url. :rtype: str """ return "https://fink-portal.org/api/v1/objects" @property def objectId(self) -> str: """ :return: The object ID i.e., the transient name :rtype: str """ return self.transient
[docs] def collect_data(self) -> None: """Downloads the data from astrocats and saves it into the raw file path.""" if os.path.isfile(self.raw_file_path): logger.warning('The raw data file already exists.') return None response = requests.post(url=self.url, json={'objectId': self.objectId, 'output-format': 'csv', 'withupperlim': 'True'}) data = pd.read_csv(io.BytesIO(response.content)) if len(data) == 0: raise ValueError( f"Transient {self.transient} does not exist in the catalog. " f"Are you sure you are using the right alias?") data.to_csv(self.raw_file_path, index=False) logger.info(f"Retrieved data for {self.transient}.")
[docs] def convert_raw_data_to_csv(self) -> Union[pd.DataFrame, None]: """Converts the raw data into processed data and saves it into the processed file path. The data columns are in `OpenDataGetter.PROCESSED_FILE_COLUMNS`. :return: The processed data. :rtype: pandas.DataFrame """ if os.path.isfile(self.processed_file_path): logger.warning('The processed data file already exists. Returning.') return pd.read_csv(self.processed_file_path) raw_data = pd.read_csv(self.raw_file_path) raw_data = raw_data[raw_data['d:tag']=='valid'] fink_to_general_bands = {1: "ztfg", 2: "ztfr", 3:'ztfi'} processed_data = pd.DataFrame() processed_data["time"] = jd_to_mjd(raw_data["i:jd"].values) processed_data["magnitude"] = raw_data['i:magap'].values processed_data["e_magnitude"] = raw_data['i:sigmagap'].values processed_data['system'] = 'AB' bands = [fink_to_general_bands[x] for x in raw_data['i:fid']] processed_data["band"] = bands processed_data["flux_density(mjy)"] = calc_flux_density_from_ABmag(processed_data["magnitude"].values).value processed_data["flux_density_error"] = calc_flux_density_error_from_monochromatic_magnitude( magnitude=processed_data["magnitude"].values, magnitude_error=processed_data["e_magnitude"].values, reference_flux=3631, magnitude_system="AB") processed_data['flux(erg/cm2/s)'] = bandpass_magnitude_to_flux(processed_data['magnitude'].values, processed_data['band'].values) processed_data['flux_error'] = calc_flux_error_from_magnitude(magnitude=processed_data['magnitude'].values, magnitude_error=processed_data['e_magnitude'].values, reference_flux=bands_to_reference_flux(processed_data['band'].values)) processed_data = processed_data.sort_values(by="time") time_of_event = min(processed_data["time"]) - 0.1 time_of_event = Time(time_of_event, format='mjd') tt = Time(np.asarray(processed_data["time"], dtype=float), format='mjd') processed_data['time (days)'] = ((tt - time_of_event).to(uu.day)).value processed_data.to_csv(self.processed_file_path, sep=',', index=False) logger.info(f'Congratulations, you now have a nice data file: {self.processed_file_path}') return processed_data