from __future__ import annotations
import os
from typing import Union
import urllib
import urllib.request
import astropy.io.ascii
import numpy as np
import pandas as pd
import requests
import redback.get_data.directory
import redback.get_data.utils
import redback.redback_errors
from redback.get_data.getter import GRBDataGetter
from redback.utils import logger
try:
import swifttools.ukssdc.data.GRB as udg
SWIFTTOOLS_AVAILABLE = True
except ImportError:
SWIFTTOOLS_AVAILABLE = False
logger.warning("swifttools not available. You will not be able to download Swift afterglow data via API.")
dirname = os.path.dirname(__file__)
[docs]
class SwiftDataGetter(GRBDataGetter):
VALID_TRANSIENT_TYPES = ["afterglow", "prompt"]
VALID_DATA_MODES = ['flux', 'flux_density', 'prompt']
VALID_INSTRUMENTS = ['BAT+XRT', 'XRT']
VALID_BAT_SNR = ['SNR4', 'SNR5', 'SNR6', 'SNR7']
XRT_DATA_KEYS = ['Time [s]', "Pos. time err [s]", "Neg. time err [s]", "Flux [erg cm^{-2} s^{-1}]",
"Pos. flux err [erg cm^{-2} s^{-1}]", "Neg. flux err [erg cm^{-2} s^{-1}]"]
INTEGRATED_FLUX_KEYS = ["Time [s]", "Pos. time err [s]", "Neg. time err [s]", "Flux [erg cm^{-2} s^{-1}]",
"Pos. flux err [erg cm^{-2} s^{-1}]", "Neg. flux err [erg cm^{-2} s^{-1}]", "Instrument"]
FLUX_DENSITY_KEYS = ['Time [s]', "Pos. time err [s]", "Neg. time err [s]",
'Flux [mJy]', 'Pos. flux err [mJy]', 'Neg. flux err [mJy]', 'Frequency [Hz]']
PROMPT_DATA_KEYS = ["Time [s]", "flux_15_25 [counts/s/det]", "flux_15_25_err [counts/s/det]",
"flux_25_50 [counts/s/det]",
"flux_25_50_err [counts/s/det]", "flux_50_100 [counts/s/det]", "flux_50_100_err [counts/s/det]",
"flux_100_350 [counts/s/det]", "flux_100_350_err [counts/s/det]", "flux_15_350 [counts/s/det]",
"flux_15_350_err [counts/s/det]"]
SWIFT_PROMPT_BIN_SIZES = ['1s', '2ms', '8ms', '16ms', '64ms', '256ms']
[docs]
def __init__(
self, grb: str, transient_type: str, data_mode: str,
instrument: str = 'BAT+XRT', bin_size: str = None,
snr: Union[int, str] = 4, force_download: bool = False) -> None:
"""Constructor class for a data getter. The instance will be able to download the specified Swift data.
:param grb: Telephone number of GRB, e.g., 'GRB140903A' or '140903A' are valid inputs.
:type grb: str
:param transient_type: Type of the transient. Should be 'prompt' or 'afterglow'.
:type transient_type: str
:param data_mode: Data mode must be from `redback.get_data.swift.SwiftDataGetter.VALID_DATA_MODES`.
:type data_mode: str
:param instrument: Instrument(s) to use.
Must be from `redback.get_data.swift.SwiftDataGetter.VALID_INSTRUMENTS`.
:type instrument: str
:param bin_size: Bin size. Must be from `redback.get_data.swift.SwiftDataGetter.SWIFT_PROMPT_BIN_SIZES`.
:type bin_size: str
:param snr: BAT Burst Analyser SNR choice (e.g., 4, 5, 6, 7).
:type snr: int or str
:param force_download: If True, re-download data from API even if cached files exist.
:type force_download: bool
"""
super().__init__(grb=grb, transient_type=transient_type)
self.grb = grb
self.instrument = instrument
self.data_mode = data_mode
self.bin_size = bin_size
self.snr = snr
self.force_download = force_download
self.directory_path, self.raw_file_path, self.processed_file_path = self.create_directory_structure()
@property
def data_mode(self) -> str:
"""Ensures the data mode to be from `SwiftDataGetter.VALID_DATA_MODES`.
:return: The data mode
:rtype: str
"""
return self._data_mode
@data_mode.setter
def data_mode(self, data_mode: str) -> None:
"""
:param data_mode: The data mode.
:type data_mode: str
"""
if data_mode not in self.VALID_DATA_MODES:
raise ValueError("Swift does not have {} data".format(self.data_mode))
self._data_mode = data_mode
@property
def instrument(self) -> str:
"""
Ensures the data mode to be from `SwiftDataGetter.VALID_INSTRUMENTS`.
:return: The instrument
:rtype: str
"""
return self._instrument
@instrument.setter
def instrument(self, instrument: str) -> None:
"""
:param instrument: The instrument
:type: str
"""
if instrument not in self.VALID_INSTRUMENTS:
raise ValueError("Swift does not have {} instrument mode".format(self.instrument))
self._instrument = instrument
@property
def snr(self) -> str:
"""Returns BAT Burst Analyser SNR selection."""
return self._snr
@snr.setter
def snr(self, snr: Union[int, str]) -> None:
"""Normalizes SNR to 'SNR4' style and validates."""
snr_str = f"SNR{snr}".upper() if isinstance(snr, (int, float)) else str(snr).upper()
if not snr_str.startswith("SNR"):
snr_str = f"SNR{snr_str}"
if snr_str not in self.VALID_BAT_SNR:
raise ValueError(f"Unsupported BAT SNR selection: {snr}. Choose from {self.VALID_BAT_SNR}.")
self._snr = snr_str
@property
def trigger(self) -> str:
"""Gets the trigger number based on the GRB name.
:return: The trigger number.
:rtype: str
"""
logger.info('Getting trigger number')
return redback.get_data.utils.get_trigger_number(self.stripped_grb)
@property
def swifttools_grb_name(self) -> str:
"""Formats the GRB name for swifttools (expects 'GRB 060729' style)."""
if self.grb.startswith("GRB ") or not self.grb.startswith("GRB"):
return self.grb
return f"GRB {self.grb[len('GRB'):]}"
[docs]
def get_swift_id_from_grb(self) -> str:
"""
Gets the Swift ID from the GRB number.
:return: The Swift ID
:rtype: str
"""
data = astropy.io.ascii.read(f'{dirname.rstrip("get_data/")}/tables/summary_general_swift_bat.txt')
triggers = list(data['col2'])
event_names = list(data['col1'])
swift_id = triggers[event_names.index(self.grb)]
if len(swift_id) == 6:
swift_id += "000"
swift_id = swift_id.zfill(11)
return swift_id
@property
def grb_website(self) -> str:
"""
:return: The GRB website depending on the data mode and instrument.
:rtype: str
"""
if self.transient_type == 'prompt':
return f"https://swift.gsfc.nasa.gov/results/batgrbcat/{self.grb}/data_product/" \
f"{self.get_swift_id_from_grb()}-results/lc/{self.bin_size}_lc_ascii.dat"
if self.instrument == 'BAT+XRT':
return f'http://www.swift.ac.uk/burst_analyser/00{self.trigger}/'
elif self.instrument == 'XRT':
return f'https://www.swift.ac.uk/xrt_curves/00{self.trigger}/flux.qdp'
[docs]
def get_data(self) -> pd.DataFrame:
"""
Downloads the raw data and produces a processed .csv file.
:return: The processed data
:rtype: pandas.DataFrame
"""
if self.instrument == "BAT+XRT":
logger.warning(
"You are downloading BAT and XRT data, "
"you will need to truncate the data for some models.")
elif self.instrument == "XRT":
logger.warning(
"You are only downloading XRT data, you may not capture"
" the tail of the prompt emission.")
return super(SwiftDataGetter, self).get_data()
[docs]
def create_directory_structure(self) -> redback.get_data.directory.DirectoryStructure:
"""
:return: A namedtuple with the directory path, raw file path, and processed file path.
:rtype: redback.get_data.directory.DirectoyStructure
"""
if self.transient_type == 'afterglow':
return redback.get_data.directory.afterglow_directory_structure(
grb=self.grb, data_mode=self.data_mode, instrument=self.instrument, snr=self.snr)
elif self.transient_type == 'prompt':
return redback.get_data.directory.swift_prompt_directory_structure(
grb=self.grb, bin_size=self.bin_size)
[docs]
def collect_data(self) -> None:
"""Downloads the data from the Swift website and saves it into the raw file path."""
# For prompt emission, continue using direct download (no API available yet)
if self.transient_type == "prompt":
if os.path.isfile(self.raw_file_path) and not self.force_download:
logger.warning('The raw data file already exists. Returning.')
return
response = requests.get(self.grb_website)
if 'No Light curve available' in response.text:
raise redback.redback_errors.WebsiteExist(
f'Problem loading the website for GRB{self.stripped_grb}. '
f'Are you sure GRB {self.stripped_grb} has Swift data?')
self.download_directly()
return
# For afterglow data, use swifttools API
# Check both raw and processed files - if force_download, we need to regenerate both
if not self.force_download:
if os.path.isfile(self.processed_file_path):
logger.warning('The processed data file already exists. Returning.')
return
if not SWIFTTOOLS_AVAILABLE:
raise ImportError(
"swifttools is required for Swift afterglow data retrieval. "
"Please install it with: pip install swifttools"
)
# Check if raw data already exists and can be loaded (unless force_download is True)
if os.path.isfile(self.raw_file_path) and not self.force_download:
try:
logger.info(f'Raw data file exists, loading from {self.raw_file_path}')
self.load_raw_api_data()
return
except Exception as e:
logger.warning(f'Could not load raw data file: {e}. Re-downloading from API.')
# Download from API if raw data doesn't exist, couldn't be loaded, or force_download is True
if self.force_download:
logger.info('Force download requested, re-downloading from API')
# For BAT+XRT mode, get both instruments
if self.instrument == 'BAT+XRT':
# For flux_density mode, get XRT from Burst Analyser (has Density datasets in Jy)
# For flux mode, get XRT from getLightCurves (has integrated flux in erg/cm²/s)
if self.data_mode == 'flux_density':
ba_data = self.download_burst_analyser_data_via_api()
self._api_data = {'xrt': None, 'bat': ba_data}
else:
xrt_data = self.download_xrt_data_via_api()
ba_data = self.download_burst_analyser_data_via_api()
self._api_data = {'xrt': xrt_data, 'bat': ba_data}
elif self.instrument == 'XRT':
# XRT only
if self.data_mode == 'flux_density':
ba_data = self.download_burst_analyser_data_via_api()
self._api_data = {'xrt': None, 'bat': ba_data}
else:
xrt_data = self.download_xrt_data_via_api()
self._api_data = {'xrt': xrt_data, 'bat': None}
else:
# Shouldn't happen but handle gracefully
if self.data_mode == 'flux_density':
ba_data = self.download_burst_analyser_data_via_api()
self._api_data = {'xrt': None, 'bat': ba_data}
else:
xrt_data = self.download_xrt_data_via_api()
self._api_data = {'xrt': xrt_data, 'bat': None}
# Save raw API data for debugging/reprocessing
self.save_raw_api_data()
[docs]
def download_directly(self) -> None:
"""Downloads prompt or XRT data directly without using PhantomJS if possible."""
try:
urllib.request.urlretrieve(self.grb_website, self.raw_file_path)
logger.info(f'Congratulations, you now have raw {self.instrument} {self.transient_type} '
f'data for {self.grb}')
except Exception as e:
logger.warning(f'Cannot load the website for {self.grb} \n'
f'Failed with exception: \n'
f'{e}')
finally:
urllib.request.urlcleanup()
[docs]
def download_xrt_data_via_api(self) -> pd.DataFrame:
"""Downloads XRT data using the swifttools API.
:return: The XRT lightcurve data
:rtype: pandas.DataFrame
"""
if not SWIFTTOOLS_AVAILABLE:
raise ImportError("swifttools is required for API-based data retrieval. "
"Please install it with: pip install swifttools")
try:
logger.info(f'Downloading XRT data for {self.grb} using swifttools API')
# Get the lightcurve data using swifttools
lc_data = None
last_error = None
grb_names = [self.swifttools_grb_name, self.grb, self.stripped_grb]
for grb_name in grb_names:
if not grb_name:
continue
try:
lc_data = udg.getLightCurves(
GRBName=grb_name,
incbad="both",
nosys="both",
saveData=False,
returnData=True,
silent=True
)
break
except Exception as e:
last_error = e
logger.warning(f'XRT API failed for GRBName={grb_name}: {e}')
if lc_data is None:
raise last_error if last_error else RuntimeError("Failed to retrieve XRT data via API")
# The API returns datasets at the top level (not nested under GRB name)
# We want to combine WT and PC modes with priority: PC_nosys_incbad, WT_nosys_incbad
def select_best_curve(lc_data: dict, mode: str) -> pd.DataFrame | None:
"""Select the best curve for a given mode (PC or WT)."""
preferred_keys = [
f"{mode}_nosys_incbad",
f"{mode}_incbad",
f"{mode}_nosys",
f"{mode}",
]
for key in preferred_keys:
if key in lc_data:
df = lc_data[key]
if isinstance(df, pd.DataFrame) and len(df) > 0:
# Filter out upper limits if present
if 'UL' in df.columns and df['UL'].dtype == bool:
df = df[~df['UL']]
return df
return None
# Collect all XRT data (both WT and PC modes)
all_dfs = []
# Get WT mode data (typically earlier times)
wt_df = select_best_curve(lc_data, "WT")
if wt_df is not None:
all_dfs.append(wt_df)
logger.info(f'Found {len(wt_df)} WT mode data points')
# Get PC mode data (typically later times)
pc_df = select_best_curve(lc_data, "PC")
if pc_df is not None:
all_dfs.append(pc_df)
logger.info(f'Found {len(pc_df)} PC mode data points')
if not all_dfs:
raise redback.redback_errors.WebsiteExist(
f'No suitable XRT lightcurve data found for {self.grb}')
# Combine all dataframes
combined_df = pd.concat(all_dfs, ignore_index=True)
# Sort by time
combined_df = combined_df.sort_values('Time').reset_index(drop=True)
logger.info(f'Successfully downloaded {len(combined_df)} XRT data points for {self.grb}')
return combined_df
except Exception as e:
logger.warning(f'Failed to download XRT data via API for {self.grb}: {e}')
raise
[docs]
def download_burst_analyser_data_via_api(self) -> dict:
"""Downloads BAT+XRT Burst Analyser data using the swifttools API.
:return: The Burst Analyser data dictionary
:rtype: dict
"""
if not SWIFTTOOLS_AVAILABLE:
raise ImportError("swifttools is required for API-based data retrieval. "
"Please install it with: pip install swifttools")
try:
logger.info(f'Downloading Burst Analyser data for {self.grb} using swifttools API')
# Get the Burst Analyser data using swifttools
ba_data = None
last_error = None
grb_names = [self.swifttools_grb_name, self.grb, self.stripped_grb]
for grb_name in grb_names:
if not grb_name:
continue
try:
ba_data = udg.getBurstAnalyser(
GRBName=grb_name,
saveData=False,
returnData=True,
silent=True
)
if not ba_data or len(ba_data) == 0:
raise redback.redback_errors.WebsiteExist(
f'No Burst Analyser data available for {self.grb}')
break
except Exception as e:
last_error = e
logger.warning(f'Burst Analyser API failed for GRBName={grb_name}: {e}')
if ba_data is None:
raise last_error if last_error else RuntimeError("Failed to retrieve Burst Analyser data via API")
logger.info(f'Successfully downloaded Burst Analyser data for {self.grb}')
return ba_data
except Exception as e:
logger.warning(f'Failed to download Burst Analyser data via API for {self.grb}: {e}')
raise
[docs]
def save_raw_api_data(self) -> None:
"""Saves the raw API data to the raw file path for debugging and reprocessing."""
if not hasattr(self, '_api_data') or self._api_data is None:
# Fallback to marker file
with open(self.raw_file_path, 'w') as f:
f.write('# Data retrieved via swifttools API\n')
return
try:
# Create a comprehensive raw data file
with open(self.raw_file_path, 'w') as f:
f.write('# Swift data retrieved via swifttools API\n')
f.write(f'# GRB: {self.grb}\n')
f.write(f'# Instrument: {self.instrument}\n')
f.write(f'# Data mode: {self.data_mode}\n')
f.write('#\n')
if isinstance(self._api_data, dict):
# New format with separate XRT and BAT data
if 'xrt' in self._api_data and self._api_data['xrt'] is not None:
f.write('# XRT DATA (from getLightCurves)\n')
f.write('#\n')
xrt_df = self._api_data['xrt']
xrt_df.to_csv(f, index=False)
f.write('\n')
if 'bat' in self._api_data and self._api_data['bat'] is not None:
f.write('# BAT DATA (from getBurstAnalyser XRTBand)\n')
f.write('#\n')
ba_data = self._api_data['bat']
if 'BAT' in ba_data:
# Save ALL available SNR levels for BAT XRTBand data
for snr_key in ['SNR4', 'SNR5', 'SNR6', 'SNR7']:
if snr_key in ba_data['BAT']:
bat_entry = ba_data['BAT'][snr_key]
if isinstance(bat_entry, dict) and 'XRTBand' in bat_entry:
bat_df = bat_entry['XRTBand']
f.write(f'# BAT_{snr_key}\n')
bat_df.to_csv(f, index=False)
f.write('\n')
else:
# Legacy format: single dataframe
self._api_data.to_csv(f, index=False)
logger.info(f'Saved raw API data to {self.raw_file_path}')
except Exception as e:
logger.warning(f'Could not save raw API data: {e}')
# Fallback to marker file
with open(self.raw_file_path, 'w') as f:
f.write('# Data retrieved via swifttools API\n')
[docs]
def load_raw_api_data(self) -> None:
"""Loads previously saved raw API data from the raw file path."""
if not os.path.isfile(self.raw_file_path):
raise FileNotFoundError(f'Raw data file not found: {self.raw_file_path}')
logger.info(f'Loading raw API data from {self.raw_file_path}')
# Read the file and parse sections
with open(self.raw_file_path, 'r') as f:
lines = f.readlines()
# Check if it's just a marker file
if len(lines) == 1 and lines[0].strip() == '# Data retrieved via swifttools API':
raise ValueError('Raw file is just a marker, no actual data saved')
# Find section boundaries
xrt_start = None
bat_start = None
bat_snr_sections = []
for i, line in enumerate(lines):
if '# XRT DATA' in line:
xrt_start = i
elif '# BAT DATA' in line:
bat_start = i
elif line.startswith('# BAT_SNR'):
# Extract SNR level from header like "# BAT_SNR4"
snr_level = line.strip().replace('# BAT_', '')
bat_snr_sections.append((i, snr_level))
xrt_data = None
bat_data = None
# Parse XRT section
if xrt_start is not None:
# Find where CSV data starts (skip comment lines after section header)
csv_start = xrt_start + 1
while csv_start < len(lines) and (lines[csv_start].startswith('#') or lines[csv_start].strip() == ''):
csv_start += 1
# Find where CSV data ends (next section or end of file)
csv_end = bat_start if bat_start is not None else len(lines)
# Extract CSV data
if csv_start < csv_end:
from io import StringIO
csv_content = ''.join(lines[csv_start:csv_end])
xrt_data = pd.read_csv(StringIO(csv_content))
# Parse BAT sections (all SNR levels)
if bat_snr_sections:
from io import StringIO
bat_data = {'BAT': {}}
for idx, (section_start, snr_level) in enumerate(bat_snr_sections):
# Find where CSV data starts
csv_start = section_start + 1
while csv_start < len(lines) and (lines[csv_start].startswith('#') or lines[csv_start].strip() == ''):
csv_start += 1
# Find where CSV data ends (next SNR section or end of file)
if idx + 1 < len(bat_snr_sections):
csv_end = bat_snr_sections[idx + 1][0]
else:
csv_end = len(lines)
# Extract CSV data for this SNR level
if csv_start < csv_end:
csv_content = ''.join(lines[csv_start:csv_end])
bat_df = pd.read_csv(StringIO(csv_content))
bat_data['BAT'][snr_level] = {'XRTBand': bat_df}
# Store in the expected format
if xrt_data is not None or bat_data is not None:
self._api_data = {'xrt': xrt_data, 'bat': bat_data}
xrt_len = len(xrt_data) if xrt_data is not None else 0
bat_len = 0
if bat_data and 'BAT' in bat_data:
bat_len = sum(len(bat_data['BAT'][snr]['XRTBand']) for snr in bat_data['BAT'])
logger.info(f'Successfully loaded raw data: XRT={xrt_len} points, BAT={bat_len} points')
else:
raise ValueError('Could not parse any data from raw file')
[docs]
def convert_raw_data_to_csv(self) -> Union[pd.DataFrame, None]:
"""Converts the raw data into processed data and saves it into the processed file path.
:return: The processed data
:rtype: pandas.DataFrame
"""
if os.path.isfile(self.processed_file_path):
logger.warning('The processed data file already exists. Returning.')
return pd.read_csv(self.processed_file_path)
# Check if we have API data stored
if hasattr(self, '_api_data') and self._api_data is not None:
if isinstance(self._api_data, dict) and 'xrt' in self._api_data:
# New API format with separate XRT and BAT data
return self.convert_combined_api_data_to_csv()
else:
# Legacy: single dataframe (XRT only from old code)
return self.convert_xrt_api_data_to_csv()
# Fall back to legacy conversion methods
if self.instrument == 'XRT':
return self.convert_xrt_data_to_csv()
elif self.transient_type == 'afterglow':
return self.convert_raw_afterglow_data_to_csv()
elif self.transient_type == 'prompt':
return self.convert_raw_prompt_data_to_csv()
[docs]
def convert_xrt_data_to_csv(self) -> pd.DataFrame:
"""Converts the raw XRT data into processed data and saves it into the processed file path.
The column names are in `SwiftDataGetter.XRT_DATA_KEYS`
:return: The processed data.
:rtype: pandas.DataFrame
"""
data = np.loadtxt(self.raw_file_path, comments=['!', 'READ', 'NO'])
data = {key: data[:, i] for i, key in enumerate(self.XRT_DATA_KEYS)}
data = pd.DataFrame(data)
data = data[data["Pos. flux err [erg cm^{-2} s^{-1}]"] != 0.]
data.to_csv(self.processed_file_path, index=False, sep=',')
return data
[docs]
def convert_raw_afterglow_data_to_csv(self) -> pd.DataFrame:
"""Converts the raw afterglow data into processed data and saves it into the processed file path.
:return: The processed data.
:rtype: pandas.DataFrame
"""
if self.data_mode == 'flux':
return self.convert_integrated_flux_data_to_csv()
if self.data_mode == 'flux_density':
return self.convert_flux_density_data_to_csv()
[docs]
def convert_raw_prompt_data_to_csv(self) -> pd.DataFrame:
"""Converts the raw prompt data into processed data and saves it into the processed file path.
The column names are in `SwiftDataGetter.PROMPT_DATA_KEYS`
:return: The processed data.
:rtype: pandas.DataFrame
"""
data = np.loadtxt(self.raw_file_path)
df = pd.DataFrame(data=data, columns=self.PROMPT_DATA_KEYS)
df.to_csv(self.processed_file_path, index=False, sep=',')
return df
[docs]
def convert_integrated_flux_data_to_csv(self) -> pd.DataFrame:
"""Converts the flux data into processed data and saves it into the processed file path.
The column names are in `SwiftDataGetter.INTEGRATED_FLUX_KEYS`
:return: The processed data.
:rtype: pandas.DataFrame
"""
data = {key: [] for key in self.INTEGRATED_FLUX_KEYS}
with open(self.raw_file_path) as f:
started = False
for num, line in enumerate(f.readlines()):
if line.startswith('NO NO NO'):
started = True
if not started:
continue
if line.startswith('!'):
instrument = line[2:].replace('\n', '')
if line[0].isnumeric() or line[0] == '-':
line_items = line.split('\t')
data['Instrument'] = instrument
for key, item in zip(self.INTEGRATED_FLUX_KEYS, line_items):
data[key].append(item.replace('\n', ''))
df = pd.DataFrame(data=data)
df.to_csv(self.processed_file_path, index=False, sep=',')
return df
[docs]
def convert_flux_density_data_to_csv(self) -> pd.DataFrame:
"""Converts the flux data into processed data and saves it into the processed file path.
The column names are in `SwiftDataGetter.FLUX_DENSITY_KEYS`
:return: The processed data.
:rtype: pandas.DataFrame
"""
data = {key: [] for key in self.FLUX_DENSITY_KEYS}
with open(self.raw_file_path) as f:
started = False
for num, line in enumerate(f.readlines()):
if line.startswith('NO NO NO'):
started = True
if not started:
continue
if line[0].isnumeric() or line[0] == '-':
line_items = line.split('\t')
for key, item in zip(self.FLUX_DENSITY_KEYS, line_items):
data[key].append(item.replace('\n', ''))
data['Flux [mJy]'] = [float(x) * 1000 for x in data['Flux [mJy]']]
data['Pos. flux err [mJy]'] = [float(x) * 1000 for x in data['Pos. flux err [mJy]']]
data['Neg. flux err [mJy]'] = [float(x) * 1000 for x in data['Neg. flux err [mJy]']]
df = pd.DataFrame(data=data)
df.to_csv(self.processed_file_path, index=False, sep=',')
return df
[docs]
def convert_xrt_api_data_to_csv(self) -> pd.DataFrame:
"""Converts XRT data from the swifttools API into the expected CSV format.
:return: The processed data.
:rtype: pandas.DataFrame
"""
if not hasattr(self, '_api_data') or self._api_data is None:
raise ValueError("No API data available to convert")
df = self._api_data
# The API returns data with column names like 'Time', 'TimePos', 'TimeNeg', 'Flux', 'FluxPos', 'FluxNeg'
# We need to convert this to match the expected format
# Expected columns: 'Time [s]', "Pos. time err [s]", "Neg. time err [s]",
# "Flux [erg cm^{-2} s^{-1}]", "Pos. flux err [erg cm^{-2} s^{-1}]",
# "Neg. flux err [erg cm^{-2} s^{-1}]"
# Create mapping from API column names to expected column names
column_mapping = {
'Time': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [erg cm^{-2} s^{-1}]',
'FluxPos': 'Pos. flux err [erg cm^{-2} s^{-1}]',
'FluxNeg': 'Neg. flux err [erg cm^{-2} s^{-1}]',
}
# Rename columns if they exist in the dataframe
data = {}
for api_col, expected_col in column_mapping.items():
if api_col in df.columns:
data[expected_col] = df[api_col].values
# If we didn't find the expected columns, try alternative names
if 'Time [s]' not in data:
if 'T' in df.columns:
data['Time [s]'] = df['T'].values
elif 'MET' in df.columns:
data['Time [s]'] = df['MET'].values
# Create a new dataframe with the expected format
processed_df = pd.DataFrame(data)
# Filter out rows with zero or invalid flux errors (matching legacy behavior)
if 'Pos. flux err [erg cm^{-2} s^{-1}]' in processed_df.columns:
processed_df = processed_df[processed_df['Pos. flux err [erg cm^{-2} s^{-1}]'] != 0.]
processed_df.to_csv(self.processed_file_path, index=False, sep=',')
logger.info(f'Converted XRT API data to CSV format for {self.grb}')
return processed_df
[docs]
def convert_combined_api_data_to_csv(self) -> pd.DataFrame:
"""Converts combined XRT (from getLightCurves) and BAT (from getBurstAnalyser) data to CSV.
:return: The processed data.
:rtype: pandas.DataFrame
"""
if not hasattr(self, '_api_data') or self._api_data is None:
raise ValueError("No API data available to convert")
xrt_data = self._api_data.get('xrt')
bat_ba_data = self._api_data.get('bat')
# Handle different data modes
if self.data_mode == 'flux':
return self._convert_flux_mode(xrt_data, bat_ba_data)
elif self.data_mode == 'flux_density':
return self._convert_flux_density_mode(xrt_data, bat_ba_data)
else:
raise ValueError(f"Unsupported data mode: {self.data_mode}")
def _convert_flux_mode(self, xrt_data, bat_ba_data) -> pd.DataFrame:
"""Convert flux mode data (integrated flux in erg/cm²/s)."""
all_data = []
# Process XRT data from getLightCurves (already in correct units)
if xrt_data is not None and len(xrt_data) > 0:
xrt_mapping = {
'Time': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [erg cm^{-2} s^{-1}]',
'FluxPos': 'Pos. flux err [erg cm^{-2} s^{-1}]',
'FluxNeg': 'Neg. flux err [erg cm^{-2} s^{-1}]',
}
xrt_df = xrt_data.rename(columns={k: v for k, v in xrt_mapping.items() if k in xrt_data.columns}).copy()
required_cols = list(xrt_mapping.values())
if all(col in xrt_df.columns for col in required_cols):
xrt_df = xrt_df[required_cols].copy()
xrt_df['Instrument'] = 'XRT'
all_data.append(xrt_df)
logger.info(f'Processed {len(xrt_df)} XRT data points')
# Process BAT data from getBurstAnalyser
if bat_ba_data is not None and 'BAT' in bat_ba_data:
# Get the requested SNR level
snr_keys = [self.snr, 'SNR4', 'SNR5', 'SNR6', 'SNR7']
bat_df = None
for snr_key in snr_keys:
if snr_key in bat_ba_data['BAT']:
bat_entry = bat_ba_data['BAT'][snr_key]
# Use XRTBand which has BAT data in XRT-band flux (erg/cm^2/s)
if isinstance(bat_entry, dict) and 'XRTBand' in bat_entry:
bat_df = bat_entry['XRTBand']
logger.info(f'Using BAT {snr_key} XRTBand data')
break
if bat_df is not None and len(bat_df) > 0:
# Filter out bad bins
if 'BadBin' in bat_df.columns:
bat_df = bat_df[bat_df['BadBin'] == False].copy()
bat_mapping = {
'Time': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [erg cm^{-2} s^{-1}]',
'FluxPos': 'Pos. flux err [erg cm^{-2} s^{-1}]',
'FluxNeg': 'Neg. flux err [erg cm^{-2} s^{-1}]',
}
bat_processed = bat_df.rename(columns={k: v for k, v in bat_mapping.items() if k in bat_df.columns}).copy()
required_cols = list(bat_mapping.values())
if all(col in bat_processed.columns for col in required_cols):
# Filter for valid data
mask = (
np.isfinite(bat_processed['Time [s]']) &
np.isfinite(bat_processed['Flux [erg cm^{-2} s^{-1}]']) &
(bat_processed['Flux [erg cm^{-2} s^{-1}]'] > 0)
)
bat_processed = bat_processed[mask].copy()
bat_processed = bat_processed[required_cols].copy()
bat_processed['Instrument'] = 'BAT'
all_data.append(bat_processed)
logger.info(f'Processed {len(bat_processed)} BAT data points')
if not all_data:
raise ValueError(f"No valid XRT or BAT data found for {self.grb}")
# Combine and sort by time
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values('Time [s]').reset_index(drop=True)
# Save to CSV
expected_columns = self.INTEGRATED_FLUX_KEYS
final_df = combined_df[expected_columns]
final_df.to_csv(self.processed_file_path, index=False, sep=',')
logger.info(f'Saved combined data: {len(final_df)} total points for {self.grb}')
return final_df
def _convert_flux_density_mode(self, xrt_data, bat_ba_data) -> pd.DataFrame:
"""Convert flux_density mode data (flux density in mJy).
Uses Density datasets from getBurstAnalyser for both XRT and BAT (already in Jy).
"""
if bat_ba_data is None:
raise ValueError(f"No Burst Analyser data available for flux_density mode for {self.grb}")
all_data = []
# Process XRT Density data from getBurstAnalyser
# Density datasets have flux in Jy (not erg/cm²/s)
if 'XRT' in bat_ba_data:
xrt_datasets = bat_ba_data['XRT'].get('Datasets', [])
# Look for Density datasets (WT and PC modes)
for ds_name in ['Density_WT_incbad', 'Density_PC_incbad', 'Density_WT', 'Density_PC']:
if ds_name in xrt_datasets and ds_name in bat_ba_data['XRT']:
xrt_df = bat_ba_data['XRT'][ds_name]
if isinstance(xrt_df, pd.DataFrame) and len(xrt_df) > 0:
xrt_norm = pd.DataFrame()
xrt_norm['Time'] = xrt_df['Time']
xrt_norm['TimePos'] = xrt_df['TimePos']
xrt_norm['TimeNeg'] = xrt_df['TimeNeg']
# Density data is already in Jy, convert to mJy
xrt_norm['Flux'] = xrt_df['Flux'] * 1000
xrt_norm['FluxPos'] = xrt_df['FluxPos'] * 1000
xrt_norm['FluxNeg'] = xrt_df['FluxNeg'] * 1000
xrt_norm['Instrument'] = 'XRT'
all_data.append(xrt_norm)
logger.info(f'Found {len(xrt_norm)} XRT flux density points from {ds_name}')
# Process BAT Density data from getBurstAnalyser
if 'BAT' in bat_ba_data:
# Get the requested SNR level
snr_keys = [self.snr, 'SNR4', 'SNR5', 'SNR6', 'SNR7']
bat_df = None
for snr_key in snr_keys:
if snr_key in bat_ba_data['BAT']:
bat_entry = bat_ba_data['BAT'][snr_key]
# For BAT, the Density key has flux density in Jy
if isinstance(bat_entry, dict) and 'Density' in bat_entry:
bat_df = bat_entry['Density'].copy()
logger.info(f'Using BAT {snr_key} Density data for flux density')
break
if bat_df is not None and len(bat_df) > 0:
# Filter out bad bins
if 'BadBin' in bat_df.columns:
bat_df = bat_df[bat_df['BadBin'] == False].copy()
# Density data is already in Jy, convert to mJy
bat_df['Flux'] = bat_df['Flux'] * 1000
bat_df['FluxPos'] = bat_df['FluxPos'] * 1000
bat_df['FluxNeg'] = bat_df['FluxNeg'] * 1000
# Filter for valid data
mask = (
np.isfinite(bat_df['Time']) &
np.isfinite(bat_df['Flux']) &
(bat_df['Flux'] > 0)
)
bat_df = bat_df[mask].copy()
bat_df['Instrument'] = 'BAT'
all_data.append(bat_df)
logger.info(f'Processed {len(bat_df)} BAT flux density points')
if not all_data:
raise ValueError(f"No flux density data found for {self.grb}")
# Combine datasets
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values('Time').reset_index(drop=True)
# Map to expected column names
column_mapping = {
'Time': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [mJy]',
'FluxPos': 'Pos. flux err [mJy]',
'FluxNeg': 'Neg. flux err [mJy]'
}
final_df = combined_df.rename(columns={k: v for k, v in column_mapping.items() if k in combined_df.columns}).copy()
# Add frequency column (Swift flux density is at 10 keV = 2.418e18 Hz)
final_df['Frequency [Hz]'] = 2.418e18
# Select only the expected columns
expected_columns = self.FLUX_DENSITY_KEYS
missing = [col for col in expected_columns if col not in final_df.columns]
if missing:
raise ValueError(f"Missing required flux density columns: {missing}")
final_df = final_df[expected_columns]
final_df.to_csv(self.processed_file_path, index=False, sep=',')
logger.info(f'Saved flux density data: {len(final_df)} total points for {self.grb}')
return final_df
[docs]
def convert_burst_analyser_api_data_to_csv(self) -> pd.DataFrame:
"""Converts Burst Analyser data from the swifttools API into the expected CSV format.
:return: The processed data.
:rtype: pandas.DataFrame
"""
if not hasattr(self, '_api_data') or self._api_data is None:
raise ValueError("No API data available to convert")
# Extract Burst Analyser data from the nested structure
# _api_data = {'xrt': xrt_data, 'bat': ba_data}
if isinstance(self._api_data, dict) and 'bat' in self._api_data:
ba_data = self._api_data['bat']
else:
ba_data = self._api_data
def normalize_ba_flux_dataframe(df: pd.DataFrame, source_df: pd.DataFrame) -> pd.DataFrame | None:
"""Map Burst Analyser columns to integrated flux columns."""
if df is None or not isinstance(df, pd.DataFrame) or len(df) == 0:
return None
# Handle 'T' as alternative to 'Time'
df = df.copy()
if 'T' in df.columns and 'Time' not in df.columns:
df['Time'] = df['T']
mapping = {
'Time': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [erg cm^{-2} s^{-1}]',
'FluxPos': 'Pos. flux err [erg cm^{-2} s^{-1}]',
'FluxNeg': 'Neg. flux err [erg cm^{-2} s^{-1}]',
}
renamed = df.rename(columns={k: v for k, v in mapping.items() if k in df.columns}).copy()
required = [
'Time [s]',
'Pos. time err [s]',
'Neg. time err [s]',
'Flux [erg cm^{-2} s^{-1}]',
'Pos. flux err [erg cm^{-2} s^{-1}]',
'Neg. flux err [erg cm^{-2} s^{-1}]',
]
if not all(col in renamed.columns for col in required):
return None
cleaned = renamed[required].copy()
# Filter bad bins if provided by BA (BadBin column in the source df)
if 'BadBin' in source_df.columns:
mask = (source_df['BadBin'] == False) | (source_df['BadBin'] == 0)
cleaned = cleaned[mask].reset_index(drop=True)
# Require finite, positive flux and finite time
mask = (
np.isfinite(cleaned['Time [s]']) &
np.isfinite(cleaned['Flux [erg cm^{-2} s^{-1}]']) &
(cleaned['Flux [erg cm^{-2} s^{-1}]'] > 0)
)
cleaned = cleaned[mask].reset_index(drop=True)
return cleaned if len(cleaned) > 0 else None
# For flux mode, we want integrated flux data
if self.data_mode == 'flux':
all_data = []
# Extract XRT data - need to handle both WT and PC modes separately
if 'XRT' in ba_data:
# Try PC mode first
for key in ['ObservedFlux_PC_incbad', 'ObservedFlux_PC']:
if key in ba_data['XRT']:
pc_df = ba_data['XRT'][key]
if isinstance(pc_df, pd.DataFrame) and len(pc_df) > 0:
df_norm = normalize_ba_flux_dataframe(pc_df, pc_df)
if df_norm is not None:
df_norm['Instrument'] = 'XRT'
all_data.append(df_norm)
logger.info(f'Found {len(df_norm)} XRT PC mode points')
break
# Then try WT mode
for key in ['ObservedFlux_WT_incbad', 'ObservedFlux_WT']:
if key in ba_data['XRT']:
wt_df = ba_data['XRT'][key]
if isinstance(wt_df, pd.DataFrame) and len(wt_df) > 0:
df_norm = normalize_ba_flux_dataframe(wt_df, wt_df)
if df_norm is not None:
df_norm['Instrument'] = 'XRT'
all_data.append(df_norm)
logger.info(f'Found {len(df_norm)} XRT WT mode points')
break
# Extract BAT data - it's nested as ba_data['BAT'][SNR_key]['ObservedFlux']
if 'BAT' in ba_data:
snr_keys = [self.snr, 'SNR4', 'SNR5', 'SNR6', 'SNR7']
for snr_key in snr_keys:
if snr_key in ba_data['BAT']:
bat_entry = ba_data['BAT'][snr_key]
if isinstance(bat_entry, dict) and 'ObservedFlux' in bat_entry:
bat_df = bat_entry['ObservedFlux']
if isinstance(bat_df, pd.DataFrame) and len(bat_df) > 0:
df_norm = normalize_ba_flux_dataframe(bat_df, bat_df)
if df_norm is not None:
df_norm['Instrument'] = 'BAT'
all_data.append(df_norm)
logger.info(f'Found {len(df_norm)} BAT {snr_key} points')
break
if not all_data:
raise ValueError(
f"No suitable Burst Analyser flux data found for {self.grb}. "
f"Check that the selected BAT SNR ({self.snr}) has Flux columns."
)
# Combine all data
combined_df = pd.concat(all_data, ignore_index=True)
# Sort by time
combined_df = combined_df.sort_values('Time [s]').reset_index(drop=True)
# Select only the expected columns
expected_columns = self.INTEGRATED_FLUX_KEYS
final_df = combined_df[expected_columns]
elif self.data_mode == 'flux_density':
# For flux_density mode, combine XRT lightcurve data with BAT from Burst Analyser
all_data = []
# First, add XRT data from getLightCurves
# XRT data is stored in _api_data['xrt'] after loading
xrt_lc_data = self._api_data.get('xrt') if isinstance(self._api_data, dict) else None
logger.info(f"DEBUG: xrt_lc_data type: {type(xrt_lc_data)}, is None: {xrt_lc_data is None}")
if xrt_lc_data is not None:
logger.info(f"DEBUG: xrt_lc_data is DataFrame: {isinstance(xrt_lc_data, pd.DataFrame)}, len: {len(xrt_lc_data) if isinstance(xrt_lc_data, pd.DataFrame) else 'N/A'}")
if isinstance(xrt_lc_data, pd.DataFrame):
logger.info(f"DEBUG: xrt_lc_data columns: {xrt_lc_data.columns.tolist()}")
if xrt_lc_data is not None and isinstance(xrt_lc_data, pd.DataFrame) and len(xrt_lc_data) > 0:
# The XRT lightcurve data should have Time, Flux, etc.
# Separate by mode if Mode column exists, otherwise treat as single dataset
if 'Mode' in xrt_lc_data.columns:
for mode_name in xrt_lc_data['Mode'].unique():
mode_df = xrt_lc_data[xrt_lc_data['Mode'] == mode_name]
if len(mode_df) > 0:
mode_norm = pd.DataFrame()
mode_norm['Time'] = mode_df['Time']
mode_norm['TimePos'] = mode_df['TimePos']
mode_norm['TimeNeg'] = mode_df['TimeNeg']
mode_norm['Flux'] = mode_df['Flux']
mode_norm['FluxPos'] = mode_df['FluxPos']
mode_norm['FluxNeg'] = mode_df['FluxNeg']
all_data.append(mode_norm)
logger.info(f'Found {len(mode_norm)} XRT {mode_name} mode flux density points')
else:
# No mode column, treat as single dataset
xrt_norm = pd.DataFrame()
xrt_norm['Time'] = xrt_lc_data['Time']
xrt_norm['TimePos'] = xrt_lc_data['TimePos']
xrt_norm['TimeNeg'] = xrt_lc_data['TimeNeg']
xrt_norm['Flux'] = xrt_lc_data['Flux']
xrt_norm['FluxPos'] = xrt_lc_data['FluxPos']
xrt_norm['FluxNeg'] = xrt_lc_data['FluxNeg']
all_data.append(xrt_norm)
logger.info(f'Found {len(xrt_norm)} XRT flux density points')
# Then, add BAT data from Burst Analyser XRTBand
if 'BAT' in ba_data:
snr_key = self.snr
if snr_key in ba_data['BAT']:
bat_entry = ba_data['BAT'][snr_key]
if isinstance(bat_entry, dict) and 'XRTBand' in bat_entry:
bat_df = bat_entry['XRTBand']
if isinstance(bat_df, pd.DataFrame) and len(bat_df) > 0:
# Filter and normalize BAT data
bat_norm = pd.DataFrame()
bat_norm['Time'] = bat_df['Time']
bat_norm['TimePos'] = bat_df['TimePos']
bat_norm['TimeNeg'] = bat_df['TimeNeg']
bat_norm['Flux'] = bat_df['Flux']
bat_norm['FluxPos'] = bat_df['FluxPos']
bat_norm['FluxNeg'] = bat_df['FluxNeg']
# Remove bad bins and negative flux values
if 'BadBin' in bat_df.columns:
bat_norm = bat_norm[bat_df['BadBin'] == 0]
bat_norm = bat_norm[bat_norm['Flux'] > 0]
all_data.append(bat_norm)
logger.info(f'Using BAT {snr_key} XRTBand data for flux density')
logger.info(f'Processed {len(bat_norm)} BAT flux density points')
if not all_data:
raise ValueError(f"No flux density data found for {self.grb}")
# Combine all data
combined_df = pd.concat(all_data, ignore_index=True)
# Map to expected column names for flux density
column_mapping = {
'Time': 'Time [s]',
'T': 'Time [s]',
'TimePos': 'Pos. time err [s]',
'TimeNeg': 'Neg. time err [s]',
'Flux': 'Flux [mJy]',
'FluxPos': 'Pos. flux err [mJy]',
'FluxNeg': 'Neg. flux err [mJy]'
}
# Rename columns
for old_name, new_name in column_mapping.items():
if old_name in combined_df.columns and new_name not in combined_df.columns:
combined_df.rename(columns={old_name: new_name}, inplace=True)
# Convert flux density from Jy to mJy (Swift API returns Jy)
if 'Flux [mJy]' in combined_df.columns:
combined_df['Flux [mJy]'] = combined_df['Flux [mJy]'] * 1000
combined_df['Pos. flux err [mJy]'] = combined_df['Pos. flux err [mJy]'] * 1000
combined_df['Neg. flux err [mJy]'] = combined_df['Neg. flux err [mJy]'] * 1000
expected_columns = self.FLUX_DENSITY_KEYS
final_df = combined_df[[col for col in expected_columns if col in combined_df.columns]]
else:
raise ValueError(f"Unsupported data mode: {self.data_mode}")
final_df.to_csv(self.processed_file_path, index=False, sep=',')
logger.info(f'Converted Burst Analyser API data to CSV format for {self.grb}')
return final_df