Source code for cosmicfishpie.fishermatrix.cosmicfish

# -*- coding: utf-8 -*-
"""MAIN

This is the main engine of CosmicFish.

"""

import os
import sys
from copy import copy, deepcopy
from time import time

import numpy as np
import pandas as pd
from scipy.integrate import simpson

from cosmicfishpie.analysis import fisher_matrix as fm
from cosmicfishpie.CMBsurvey import CMB_cov
from cosmicfishpie.configs import config as cfg
from cosmicfishpie.LSSsurvey import photo_cov, photo_obs, spectro_cov, spectro_obs
from cosmicfishpie.utilities.utils import filesystem as ufs
from cosmicfishpie.utilities.utils import numerics as unu
from cosmicfishpie.utilities.utils import printing as upt
from cosmicfishpie.version import VERSION


[docs] def ray_session(num_cpus=4, restart=True, shutdown=False): import ray if shutdown: if ray.is_initialized(): ray.shutdown() return None elif restart: if ray.is_initialized(): print("ray is shutting down...") ray.shutdown() if not ray.is_initialized(): ray.init(num_cpus=num_cpus) return None
[docs] class FisherMatrix: # update version at every release cf_version = f"CosmicFish_v{VERSION}" def __init__( self, options=dict(), specifications=dict(), observables=None, freepars=None, extfiles=None, fiducialpars=None, photobiaspars=None, photopars=None, spectrononlinearpars=None, spectrobiaspars=None, IApars=None, IMbiaspars=None, PShotpars=None, surveyName="Euclid", cosmoModel="w0waCDM", latexnames=None, parallel=False, parallel_cpus=4, ): """This is the main class of cosmicfishpie that is used to do a Fisher matrix forecast for current and upcoming experiments in cosmology. If you want to compute the Fisher matrix you can pass all specifications and options here. Parameters ---------- options : dict, optional A dictionary that contains the global options for the calculation of the fishermatrix. A list of all possible keys are found in the documentation of cosmicfishpie.configs.config specifications : dict, optional A dictionary containing the survey specifications. Defaults to the specifications in the `.yaml` of the survey specifications observables : list, optional A list of strings for the different observables freepars : dict, optional A dictionary containing all cosmological parameters to be varied and their corresponding rel. step sizes extfiles : dict, optional A dictionary containing the path to the external files as well as how all the names of the files in the folder correspond to the cosmological quantities, units etc. fiducialpars : dict, optional A dictionary containing the fiducial cosmological parameters photobiaspars : dict, optional A dictionary containing the specifications for the galaxy biases of the photometric probe photopars : dict, optional A dictionary containing specifications for the window function's galaxy distribution spectrononlinearpars : dict, optional A dictionary containing the values of the non linear modeling parameters of the spectroscopic probe spectrobiaspars : dict, optional A dictionary containing the specifications for the galaxy biases of the spectroscopic probe IApars : dict, optional A dictionary containing the specifications for the galaxy biases of the spectroscopic intensity mapping probe surveyName : str, optional String of the name of the survey for which the forecast is done. Defaults to Euclid with optimistic specifications cosmoModel : str, optional A string of the name of the cosmological model used in the calculations. Defaults to flat "w0waCDM" cosmology latexnames : dict, optional A dictionary that contains the Latex names of the cosmological parameters parallel : bool, optional If True will compute the Fisher matrix using ray parallelization. Defaults to False parallel_cpus : int, optional Number of CPUs that should be used when computing the results using ray parallelization. Attributes ---------- fiducialcosmopars : dict A dictionary containing all fiducial values for the cosmological parameters photopars : dict A dictionary containing specifications for the window function's galaxy distribution of the photometric probe IApars : dict A dictionary containing the specifications for the intrinsic alignment effect in cosmic shear of the photometric probe biaspars : dict a dictionary containing the specifications for the galaxy biases of the photometric probe Spectrobiaspars : dict A dictionary containing the specifications for the galaxy biases of the spectroscopic probe Spectrononlinpars : dict A dictionary containing the values of the non linear modeling parameters entering FOG and the dewiggling weight per bin for the spectroscopic probe IMbiaspars : dict A dictionary containing the specifications for the line intensity biases of the spectroscopic probe PShotpars : dict A dictionary containing the values of the additional shot noise per bin dictionary containing the values of the additional shot noise per bin for the spectroscopic probe observables : list A list of strings for the different observables freeparams : dict A dictionary containing all names and the corresponding rel. step size for all parameters allparams_fidus : dict A dictionary that contains all fiducial cosmological and nuisance parameters needed to compute the observable of all probes. """ if options["feedback"] > 0: print("****************************************************************") print(" _____ _ _____ __ ") print(" / ___/__ ___ __ _ (_)___/ __(_)__ / / ") print(" / /__/ _ \\(_-</ ' \\/ / __/ _// (_-</ _ \\ ") print(" \\___/\\___/___/_/_/_/_/\\__/_/ /_/___/_//_/ ") print("") print("****************************************************************") print(" This is the new Python version of the CosmicFish code.") print("****************************************************************") sys.stdout.flush() cfg.init( options=options, specifications=specifications, observables=observables, freepars=freepars, extfiles=extfiles, fiducialpars=fiducialpars, photobiaspars=photobiaspars, photopars=photopars, IApars=IApars, spectrononlinearpars=spectrononlinearpars, spectrobiaspars=spectrobiaspars, IMbiaspars=IMbiaspars, PShotpars=PShotpars, surveyName=surveyName, cosmoModel=cosmoModel, latexnames=latexnames, ) self.settings = deepcopy(cfg.settings) self.specs = deepcopy(cfg.specs) self.fiducialcosmopars = deepcopy(cfg.fiducialparams) self.fiducialparams = self.fiducialcosmopars ## for compatibility self.fiducialcosmo = copy(cfg.fiducialcosmo) self.photopars = deepcopy(cfg.photoparams) self.photobiaspars = deepcopy(cfg.Photobiasparams) self.IApars = deepcopy(cfg.IAparams) self.Spectrobiaspars = deepcopy(cfg.Spectrobiasparams) self.Spectrobiasparams = self.Spectrobiaspars ## for compatibility self.Spectrononlinpars = deepcopy(cfg.Spectrononlinearparams) self.Spectrononlinearparams = self.Spectrononlinpars ## for compatibility self.IMbiaspars = deepcopy(cfg.IMbiasparams) self.IMbiasparams = self.IMbiaspars ## for compatibility self.PShotpars = deepcopy(cfg.PShotparams) self.PShotparams = self.PShotpars ## for compatibility self.observables = deepcopy(cfg.obs) self.obs = self.observables ## for compatibility self.input_type = deepcopy(cfg.input_type) ## for compatibility self.freeparams = deepcopy(cfg.freeparams) self.allparams_fidus = { **self.fiducialcosmopars, **self.photopars, **self.photobiaspars, **self.IApars, **self.Spectrobiaspars, **self.Spectrononlinpars, **self.IMbiaspars, **self.PShotpars, } self.parallel = parallel self.feed_lvl = self.settings["feedback"] self.feed_lvl = cfg.settings["feedback"] allpars = {} allpars.update(self.fiducialcosmopars) allpars.update(self.photobiaspars) allpars.update(self.photopars) allpars.update(self.IApars) allpars.update(self.Spectrobiaspars) allpars.update(self.Spectrononlinpars) allpars.update(self.IMbiaspars) allpars.update(self.PShotpars) self.allparams = allpars # self.parallel=True self.recap_options() if self.parallel: ray_session(num_cpus=parallel_cpus, restart=True, shutdown=False)
[docs] def compute(self, max_z_bins=None): """This function will compute the Fisher information matrix and export using the specified settings. Arguments --------- max_z_bins : int For the spectroscopic probe what the highest redshift bin that should be considered in the computation of the Fisher Returns ------- cosmicfishpie.analysis.fisher_matrix A instance of fisher_matrix containing the calculated Fisher matrix as well as parameter names, settings, etc """ # This switches between the possible Fisher matrices tfishstart = time() if "GCph" in self.observables or "WL" in self.observables: upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="----> Computing photo Fisher matrix", instance=self, ) self.photo_obs_fid = photo_obs.ComputeCls( self.fiducialcosmopars, self.photopars, self.IApars, self.photobiaspars, print_info_specs=True, ) self.photo_LSS = photo_cov.PhotoCov( self.fiducialcosmopars, self.photopars, self.IApars, self.photobiaspars, fiducial_Cls=self.photo_obs_fid, ) noisy_cls, covmat = self.photo_LSS.compute_covmat() self.photo_derivs = self.photo_LSS.compute_derivs() photoFM = self.photo_LSS_fishermatrix_einsum( noisy_cls=noisy_cls, covmat=covmat, derivs=self.photo_derivs ) finalFisher = deepcopy(photoFM) elif "GCsp" in self.observables or "IM" in self.observables: upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="----> Computing Pk-spectro Fisher matrix", instance=self, ) self.set_pk_settings() if "IM" in self.observables and "GCsp" in self.observables: self.obs_spectrum = ["I", "g"] elif "IM" in self.observables: # compute in the case of IM only self.obs_spectrum = ["I", "I"] elif "GCsp" in self.observables: self.obs_spectrum = ["g", "g"] # self.obs_spectrum = ["I", "g"] self.pk_obs_fid = spectro_obs.ComputeGalSpectro( cosmopars=self.fiducialcosmopars, fiducial_cosmopars=self.fiducialcosmopars, spectrobiaspars=self.Spectrobiaspars, spectrononlinearpars=self.Spectrononlinpars, IMbiaspars=self.IMbiaspars, PShotpars=self.PShotpars, # bias_samples=self.obs_spectrum, configuration=self, ) self.pk_cov = spectro_cov.SpectroCov( self.fiducialcosmopars, fiducial_specobs=self.pk_obs_fid, bias_samples=self.obs_spectrum, configuration=self, ) self.zmids = self.pk_cov.inter_z_bin_mids self.zmids = self.pk_cov.inter_z_bin_mids nbins = len(self.zmids) if max_z_bins is not None and isinstance(max_z_bins, int): cutnbins = max_z_bins self.eliminate_zbinned_freepars(cutnbins) self.zmids = self.zmids[0:cutnbins] nbins = len(self.zmids) tini = time() self.derivs_dict = dict() # if self.parallel == False: k_mesh = self.Pk_kmesh mu_mesh = self.Pk_mumesh self.veff_arr = [] tvi = time() self.fish_z_arr = np.array(self.zmids) for zi in self.zmids: if "IM" in self.observables and "GCsp" in self.observables: self.veff_arr.append(self.pk_cov.veff_Ig(zi, k_mesh, mu_mesh)) elif "IM" in self.observables: # compute in the case of IM only self.veff_arr.append(self.pk_cov.veff_II(zi, k_mesh, mu_mesh)) elif "GCsp" in self.observables: self.veff_arr.append(self.pk_cov.veff(zi, k_mesh, mu_mesh)) tvf = time() upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="+++ Volumes computation computed in: ", time_ini=tvi, time_fin=tvf, instance=self, ) self.derivs_dict = self.compute_binned_derivs(self.fish_z_arr) tend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="+++ Derivatives computation done in: ", time_ini=tini, time_fin=tend, instance=self, ) self.allparams = self.pk_deriv.fiducial_allpars pk_Fish = self.pk_LSS_Fisher(nbins=nbins) pk_Fish = self.pk_LSS_Fisher(nbins=nbins) specFM = np.sum(pk_Fish, axis=0) finalFisher = deepcopy(specFM) elif ( "CMB_T" in self.observables or "CMB_E" in self.observables or "CMB_B" in self.observables ): upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="----> Computing CMB Fisher matrix", instance=self, ) CMB = CMB_cov.CMBCov(self.fiducialcosmopars, print_info_specs=True) noisy_cls, covmat = CMB.compute_covmat() derivs = CMB.compute_derivs() CMB_FM = self.CMB_fishermatrix(noisy_cls=noisy_cls, covmat=covmat, derivs=derivs) finalFisher = deepcopy(CMB_FM) # return CMB_FM else: raise AttributeError("Observables list not defined properly") tfishend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="Fisher matrix calculation finished in ", time_ini=tfishstart, time_fin=tfishend, ) fishMat_object = self.export_fisher(finalFisher, totaltime=tfishend - tfishstart) if self.parallel: ray_session(restart=False, shutdown=True) return fishMat_object
[docs] def set_pk_settings(self): """Function to define grids of the internal wavenumber and observation angle Note ---- This is not a setter function. Rather the internal grids are constructed from the passed options and then they are stored in the attributes `Pk_kgrid` and `Pk_mugrid` as well as the combined meshes `Pk_kmesh` and `Pk_mumesh`. """ # kmin and kmax GCsp values are always indicated in h/Mpc k_units_factor = self.fiducialcosmopars["h"] self.kmin_fish = self.specs["kmin_GCsp"] * k_units_factor self.kmax_fish = self.specs["kmax_GCsp"] * k_units_factor self.Pk_ksamp = self.settings["spectro_Pk_k_samples"] self.Pk_musamp = self.settings["spectro_Pk_mu_samples"] self.Pk_kgrid = np.linspace(self.kmin_fish, self.kmax_fish, self.Pk_ksamp) self.Pk_mugrid = np.linspace(0.0, 1.0, self.Pk_musamp) self.Pk_kmesh, self.Pk_mumesh = np.meshgrid(self.Pk_kgrid, self.Pk_mugrid)
[docs] def compute_binned_derivs(self, z_arr): """This computes the derivatives of the observed power spectrum for all redshift bins Parameters ---------- z_arr : list, numpy.ndarray List of the redshift bin centers of the probe Returns ------- dict A dictionary containing lists of derivatives of the observed power spectrum for each redshift bin and varied parameter """ self.pk_deriv = spectro_cov.SpectroDerivs( z_arr, self.Pk_kmesh, self.Pk_mumesh, fiducial_spectro_obj=self.pk_obs_fid, bias_samples=self.obs_spectrum, configuration=self, ) allpars_deriv = self.pk_deriv.compute_derivs(freeparams=self.freeparams) return allpars_deriv
[docs] def pk_LSS_Fisher(self, nbins): """This computes the Fisher matrix of a spectroscopic probe for all redshift bins Parameters ---------- nbins : int number of redshift bins the spectroscopic probe has Returns ------- numpy.ndarray A list of Fisher matrices for each redshift bin """ self.parslist = list(self.freeparams.keys()) fisherMatrix = np.zeros((nbins, len(self.parslist), len(self.parslist))) upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="Computing Pk Fisher matrix, shape: " + str(fisherMatrix.shape), instance=self, ) upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="k_min = {:.3f}Mpc^-1, k_max = {:.3f}Mpc^-1,".format( self.kmin_fish, self.kmax_fish ), instance=self, ) upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="Fisher Matrix shape: {}".format(fisherMatrix.shape), instance=self, ) fisherMatrix = np.array([self.fisher_per_bin(ibin) for ibin in range(nbins)]) return fisherMatrix
[docs] def fisher_per_bin(self, ibin): """This helper function contains the Fisher matrix of a spectroscopic probe for a single redshift bin Arguments --------- ibin : int index of the redshift bin Returns ------- numpy.ndarray Fisher matrix of a spectroscopic probe for a single redshift bin""" fish_bi = np.zeros((len(self.parslist), len(self.parslist))) upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="Fisher Spectro bin number: {:d}".format(ibin + 1), instance=self, ) for i, pi in enumerate(self.parslist): fish_bi[i, i] = self.fisher_calculation(ibin, pi, pi) for j in range(i): pj = self.parslist[j] fish_bi[i, j] = self.fisher_calculation(ibin, pi, pj) fish_bi[j, i] = fish_bi[i, j] return fish_bi
[docs] def fisher_calculation(self, zi, p_i, p_j): """This helper function calculates a singular element of the fisher matrix Parameters ---------- zi : int index of the redshift bin p_i : str name of the first parameter p_j : str name of the second parameter Returns ------- float The Fisher matrix element computed from the derivative with respect to two parameters and a given redshift bin for a spectroscopic probe """ k_mesh = self.Pk_kmesh mu_mesh = self.Pk_mumesh integrand = self.fish_integrand(zi, k_mesh, mu_mesh, p_i, p_j) fish_element = self.fisher_integral(integrand) return fish_element
[docs] def fisher_integral(self, integrand): """Function to calculate the integral that enters the computation of the Fisher elements. Uses a simpson integration on the mu and k grid Arguments --------- integrand : numpy.ndarray Integrant that enters the Fisher matrix element computation. The shape needs to match ( shape of the kmesh ) x ( shape of mumesh) Returns ------- float The Fisher matrix element computed from the derivative with respect to two parameters at a given redshift bin for a spectroscopic probe """ k_mesh = self.Pk_kmesh mu_mesh = self.Pk_mumesh mu_integral = simpson(integrand, x=mu_mesh[:, 0], axis=0) kmu_integral = simpson(mu_integral, x=k_mesh[0, :]) return kmu_integral
[docs] def fish_integrand(self, zi, k, mu, pi, pj): """Helper function to calculate the integrand that enters the computation of the Fisher elements. Arguments --------- zi : int index of the redshift bin k : numpy.ndarray grid of wavenumbers used in the calculation. Has to be the internal kmesh mu : numpy.ndarray grid of observation angles used in the calculation. Has to be the internal mumesh pi : str name of the first parameter pj : str name of the second parameter Returns ------- numpy.ndarray Integrant that enters the Fisher matrix element computation. The shape matches ( shape of the kmesh ) x ( shape of mumesh) """ volsurv = self.pk_cov.volume_survey(zi) # pref = 2/(8*(np.pi**2)) pref = 2 # prefactor due to \mu symmetry # zmid = self.pk_cov.global_z_bin_mids[zi] dPdpi = self.derivs_dict[pi][zi] dPdpj = self.derivs_dict[pj][zi] intg = k**2 * pref * volsurv * self.veff_arr[zi] * dPdpi * dPdpj return intg
[docs] def photo_LSS_fishermatrix(self, noisy_cls=None, covmat=None, derivs=None, lss_obj=None): """Compute the Fisher matrix of a photometric probe. Arguments --------- noisy_cls : dict, optional a dictionary with all the auto and cross correlation fiducial angular power spectra with noise added to it. Will recompute from lss_obj when not passed covmat : list, optional A list of pandas.DataFrame objects that store the covariance matrix for each multipole. Will recompute from lss_obj when not passed derivs : dict, optional A dictionary containing the derivatives of the angular power spectrum at the fiducial for all free parameters. Will recompute from lss_obj when not passed lss_obj : cosmicfishpie.LSSsurvey.photo_cov.PhotoCov, optional This object is used to compute the ingredients of the Fisher matrix if they were not passed Returns ------- numpy.ndarray The full fisher matrix for the photometric probe """ # this numbers reduce to old z_bins_ph in config if specs not found self.z_bins_WL = self.specs["z_bins_WL"] self.num_z_bins_WL = len(self.specs["z_bins_WL"]) - 1 self.binrange_WL = self.specs["binrange_WL"] self.z_bins_GCph = self.specs["z_bins_GCph"] self.num_z_bins_GCph = len(self.specs["z_bins_GCph"]) - 1 self.binrange_GCph = self.specs["binrange_GCph"] if covmat is None and lss_obj is not None: noisy_cls, covmat = lss_obj.compute_covmat() if derivs is None and lss_obj is not None: derivs = lss_obj.compute_derivs() tini = time() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Computing Fisher matrix", instance=self ) # compute fisher matrix lvec = noisy_cls["ells"] lvec_ave = unu.moving_average(lvec, 2) # computing center of bins delta_ell = np.diff(lvec) # compute delta_ell between bin edges FisherV = np.zeros((len(lvec_ave), len(self.freeparams), len(self.freeparams))) cols = [] for o in self.observables: if o == "GCph": for ind in range(self.num_z_bins_GCph): cols.append(o + " " + str(ind + 1)) elif o == "WL": for ind in range(self.num_z_bins_WL): cols.append(o + " " + str(ind + 1)) covarr = np.zeros(((len(lvec_ave)), len(cols), len(cols))) der1 = np.zeros((len(cols), len(cols))) der2 = np.zeros((len(cols), len(cols))) for i_ell in range(len(lvec_ave)): covarr[i_ell, :, :] = covmat[i_ell] covdf = covarr[i_ell, :, :] invMat = np.linalg.pinv(covdf) if i_ell == 1: tparstart = time() for ind1, par1 in enumerate(self.freeparams): for ind2, par2 in enumerate(self.freeparams): if ind1 > ind2: FisherV[i_ell, ind1, ind2] = FisherV[i_ell, ind2, ind1] continue else: for ia, aa in enumerate(cols): for ib, bb in enumerate(cols): der1[ia, ib] = derivs[par1][aa + "x" + bb][i_ell] der2[ia, ib] = derivs[par2][aa + "x" + bb][i_ell] mat1 = der1.dot(invMat) mat2 = invMat.dot(mat1) mat3 = der2.dot(mat2) trace = np.trace(mat3) FisherV[i_ell, ind1, ind2] = ( trace * (lvec_ave[i_ell] + 0.5) * delta_ell[i_ell] ) if i_ell == 1: tparend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="FisherV entry ({}, {}) for ell index {} done in ".format( par1, par2, i_ell ), time_ini=tparstart, time_fin=tparend, ) FisherVV = np.sum(FisherV, axis=0) tfin = time() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Finished calculation of Fisher Matrix for {} in: ".format(self.observables), time_ini=tini, time_fin=tfin, ) return FisherVV
[docs] def photo_LSS_fishermatrix_einsum(self, noisy_cls=None, covmat=None, derivs=None, lss_obj=None): """Compute the Fisher matrix of a photometric probe. Arguments --------- noisy_cls : dict, optional a dictionary with all the auto and cross correlation fiducial angular power spectra with noise added to it. Will recompute from lss_obj when not passed covmat : list, optional A list of pandas.DataFrame objects that store the covariance matrix for each multipole. Will recompute from lss_obj when not passed derivs : dict, optional A dictionary containing the derivatives of the angular power spectrum at the fiducial for all free parameters. Will recompute from lss_obj when not passed lss_obj : cosmicfishpie.LSSsurvey.photo_cov.PhotoCov, optional This object is used to compute the ingredients of the Fisher matrix if they were not passed Returns ------- numpy.ndarray The full fisher matrix for the photometric probe """ # Get bin information for both WL and GCph self.z_bins_WL = self.specs["z_bins_WL"] self.num_z_bins_WL = len(self.specs["z_bins_WL"]) - 1 self.binrange_WL = self.specs["binrange_WL"] self.z_bins_GCph = self.specs["z_bins_GCph"] self.num_z_bins_GCph = len(self.specs["z_bins_GCph"]) - 1 self.binrange_GCph = self.specs["binrange_GCph"] if covmat is None and lss_obj is not None: noisy_cls, covmat = lss_obj.compute_covmat() if derivs is None and lss_obj is not None: derivs = lss_obj.compute_derivs() tini = time() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Computing Fisher matrix", instance=self ) # compute fisher matrix lvec = noisy_cls["ells"] lvec_ave = unu.moving_average(lvec, 2) # computing center of bins delta_ell = np.diff(lvec) # compute delta_ell between bin edges # Create columns list based on observables and their respective bin numbers cols = [] for o in self.observables: if o == "GCph": cols.extend([f"{o} {ind + 1}" for ind in range(self.num_z_bins_GCph)]) elif o == "WL": cols.extend([f"{o} {ind + 1}" for ind in range(self.num_z_bins_WL)]) # Precompute covariance matrices and their inverses covarr = np.array(covmat) inv_covarr = np.linalg.pinv(covarr) # Precompute derivatives der_array = np.array( [[derivs[par][f"{a}x{b}"] for a in cols for b in cols] for par in self.freeparams] ) der_array = der_array.reshape(len(self.freeparams), len(cols), len(cols), -1) # Precompute the factor for FisherV factor = (lvec_ave + 0.5) * delta_ell # Compute FisherV using vectorized operations FisherV = np.zeros((len(lvec_ave), len(self.freeparams), len(self.freeparams))) for l_ind, ell in enumerate(lvec_ave): der = der_array[:, :, :, l_ind] # Shape: (n_freeparams, n_cols, n_cols) # mat1: multiply der by inv_covarr # 'p' is freeparam index, 'i' and 'j' are column indices mat1 = np.einsum("pij,jk->pik", der, inv_covarr[l_ind]) # mat2: multiply inv_covarr by mat1 # 'p' is still freeparam index mat2 = np.einsum("ij,pjk->pik", inv_covarr[l_ind], mat1) # mat3: final matrix multiplication # 'p' and 'q' are indices for the two freeparam dimensions mat3 = np.einsum("pij,qji->pq", der, mat2) FisherV[l_ind] = mat3 * factor[l_ind] if l_ind == 1: upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=f"FisherV entries for ell={ell} at index {l_ind} done", time_ini=tini, time_fin=time(), ) # Sum up FisherV along the ell dimension FisherVV = np.sum(FisherV, axis=0) tfin = time() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text=f"Finished calculation of Fisher Matrix for {self.observables} in: ", time_ini=tini, time_fin=tfin, ) return FisherVV
[docs] def CMB_fishermatrix(self, noisy_cls=None, covmat=None, derivs=None, cmb_obj=None): if covmat is None and cmb_obj is not None: noisy_cls, covmat = cmb_obj.compute_covmat() if derivs is None and cmb_obj is not None: derivs = cmb_obj.compute_derivs() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Computing Fisher matrix", instance=self ) # compute fisher matrix lvec = noisy_cls["ells"] freeparams = list(self.freeparams) observables = list(self.observables) npar = len(freeparams) nobs = len(observables) nell = len(lvec) tfishstart = time() # Build derivative tensor D[e, p, i, j], with: # e: ell index, p: parameter index, i/j: observable indices der = np.zeros((nell, npar, nobs, nobs), dtype=float) for p_ind, par in enumerate(freeparams): for i_ind, obs1 in enumerate(observables): for j_ind, obs2 in enumerate(observables): key = obs1 + "x" + obs2 der[:, p_ind, i_ind, j_ind] = np.asarray(derivs[par][key], dtype=float) # Covariance tensor C[e, i, j] and inverse per ell. covarr = np.zeros((nell, nobs, nobs), dtype=float) for e_ind in range(nell): cov_e = covmat[e_ind] if hasattr(cov_e, "to_numpy"): covarr[e_ind] = cov_e.to_numpy(dtype=float) else: covarr[e_ind] = np.asarray(cov_e, dtype=float) inv_covarr = np.linalg.pinv(covarr) # Per-ell Fisher contribution: # F_e[p,q] = Tr(D_q C^-1 D_p C^-1) # = D_q[i,j] C^-1[j,k] D_p[k,l] C^-1[l,i] fisher_per_ell = np.einsum("eqij,ejk,epkl,eli->epq", der, inv_covarr, der, inv_covarr) ell_weight = np.asarray(lvec, dtype=float) + 0.5 Fisher = np.einsum("e,epq->pq", ell_weight, fisher_per_ell) Fisher = 0.5 * (Fisher + Fisher.T) tfishend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Fisher matrix computed", instance=self ) upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="Fisher matrix calculation finished in ", time_ini=tfishstart, time_fin=tfishend, ) upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Finished calculation of Fisher Matrix for {}".format(self.observables), ) return Fisher
[docs] def eliminate_zbinned_freepars(self, nmax): """This helper function is there to find any parameters passed that correspond to no zbin. It will then remove them from the varied parameters list Arguments --------- nmax : int Index of the highest redshift bin that should be considered """ freepars = deepcopy(self.freeparams) for key in freepars.keys(): if "_" in key: # convention for z-binned pars: par_ibin keysplit = key.split("_") try: ibin = int(keysplit[1]) except ValueError: continue if isinstance(ibin, int) and ibin > nmax: removedpar = self.freeparams.pop(key) upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="Removed free param: {:s} = {:f}".format(key, removedpar), instance=self, ) else: continue
[docs] def export_fisher(self, fishmat, totaltime=None): """Will print the fisher matrix as well as specifications and the parameter names to files. To change location and filenames for this check the settings dictionary of cosmicfishpie.configs.config Arguments --------- fishmat : numpy.ndarray Computed Fisher matrix totaltime : float, optional Total time needed to compute the fisher. Will print time information if passed Returns ------- cosmicfishpie.analysis.fisher_matrix A instance of fisher_matrix containing the calculated Fisher matrix as well as parameter names, settings, etc """ # If an output root is provided, we write the Fisher matrix on file if self.settings["outroot"] != "": obstring = "".join(self.observables) cols = [key for key in self.freeparams] header = "#" + " ".join(["", *cols]) FM = pd.DataFrame(fishmat, columns=cols, index=cols) if not os.path.exists(self.settings["results_dir"]): os.makedirs(self.settings["results_dir"]) # Root naming (without extension): <results>/<cf_version>_<outroot>_<obstring>_FM root = ( self.settings["results_dir"] + "/" + self.cf_version + "_" + self.settings["outroot"] + "_" + obstring + "_FM" ) extension = self.settings["fishermatrix_file_extension"] # Optional CSV export (disabled by default) if self.settings.get("export_csv_fisher", False): FM.to_csv(root + ".csv") # Primary TXT matrix with open(root + extension, "w") as f: f.write(header + "\n") f.write(FM.to_csv(header=False, index=False, sep="\t")) upt.time_print( feedback_level=self.feed_lvl, min_level=0, text="Fisher matrix exported: {:s}".format(root + extension), ) with open(root + ".paramnames", "w") as f: f.write("#\n") f.write("#\n") f.write("# This file contains the parameter names for a derived Fisher matrix.\n") f.write("#\n") for par in self.freeparams.keys(): f.write( str(par) + " " + str(cfg.latex_names.get(par, str(par))) + " " + "{:.6f}".format(self.allparams[par]) ) f.write("\n") fishMat_obj = fm.fisher_matrix(file_name=root + ".txt") # Write specifications to file (legacy .dat, and JSON below): specs_name = root + "_specs.dat" with open(specs_name, "w") as f: f.write("**Git version commit: %s \n" % (ufs.git_version())) if totaltime is not None: f.write( "**Time needed for computation of Fisher matrix: {:.3f} seconds \n".format( totaltime ) ) f.write("**Observables: %s \n" % (obstring)) f.write("### CosmicFish settings ###\n") for key, value in sorted(self.settings.items()): f.write("%s:%s\n" % (key, value)) f.write("### Fiducial parameters ###\n") for key, value in sorted(self.fiducialcosmopars.items()): f.write("%s:%s\n" % (key, value)) f.write("### Free parameters ###\n") for key, value in sorted(self.freeparams.items()): f.write("%s:%s\n" % (key, value)) f.write("### Survey specifications ###\n") for key, value in sorted(self.specs.items()): f.write("%s:%s\n" % (key, value)) print() upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="CosmicFish settings and Survey specifications exported: {:s}".format( specs_name ), instance=self, ) # Also export a structured JSON snapshot for reproducibility/comparison try: import json from datetime import datetime, timezone def _jsonify(obj): import numpy as _np if isinstance(obj, dict): return {k: _jsonify(v) for k, v in obj.items()} if isinstance(obj, (list, tuple)): return [_jsonify(v) for v in obj] if isinstance(obj, range): return list(obj) if isinstance(obj, _np.ndarray): return obj.tolist() if isinstance(obj, (_np.floating, _np.integer)): return obj.item() return obj json_name = root + "_specs.json" snapshot = { "metadata": { "cf_version": self.cf_version, "git_commit": ufs.git_version(), "timestamp": datetime.now(timezone.utc) .replace(microsecond=0) .isoformat() .replace("+00:00", "Z"), "observables": list(self.observables), "totaltime_sec": float(totaltime) if totaltime is not None else None, "matrix_files": { "txt": root + ".txt", "paramnames": root + ".paramnames", "dat_specs": specs_name, "json_specs": json_name, }, "env_flags": { "COSMICFISH_FAST_EFF": os.environ.get("COSMICFISH_FAST_EFF"), "COSMICFISH_FAST_P": os.environ.get("COSMICFISH_FAST_P"), "COSMICFISH_FAST_KERNEL": os.environ.get("COSMICFISH_FAST_KERNEL"), }, }, # Exact constructor-compatible payload to replay the run "options": _jsonify(self.settings), "specifications": _jsonify(self.specs), "fiducialpars": _jsonify(self.fiducialcosmopars), "freepars": _jsonify(self.freeparams), # Convenience extras "allparams": _jsonify(self.allparams), } with open(json_name, "w") as jf: json.dump(snapshot, jf, indent=2) upt.time_print( feedback_level=self.feed_lvl, min_level=1, text="CosmicFish JSON specifications exported: {:s}".format(json_name), instance=self, ) except Exception as _e: # pragma: no cover (best-effort JSON export) upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="Warning: failed to export JSON specifications", instance=self, ) return fishMat_obj
[docs] def recap_options(self): """This will print all the selected options into the standard output""" if self.settings["feedback"] > 1: print("") print("----------RECAP OF SELECTED OPTIONS--------") print("") print("Settings:") for key in self.settings: print(" " + key + ": {}".format(self.settings[key])) print("") print("Specifications:") for key in self.specs: print(" " + key + ": {}".format(self.specs[key])) if self.feed_lvl > 1: print("Cosmological parameters:") for key in self.fiducialcosmopars: print(" " + key + ": {}".format(self.fiducialcosmopars[key])) print("Photometric parameters:") for key in self.photopars: print(" " + key + ": {}".format(self.photopars[key])) print("IA parameters:") for key in self.IApars: print(" " + key + ": {}".format(self.IApars[key])) print("Bias parameters:") for key in self.photobiaspars: print(" " + key + ": {}".format(self.photobiaspars[key])) print("SpectroBias parameters:") for key in self.Spectrobiaspars: print(" " + key + ": {}".format(self.Spectrobiaspars[key])) print("SpectroNonlinear parameters:") for key in self.Spectrononlinpars: print(" " + key + ": {}".format(self.Spectrononlinpars[key])) print("IMBias parameters:") for key in self.IMbiaspars: print(" " + key + ": {}".format(self.IMbiaspars[key])) print("PShot parameters:") for key in self.PShotpars: print(" " + key + ": {}".format(self.PShotpars[key])) print("Free parameters:") for par in self.freeparams: print(" " + par)