"""
This module contains the mode identification class for PBjam.
The general mode ID strategy is to model the background + l=2,0 components
separately from the l=1 modes, since the latter are often more computationally
difficult.
This provides the inputs for the detailed peakbagging stage which is provided as
a separate module.
Several plotting options are available to display the results, including `echelle`,
`spectrum` and `corner`.
"""
import warnings, os, pickle
import jax.numpy as jnp
import numpy as np
from pbjam.l1models import Asyl1model, Mixl1model, RGBl1model
from pbjam.l20models import Asyl20model
from pbjam.plotting import plotting
from pbjam import IO
import pandas as pd
[docs]
class modeID(plotting, ):
"""
Class for identifying modes in solar-like oscillators.
Parameters
----------
f : array-like
The frequency array of the spectrum.
s : array-like
The values of the power density spectrum.
obs : dict
Dictionary of observational inputs.
addPriors : dict, optional
Additional priors to be added. Default is an empty dictionary.
N_p : int, optional
Number of radial orders to use for mode identification. Default is 7.
freqLimits : list, optional
Frequency limits for mode identification. If None, it is calculated based on 'numax' and 'dnu'.
priorPath : str, optional
Path to prior sample csv. If None, a default path is used.
"""
def __init__(self, f, s, obs, addPriors={}, N_p=7, freqLimits=None, priorPath=None, **kwargs):
self.__dict__.update((k, v) for k, v in locals().items() if k not in ['self'])
self.f = jnp.array(self.f)
self.s = jnp.array(self.s)
self.Nyquist = self.f[-1]
# Set frequency range to compute the model on. Default is one radial order above/below the requested number.
if self.freqLimits is None:
self.freqLimits = [self.obs['numax'][0] - self.obs['dnu'][0]*(self.N_p//2+1),
self.obs['numax'][0] + self.obs['dnu'][0]*(self.N_p//2+1),]
self.sel = (np.array(self.freqLimits).min() < self.f) & (self.f < np.array(self.freqLimits).max())
if self.priorPath is None:
self.priorPath = IO._getPriorPath()
[docs]
def runl20model(self, progress=True, dynamic=False, minSamples=5000, sampler_kwargs={}, logl_kwargs={}, PCAsamples=50, PCAdims=6, **kwargs):
"""
Runs the l20 model on the selected spectrum.
Parameters
----------
progress : bool, optional
Whether to show progress during the model run. Default is True.
dynamic : bool, optional
Whether to use dynamic nested sampling. Default is False (static nested sampling).
minSamples : int, optional
The minimum number of samples to generate. Default is 5000.
logl_kwargs : dict, optional
Additional keyword arguments for the log-likelihood function. Default is an empty dictionary.
PCAsamples : int, optional
Number of samples for PCA. Default is 50.
PCAdims : int, optional
Number of dimensions for PCA. Default is 6.
Returns
-------
result : dict
Parsed results from the l20 model.
"""
f = self.f[self.sel]
s = self.s[self.sel]
self.l20model = Asyl20model(f, s,
self.obs,
self.addPriors,
self.N_p,
PCAsamples,
PCAdims,
priorPath=self.priorPath)
self.l20Samples = self.l20model.runSampler(progress=progress,
dynamic=dynamic,
minSamples=minSamples,
logl_kwargs=logl_kwargs,
sampler_kwargs=sampler_kwargs)
l20samples_u = self.l20model.unpackSamples(self.l20Samples)
self.l20result = self.l20model.parseSamples(l20samples_u)
self.result = self.mergeResults(l20result=self.l20result)
return self.l20result
[docs]
def runl1model(self, progress=True, dynamic=False, minSamples=5000, sampler_kwargs={}, logl_kwargs={}, model='auto', PCAsamples=500, PCAdims=7, **kwargs):
"""
Runs the l1 model on the selected spectrum.
Should follow the l20 model run.
Parameters
----------
progress : bool, optional
Whether to show progress during the model run. Default is True.
dynamic : bool, optional
Whether to use dynamic nested sampling. Default is False (static nested sampling).
minSamples : int, optional
The minimum number of samples to generate. Default is 5000.
sampler_kwargs : dict, optional
Additional keyword arguments for the sampler. Default is an empty dictionary.
logl_kwargs : dict, optional
Additional keyword arguments for the log-likelihood function. Default is an empty dictionary.
model : str
Choice of which model to use for estimating the l=1 mode locations. Choices are MS, SG, RGB models.
PCAsamples : int, optional
Number of samples for PCA. Default is 100.
PCAdims : int, optional
Number of dimensions for PCA. Default is 5.
Returns
-------
result : dict
Parsed results from the l1 model.
"""
# Compute the l=2,0 model residual.
self.l20residual = self.s[self.sel] / self.l20model.getMedianModel()
f = self.f[self.sel]
s = self.l20residual
summary = {'n_p': self.l20result['enn'][self.l20result['ell']==0],
'nu0_p': self.l20result['summary']['freq'][0, self.l20result['ell']==0]}
for key in ['numax', 'dnu', 'env_height', 'env_width', 'mode_width', 'teff', 'bp_rp']:
summary[key] = self.l20result['summary'][key]
if model.lower() =='auto':
model = self.selectModel()
print(f'Input Teff={self.obs["teff"][0]}K and dnu={self.obs["dnu"][0]}muHz suggests the appropriate l=1 model is: {model}')
if model.lower() == 'ms':
self.l1model = Asyl1model(f, s,
summary,
self.addPriors,
PCAsamples,
priorPath=self.priorPath)
elif model.lower() == 'sg':
self.l1model = Mixl1model(f, s,
summary,
self.addPriors,
PCAsamples,
PCAdims,
priorPath=self.priorPath)
elif model.lower() == 'rgb':
self.l1model = RGBl1model(f, s,
summary,
self.addPriors,
PCAsamples,
rootiter=15,
priorPath=self.priorPath,
modelChoice='simple')
else:
raise ValueError(f'Model {model} is invalid. Please use either MS, SG or RGB.')
self.l1Samples = self.l1model.runSampler(progress=progress,
dynamic=dynamic,
minSamples=minSamples,
logl_kwargs=logl_kwargs,
sampler_kwargs=sampler_kwargs)
l1SamplesU = self.l1model.unpackSamples(self.l1Samples)
self.l1result = self.l1model.parseSamples(l1SamplesU)
self.result = self.mergeResults(l20result=self.l20result, l1result=self.l1result)
return self.l1result
[docs]
def __call__(self, model='auto', progress=True, dynamic=False, sampler_kwargs={}, logl_kwargs={}, **kwargs):
"""
Run both the l20 and l1 models.
The results are stored in the modeID.result dictionary after each step is
completed.
Parameters
----------
progress : bool, optional
Whether to show progress during the model run. Default is True.
sampler_kwargs : dict, optional
Additional keyword arguments for the sampler. Default is an empty dictionary.
logl_kwargs : dict, optional
Additional keyword arguments for the log-likelihood function. Default is an empty dictionary.
"""
self.runl20model(progress, dynamic, sampler_kwargs=sampler_kwargs, logl_kwargs=logl_kwargs)
self.runl1model(progress, dynamic, model=model, sampler_kwargs=sampler_kwargs, logl_kwargs=logl_kwargs)
[docs]
def mergeResults(self, l20result=None, l1result=None, N=5000):
"""
Merges results from l20 and l1 models into a single result dictionary.
Attempts to include N samples from both models, but will use the lowest common
value in case one of the models returns less than N samples.
Note that if l20 and l1 share any parameters (like numax and dnu) the results
from the l20 model are used since they are probably a bit more reliable.
Parameters
----------
l20result : dict, optional
The result dictionary from the l20 model.
l1result : dict, optional
The result dictionary from the L1 model.
N : int, optional
The number of samples to include in the merged results. Default is 5000.
Returns
-------
R : dict
A dictionary containing merged results from the l20 and l1 models.
"""
# Initialize an empty result dictionary
R = {'ell': np.array([]),
'enn': np.array([]),
'emm': np.array([]),
'zeta': np.array([]),
'summary': {'freq' : np.array([]).reshape((2, 0)),
'height': np.array([]).reshape((2, 0)),
'width' : np.array([]).reshape((2, 0)),
'rotAsym': np.array([]).reshape((2, 0)),
},
'samples': {'freq' : np.array([]).reshape((N, 0)),
'height': np.array([]).reshape((N, 0)),
'width' : np.array([]).reshape((N, 0)),
'rotAsym' : np.array([]).reshape((N, 0))
},
}
# Use self.l20result if l20result is not provided
if l20result is None and hasattr(self, 'l20result'):
l20result = self.l20result
_N = np.append(N, l20result['samples']['freq'].shape[0])
# Use self.l1result if l1result is not provided
if l1result is None and hasattr(self, 'l1model'):
l1result = self.l1result
if l1result is not None:
_N = np.append(_N, l1result['samples']['freq'].shape[0])
# Use the minimum number of samples.
N = np.min(_N)
# This order overrides the numax, dnu and teff from l1result in the output since the l20result is more reliable.
resList = [l1result, l20result]
# Merge top level dictionary keys.
for rootkey in ['ell', 'enn', 'emm', 'zeta']:
for D in resList:
if D is not None:
R[rootkey] = np.append(R[rootkey], D[rootkey])
# Merge summary and samples level keys.
for rootkey in ['summary', 'samples']:
for D in resList:
if D is not None:
for subkey in list(D[rootkey].keys()):
if subkey not in ['freq', 'height', 'width', 'rotAsym']:
R[rootkey][subkey] = D[rootkey][subkey]
for subkey in ['freq', 'height', 'width', 'rotAsym']:
R[rootkey][subkey] = np.hstack((R[rootkey][subkey][:N, :],
D[rootkey][subkey][:N, :]))
else:
continue
return R
[docs]
def storeResult(self, resultDict, path=None, ID=None):
"""
Stores the results in a specified directory with identifier. The results are stored as a
pickled Python dictionary file and a CSV file. The pickle file contains all results,
while the CSV file contains only the summary model parameters.
Parameters
----------
resultDict : dict
The dictionary containing the results to be stored.
path : str, optional
The directory path where the results should be stored. If None, it defaults to the current directory.
ID : str, optional
A unique identifier for the stored results. If None, a random identifier is generated.
"""
# If no path is specified use cwd.
if path is None:
path = os.getcwd()
else:
path = str(path)
# Make the dir path if it doesn't exist
if not os.path.exists(os.path.dirname(path)):
os.makedirs(path)
# Assign random number identifier to the save
if ID is not None:
_ID = str(ID)
else:
_ID = f'unknown_tgt_{np.random.randint(0, 1e10)}'
warnings.warn(f'Output stored under {_ID}. You should probably specify a target ID.')
basefilename = os.path.join(*[path, f'{_ID}_modeIDresult'])
# Store everything in pickled dict
pickle.dump(resultDict, open(basefilename+'.pkl', 'wb'))
# Grab just model parameters and save
_tmp = {key: self.result['summary'][key] for key in self.result['summary'].keys() if key not in ['freq', 'height', 'width']}
df_data = [{'name': key, 'mean': value[0], 'error': value[1]} for key, value in _tmp.items()]
# Create a DataFrame
df = pd.DataFrame(df_data)
df.to_csv(basefilename+'.csv', index=False)
[docs]
def selectModel(self, ):
"""
Select the appropriate stellar model based on observed properties.
The method classifies the star as either 'ms' (main sequence), 'sg' (subgiant), or 'rgb' (red giant branch)
using thresholds based on the large frequency separation (`dnu`) and effective temperature (`Teff`).
Returns
-------
model : str
The selected model as a string: 'ms', 'sg', or 'rgb'.
Notes
-----
The classification uses linear relations:
- Main sequence (MS): `dnu > -0.016 * Teff + 157`
- Subgiant (SG): `dnu > -0.010 * Teff + 74`
- Red giant branch (RGB): Default if neither condition is satisfied.
Examples
--------
>>> obj.obs = {'dnu': 100, 'Teff': 5800}
>>> obj.selectModel()
'sg'
>>> obj.obs = {'dnu': 120, 'Teff': 6000}
>>> obj.selectModel()
'ms'
>>> obj.obs = {'dnu': 50, 'Teff': 5000}
>>> obj.selectModel()
'rgb'
"""
MSclassify = lambda Teff: -0.016 * Teff + 157
SGclassify = lambda Teff: -0.010 * Teff + 74
if self.obs['dnu'][0] > MSclassify(self.obs['teff'][0]):
model = 'ms'
elif self.obs['dnu'][0] > SGclassify(self.obs['teff'][0]):
model = 'sg'
else:
model = 'rgb'
return model