"""
The `star' class is the core of PBjam and refers to a single target that is to
be peakbagged. Each `star' instance is assigned an ID and physical input
parameters, as well as a time series or power spectrum.
The different steps in the peakbagging process are then passed the `star'
instance, updating it with the results of each step. The outputs of each step
are stored in a dedicated directory created with the star ID.
The `session' class wraps one or more star class instances and peakbags them all
sequentially. The recommended use of PBjam is the use the `session' class, and
only use the `star' class for more granular control of the peakbagging process.
"""
import os, warnings, re, time
from .asy_peakbag import asymptotic_fit
from .priors import kde
from .peakbag import peakbag
from .jar import get_priorpath, to_log10, references, isvalid
from .plotting import plotting
import pandas as pd
import numpy as np
from astroquery.mast import ObservationsClass as AsqMastObsCl
from astroquery.mast import Catalogs
from astroquery.simbad import Simbad
import astropy.units as units
[docs]class star(plotting):
""" Class for each star to be peakbagged
Additional attributes are added for each step of the peakbagging process
Note spectrum is flattened (background divided out.)
Examples
--------
Peakbag using the star class. Note that the star class only takes Lightkurve
periodograms, pg, as spectrum input.
>>> st = pbjam.star(ID='KIC4448777', pg=pg, numax=[220.0, 3.0],
dnu=[16.97, 0.01], teff=[4750, 100],
bp_rp = [1.34, 0.01])
>>> st(make_plots=True)
Parameters
----------
ID : string, int
Target identifier. If custom timeseries/periodogram is provided, it
must be resolvable by LightKurve (KIC, TIC, EPIC, HD, etc.).
pg : lightkurve.periodogram.Periodogram object
A lightkurve periodogram object containing frequencies in units of
microhertz and power (in arbitrary units).
numax : list
List of the form [numax, numax_error].
dnu : list
List of the form [dnu, dnu_error].
teff : list
List of the form [teff, teff_error].
bp_rp : list
List of the form [bp_rp, bp_rp_error].
path : str, optional
The path at which to store output. If no path is set but make_plots is
True, output will be saved in the current working directory. Default is
the current working directory.
prior_file : str, optional
Path to the csv file containing the prior data. Default is
pbjam/data/prior_data.csv
Attributes
----------
f : array
Array of power spectrum frequencies
s : array
power spectrum
"""
def __init__(self, ID, pg, numax, dnu, teff=[None,None], bp_rp=[None,None],
path=None, prior_file=None):
self.ID = ID
if numax[0] < 25:
warnings.warn('The input numax is less than 25. The prior is not well defined here, so be careful with the result.')
self.numax = numax
self.dnu = dnu
self.references = references()
self.references._addRef(['numpy', 'python', 'lightkurve', 'astropy'])
teff, bp_rp = self._checkTeffBpRp(teff, bp_rp)
self.teff = teff
self.bp_rp = bp_rp
self.pg = pg.flatten() # in case user supplies unormalized spectrum
self.f = self.pg.frequency.value
self.s = self.pg.power.value
self._obs = {'dnu': self.dnu, 'numax': self.numax, 'teff': self.teff,
'bp_rp': self.bp_rp}
self._log_obs = {x: to_log10(*self._obs[x]) for x in self._obs.keys() if x != 'bp_rp'}
self._set_outpath(path)
if prior_file is None:
self.prior_file = get_priorpath()
else:
self.prior_file = prior_file
def _checkTeffBpRp(self, teff, bp_rp):
""" Set the Teff and/or bp_rp values
Checks the input Teff and Gbp-Grp values to see if any are missing.
If Gbp-Grp is missing it will be looked up online either from the TIC or
the Gaia archive.
Teff and Gbp-Grp provide a lot of the same information, so only one of
them need to be provided to start with. If one is not provided, PBjam
will assume a wide prior on it.
Parameters
----------
teff : list
List of the form [teff, teff_error]. For multiple targets, use a list
of lists.
bp_rp : list
List of the form [bp_rp, bp_rp_error]. For multiple targets, use a list
of lists.
Returns
-------
teff : list
The checked teff value. List of the form [teff, teff_error].
bp_rp : list
The checked bp_rp value. List of the form [bp_rp, bp_rp_error].
"""
if isvalid(bp_rp[0]) is False:
try:
bp_rp = [get_bp_rp(self.ID), 0.1]
except:
bp_rp = [np.nan, np.nan]
if not isvalid(teff[0]) and not isvalid(bp_rp[0]):
raise ValueError('Must provide either teff or bp_rp arguments when initializing the star class.')
elif not isvalid(teff[0]):
teff = [4889, 1500] # these are rough esimates from the prior
elif not np.isfinite(bp_rp[0]):
bp_rp = [1.2927, 0.5] # these are rough esimates from the prior
self.references._addRef(['Evans2018'])
return teff, bp_rp
def _get_outpath(self, fname):
""" Get basepath or make full file path name.
Convenience function for either setting the base path name for the star,
or if given fname as input, will append this to the basepath name to
create a full path to the file in question.
Parameters
----------
fname : str, optional
If not None, will append this to the pathname of the star. Use this
to store files such as plots or tables.
Returns
-------
path : str
If fname is None, path is the path name of the star. Otherwise it is
the full file path name for the file in question.
"""
if fname is None:
return self.path
elif isinstance(fname, str):
path = os.path.join(*[self.path, fname])
else:
raise ValueError(f'Unrecognized input {fname}.')
if not os.path.isdir(self.path):
raise IOError(f'You are trying to access {self.path} which is a directory that does not exist.')
else:
return path
def _set_outpath(self, path):
""" Sets the path attribute for star
If path is a string it is assumed to be a path name, if not the
current working directory will be used.
Attempts to create an output directory for all the results that PBjam
produces. A directory is created when a star class instance is
initialized, so a session might create multiple directories.
Parameters
----------
path : str
Directory to place the star subdirectory.
"""
if isinstance(path, str):
# If path is str, presume user wants to put stuff somewhere specific.
self.path = os.path.join(*[path, f'{self.ID}'])
else:
# Otherwise just create a subdir in cwd.
self.path = os.path.join(*[os.getcwd(), f'{self.ID}'])
# Check if self.path exists, if not try to create it
if not os.path.isdir(self.path):
try:
os.makedirs(self.path)
except Exception as ex:
message = "Could not create directory for Star {0} because an exception of type {1} occurred. Arguments:\n{2!r}".format(self.ID, type(ex).__name__, ex.args)
print(message)
[docs] def run_kde(self, bw_fac=1.0, make_plots=False, store_chains=False):
""" Run all steps involving KDE.
Starts by creating a KDE based on the prior data sample. Then samples
this KDE for initial starting positions for asy_peakbag.
Parameters
----------
bw_fac : float
Scaling factor for the KDE bandwidth. By default the bandwidth is
automatically set, but may be scaled to adjust for sparsity of the
prior sample.
make_plots : bool, optional
Whether or not to produce plots of the results. Default is False.
store_chains : bool, optional
Whether or not to store posterior samples on disk. Default is False.
"""
print('Starting KDE estimation')
# Init
kde(self)
# Call
self.kde(dnu=self.dnu, numax=self.numax, teff=self.teff,
bp_rp=self.bp_rp, bw_fac=bw_fac)
# Store
if make_plots:
self.kde.plot_corner(path=self.path, ID=self.ID,
savefig=make_plots)
self.kde.plot_spectrum(pg=self.pg, path=self.path, ID=self.ID,
savefig=make_plots)
self.kde.plot_echelle(path=self.path, ID=self.ID,
savefig=make_plots)
self.references._addRef('matplotlib')
if store_chains:
kde_samps = pd.DataFrame(self.kde.samples, columns=self.kde.par_names)
kde_samps.to_csv(self._get_outpath(f'kde_chains_{self.ID}.csv'), index=False)
[docs] def run_asy_peakbag(self, norders, make_plots=False,
store_chains=False, method='emcee',
developer_mode=False):
""" Run all steps involving asy_peakbag.
Performs a fit of the asymptotic relation to the spectrum (l=2,0 only),
using initial guesses and prior for the fit parameters from KDE.
Parameters
----------
norders : int
Number of orders to include in the fits.
make_plots : bool, optional
Whether or not to produce plots of the results. Default is False.
store_chains : bool, optional
Whether or not to store posterior samples on disk. Default is False.
method : string
Method to be used for sampling the posterior. Options are 'emcee' or
'cpnest. Default method is 'emcee' that will call emcee, alternative
is 'cpnest' to call nested sampling with CPnest.
developer_mode : bool
Run asy_peakbag in developer mode. Currently just retains the input
value of dnu and numax as priors, for the purposes of expanding
the prior sample. Important: This is not good practice for getting
science results!
"""
print('Starting asymptotic peakbagging')
# Init
asymptotic_fit(self, norders=norders)
# Call
self.asy_fit(method, developer_mode)
self.references._addRef(method)
# Store
self.asy_fit.summary.to_csv(self._get_outpath(f'asymptotic_fit_summary_{self.ID}.csv'),
index=True, index_label='name')
self.asy_fit.modeID.to_csv(self._get_outpath(f'asymptotic_fit_modeID_{self.ID}.csv'),
index=False)
if make_plots:
self.asy_fit.plot_spectrum(path=self.path, ID=self.ID,
savefig=make_plots)
self.asy_fit.plot_corner(path=self.path, ID=self.ID,
savefig=make_plots)
self.asy_fit.plot_echelle(path=self.path, ID=self.ID,
savefig=make_plots)
self.references._addRef('matplotlib')
if store_chains:
asy_samps = pd.DataFrame(self.asy_fit.samples, columns=self.asy_fit.par_names)
asy_samps.to_csv(self._get_outpath(f'asymptotic_fit_chains_{self.ID}.csv'), index=False)
[docs] def run_peakbag(self, model_type='simple', tune=1500, nthreads=1,
make_plots=False, store_chains=False):
""" Run all steps involving peakbag.
Performs fit using simple Lorentzian profile pairs to subsections of
the power spectrum, based on results from asy_peakbag.
Parameters
----------
model_type : str
Can be either 'simple' or 'model_gp' which sets the type of mode
width model. Defaults is 'simple'.
tune : int, optional
Numer of tuning steps passed to pm.sample. Default is 1500.
nthreads : int, optional.
Number of processes to spin up in pymc3. Default is 1.
make_plots : bool, optional.
Whether or not to produce plots of the results. Default is False.
store_chains : bool, optional
Whether or not to store posterior samples on disk. Default is False.
"""
print('Starting peakbagging')
# Init
peakbag(self)
# Call
self.peakbag(model_type=model_type, tune=tune, nthreads=nthreads)
# Store
self.peakbag.summary.to_csv(self._get_outpath(f'peakbag_summary_{self.ID}.csv'),
index_label='name')
if make_plots:
self.peakbag.plot_spectrum(path=self.path, ID=self.ID,
savefig=make_plots)
self.peakbag.plot_echelle(path=self.path, ID=self.ID,
savefig=make_plots)
self.references._addRef('matplotlib')
if store_chains:
peakbag_samps = pd.DataFrame(self.peakbag.samples, columns=self.peakbag.par_names)
peakbag_samps.to_csv(self._get_outpath(f'peakbag_chains_{self.ID}.csv'), index=False)
def __call__(self, bw_fac=1.0, norders=8, model_type='simple', tune=1500,
nthreads=1, make_plots=True, store_chains=False,
asy_sampling='emcee', developer_mode=False):
""" Perform all the PBjam steps
Starts by running KDE, followed by Asy_peakbag and then finally peakbag.
Parameters
----------
bw_fac : float, optional.
Scaling factor for the KDE bandwidth. By default the bandwidth is
automatically set, but may be scaled to adjust for sparsity of the
prior sample. Default is 1.
norders : int, optional.
Number of orders to include in the fits. Default is 8.
model_type : str, optional.
Can be either 'simple' or 'model_gp' which sets the type of mode
width model. Defaults is 'simple'.
tune : int, optional
Numer of tuning steps passed to pm.sample. Default is 1500.
nthreads : int, optional.
Number of processes to spin up in pymc3. Default is 1.
make_plots : bool, optional.
Whether or not to produce plots of the results. Default is False.
store_chains : bool, optional
Whether or not to store posterior samples on disk. Default is False.
asy_sampling : string
Method to be used for sampling the posterior in asy_peakbag. Options
are 'emcee' or 'cpnest. Default method is 'emcee' that will call
emcee, alternative is 'cpnest' to call nested sampling with CPnest.
developer_mode : bool
Run asy_peakbag in developer mode. Currently just retains the input
value of dnu and numax as priors, for the purposes of expanding
the prior sample. Important: This is not good practice for getting
science results!
"""
self.run_kde(bw_fac=bw_fac, make_plots=make_plots, store_chains=store_chains)
self.run_asy_peakbag(norders=norders, make_plots=make_plots,
store_chains=store_chains, method=asy_sampling,
developer_mode=developer_mode)
self.run_peakbag(model_type=model_type, tune=tune, nthreads=nthreads,
make_plots=make_plots, store_chains=store_chains)
self.references._addRef('pandas')
def _querySimbad(ID):
""" Query any ID at Simbad for Gaia DR2 source ID.
Looks up the target ID on Simbad to check if it has a Gaia DR2 ID.
The input ID can be any commonly used identifier, such as a Bayer
designation, HD number or KIC.
Returns None if there is not a valid cross-match with GDR2 on Simbad.
Notes
-----
TIC numbers are note currently listed on Simbad. Do a separate MAST quiry
for this.
Parameters
----------
ID : str
Target identifier. If Simbad can resolve the name then it should work.
Returns
-------
gaiaID : str
Gaia DR2 source ID. Returns None if no Gaia ID is found.
"""
print('Querying Simbad for Gaia ID')
try:
job = Simbad.query_objectids(ID)
except:
print(f'Unable to resolve {ID} with Simbad')
return None
for line in job['ID']:
if 'Gaia DR2' in line:
return line.replace('Gaia DR2 ', '')
return None
def _queryTIC(ID, radius = 20):
""" Query TIC for bp-rp value
Queries the TIC at MAST to search for a target ID to return bp-rp value. The
TIC is already cross-matched with the Gaia catalog, so it contains a bp-rp
value for many targets (not all though).
For some reason it does a cone search, which may return more than one
target. In which case the target matching the ID is found in the returned
list.
Returns None if the target does not have a GDR3 ID.
Parameters
----------
ID : str
The TIC identifier to search for.
radius : float, optional
Radius in arcseconds to use for the sky cone search. Default is 20".
Returns
-------
bp_rp : float
Gaia bp-rp value from the TIC.
"""
print('Querying TIC for Gaia bp-rp values.')
job = Catalogs.query_object(objectname=ID, catalog='TIC', objType='STAR',
radius = radius*units.arcsec)
if len(job) > 0:
idx = job['ID'] == str(ID.replace('TIC','').replace(' ', ''))
return float(job['gaiabp'][idx] - job['gaiarp'][idx]) #This should crash if len(result) > 1.
else:
return None
def _queryMAST(ID):
""" Query any ID at MAST
Sends a query for a target ID to MAST which returns an Astropy Skycoords
object with the target coordinates.
ID can be any commonly used identifier such as a Bayer designation, HD, KIC,
2MASS or other name.
Parameters
----------
ID : str
Target identifier
Returns
-------
job : astropy.Skycoords
An Astropy Skycoords object with the target coordinates.
"""
print(f'Querying MAST for the {ID} coordinates.')
mastobs = AsqMastObsCl()
try:
return mastobs.resolve_object(objectname=ID)
except:
return None
def _queryGaia(ID=None, coords=None, radius=2):
""" Query Gaia archive for bp-rp
Sends an ADQL query to the Gaia archive to look up a requested target ID or
set of coordinates.
If the query is based on coordinates a cone search will be performed and the
closest target is returned. Provided coordinates must be astropy.Skycoords.
Parameters
----------
ID : str
Gaia source ID to search for.
coord : astropy.Skycoords
An Astropy Skycoords object with the target coordinates. Must only
contain one target.
radius : float, optional
Radius in arcseconds to use for the sky cone search. Default is 20".
Returns
-------
bp_rp : float
Gaia bp-rp value of the requested target from the Gaia archive.
"""
from astroquery.gaia import Gaia
if ID is not None:
print('Querying Gaia archive for bp-rp values by target ID.')
adql_query = "select * from gaiadr2.gaia_source where source_id=%s" % (ID)
try:
job = Gaia.launch_job(adql_query).get_results()
except:
return None
return float(job['bp_rp'][0])
elif coords is not None:
print('Querying Gaia archive for bp-rp values by target coordinates.')
ra = coords.ra.value
dec = coords.dec.value
adql_query = f"SELECT DISTANCE(POINT('ICRS', {ra}, {dec}), POINT('ICRS', ra, dec)) AS dist, * FROM gaiaedr3.gaia_source WHERE 1=CONTAINS( POINT('ICRS', {ra}, {dec}), CIRCLE('ICRS', ra, dec,{radius})) ORDER BY dist ASC"
try:
job = Gaia.launch_job(adql_query).get_results()
except:
return None
return float(job['bp_rp'][0])
else:
raise ValueError('No ID or coordinates provided when querying the Gaia archive.')
def _format_name(ID):
""" Format input ID
Users tend to be inconsistent in naming targets, which is an issue for
looking stuff up on, e.g., Simbad. This function formats the ID so that
Simbad doesn't throw a fit.
If the name doesn't look like anything in the variant list it will only be
changed to a lower-case string.
Parameters
----------
ID : str
Name to be formatted.
Returns
-------
ID : str
Formatted name
"""
ID = str(ID)
ID = ID.lower()
# Add naming exceptions here
variants = {'KIC': ['kic', 'kplr', 'KIC'],
'Gaia EDR3': ['gaia edr3', 'gedr3', 'edr3', 'Gaia EDR3'],
'Gaia DR2': ['gaia dr2', 'gdr2', 'dr2', 'Gaia DR2'],
'Gaia DR1': ['gaia dr1', 'gdr1', 'dr1', 'Gaia DR1'],
'EPIC': ['epic', 'ktwo', 'EPIC'],
'TIC': ['tic', 'tess', 'TIC']
}
fname = None
for key in variants:
for x in variants[key]:
if x in ID:
fname = ID.replace(x,'')
fname = re.sub(r"\s+", "", fname, flags=re.UNICODE)
fname = key+' '+str(int(fname))
return fname
return ID
[docs]def get_bp_rp(ID):
""" Search online for bp_rp values based on ID.
First a check is made to see if the target is a TIC number, in which case
the TIC will be queried, since this is already cross-matched with Gaia DR2.
If it is not a TIC number, Simbad is queried to identify a possible Gaia
source ID.
As a last resort MAST is queried to provide the target coordinates, after
which a Gaia query is launched to find the closest target. The default
search radius is 20" around the provided coordinates.
Parameters
----------
ID : str
Target identifier to search for.
Returns
-------
bp_rp : float
Gaia bp-rp value for the target. Is nan if no result is found or the
queries failed.
"""
time.sleep(1) # Sleep timer to prevent temporary blacklisting by CDS servers.
ID = _format_name(ID)
if 'TIC' in ID:
bp_rp = _queryTIC(ID)
else:
try:
gaiaID = _querySimbad(ID)
bp_rp = _queryGaia(ID=gaiaID)
except:
try:
coords = _queryMAST(ID)
bp_rp = _queryGaia(coords=coords)
except:
print(f'Unable to retrieve a bp_rp value for {ID}.')
bp_rp = np.nan
return bp_rp