Source code for pbjam.IO

"""
The IO module contains the primary methods for PBjam to process either a time
series or power density spectrum from the user or an automatically downloaded 
data set from the Mikulski Archive for Spact Telescopes (https://archive.stsci.edu/home) 
via the Lightkurve package.
"""

from . import PACKAGEDIR
import jax.numpy as jnp
import numpy as np
from scipy.integrate import simpson
import os, time
import lightkurve as lk
from lightkurve.periodogram import Periodogram
from astropy.timeseries import LombScargle
from astropy import units
 
 
[docs] class psd(): """Asteroseismology wrapper for Astropy Lomb-Scargle Uses the Astropy.LombScargle class to compute the power spectrum of a given time series. A variety of choices for computing the spectrum are available. The recommended methods are either fast or Cython. Notes ----- The Cython implementation is very slow for time series longer than about 1 month (array size of 1e5). The Fast implementation is similar to the an FFT, but at a very slight loss of accuracy. There appears to be a slight increasing slope with frequency toward the Nyquist frequency. The adjustments to the frequency resolution, due to gaps, performed in the KASOC filter may not be beneficial the statistics we use in the detection algorithm. This has not been thuroughly tested yet though. So recommend leaving it in, but with a switch to turn it off for testing. Parameters ---------- time : array Time stamps of the time series. flux : array Flux values of the time series. flux_error : array Flux value errors of the time series. fit_mean : bool, optional Keyword for Astropy.LombScargle. If True, uses the generalized Lomb-Scargle approach and fits with a floating mean. Default is False. timeConversion : float Factor to convert the time series such that it is in seconds. Note, all stored time values, e.g. cadence or duration, are kept in the input units. Default is 86400 to convert from days to seconds. Attributes ---------- dt : float Cadence of the time series. dT : float Total length of the time series. NT : int Number of data points in the time series. dutyCycle : float Duty cycle of the time series. Nyquist : float Nyquist frequency in Hz. df : float Fundamental frequency spacing in Hz. ls : astropy.timeseries.LombScargle object: Astropy Lomb-Scargle class instance used in computing the power spectrum. indx : array, bool Mask array for removing nan and/or -inf values from the time series. freqHz : array, float Frequency range in Hz. freq : array, float Freqeuency range in muHz. normfactor : float Normalization factor to ensure the power conforms with Parseval. power : array, float Power spectrum of the time series in ppm^2. powerdensity : array float Power density spectrum of the time series in ppm^2/muHz amplitude : array, float Amplitude spectrum of the time series in ppm. """ def __init__(self, ID, lk_kwargs={}, time=None, flux=None, flux_err=None, useWeighted=False, downloadDir=None, fit_mean=False, timeConversion=86400, badIdx=None, numax=None): self.__dict__.update((k, v) for k, v in locals().items() if k not in ['self']) self.TS = timeSeries(ID, lk_kwargs, time, flux, flux_err, self.badIdx, self.downloadDir, numax=self.numax) if useWeighted: # Init Astropy LS class with weights assert flux_err is not None, 'To compute the weighted spectrum, provide a flux_err argument.' self.ls = LombScargle(self.TS.time * self.timeConversion, self.TS.flux, center_data=True, fit_mean=self.fit_mean, dy=self.TS.flux_err,) else: # Init Astropy LS class without weights self.ls = LombScargle(self.TS.time * self.timeConversion, self.TS.flux, center_data=True, fit_mean=self.fit_mean) self.Nyquist = 1/(2 * self.timeConversion * self.TS.dt) # Hz self.df = self._fundamental_spacing_integral()
[docs] def __call__(self, oversampling=1, nyquist_factor=1.0, method='fast'): """ Compute power spectrum Computes the power spectrum and normalizes it to conform with Parseval's theorem. The output is available as the power in ppm^2, powerdensity in ppm^2/muHz and the amplitude spectrum in ppm. The frequency range is transformed to muHz as this is customarily used in asteroseismology of main sequence stars. Parameters ---------- oversampling : int The number of times the frequency range should be oversampled. This equates to zero-padding when using the FFT. nyquist_factor : float Factor by which to extend the spectrum past the Nyquist frequency. The default is 10% greater than the true Nyquist frequency. We use this to get a better handle on the background level at high frequency. method : str The recommended methods are either `fast' or `Cython'. Cython is a bit more accurate, but significantly slower. """ self.freqHz = np.arange(self.df/oversampling, nyquist_factor*self.Nyquist, self.df/oversampling, dtype='float64') self.freq = jnp.array(self.freqHz*1e6) # muHz is usually used in seismology # Calculate power at frequencies using fast Lomb-Scargle periodogram: power = self.ls.power(self.freqHz, normalization='psd', method=method, assume_regular_frequency=True) # Due to numerical errors, the "fast implementation" can return power < 0. # Replace with random exponential values instead of 0? power = np.clip(power, 0, None) self._getNorm(power) self.power = jnp.array(power * self.normfactor * 2) self.powerdensity = jnp.array(power * self.normfactor / (self.df * 1e6)) self.amplitude = jnp.array(power * np.sqrt(power * self.normfactor * 2)) pg = Periodogram(self.freq * units.uHz, units.Quantity(self.powerdensity)) self.pg = pg
[docs] def getTSWindowFunction(self, tmin=None, tmax=None, cadenceMargin=1.01): """ Generates a time series window function. Parameters ---------- tmin : float, optional Minimum time value for padding. If None, uses the minimum of self.time. Default is None. tmax : float, optional Maximum time value for padding. If None, uses the maximum of self.time. Default is None. cadenceMargin : float, optional Margin factor to identify gaps in the time series. Default is 1.01. Returns ------- tuple A tuple containing the adjusted time array and the corresponding window function array. Notes ----- - The method first initializes the time (`t`) and window function (`w`) arrays. - It then identifies gaps in the time series larger than `cadenceMargin * self.dt` and fills them with zeros in the window function. - The method ensures the length of the time series does not exceed a break counter of 100 to avoid infinite loops. - Padding is added at the start and end of the time series if `tmin` or `tmax` are specified and exceed the current bounds of `t`. """ if tmin is None: tmin = min(self.TS.time) if tmax is None: tmax = max(self.TS.time) t = self.TS.time.copy() w = np.ones_like(t) break_counter = 0 epsilon = 0.0001 # this is a tiny scaling of dt to avoid numerical issues while any(np.diff(t) > cadenceMargin * self.TS.dt): idx = np.where(np.diff(t)>cadenceMargin*self.TS.dt)[0][0] t_gap_fill = np.arange(t[idx], t[idx+1]-epsilon*self.TS.dt, self.TS.dt) w_gap_fill = np.zeros(len(t_gap_fill)) w_gap_fill[0] = 1 t = np.concatenate((t[:idx], t_gap_fill, t[idx+1:])) w = np.concatenate((w[:idx], w_gap_fill, w[idx+1:])) break_counter +=1 if break_counter == 100: break if (tmin is not None) and (tmin < t[0]): padLow = np.arange(tmin, t[0], self.TS.dt) t = np.append(padLow, t) w = np.append(np.zeros_like(padLow), w) if (tmax is not None) and (t[0] < tmax): padHi = np.arange(t[-1], tmax, self.TS.dt) t = np.append(t, padHi) w = np.append(w, np.zeros_like(padHi)) return t, w
def _getNorm(self, power): """ Parseval normalization Computes the normalization factor for the power spectrum such that it conforms with Parseval's theorem. power : array Unnormalized array of power. """ N = len(self.ls.t) if self.ls.dy is None: tot_MS = np.sum((self.ls.y - np.nanmean(self.ls.y))**2)/N else: tot_MS = np.sum(((self.ls.y - np.nanmean(self.ls.y))/self.ls.dy)**2)/np.sum((1/self.ls.dy)**2) self.normfactor = tot_MS/np.sum(power) def _fundamental_spacing_integral(self): """ Estimate fundamental frequency bin spacing Computes the frequency bin spacing using the integral of the spectral window function. For uniformly sampled data this is given by df=1/T. Which under ideal circumstances ensures that power in neighbouring frequency bins is independent. However, this fails when there are gaps in the time series. The integral of the spectral window function is a better approximation for ensuring the bins are less correlated. """ # The nominal frequency resolution df = 1/(self.timeConversion*(np.nanmax(self.TS.time) - np.nanmin(self.TS.time))) # Hz # Compute the window function freq, window = self.windowfunction(df, width=100*df, oversampling=5) # oversampling for integral accuracy # Integrate the windowfunction to get the corrected frequency resolution df = simpson(window, x=freq) return df*1e-6
[docs] def windowfunction(self, df, width=None, oversampling=10): """Spectral window function. Parameters ---------- width : float, optional The width in Hz on either side of zero to calculate spectral window. Default is None. oversampling : float, optional Oversampling factor. Default is 10. """ if width is None: width = 100*df # freq_cen = 0.5*self.Nyquist freq_cen = max([0.5*self.Nyquist, width + df]) Nfreq = int(oversampling*width/df) freq = freq_cen + (df/oversampling) * np.arange(-Nfreq, Nfreq, 1) x = 0.5*np.sin(2*np.pi*freq_cen*self.ls.t) + 0.5*np.cos(2*np.pi*freq_cen*self.ls.t) # Calculate power spectrum for the given frequency range: ls = LombScargle(self.ls.t, x, center_data=True, fit_mean=self.fit_mean) power = ls.power(freq, method='fast', normalization='psd', assume_regular_frequency=True) power /= power[int(len(power)/2)] # Normalize to have maximum of one freq -= freq_cen freq *= 1e6 return freq, power
class timeSeries(): def __init__(self, ID, lk_kwargs, time, flux, flux_err=None, badIdx=None, downloadDir=None, numax=None): self.__dict__.update((k, v) for k, v in locals().items() if k not in ['self']) if (self.time is None) and (self.flux is None): self.time, self.flux, self.flux_err = self._getTS() self.flux = (self.flux/np.nanmedian(self.flux) - 1) * 1e6 self._getBadIndex(self.time, self.flux, self.flux_err, self.badIdx) self.goodIdx = np.invert(self.badIdx) self.time, self.flux = self.time[self.goodIdx], self.flux[self.goodIdx] if self.flux_err is not None: self.flux_err = self.flux_err[self.goodIdx] self.dt = self._getSampling() self.dT = self.time.max() - self.time.min() self.NT = len(self.time) self.dutyCycle = self._getDutyCycle() def _getDutyCycle(self, cadence=None): """ Compute the duty cycle If cadence is not provided, it is assumed to be the median difference of the time stamps in the time series. Parameters ---------- cadence : float Nominal cadence of the time series. Units should be the same as t. Returns ------- dutyCycle : float Duty cycle of the time series """ if cadence is None: cadence = self._getSampling() nomLen = np.ceil((np.nanmax(self.time) - np.nanmin(self.time)) / cadence) dutyCycle = len(self.time) / nomLen return dutyCycle def _getBadIndex(self, time, flux, flux_err, badIdx): """ Identify indices with nan/inf values Flags array indices where either the timestamps, flux values, or flux errors are nan or inf. """ if badIdx is None: badIdx = np.zeros(len(time), dtype=bool) if flux_err is None: flux_err = np.zeros(len(time), dtype=bool) self.badIdx = np.invert(np.isfinite(time) & np.isfinite(flux) & np.isfinite(flux_err) & np.invert(badIdx)) def _getSampling(self): """ Compute sampling rate Computes the average sampling rate in the time series. This should approximate the nominal sampling rate, even with gaps in the time series. Returns ---------- dt : float Cadence of the time stamps. """ dt = np.median(np.diff(self.time)) return dt def _getTS(self, outlierRejection=5): """Get time series with lightkurve Parameters ---------- outlierRejection : float, optional Number of sigma to use for sigma clipping in the time series cleaning. Used as input the the lightkurve remove_outliers method. Returns ------- time : DeviceArray The timestamps of the time series. flux : DeviceArray The flux values of the time series. """ # Force a short wait time to prevent multiple rapid requests from # being rejected. time.sleep(np.random.uniform(1, 5)) search = lk.search_lightcurve(self.ID, **self.lk_kwargs) LCcol = search.download_all(download_dir=self.downloadDir) lc = LCcol.stitch() lc = self.cleanLC(lc, outlierRejection) _time, _flux, _flux_err = jnp.array(lc.time.value), jnp.array(lc.flux.value), jnp.array(lc.flux_err.value) return _time, _flux, _flux_err def cleanLC(self, lc, outlierRejection): """ Perform Lightkurve operations on object. Performs basic cleaning of a light curve, removing nans, outliers, median filtering etc. Parameters ---------- lc : Lightkurve.LightCurve instance Lightkurve object to be cleaned outlierRejection : float, optional Number of sigma to use for sigma clipping in the time series cleaning. Used as input the the lightkurve remove_outliers method. Returns ------- lc : Lightkurve.LightCurve instance The cleaned Lightkurve object """ if self.numax is None: wlen = int(4e6 / self.lk_kwargs['exptime']) else: wlen = int(1/(self.numax * 1e-3)*1e6 / self.lk_kwargs['exptime']) if wlen % 2 == 0: wlen += 1 lc = lc.normalize().remove_nans().remove_outliers(outlierRejection).flatten(window_length=wlen) #remove_nans().flatten(window_length=4001).remove_outliers() return lc def _getOutpath(self, fname): """ Get basepath or make full file path name. Convenience function for either setting the base path name for the star, or if given fname as input, will append this to the basepath name to create a full path to the file in question. Parameters ---------- fname : str, optional If not None, will append this to the pathname of the star. Use this to store files such as plots or tables. Returns ------- path : str If fname is None, path is the path name of the star. Otherwise it is the full file path name for the file in question. """ if fname is None: return self.path elif isinstance(fname, str): path = os.path.join(*[self.path, fname]) else: raise ValueError(f'Unrecognized input {fname}.') if not os.path.isdir(self.path): raise IOError(f'You are trying to access {self.path} which is a directory that does not exist.') else: return path def _setOutpath(name, rootPath): """ Sets the path attribute for star If path is a string it is assumed to be a path name, if not the current working directory will be used. Attempts to create an output directory for all the results that PBjam produces. A directory is created when a star class instance is initialized, so a session might create multiple directories. Parameters ---------- rootPath : str Directory to place the star subdirectory. """ if rootPath is None: rootPath = os.getcwd() if not os.path.basename == name: path = os.path.join(*[rootPath, f'{name}']) else: path = rootPath # Check if self.path exists, if not try to create it if not os.path.isdir(path): try: os.makedirs(path) except Exception as ex: message = "Could not create directory for Star {0} because an exception of type {1} occurred. Arguments:\n{2!r}".format(name, type(ex).__name__, ex.args) print(message) return path def _getPriorPath(): """ Get default prior path name Returns ------- prior_file : str Default path to the prior in the package directory structure. """ return os.path.join(*[PACKAGEDIR, 'data', 'prior_data.csv'])