Source code for pbjam.peakbagging

"""
The peakbagging module contains the classes used for the peakbag stage of PBjam. The 
main class to interact with is the peakbag class, which will handle potential slicing
of the spectrum or not and combine results for each case.
"""

from functools import partial
import jax, emcee, warnings, time
import jax.numpy as jnp
from pbjam.plotting import plotting 
import pbjam.distributions as dist
from pbjam import jar, samplers
import numpy as np
import statsmodels.api as sm
from tqdm import tqdm
import scipy.stats as st

[docs] class peakbag(plotting): """ A class for handling the peakbagging run(s). The spectrum can be divided into a desired number of slices which can be peakbagged independently, which tends to be faster than fitting the whole spectrum at once. The default setting is to divide the spectrum into roughly the number of radial orders provided in the input frequency list. However, checks are performed to make sure closely spaced modes aren't split so any potential correlation is correctly accounted for. The number of slices may therefore be less than the requested if it's not possible to separate the modes. The slicing is done by a K-means clustering algorithm, where clusters are then merged if they split up for example an l=2,0 mode pair. Parameters ---------- f : array-like The frequency array of the spectrum. s : array-like The values of the power density spectrum. ell : array-like Angular degree of the modes. freq : array-like Frequencies of the modes corresponding to the angular degrees. height : array-like, optional Heights of the modes corresponding to the angular degrees. Default is None, in which case an SNR of 1 is assumed. Providing betters estimates from modeID is however strongly recommended. width : array-like, optional Widths of the modes corresponding to the angular degrees. Default is None, in which case an width of 0.1 is assumed. Providing betters estimates from modeID is however strongly recommended. zeta : array-like, optional Mixed-mode coupling factors of the modes corresponding to the angular degrees. Default is None, in which case 0 (no mixing) is assumed for all modes. dnu : float, optional Large frequency separation. Default is None, in which case it's estimated from the provided `ell` and `freq` arrays. d02 : float, optional Small frequency separation between l=0 and l=2 modes. Default is None, in which case it's estimated from the provided `ell` and `freq` arrays. freqLimits : list, optional Frequency limits for the peakbagging. Default is an empty list, in which case the lowest and highest radial mode frequencies -/+ 1 radial order are used to set the limits. rotAsym : array-like, optional Rotational asymmetry parameters. Default is None. RV : array-like, optional Radial velocity and associated error of the star in km/s. Default is None. slices : int, optional Number of slices for mode fitting. Default is -1, in which case the number of radial orders determined from `freq` is used. snrInput : bool, optional Flag indicating if the input is signal-to-noise ratio (SNR) spectrum. Default is False. **kwargs : dict Additional keyword arguments. Attributes ---------- N_p : int Number of radial (l=0) modes. Nmodes : int Total number of modes. """ def __init__(self, f, s, ell, freq, height=None, width=None, zeta=None, dnu=None, d02=None, freqLimits=[], rotAsym=None, RV=None, slice=False, dipoleMask=None, snrInput=False, samplerType='emcee', **kwargs): self.__dict__.update((k, v) for k, v in locals().items() if k not in ['self']) # Assign some default parameters if none are given. self._checkDefaults() self.width[0, :] = self.width[0, :] / (1-self.zeta) # If is not the SNR spectrum, remove the packground first and turn the input heights into SNR. if not self.snrInput: self.bkg = self.getBkg() self.snr = self.s / self.bkg(self.f) self.height[0, :] = self.height[0, :] / self.bkg(self.freq[0, :]) else: self.snr = self.s # Pick sampler if self.samplerType.lower()=='emcee': print('Using emcee to sample.') sampler = EmceePeakbag elif self.samplerType.lower()=='dynesty': print('Using Dynesty to sample.') sampler = DynestyPeakbag else: raise ValueError('Sampler choice must be emcee or dynesty') self.spectrumMask = self._buildSpectrumMask() # Select modes to include based on freqLimits self._pickModes() self.N_p = len(self.ell[self.ell==0]) self.Nmodes = len(self.freq[0, :]) # Create list of peakbag instances. self.createPeakbagInstances(sampler) def _checkDefaults(self): """ Checks and sets default values for attributes if they are None. Just to tidy up the init function. """ self.dnu = self._setDnu() self.d02 = self._setd02() if self.height is None: self.height = np.ones_like(self.freq) if self.width is None: self.width = 0.1 * np.ones_like(self.freq) if self.zeta is None: self.zeta = np.zeros(self.freq.shape[-1]) if self.rotAsym is None: self.rotAsym = np.zeros_like(self.freq) if self.RV is None: self.RV = (0, 0) def _pickModes(self,): """ Selects the modes to include in the model based on frequency limits. Checks if `freqLimits` is empty. If so, it sets default frequency limits based on the minimum and maximum frequencies of the l=0 modes +/- fac*dnu. The method updates the attributes `ell`, `freq`, `height`, `width`, `rotAsym`, and `zeta` to include only the selected modes. Parameters ---------- fac : float, optional A factor used to adjust the frequency limits. Default is 1. Returns ------- idx : array-like A boolean array indicating which modes are selected based on the frequency limits. """ globalMinFreq = min([min(x) for x in self.freqLimits]) globalMaxFreq = max([max(x) for x in self.freqLimits]) idx = (globalMinFreq< self.freq[0, :]) & (self.freq[0,:] < globalMaxFreq) self.ell = self.ell[idx] self.freq = self.freq[:, idx] self.height = self.height[:, idx] self.width = self.width[:, idx] self.rotAsym = self.rotAsym[:, idx] self.zeta = self.zeta[idx] return idx def _setd02(self): """ Set d02 if not provided as input. This is used to define a wide PDF which is used to penalize l=0 and l=2 frequencies that drift away from each other too much, and also ensure they are unlikely to be negative. A precise values is not strictly necessary. Returns ------- d02 : array-like Estimate of the l=2,0 small separation. """ if self.d02 is None and ((2 in self.ell) and (0 in self.ell)): try: d02 = np.array([np.median(self.freq[0, self.ell==0] - self.freq[0, self.ell==2]), jnp.nan]) except: warnings.warn("Estimating d02 as 0.1*dnu") d02 = np.array([0.1 * self.dnu[0], jnp.nan]) elif isinstance(self.d02, (float, int)): d02 = np.array([self.d02, jnp.nan]) else: d02 = np.array(self.d02) assert (d02.dtype==float) or (d02.dtype==int) return d02 def _setDnu(self): """ Set dnu if not provided as input. This is used in defining the slicing so a precise values is not critical. Returns ------- dnu : array-like Estimated large separation of the modes. """ if self.dnu is None: if 0 in self.ell: ref_l = 0 elif 2 in self.ell: ref_l = 2 elif 1 in self.ell: ref_l = 1 else: raise ValueError('ells must contain either l=0,1, or 2.') dnu = np.array([jnp.median(jnp.diff(self.freq[0, self.ell==ref_l])), jnp.nan]) elif isinstance(self.dnu, (float, int)): dnu = np.array([self.dnu, jnp.nan]) else: dnu = np.array(self.dnu) assert (dnu.dtype==float) or (dnu.dtype==int) return dnu
[docs] def getBkg(self, a=0.66, b=0.88, skips=100): """ Estimate the background Takes an average of the power at linearly spaced points along the log(frequency) axis, where the width of the averaging window increases as a power law. The mean power values are interpolated back onto the full linear frequency axis to estimate the background noise level at all frequencies. Returns ------- b : array Array of psd values approximating the background. """ freq_skips = np.exp(np.linspace(np.log(self.f[0]), np.log(self.f[-1]), skips)) m = np.array([np.median(self.s[np.abs(self.f-fi) < a*fi**b]) for fi in freq_skips]) bkgModel = jar.jaxInterp1D(freq_skips, m/np.log(2)) return bkgModel
def _buildSpectrumMask(self, fac=1): if len(self.freqLimits) == 0: self.freqLimits = [(min(self.freq[0, self.ell==0]) - fac * self.dnu[0], max(self.freq[0, self.ell==0]) + fac * self.dnu[0])] idx = np.zeros(len(self.snr), dtype=bool) # If no input (True or False) is given regarding applying the dipole masking, default to automatic if self.dipoleMask is None: if not (1 in self.ell): self.dipoleMask = True else: self.dipoleMask = False typicalWidth = np.median(self.width[0, self.ell==0]) if self.dipoleMask: for llim, ulim in self.freqLimits: for nu in self.freq[0, self.ell != 1]: delta = self.d02[0] + 10*typicalWidth + 0.01 * self.dnu[0] #pair_llim = nu - 0.16 * self.dnu[0] #pair_ulim = nu + 0.06 * self.dnu[0] insideFreqLims = (llim <= nu - delta <= ulim) & (llim <= nu + delta <= ulim) if insideFreqLims: idx += (nu - delta <= self.f) & (self.f <= nu + delta) else: raise ValueError('The range around a mode is outside the specified frequency range. Try increasing the range by supplying a wider range using freqLimits.') else: for llim, ulim in self.freqLimits: idx += (llim <= self.f) & (self.f <= ulim) return idx def _checkSmallDiffs(self, cuts, nu, Gamma): """ Check for any slices that split closely spaced modes defined in termes of a provided width `Gamma`. The frequency differences simply have to be larger than the given width. Parameters ---------- cuts : array-like Frequencies at which slices are made. nu : array-like Mode frequencies. Gamma : float The width criterion. Returns ------- goodCutIdx : bool Array of boolean values corresponding to the valid cuts. """ distances = np.abs(nu[:, np.newaxis] - cuts) diffs = np.array([distances[m, i] for i, m in enumerate(np.argmin(distances, axis=0))]) goodCutIdx = diffs > Gamma return goodCutIdx def _checkNoSmallSep(self, C, nu, ells): """ Check for any slices that split l=2,0 mode pairs, if they do the slice is rejected. Parameters ---------- C : array-like Frequencies at which slices are made. nu : array-like Mode frequencies. ells : float Angular degrees of the modes in `nu`. Returns ------- goodCutIdx : bool Array of boolean values corresponding to the valid cuts. """ goodCutIdx = np.ones(len(C),dtype=bool) for i, c in enumerate(C): if (i==0) or (i==len(C)-1): continue bidx = nu < c nub_idx = np.argmax(nu[bidx] - c) lb = ells[bidx][nub_idx] aidx = nu > c nua_idx = np.argmin(nu[aidx] - c) la = ells[aidx][nua_idx] if (int(lb)==2) and (int(la)==0): goodCut = False elif (int(lb)==3) and (int(la)==1): goodCut = False else: goodCut = True goodCutIdx[i] *= goodCut return goodCutIdx def _kmeans(self, nu, ells, centroids=None, max_iters=100): """ Assign a mode to a cluster of modes using K-means clustering. The initial list of cluster centroids are taken as l=0 modes by default. Parameters ---------- nu : array-like Mode frequencies. ells : float Angular degrees of the modes in `nu`. centroids : array-like, optional Array of centroids to start at, by default None in which case the l=0 modes frequencies are used. max_iters : int, optional Maximum number of iterations to use to establish the clusters, by default 100 Returns ------- centroids : array-like Locations of the new centroids after clustering. labels : list Labels of the clusters. k : int Number of clusters. """ nu = np.sort(nu) if not centroids: _k = int(np.ceil(len(ells[ells==0]) / self.slices)) centroids = nu[ells==0][::_k] k = len(centroids) # should be the same as _k for _ in range(max_iters): # Assign each data point to the closest centroid distances = np.abs(nu[:, np.newaxis] - centroids) # distance of each point to all the centroids labels = np.argmin(distances, axis=1) # assign to closest centroid # Update centroids based on the mean of data points assigned to each cluster new_centroids = np.array([nu[labels == i].mean() for i in range(k)]) # Merge clusters with only one point into the nearest cluster for i in range(k): if len(labels[labels == i]) == 1: closest_centroid_idx = np.argmin(np.abs(centroids - centroids[i])) labels[labels == i] = closest_centroid_idx # If centroids have not changed significantly, stop if np.allclose(centroids, new_centroids): break centroids = new_centroids return centroids, labels, k def _determineCuts(self, dnu, nu, labels): """ Determine initial location of the slice locations based on the K-means clustering, where the slice locations are initially placed half-way between two adjacent clusters. Appends an upper and lower limits are also appended to the list. Parameters ---------- dnu : float Large separation, used to define the upper and lower limits that are appended. nu : array-like Mode frequencies. labels : list Labels of the clusters. Returns ------- cuts : array-like Array of frequencies at which to place the slices. """ x = np.where(np.diff(labels) == 1)[0] cuts = np.append(nu.min() - dnu / 3, # append lower limit np.append(nu[x] + (nu[x+1] - nu[x])/2, # append list of cuts based on clustering nu.max() + dnu / 3)) # append upper limit return cuts def _slc(self, x, low, high): """ Select indices of x based on provided low and high limits. Parameters ---------- x : array-like Array to base selection on. low : float Low limit to base the selection on. high : float Low limit to base the selection on. Returns ------- idx : bool Boolean array of shape equal to x with the high/low selection applied. """ return (low <= x) & (x <= high)
[docs] def sliceSpectrum(self): """ Slice up the envelope. Sets a series of frequency limits which divide the detected modes into roughly equal number of modes per slice. Parameters ---------- result : dict Dictionary of results from the mode ID stage. fac : int, optional Factor scale dnu to include modes outside envelope, usually very mixed l=1 modes, by default 1. Returns ------- limits : np.array Frequencies delimiting the slices. """ # Pick slices if len(self.freqLimits) == 1: limits = self.getSliceLimits() elif len(self.freqLimits) > 1: # Assume manual freqLimits correspond to cuts, and return these assert all([np.diff(pair) > 0 for i, pair in enumerate(self.freqLimits)]), 'freqLimits must be of the form [(lower, upper), (lower, upper), ...]' limits = self.freqLimits else: raise ValueError('Something is off. len(freqLimits) should not be 0.') return limits
def getSliceLimits(self): sortidx = np.argsort(self.freq[0, :]) freqs = self.freq[0, sortidx] ells = self.ell[sortidx] _, labels, nclusters = self._kmeans(freqs, ells) # find cuts cuts = self._determineCuts(self.dnu[0], freqs, labels) assert len(cuts) == nclusters + 1 # weed out small distances goodCutIdx0 = self._checkSmallDiffs(cuts, freqs, 3*np.median(self.width[0, sortidx])) # weed out 2, 0 and 3, 1 splitting cuts goodCutIdx1 = self._checkNoSmallSep(cuts, freqs, ells) goodCutIdx = goodCutIdx0 * goodCutIdx1 limits = cuts[goodCutIdx] return limits
[docs] def createPeakbagInstances(self, sampler): """ Create a list of class instances which do the modeling in each spectrum slice. Only the relevant part of the spectrum and mode parameters are passed to each instance. Unless `self.slices` is set to 0, attempts to find the best number of slices to use. Starts at the number of l=0 modes, but may adjust down if there are closely spaced sets of modes. Similarly if a choice of self.slices > 0 means a slice has to be placed between two closely spaced modes, the number of slices is also adjusted downward to avoid slicing between closely spaced peaks. Notes ----- - If `self.slices` is 0, no slicing occurs. - The method stores the created instances in the `pbInstances` attribute. """ self.pbInstances = [] if self.slice: sliceLimits = self.sliceSpectrum() # TODO slicing of the spectrum must respect manually applied freqLimits else: sliceLimits = self.freqLimits for i in range(len(sliceLimits)): #slcIdx = self._slc(self.freq[0, :], min(sliceLimits[i]), max(sliceLimits[i])) idx = np.zeros(len(self.freq[0, :]), dtype=bool) for j,nu in enumerate(self.freq[0, :]): sr = np.argsort(abs(self.f-nu))[:2] idx[j] = all(self.spectrumMask[sr]) _ell = self.ell[idx] _zeta = self.zeta[idx] _freq = self.freq[:, idx] _height = self.height[:, idx] _width = self.width[:, idx] _rotAsym = self.rotAsym[:, idx] self.pbInstances.append(sampler(self.f[self.spectrumMask], self.snr[self.spectrumMask], _ell, _freq, _height, _width, _zeta, self.dnu, self.d02, [min(sliceLimits[i]), max(sliceLimits[i])], _rotAsym, self.RV))
# else: # self.pbInstances.append(sampler(self.f, self.snr, self.ell, self.freq, self.height, self.width, self.zeta, self.dnu, self.d02, self.freqLimits, self.rotAsym, self.RV)) def __call__(self, sampler_kwargs={}, Nsamples=10000, **kwargs): """ Run the sampler for the slice(s) and parse the results. Parameters ---------- dynamic : bool, optional Whether or not to use dynamic sampling in Dynesty, by default False. progress : bool, optional Whether or not to show the progress, by default False. Setting this to True doesn't play well with tqdm for the slice loop. sampler_kwargs : dict, optional Additional kwargs to be passed to the sampler. Nsamples : int, optional Samples to retain in the parsed sample, by default 10000 Returns ------- result : dict Dictionary containing the results of the sampling. """ # if self.Nslices > 0: # print('Peakbagging envelope slices') # else: # print('Peakbagging the whole envelope') t0 = time.time() for i, inst in enumerate(self.pbInstances): print(f'Peakbagging slice {i+1}/{len(self.pbInstances)}') inst(sampler_kwargs); print(f'Time taken {np.round((time.time() - t0)/60, 1)} minutes') self.result = self.parseSamples(Nsamples) return self.result
[docs] def parseSamples(self, Nsamples): """ Parses the samples to extract and organize the model parameters. Attempts to include at most N samples from the model, but will default to the actual number of samples of the model parameters if it's less than N. The resulting dictionary contains some global parameters, ell, enn, emm etc. and two dictionaries, one containing the samples drawn and one with their summary statistics. Parameters ---------- smp : dict A dictionary of sampled parameters. Nmax : int, optional Maximum number of samples to include. Default is 5000. Returns ------- result : dict A dictionary containing parsed and organized model parameters. """ N = min([Nsamples, min([inst.nsamples for inst in self.pbInstances])]) result = {'ell': np.array([]), 'enn': np.array([]), 'emm': np.array([]), 'zeta': np.array([]), 'summary': {'freq' : np.empty(shape=(2, 0), dtype=float), 'height': np.empty(shape=(2, 0), dtype=float), 'width' : np.empty(shape=(2, 0), dtype=float),}, 'samples': {'freq' : np.empty(shape=(N, 0), dtype=float), 'height': np.empty(shape=(N, 0), dtype=float), 'width' : np.empty(shape=(N, 0), dtype=float),}, 'kstest': {'significant': np.array([], dtype=bool), 'pvalue': np.array([]), 'statistic': np.array([]),} } for inst in self.pbInstances: # Merge top level keys. for key in ['ell', 'enn', 'emm', 'zeta']: result[key] = np.append(result[key], inst.result[key]) # Merge lower level keys into summary and samples. n = result['samples']['freq'].shape[0] m = inst.result['samples']['freq'].shape[0] randInt = np.random.choice(np.arange(m), size=n, replace=False) for key in ['freq', 'height', 'width']: result['summary'][key] = np.append(result['summary'][key], inst.result['summary'][key], axis=1) smpl = inst.result['samples'][key][randInt, :] result['samples'][key] = np.append(result['samples'][key], smpl, axis=1) # Compute KS-statistic for mode frequencies for key in ['significant', 'pvalue', 'statistic']: result['kstest'][key] = np.append(result['kstest'][key], inst.result['kstest'][key]) return result
[docs] def getRotationInclination(self,): """ Computes the joint posterior distribution for rotation and inclination given the samples from the peakbag instances. Kernel density estimates are computed for all the marginalized posterior distributions. These are then sampled using an MCMC sampler, where the resulting posterior is given by the joint probability of getting the individual posteriors. Summary statistic and samples are added to the pb.result dictionary. """ R = jointRotInc(self) samples = R() samplesU = R.unpackSamples(samples) self.result['samples'].update(samplesU) for key in samplesU.keys(): self.result['summary'][key] = jar.smryStats(samplesU[key])
[docs] class jointRotInc(samplers.DynestySampling): """ A class to perform joint posterior sampling for rotation and inclination parameters using emcee. Takes the posterior distributions from the slices in the peakbag object and samples the joint probability of all the slices. Notes ----- This is a coarse estimate of the rotation rate and inclination. A better estimate can be achieved by modeling the entire envelope at once instead of using the slices. To do this set slices=0 when initializing peakbag. Parameters ---------- pb : object Peakbagging object containing the instances for analysis. NKDE : int, optional Number of samples for KDE estimation. Default is 2500. bw : float, optional Bandwidth for KDE. Default is 0.03. """ def __init__(self, pb, NKDE=2500, bw=0.03): self.insts = pb.pbInstances self.labels = ['nurot_e', 'nurot_c', 'inc'] self.priors = {lbl: self.insts[0].priors[lbl] for lbl in self.labels} self.ndims = len(self.priors) self.Nslices = len(pb.pbInstances) self.makeKDEs(NKDE, bw)
[docs] def makeKDEs(self, N, bw): """ Creates KDEs of the posterior distributions from the spectrum slices. Parameters ---------- N : int Number of samples for KDE estimation. bw : float Bandwidth for KDE. """ self.kdes = [] sampleSizes = np.array([self.insts[i].samples.shape[0] for i in range(self.Nslices)]) Nc = np.append(N, sampleSizes).min() for i in range(self.Nslices): indx = np.random.choice(np.arange(self.insts[i].samples.shape[0], dtype=int), Nc) samplesU = self.insts[i].unpackSamples(self.insts[i].samples[indx, :]) kde = sm.nonparametric.KDEMultivariate(data=[samplesU[lbl] for lbl in self.labels], var_type='c'*self.ndims, bw=[bw]*self.ndims) self.kdes.append(kde)
[docs] @partial(jax.jit, static_argnums=(0,)) def unpackParams(self, theta): """ Cast the parameters in a dictionary Parameters ---------- theta : array Array of parameters drawn from the posterior distribution. Returns ------- thetaU : dict The unpacked parameters. """ thetaU = {lbl: theta[i] for i, lbl in enumerate(self.labels)} return thetaU
[docs] def unpackSamples(self, samples): """ Unpacks the sample matrix into a dictionary. Parameters ---------- samples : array-like Sample matrix. Returns ------- S : dict Dictionary of samples. """ S = {} for i, lbl in enumerate(self.labels): S[f'{lbl}'] = samples[:, i] return S
[docs] @partial(jax.jit, static_argnums=(0,)) def priorLnProb(self, thetaU): """ Computes the log-prior probability for the given parameters. Parameters ---------- thetaU : dict Dictionary of parameters. Returns ------- lnP : float Log-prior probability. """ lnP = jnp.sum(jnp.array([self.priors[lbl].logpdf(thetaU[lbl]) for lbl in self.labels])) return lnP
[docs] def lnJointPost(self, theta): """ Computes the joint log-posterior probability for the given parameters. Parameters ---------- theta : array-like Parameter vector. Returns ------- lnP float Joint log-posterior probability. """ thetaU = self.unpackParams(theta) lnPrior = self.priorLnProb(thetaU) lnL = 0 for kde in self.kdes: lnL += jnp.log(kde.pdf(theta)) - lnPrior lnP = lnL + lnPrior # Some values return nan since the KDE is only defined on a linear scale. if np.isnan(lnP) or np.isinf(lnP): lnP = -jnp.inf return lnP
def __call__(self, nwalkers=500, nsteps=500, burnFraction=0.1, accept=0.1, progress=True): """ Runs the MCMC sampling to obtain the posterior samples. Parameters ---------- nwalkers : int, optional Number of walkers for MCMC. Default is 500. nsteps : int, optional Number of steps for MCMC. Default is 500. burnFraction : float, optional Fraction of steps to discard as burn-in. Default is 0.1. accept : float, optional Acceptance threshold for MCMC samples. Default is 0.1. progress : bool, optional Whether to display progress during MCMC sampling. Default is True. Returns ------- ndarray Posterior samples. """ itr = 0 p0 = np.empty(shape=(0, self.ndims), dtype=float) while p0.shape[0] < nwalkers: itr += 1 u = np.random.uniform(0, 1, size=self.ndims) _p0 = self.ptform(u) lnP = self.lnJointPost(_p0) if np.isfinite(lnP): p0 = np.vstack((p0, _p0)) if itr>10000: p0 = np.vstack((p0, _p0)) sampler = emcee.EnsembleSampler(nwalkers, self.ndims, self.lnJointPost) sampler.run_mcmc(p0, nsteps=nsteps, progress=True); idx = sampler.acceptance_fraction > accept samples = sampler.get_chain()[int(burnFraction*nsteps):, idx, :].reshape((-1, self.ndims)) return samples
[docs] class basePeakbag(plotting): """ Base model class for peakbagging a section of the power spectrum. Parameters ---------- f : array-like The frequency array of the spectrum. s : array-like The values of the power density spectrum. ell : array-like Angular degree of the modes. freq : array-like Frequencies of the modes corresponding to the angular degrees. height : array-like Heights of the modes corresponding to the angular degrees. Default is None, in which case an SNR of 1 is assumed. Providing betters estimates from modeID is however strongly recommended. width : array-like Widths of the modes corresponding to the angular degrees. Default is None, in which case an width of 0.1 is assumed. Providing betters estimates from modeID is however strongly recommended. zeta : array-like Mixed-mode coupling factors of the modes corresponding to the angular degrees. dnu : float Large frequency separation. d02 : float Small frequency separation between l=0 and l=2 modes. freqLimits : list Frequency limits for the peakbagging. rotAsym : array-like Rotational asymmetry parameters. RV : array-like Radial velocity of the star in km/s. addPriors : dict, optional Additional priors to be added. Default is an empty dictionary. **kwargs : dict Additional keyword arguments. """ def __init__(self, f, s, ell, freq, height, width, zeta, dnu, d02, freqLimits, rotAsym, RV, addPriors={}, sampling='emcee', **kwargs): self.__dict__.update((k, v) for k, v in locals().items() if k not in ['self']) # Convert everything to jax array for key, val in self.__dict__.items(): if type(val) == np.ndarray: self.__dict__.update({key: jnp.array(val)}) # Except ell. For some reason it's no longer jit-able? self.ell = list(np.array(self.ell)) self.Nmodes = len(self.ell) self.setLabels() self.setPriors() self.sel = self.setFreqRange() self.ndims = len(self.priors.keys()) self.setAddLikeTerms() self.zeros = jnp.zeros_like(self.f[self.sel])
[docs] def setPriors(self, freq_err=0.03): """ Sets the prior distributions for model parameters. Parameters ---------- freq_err : float, optional The error in frequency, used to set the scale in percent of dnu of the normal distribution for frequency priors. Default is 0.03. Notes ----- - Initializes the `priors` dictionary and updates it with additional priors specified in `self.addPriors`. - For each mode, sets normal priors for 'freq', 'height', and 'width' if not already specified in `self.priors`. - Sets uniform priors for 'nurot_e' (envelope rotation) and 'nurot_c' (core rotation) if not already specified. - Sets a truncated sine prior for 'inc' (inclination) if not already specified. - Sets a normal prior for 'shot' noise if not already specified. Raises ------ ValueError if the length of labels does not match the length of priors. """ self.priors = {} self.priors.update((k, v) for k, v in self.addPriors.items()) for i in range(self.Nmodes): _key = f'freq{i}' if _key not in self.priors: self.priors[_key] = dist.normal(loc=self.freq[0, i], scale=freq_err * self.dnu[0]) for i in range(self.Nmodes): _key = f'height{i}' if _key not in self.priors: self.priors[_key] = dist.normal(loc=jnp.log10(self.height[0, i]), scale=0.5) for i in range(self.Nmodes): _key = f'width{i}' if _key not in self.priors: self.priors[_key] = dist.normal(loc=jnp.log10(self.width[0, i]), scale=0.5) # Envelope rotation prior if 'nurot_e' not in self.priors.keys(): self.priors['nurot_e'] = dist.uniform(loc=1e-9, scale=2.) # Core rotation prior if 'nurot_c' not in self.priors.keys(): self.priors['nurot_c'] = dist.uniform(loc=1e-9, scale=2.) # The inclination prior is a cos(i)~U(0,1) if 'inc' not in self.priors.keys(): self.priors['inc'] = dist.truncsine() #self.priors['inc'] = dist.uniform(loc=0., scale=1) if 'shot' not in self.priors.keys(): self.priors['shot'] = dist.normal(loc=0., scale=0.01) if not all([key in self.labels for key in self.priors.keys()]): raise ValueError('Length of labels doesnt match length of priors.')
[docs] def setLabels(self): """ Sets labels for the model parameters. For each key in `self.variables`, if the key is 'freq', 'height', or 'width', it appends numbered labels for each mode. """ # Default additional parameters self.labels = [] # If key appears in priors dict, override default and move it to add. for key in self.variables.keys(): if key in ['freq', 'height', 'width']: self.labels += [f'{key}{i}' for i in range(self.Nmodes)] else: self.labels.append(key) # Parameters that are in log10 self.logpars = [key for key in self.variables.keys() if self.variables[key]['log10']]
[docs] def setFreqRange(self,): """ Get frequency range around numax for model Returns ------- idx : jax device array Array of boolean values defining the interval of the frequency axis where the oscillation modes present. """ if len(self.freqLimits) != 2: raise ValueError('freqLimits should be an iterable of length 2.') return (self.freqLimits[0] < self.f) & (self.f < self.freqLimits[1])
[docs] def chi_sqr(self, mod): """ Chi^2 2 dof likelihood Evaluates the likelihood of observing the data given the model. Parameters ---------- mod : jax device array Spectrum model. Returns ------- L : float Likelihood of the data given the model """ lnp = -jnp.sum(jnp.log(mod) + self.s[self.sel] / mod) return jax.lax.cond(jnp.isfinite(lnp), lambda : lnp, lambda : -jnp.inf)
variables = {'freq' : {'info': 'mode frequency list' , 'log10': False}, 'height' : {'info': 'mode height list' , 'log10': True}, 'width' : {'info': 'mode width list' , 'log10': True}, 'nurot_e': {'info': 'envelope rotation rate' , 'log10': False}, 'nurot_c': {'info': 'core rotation rate' , 'log10': False}, 'inc' : {'info': 'stellar inclination axis' , 'log10': False}, 'shot' : {'info': 'Shot noise level' , 'log10': True }}
[docs] @partial(jax.jit, static_argnums=(0,)) def lnlikelihood(self, theta): """ Likelihood function for set of model parameters Evaluates the likelihood function for a set of model parameters given the data. This includes the constraint from the observed variables. The samples l are drawn from the latent parameter priors and are first projected into the model space before the model is constructed and the likelihood is constructed. Parameters ---------- l : list Array of latent parameters Returns ------- lnlike : float The log likelihood evaluated at the model parameters p. """ thetaU = self.unpackParams(theta) # Constraint from the periodogram mod = self.model(thetaU) lnlike = self.chi_sqr(mod) lnlike += self.AddLikeTerms(theta, thetaU) return lnlike
[docs] def unpackParams(self, theta): """ Cast the parameters in a dictionary Parameters ---------- theta : array Array of parameters drawn from the posterior distribution. Returns ------- thetaU : dict The unpacked parameters. """ thetaU = {'freq' : theta[0: self.Nmodes], 'height' : theta[self.Nmodes: 2 * self.Nmodes], 'width' : theta[2 * self.Nmodes: 3 * self.Nmodes], 'nurot_e': theta[self.labels.index('nurot_e')], 'nurot_c': theta[self.labels.index('nurot_c')], 'inc' : theta[self.labels.index('inc')], 'shot' : theta[self.labels.index('shot')], } thetaU['nurot_e'] = thetaU['nurot_e'] / jnp.sin(thetaU['inc']) thetaU['nurot_c'] = thetaU['nurot_c'] / jnp.sin(thetaU['inc']) for key in self.logpars: thetaU[key] = 10**thetaU[key] return thetaU
[docs] def model(self, thetaU): """ Computes the model spectrum using a sum of Lorentzian profiles with a multiplet of 2l+1 modes per mode nl. The rotational splitting is a weighted sum of the core and envelope rotation rates, where the weighting is the degree of mixing zeta. The splitting is symmetric. Parameters ---------- thetaU : dict A dictionary of model parameters. Returns ------- modes : array-like The computed model spectrum. """ modes = self.zeros + thetaU['shot'] omega = self.zeta * thetaU['nurot_c'] + (1 - self.zeta) * thetaU['nurot_e'] for i, l in enumerate(self.ell): for m in jnp.arange(-l, l+1): E = jar.visibility(l, m, thetaU['inc']) nu = thetaU['freq'][i] + omega[i] * m H = E * thetaU['height'][i] modes += jar.lor(self.f[self.sel], nu, H, thetaU['width'][i]) return modes
def __call__(self, sampler_kwargs): """ Run the sampler for the slice(s) and parse the results. Parameters ---------- dynamic : bool, optional Whether or not to use dynamic sampling in Dynesty, by default False. progress : bool, optional Whether or not to show the progress, by default False. Setting this to True doesn't play well with tqdm for the slice loop. sampler_kwargs : dict, optional Additional kwargs to be passed to the sampler. Returns ------- result : dict Dictionary containing the results of the sampling. """ self.runSampler(**sampler_kwargs) self.result = self.parseSamples(self.samples) return self.result def dopplerRVCorrection(self, N): if self.RV[1] > 0: RVs = np.random.normal(loc=self.RV[0], scale=self.RV[1], size=N) else: RVs = np.zeros(N) beta = RVs / jar.constants.c doppler = np.sqrt((1 + beta) / (1 - beta)) return doppler def testFreqs(self, thetaU, pvalueLim=0.05): testResult = {'pvalue': np.zeros(self.Nmodes), 'statistic': np.zeros(self.Nmodes), 'significant': np.zeros(self.Nmodes, dtype=bool)} for k in range(self.Nmodes): kstestResult = st.kstest(thetaU[f'freq{k}'], self.priors[f'freq{k}'].cdf) testResult['pvalue'][k] = kstestResult.pvalue testResult['significant'][k] = kstestResult.pvalue <= pvalueLim testResult['statistic'][k] = kstestResult.statistic return testResult
[docs] def parseSamples(self, samples, N=10000): """ Parses the samples to extract and organize the model parameters. Attempts to include at most N samples from the model, but will default to the actual number of samples of the model parameters if it's less than N. The resulting dictionary contains some global parameters, ell, enn, emm etc. and two dictionaries, one containing the samples drawn and one with their summary statistics. Parameters ---------- smp : dict A dictionary of sampled parameters. Nmax : int, optional Maximum number of samples to include. Default is 5000. Returns ------- result : dict A dictionary containing parsed and organized model parameters. """ thetaU = self.unpackSamples(samples) kstestResult = self.testFreqs(thetaU) D = self.dopplerRVCorrection(samples.shape[0]) for key in thetaU: if 'freq' in key: thetaU[key] = D*thetaU[key] result = {'ell': np.array([self.ell]), 'enn': np.array([]), 'emm': np.array([]), 'zeta': np.array([self.zeta]), 'summary': {}, 'samples': {}, 'kstest': kstestResult } for key in self.variables: arr = np.array([thetaU[_key] for _key in thetaU.keys() if key in _key]) result['summary'][key] = np.array([np.mean(arr, axis=1), np.std(arr, axis=1)]) result['samples'][key] = arr[:, :N].T return result
[docs] def unpackSamples(self, samples): """ Unpack a set of parameter samples into a dictionary of arrays. Parameters ---------- samples : array-like A 2D array of shape (n, m), where n is the number of samples and m is the number of parameters. Returns ------- S : dict A dictionary containing the parameter values for each parameter label. """ S = {key: np.zeros(samples.shape[0]) for key in self.labels} for i, key in enumerate(self.labels): S[f'{key}'] = samples[:, i] for key in self.labels: if any([key.startswith(logkey) for logkey in self.logpars]): S[key] = 10**S[key] return S
[docs] def setAddLikeTerms(self): """ Set attribute containing additional observational data Additional observational data other than the power spectrum goes here. Can be Teff or bp_rp color, but may also be additional constraints on e.g., numax, dnu. """ self.addObs = {} self.addObs['freq diff'] = dist.beta(a=1.00, b=1.00, loc=0., scale=100.)
# def setAddLikeTerms(self): # """ Set attribute containing additional observational data # Additional observational data other than the power spectrum goes here. # Can be Teff or bp_rp color, but may also be additional constraints on # e.g., numax, dnu. # """ # self.addObs = {} # # Correlated Noise Regularisation for width # wGPtheta={'amp': 1, 'scale': self.dnu[0]} # wGPmuFunc = jar.jaxInterp1D(self.freq[0, :], jnp.log10(self.width[0, :]/(1-self.zeta))) # wGP = self.build_gp(wGPtheta, self.freq[0, :], wGPmuFunc) # self.addObs['widthGP'] = wGP.log_probability # # Correlated Noise Regularisation for amplitude # hGPtheta={'amp': 1, 'scale': self.dnu[0]} # hGPmuFunc = jar.jaxInterp1D(self.freq[0, :], jnp.log10(self.height[0, :])) # hGP = self.build_gp(hGPtheta, self.freq[0, :], hGPmuFunc) # self.addObs['heightGP'] = hGP.log_probability # def build_gp(self, theta, X, muFunc, muKwargs={}): # from tinygp import GaussianProcess, kernels # kernel = theta["amp"] * kernels.ExpSquared(theta["scale"]) # GP = GaussianProcess(kernel, X, diag=1e-6, mean=partial(muFunc, **muKwargs)) # return GP
[docs] def AddLikeTerms(self, theta, thetaU): """ Add the additional probabilities to likelihood Adds the additional observational data likelihoods to the PSD likelihood. Parameters ---------- p : list Sampling parameters. Returns ------- lnp : float The likelihood of a sample given the parameter PDFs. """ delta = jnp.diff(jnp.sort(thetaU['freq'])) lnp = 0 for d in delta: lnp += self.addObs['freq diff'].logpdf(d) #lnp += self.addObs['heightGP'](theta[self.Nmodes: 2 * self.Nmodes]) #lnp += self.addObs['widthGP'](theta[2 * self.Nmodes: 3 * self.Nmodes]) return lnp
[docs] class DynestyPeakbag(basePeakbag, samplers.DynestySampling): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] class EmceePeakbag(basePeakbag, samplers.EmceeSampling): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)