Source code for pbjam.mcmc

""" 

PBjam uses MC sampling at several points during the peakbagging process. 
Samplers added to PBjam should be called from this module. 

"""

import emcee
import numpy as np
import scipy.stats as st
import cpnest.model
import pandas as pd
import os

[docs]class mcmc(): """ Class for MCMC sampling using `emcee' Uses `emcee' to sample the parameterspace of a provided spectrum model. Parameters ---------- start : ndarray An array of starting position of the parameters. likelihood : function Function to call that returns the log likelihood when passed the parameters. prior : function Function to call that returns the log prior probability when passed the parameters. nwalkers : int, optional The number of walkers that `emcee' will use. Attributes ---------- ndim : int Number of model parameters (length of start input). sampler : emcee.EnsembleSampler class instance A `emcee' sampler class instance initialized with the number of walkers, number of parameters, and the posterior comprised of the likelihood and prior input functions. chain : ndarray Sampled locations in parameters space of each walker at each step. lnlike : ndarray Likelihood at the sampled locations in parameter space. flatchain : ndarray Flattened chain. flatlnlike : ndarray Flattened likelihoods acceptance : ndarray Acceptance fraction at each step. """ def __init__(self, start, likelihood, prior, nwalkers=50): self.start = start self.likelihood = likelihood self.prior = prior self.nwalkers = nwalkers self.ndim = len(start) self.sampler = emcee.EnsembleSampler(self.nwalkers, self.ndim, self.logpost) self.chain = None self.lnlike = None self.flatchain = None self.flatlnlike = None self.acceptance = None
[docs] def logpost(self, p): """ Evaluate the likelihood and prior Returns the log posterior probability given parameters p. Evaluates first the prior function and then the likelihood function. In the event that the prior returns -inf, the function exits. Parameters ---------- p : list Fit parameters Returns ------- log_posterior: float log posterior of the model given parameters p and the observed quantities. """ logp = self.prior(p) if logp == -np.inf: return -np.inf loglike = self.likelihood(p) return logp + loglike
[docs] def stationarity(self, nfactor=20): """ Tests to see if stationarity metrics are satified. Uses the autocorrelation timescale to estimate whether the MC chains have reached a stationary state. Parameters ---------- nfactor : int, optional Factor used to test stationary. If the number of steps in the MC chain exceeds nfactor*tau, where tau is the autocorrelation timescale of the chain, the sampling is considered stationary. """ tau = self.sampler.get_autocorr_time(tol=0) converged = np.all(tau * nfactor < self.sampler.iteration) return converged
def __call__(self, max_iter=20000, spread=1e-4, start_samples=[]): """ Initialize and run the EMCEE afine invariant sampler Parameters ---------- max_iter: int, optional Maximum number of steps to take in the sampling. Stationarity is tested intermittently, so it might stop before this number is reached. spread : float, optional Percent spread around the intial position of the walkers. Small value starts the walkers in a tight ball, large value fills out the range set by parameter bounds. start_samples: ndarray, optional An array that has samples from the distribution that you want to start the sampler at. Returns ------- sampler.flatchain : array The chain of (nwalkers, niter, ndim) flattened to (nwalkers*niter, ndim). """ nsteps = 1000 # Start walkers in a tight random ball if len(start_samples) == 0: # Do this in the case of KDE pos = np.array([self.start + (np.random.randn(self.ndim) * spread) for i in range(self.nwalkers)]) else: # Do this in the case of Asy_peakbag, should be replaced with the actual sample pos = np.random.randn(self.nwalkers, self.ndim) pos *= start_samples.std(axis=0) pos += start_samples.mean(axis=0) # Burn in pos, prob, state = self.sampler.run_mcmc(initial_state=pos, nsteps=nsteps) # Fold in low AR chains pos = self.fold(pos, spread=spread) # Reset sampler self.sampler.reset() # Run with burnt-in positions pos, prob, state = self.sampler.run_mcmc(initial_state=pos, nsteps=nsteps) while not self.stationarity(): pos, prob, state = self.sampler.run_mcmc(initial_state=pos, nsteps=nsteps) print(f'Steps taken: {self.sampler.iteration}') if self.sampler.iteration == max_iter: break if self.sampler.iteration < max_iter: print(f'Chains reached stationary state after {self.sampler.iteration} iterations.') elif self.sampler.iteration == max_iter: print(f'Sampler stopped at {max_iter} (maximum). Chains did not necessarily reach a stationary state.') else: print('Unhandled exception') # Fold in low AR chains and run a little bit to update emcee self.fold(pos, spread=spread) pos, prob, state = self.sampler.run_mcmc(initial_state=pos, nsteps=100, store=True) # Final acceptance self.acceptance = self.sampler.acceptance_fraction # Estimate autocorrelation time tau = self.sampler.get_autocorr_time(tol=0, discard = nsteps).mean() # 3D chains discard = int(tau*5) thin = int(tau/4) self.chain = self.sampler.get_chain(discard=discard, thin=thin, flat=False) self.lnlike = self.sampler.get_log_prob(discard=discard, thin=thin, flat=False) # 2D chains self.flatchain = self.sampler.get_chain(discard=discard, thin=thin, flat=True) self.flatlnlike = self.sampler.get_log_prob(discard=discard, thin=thin, flat=True) self.sampler.reset() # This hopefully minimizes emcee memory leak return self.flatchain
[docs] def fold(self, pos, accept_lim = 0.2, spread=0.1): """ Fold low acceptance walkers into main distribution At the end of the burn-in, some walkers appear stuck with low acceptance fraction. These can be selected using a threshold, and folded back into the main distribution, estimated based on the median of the walkers with an acceptance fraction above the threshold. The stuck walkers are redistributed with multivariate Gaussian, with mean equal to the median of the high acceptance walkers, and a standard deviation equal to the median absolute deviation of these. Parameters ---------- pos : ndarray, optional The positions of the walkers after the burn-in phase. accept_lim: float, optional The value below which walkers will be labelled as bad and/or hence stuck. spread : float, optional Factor by which to scatter the folded walkers. Returns ------- pos : ndarray The positions of the walkers after the low accepatance walkers have been folded into high acceptance distribution. """ idx = self.sampler.acceptance_fraction < accept_lim nbad = np.shape(pos[idx, :])[0] if nbad > 0: flatchains = self.sampler.chain[~idx, :, :].reshape((-1, self.ndim)) good_med = np.median(flatchains, axis = 0) good_mad = st.median_absolute_deviation(flatchains, axis = 0) * spread pos[idx, :] = np.array([np.random.randn(self.ndim) * good_mad + good_med for n in range(nbad)]) return pos
[docs]class nested(cpnest.model.Model): """ Runs CPnest to performed nested sampling from log P(theta | D) ~ likelihood + prior Note both likelihood and prior are in natural log. Attributes ---------- names: list, strings A list of names of the model parameters bounds: list of tuples The bounds of the model parameters as [(0, 10), (-1, 1), ...] likelihood: func Function that will return the log likelihood when called as likelihood(params) prior: func Function that will return the log prior when called as prior(params) """ def __init__(self, names, bounds, likelihood, prior, path): self.names=names self.bounds=bounds self.likelihood = likelihood self.prior = prior self.path = os.path.join(*[path, 'cpnest']) if not os.path.isdir(self.path): os.mkdir(self.path)
[docs] def log_likelihood(self, param): """ Wrapper for log likelihood """ return self.likelihood(param.values)
[docs] def log_prior(self,p): """ Wrapper for log prior """ if not self.in_bounds(p): return -np.inf return self.prior(p.values)
def __call__(self, nlive=100, nthreads=1, maxmcmc=100, poolsize=100): """ Runs the nested sampling Parameters ---------- nlive : int Number of live points to be used for the sampling. This is similar to walkers in emcee. Default is 100. nthreads : int Number of parallel threads to run. More than one is currently slower since the likelihood is fairly quick to evaluate. Default is 1. maxmcmc : int Maximum number of mcmc steps taken by the sampler. Default is 100. poolsize : int Number of objects for the affine invariant sampling. Default is 100. Returns ------- df: pandas DataFrame A dataframe of the samples produced with the nested sampling. """ self.nest = cpnest.CPNest(self, verbose=0, seed=53, nthreads=nthreads, nlive=nlive, maxmcmc=maxmcmc, poolsize=poolsize, output=self.path) self.nest.run() self.samples = pd.DataFrame(self.nest.get_posterior_samples())[self.names] self.flatchain = self.samples.values self.acceptance = None return self.samples