Source code for trainingpoints

# Author: Arrykrishna Mootoovaloo
# Collaborators: Prof. Alan Heavens, Prof. Andrew Jaffe, Dr. Florent Leclercq
# Email : arrykrish@gmail.com/a.mootoovaloo17@imperial.ac.uk
# Affiliation : Imperial Centre for Inference and Cosmology
# Status : Under Development

"""
Routine to scale the Latin Hypercube samples according to the prior and evaluate the power spctrum at these points.
"""

from typing import Tuple
import timeit
import numpy as np
import pandas as pd

# our Python scripts
import priors as pr
import utils.helpers as hp
import utils.common as uc
import settings as st
import cosmology.spectrumclass as sp
import cosmology.cosmofuncs as cf

logger = uc.get_logger('trainingpoints', 'class_runs_neutrino', 'logs')


[docs]def CLASS_RUN(module: object, parameter: np.ndarray, index: int) -> Tuple[bool, dict]: ''' Run CLASS given an input parameter to generate the training points (outputs) :param: module (object) - the CLASS module :param: parameter (np.ndarray) - the input cosmology, either 5 dimensions or 6 dimensions :index: i*th cosmology from the LHS file :return: state (bool), quantities (dict) - state indicates if the run is successful, quantities contain the important quantities generated ''' # input to CLASS is a dictionary param = cf.mk_dict(st.cosmology, parameter) # for recording the parameter information (to 4 decimal places) p_info = cf.mk_dict(st.cosmology, np.around(parameter, 4)) info1 = 'Cosmology {0:4d} : {1}'.format(index, p_info) logger.info(info1) # generate all the quantities # previously: quantities = cosmo_module.pk_nonlinear_components(param) start_time = timeit.default_timer() state, quantities = cf.runTime(cf.timeOutComponents, module, param, st.timeout) elapsed = timeit.default_timer() - start_time info2 = 'Cosmology {0:4d} : Time taken is {1:.2f} seconds'.format(index, elapsed) logger.info(info2) return state, quantities
[docs]class trainingset(object): ''' Runs CLASS at the LHS points generated using the maximin procedure. If we want to sample the neutrino mass, then, please use maximin_1000_6D as input (assuming it has already been generated), otherwise please use maximin_1000_5D. ''' def __init__(self, lhs: str = 'maximin_1000_6D'): ''' Generates the training set (inputs and outputs) but we need to scale the LHS samples first before evaluating the matter power spectrum using CLASS :param: lhs (str) - the reference to the file containing the generated LHS points ''' self.lhs = lhs # load the LHS points self.inputs = pd.read_csv('lhs/' + self.lhs, index_col=0).values # number of cosmologies self.ncosmo = self.inputs.shape[0] # number of dimensions self.ndim = self.inputs.shape[1] # dimensions should agree according to the setting file assert self.ndim == len(st.cosmology), 'Dimension mis-match!'
[docs] def scale(self, save: bool = True) -> np.ndarray: ''' Scale the LHS according to the prior range. See setting file to set up the priors for the LHS samples. :param: save (bool) - if True, the scaled inputs (cosmologies) will be written to a file :return: cosmologies (np.ndarray) - the scaled inputs ''' # generate the list of distributions all_priors = pr.all_entities(st.priors) # create an empty array to store the scaled parameters cosmologies = np.zeros_like(self.inputs) for i, p in enumerate(st.cosmology): cosmologies[:, i] = all_priors[p].ppf(self.inputs[:, i]) if save: if st.neutrino: hp.store_arrays(cosmologies, 'trainingset', 'cosmologies_neutrino') else: hp.store_arrays(cosmologies, 'trainingset', 'cosmologies') return cosmologies
[docs] def targets(self, cosmologies: np.ndarray, save: bool = False) -> np.ndarray: ''' Generate the power spectrum at the specfic cosmologies :param: save (bool) - if True, the generated power spectrum will be saved in a directory. Note that the power spectrum is of shape (nk x nz), for example, 40 x 20. So the final shape will be of size (ncosmo x nk x nz). The power spectrum is flattened in this case, so we save a file of size 1000 x 800 (ncosmo = 1000, nk = 40, nz = 20). Therefore, we will have 800 separate GPs in this example. :param: cosmologies (np.ndarray) - set of cosmologies where we want to run CLASS :param: save (bool) - if True, the generated targets (training points/ power spectrum) will be saved in a directory :return: components (dict) - a list of the different quantities (growth factor, linear matter power spectrum, q function) evaluated at different cosmologies or :return: pk_non (np.ndarray) - the power spectrum evaluated at each cosmology ''' # get the class module cosmo_module = sp.powerclass() cosmo_module.configurations() if st.components: # empty list for different quantities # the growth factor growth_factor = [] # the linear matter power spectrum pk_linear = [] # the q(k,z) function q_function = [] # set of cosmologies successfully calculated class_cosmo = [] for i in range(self.ncosmo): state, quantities = CLASS_RUN(cosmo_module, cosmologies[i], i) if quantities is None: info3 = 'CLASS cannot compute Pk at this point - adding small perturbation to inputs' logger.warning(info3) ntrial = 0 while state is False: # update cosmology by a jitter term cosmologies[i] += 1E-4 * np.random.randn(self.ndim) state, quantities = CLASS_RUN(cosmo_module, cosmologies[i], i) ntrial += 1 logger.info('Number of attempts is {}'.format(ntrial)) # record all quantities growth_factor.append(quantities['gf'].flatten()) pk_linear.append(quantities['pl'].flatten()) q_function.append(quantities['qf'].flatten()) class_cosmo.append(cosmologies[i]) if save: if st.neutrino: hp.store_arrays(growth_factor, 'trainingset/components', 'growth_factor_neutrino') hp.store_arrays(pk_linear, 'trainingset/components', 'pk_linear_neutrino') hp.store_arrays(q_function, 'trainingset/components', 'q_function_neutrino') hp.store_arrays(class_cosmo, 'trainingset/components', 'cosmologies_neutrino') else: hp.store_arrays(growth_factor, 'trainingset/components', 'growth_factor') hp.store_arrays(pk_linear, 'trainingset/components', 'pk_linear') hp.store_arrays(q_function, 'trainingset/components', 'q_function') hp.store_arrays(class_cosmo, 'trainingset/components', 'cosmologies') components = {'growth_factor': growth_factor, 'pk_linear': pk_linear, 'q_function': q_function} return components else: # create an empty list to record the power spectrum pk_non = [] # generate the targets for i in range(self.ncosmo): # input to CLASS is a dictionary param = cf.mk_dict(st.cosmology, cosmologies[i]) pk = cosmo_module.pk_nonlinear(param).flatten() pk_non.append(pk) # save the power spectrum if save: if st.neutrino: hp.store_arrays(pk_non, 'trainingset/pk_nonlinear', 'pk_nl_neutrino') else: hp.store_arrays(pk_non, 'trainingset/pk_nonlinear', 'pk_nl') return pk_non
if __name__ == "__main__": training_points = trainingset(lhs='maximin_1000_5D') cosmologies = training_points.scale(save=False) outputs = training_points.targets(cosmologies, save=True)