Source code for spectrumcalc

# Author: Arrykrishna Mootoovaloo
# Collaborators: Prof. Alan Heavens, Prof. Andrew Jaffe, Dr. Florent Leclercq
# Email : arrykrish@gmail.com/a.mootoovaloo17@imperial.ac.uk
# Affiliation : Imperial Centre for Inference and Cosmology
# Status : Under Development

'''
Module to generate the matter power spectrum either from CLASS or using the emulator
'''

import numpy as np

# our scripts
import cosmology.spectrumclass as cl
import cosmology.cosmofuncs as cf
import utils.helpers as hp
import utils.gp as ug
import utils.common as uc
import settings as st


[docs]class matterspectrum(cl.powerclass): ''' Routine to sample the cosmological and nuisance parameters. We have variaous options here. We can either use the emulator to calculate the 3D matter power spectrum or we can use CLASS itself. ''' def __init__(self, emulator=True): ''' :param: emulator (bool) - if True, the emulator is used, else CLASS is used ''' self.emulator = emulator # new redshift range self.z_new = np.linspace(st.zmin, st.zmax, st.nz_new, endpoint=True) # new k range self.k_new = np.geomspace(st.k_min_h_by_Mpc, st.kmax, st.nk_new, endpoint=True) # Call some basic quantities/functions from CLASS script cl.powerclass.__init__(self) cl.powerclass.configurations(self)
[docs] def load_gps(self, directory: str = 'semigps') -> list: ''' Load all the trained Gaussian Processes. :param: directory (str) - the directory where the GPs are stored (depends on our choice: - zero mean GP - semi-GP (DEFAULT: semigps) :return: gps (list) - a list containing all the GPs ''' if st.components and not st.neutrino: # folders where we want to store the GPs folder_gf = directory + '/pknl_components' + st.d_one_plus + '/gf' folder_qf = directory + '/pknl_components' + st.d_one_plus + '/qf' folder_pl = directory + '/pknl_components' + st.d_one_plus + '/pl' self.gps_gf = [hp.load_pkl_file(folder_gf, 'gp_' + str(i)) for i in range(st.nz)] self.gps_qf = [hp.load_pkl_file(folder_qf, 'gp_' + str(i)) for i in range(st.nz * st.nk)] self.gps_pl = [hp.load_pkl_file(folder_pl, 'gp_' + str(i)) for i in range(st.nk)] if st.components and st.neutrino: raise ValueError('Neutrino - not yet implemented') if not st.components and not st.neutrino: folder = directory + '/pknl' + st.d_one_plus self.gps_pknl = [hp.load_pkl_file(folder, 'gp_' + str(i)) for i in range(st.nz * st.nk)] if not st.components and st.neutrino: raise ValueError('Neutrino - not yet implemented')
[docs] def mean_prediction(self, parameters: np.ndarray) -> np.ndarray: ''' Calculate the mean prediction from all the GPs on the grid k x z :param: parameters (np.ndarray) - a vector of the test point in parameter space :return: pred (np.ndarray) - the predictions from all the GPs ''' if st.components: # generate arguments for each component args_gf = list(zip([parameters] * st.nz, self.gps_gf)) args_qf = list(zip([parameters] * st.nz * st.nk, self.gps_qf)) args_pl = list(zip([parameters] * st.nk, self.gps_pl)) # prediction based on whether we have chosen to do the y-transformation # growth function if st.gf_args['y_trans']: pred_gf = np.array(list(map(ug.prediction, args_gf))) else: pred_gf = np.array(list(map(ug.pred_normal, args_gf))) # q function if st.qf_args['y_trans']: pred_qf = np.array(list(map(ug.prediction, args_qf))) else: pred_qf = np.array(list(map(ug.pred_normal, args_qf))) # linear matter power spectrum if st.pl_args['y_trans']: pred_pl = np.array(list(map(ug.prediction, args_pl))) else: pred_pl = np.array(list(map(ug.pred_normal, args_pl))) # return results in a dictionary pred = {'gf': pred_gf, 'qf': pred_qf, 'pl': pred_pl} return pred if not st.components: # generate arguments for predicting the non-linear matter power spectrum args_pknl = list(zip([parameters] * st.nz * st.nk, self.gps_pknl)) # check if we have chosen to transform the power spectrum if st.pknl_args['y_trans']: pred_pknl = np.array(list(map(ug.prediction, args_pknl))) else: pred_pknl = np.array(list(map(ug.pred_normal, args_pknl))) return pred_pknl
[docs] def pk_nl_pred(self, params: dict) -> np.ndarray: ''' Calculate the non linear matter power spectrum at a given test point :param: parameters (np.ndarray) - a vector of the test point in parameter space :return: pk (np.ndarray) - the non linear matter power spectrum (reshaped in nk x nz) ''' # get the parameter values # input to the emulator should be an array parameters = uc.dvalues(params) # use CLASS if we want (but this might cause CLASS to get stuck at large P_k_max) if not self.emulator and st.components: if st.timed: rec = cl.powerclass.pk_nonlinear_timed(self, params) else: rec = cl.powerclass.pk_nonlinear_components(self, params) return rec elif not self.emulator and not st.components: if st.timed: pk = cl.powerclass.pk_nonlinear_timed(self, params) else: pk = cl.powerclass.pk_nonlinear(self, params) return pk # use the emulator elif self.emulator and not st.components: # get the non-linear power spectrum directly pk = self.mean_prediction(parameters).reshape(st.nk, st.nz) return pk else: # get the 3 components of the non-linear power spectrum comp = self.mean_prediction(parameters) # product of the growth factor and the linear matter power spectrum gf_pl = np.dot(comp['pl'].reshape(st.nk, 1), comp['gf'].reshape(1, st.nz)) if st.emu_one_plus_q: # we emulate 1 + q(k,z) pk = comp['qf'].reshape(st.nk, st.nz) * gf_pl else: # we emulate q(k,z) pk = (comp['qf'].reshape(st.nk, st.nz) + 1.0) * gf_pl # we return the growth factor, the non-linear matter power spectrum and the linear matter spectrum rec = {'pk': pk, 'gf': comp['gf'], 'pl': comp['pl']} return rec
[docs] def int_pk_nl(self, params: dict, a_bary: float = 0, int_type: str = 'cubic', **kwargs) -> np.ndarray: ''' Calculate the (interpolated) 3D matter power spectrum at a test point :param: parameters (dict) : a dictionary of the test point in parameter space :param: a_bary (float) : the baryon feedback parameter (DEFAULT = 0) :param: int_type (str) : type of interpolation (linear, cubic, quintic). Default is cubic :return: spectrum (np.ndarray) : the interpolated 3D matter power spectrum ''' # calculate the mean prediction from the GPs or CLASS first rec = self.pk_nl_pred(params) if 'z' in kwargs and 'k' in kwargs: # get the wavenumber k = kwargs.pop('k') # get the redshift z = kwargs.pop('z') else: # stick with the one provided in the setting file k = self.k_new # and the redshift on a fine grid z = self.z_new # baryon feedback bf = cf.bar_fed(k / params['h'], z, a_bary) # grid from the given k and z grid = [k, z] if (not self.emulator and st.components) or (self.emulator and st.components): inputs = [self.k_range, self.redshifts, rec['pk'].flatten(), int_type] # the interpolated power spectrum spectrum = uc.two_dims_interpolate(inputs, grid).T spectrum = bf * spectrum # we can also interpolate the other quantities # growth factor inputs_gf = [self.redshifts, rec['gf'], z] int_gf = uc.interpolate(inputs_gf) # linear matter power spectrum inputs_pl = [self.k_range, rec['pl'], k] int_pl = uc.interpolate(inputs_pl) return int_gf, spectrum, int_pl else: inputs = [self.k_range, self.redshifts, rec.flatten(), int_type] # the interpolated power spectrum spectrum = uc.two_dims_interpolate(inputs, grid).T spectrum = bf * spectrum return spectrum
[docs] def int_grad_pk_nl( self, params: dict, order: int = 1, int_type: str = 'cubic', eps: list = [1E-3], **kwargs) -> dict: ''' Calculates the gradient of the power spectrum at a point in parameter space :param: params (dict) : a point within the prior box :param: eps (float) : epsilon - using central finite difference method to calculate gradient :param: int_type (str) : type of interpolation (linear, cubic, quintic). Default is cubic :return: grad (dict) : a dictionary containing the gradient of each parameter ''' # NOTE : Need to account for interpolation in the case of second derivatives # Different format for CLASS (n x n x 800)and GP (to check) # get the parameter values parameters = uc.dvalues(params) # the new grid grid = [self.k_new, self.z_new] # number of parameters nparams = len(parameters) if order == 1: if self.emulator: gradient = self.gp_gradient(parameters, order) else: gradient = cl.powerclass.derivatives(self, params, order, eps) # create an empty dictionary to record interpolated gradient grad = {} for p in range(nparams): # inputs to the interpolator inputs = [self.k_range, self.redshifts, gradient[:, p], int_type] # the interpolated power spectrum grad[str(p)] = uc.two_dims_interpolate(inputs, grid) return grad else: if self.emulator: gradient, hessian = self.gp_gradient(parameters, order) else: gradient, hessian = cl.powerclass.derivatives(self, params, order, eps) grad = {} hess = {} for p in range(nparams): # inputs to the interpolator inputs = [self.k_range, self.redshifts, gradient[:, p], int_type] # the interpolated power spectrum grad[str(p)] = uc.two_dims_interpolate(inputs, grid) for q in range(nparams): inputs = [self.k_range, self.redshifts, hessian[p, q, :], int_type] hess[str(p) + str(q)] = uc.two_dims_interpolate(inputs, grid) return grad, hess
[docs] def gp_gradient(self, testpoint: np.ndarray, order: int = 1) -> np.ndarray: ''' Calculate the gradient of the power spectrum :param: testpoint (np.ndarray) : a testpoint :param: order (int) : 1 or 2 (1 refers to first derivative and 2 refers to second) :return: the derivatives of the power spectrum (of size (nk x nz) x ndim), for example 800 x 7 ''' # number of parameters nparams = len(testpoint) # number of GPs if we are emulating power spectrum directly ngps = st.nk * st.nz if not st.components and order == 1: arguments = list(zip([testpoint] * ngps, self.gps_pknl, [order] * ngps)) results = np.array(list(map(ug.gradient, arguments))) elif not st.components and order == 2: arguments = list(zip([testpoint] * ngps, self.gps_pknl, [order] * ngps)) dummy = list(map(ug.gradient, arguments)) # the gradient of shape: ngps x nparams grad = np.array([dummy[i][0] for i in range(ngps)]) # the hessian of shape: nparams x nparams x ngps hess = np.array([dummy[i][1] for i in range(ngps)]) results = (grad, hess.T) elif st.components and order == 1: # growth factor args_gf = list(zip([testpoint] * st.nz, self.gps_gf, [order] * st.nz)) # q function args_qf = list(zip([testpoint] * st.nk * st.nz, self.gps_qf, [order] * st.nk * st.nz)) # linear power spectrum args_pl = list(zip([testpoint] * st.nk, self.gps_pl, [order] * st.nk)) # calculate the gradients with the GPs grad_gf = np.array(list(map(ug.gradient, args_gf))) grad_qf = np.array(list(map(ug.gradient, args_qf))) grad_pl = np.array(list(map(ug.gradient, args_pl))) # calculate the gradient (chain rule) # we need the mean prediction from the GPs rec = self.mean_prediction(testpoint) # reshaped arrays for gradient calculation g_gf_r = grad_gf.reshape(1, st.nz, nparams) g_qf_r = grad_qf.reshape(st.nk, st.nz, nparams) g_pl_r = grad_pl.reshape(st.nk, 1, nparams) # reshape the mean prediction gf_r = rec['gf'].reshape(1, st.nz, 1) qf_r = rec['qf'].reshape(st.nk, st.nz, 1) pl_r = rec['pl'].reshape(st.nk, 1, 1) # calculate the 3 terms for the gradients if st.emu_one_plus_q: t1 = g_gf_r * qf_r * pl_r t3 = gf_r * qf_r * g_pl_r else: t1 = g_gf_r * (1. + qf_r) * pl_r t3 = gf_r * (1. + qf_r) * g_pl_r t2 = gf_r * g_qf_r * pl_r # gradient is sum of the 3 terms results = t1 + t2 + t3 # reshape the gradient results = results.reshape(st.nk * st.nz, nparams) elif st.components and order == 2: # growth factor args_gf = list(zip([testpoint] * st.nz, self.gps_gf, [order] * st.nz)) # q function args_qf = list(zip([testpoint] * st.nk * st.nz, self.gps_qf, [order] * st.nk * st.nz)) # linear power spectrum args_pl = list(zip([testpoint] * st.nk, self.gps_pl, [order] * st.nk)) # get the first and second derivatives for # growth factor # q function # linear matter power spectrum der_gf = list(map(ug.gradient, args_gf)) der_qf = list(map(ug.gradient, args_qf)) der_pl = list(map(ug.gradient, args_pl)) # ------------------------------------------------------------------------------------- # calculate the gradient (chain rule) grad_gf = np.array([der_gf[i][0] for i in range(st.nz)]) grad_qf = np.array([der_qf[i][0] for i in range(ngps)]) grad_pl = np.array([der_pl[i][0] for i in range(st.nk)]) # we need the mean prediction from the GPs rec = self.mean_prediction(testpoint) # reshape arrays for gradient calculation g_gf_r = grad_gf.reshape(1, st.nz, nparams) g_qf_r = grad_qf.reshape(st.nk, st.nz, nparams) g_pl_r = grad_pl.reshape(st.nk, 1, nparams) # reshape the mean prediction gf_r = rec['gf'].reshape(1, st.nz, 1) qf_r = rec['qf'].reshape(st.nk, st.nz, 1) pl_r = rec['pl'].reshape(st.nk, 1, 1) # calculate the 3 terms for the gradients if st.emu_one_plus_q: t1 = g_gf_r * qf_r * pl_r t3 = gf_r * qf_r * g_pl_r else: t1 = g_gf_r * (1. + qf_r) * pl_r t3 = gf_r * (1. + qf_r) * g_pl_r t2 = gf_r * g_qf_r * pl_r # gradient is sum of the 3 terms grad = t1 + t2 + t3 # reshape the gradient grad = grad.reshape(st.nk * st.nz, nparams) # ------------------------------------------------------------------------------------- # calculate the hessian (chain rule) hess_gf = np.array([der_gf[i][1] for i in range(st.nz)]) hess_qf = np.array([der_qf[i][1] for i in range(ngps)]) hess_pl = np.array([der_pl[i][1] for i in range(st.nk)]) # reshape arrays for second derivatives calculation h_gf_r = hess_gf.reshape(1, st.nz, nparams, nparams) h_qf_r = hess_qf.reshape(st.nk, st.nz, nparams, nparams) h_pl_r = hess_pl.reshape(st.nk, 1, nparams, nparams) # reshape arrays for the mean prediction gf_r_ = rec['gf'].reshape(1, st.nz, 1, 1) qf_r_ = rec['qf'].reshape(st.nk, st.nz, 1, 1) pl_r_ = rec['pl'].reshape(st.nk, 1, 1, 1) # reshape arrays for first derivatives g_gf_r_ = grad_gf.reshape(1, st.nz, nparams, 1) g_qf_r_ = grad_qf.reshape(st.nk, st.nz, 1, nparams) g_pl_r_ = grad_pl.reshape(st.nk, 1, 1, nparams) # for the hessian parts if st.emu_one_plus_q: t1 = h_gf_r * qf_r_ * pl_r_ t3 = gf_r_ * qf_r_ * h_pl_r else: t1 = h_gf_r * (1.0 + qf_r_) * pl_r_ t3 = gf_r_ * (1.0 + qf_r_) * h_pl_r t2 = gf_r_ * h_qf_r * pl_r_ # need to make sure we get nparams x nparams for every product t4 = 2.0 * g_gf_r_ * g_qf_r_ * pl_r_ if st.emu_one_plus_q: t5 = 2.0 * g_gf_r_ * qf_r_ * g_pl_r_ else: t5 = 2.0 * g_gf_r_ * (1.0 + qf_r_) * g_pl_r_ t6 = 2.0 * gf_r_ * grad_qf.reshape(st.nk, st.nz, nparams, 1) * g_pl_r_ # hessian hess = t1 + t2 + t3 + t4 + t5 + t6 hess = hess.reshape(ngps, nparams, nparams) results = (grad, hess.T) return results