Source code for cosmicfishpie.LSSsurvey.photo_obs

# -*- coding: utf-8 -*-
"""CLS

This module contains cls calculations (only LSS atm).

"""
import os
from itertools import product
from time import time

import numpy as np
from joblib import Memory
from scipy import integrate
from scipy.interpolate import interp1d

import cosmicfishpie.configs.config as cfg
import cosmicfishpie.cosmology.cosmology as cosmology
import cosmicfishpie.cosmology.nuisance as nuisance
import cosmicfishpie.LSSsurvey.photo_window as photo_window
from cosmicfishpie.utilities.utils import printing as upt

cachedir = "./cache"
memory = Memory(cachedir, verbose=0)


# Experimental optimization toggles (default: enabled). Users can disable via env.
def _env_flag(name: str, default: bool = True) -> bool:
    val = os.environ.get(name)
    if val is None:
        return default
    return val not in ("0", "false", "False")


_USE_FAST_EFF = _env_flag("COSMICFISH_FAST_EFF", True)
_USE_FAST_P = _env_flag("COSMICFISH_FAST_P", True)
_USE_FAST_KERNEL = _env_flag("COSMICFISH_FAST_KERNEL", True)


# @memory.cache


[docs] def memo_integral_efficiency(i, ngal_func, comoving_func, z, zint_mat, diffz): """Legacy O(N^2) implementation of the efficiency integral. Notes ----- - Deprecated: prefer `faster_integral_efficiency` (vectorized) or `much_faster_integral_efficiency` (O(N)). - This version performs a per-row trapezoidal integration on the original redshift grid to ensure numerically correct spacing. The `zint_mat` and `diffz` arguments are ignored for correctness (they were prone to inconsistent spacing per row). Parameters ---------- i : int Bin index. ngal_func : callable Function returning n(z') given (z', i, kind). comoving_func : callable Function returning r(z). z : numpy.ndarray 1D redshift grid. zint_mat : numpy.ndarray Ignored (kept for backward compatibility). diffz : numpy.ndarray or float Ignored (kept for backward compatibility). Returns ------- callable Interpolating function over z for the efficiency. """ chi = comoving_func(z) integral = np.empty_like(z) # For each z_k integrate from z_k to z_max on the native grid for k in range(z.size): z_row = z[k:] # integrand: n(z') * (1 - r(z_k)/r(z')) y_row = ngal_func(z_row, i, "WL") * (1.0 - chi[k] / comoving_func(z_row)) # integrate with correct x per row integral[k] = integrate.trapezoid(y_row, x=z_row) return interp1d(z, integral, kind="cubic")
[docs] def faster_integral_efficiency(i, ngal_func, comoving_func, zarr): """function to do the integration over redshift that shows up in the lensing kernel Parameters ---------- i : int index of the redshift bin for which the lensing kernel should be computed ngal_func : callable callable function that returns the number density distribution of galaxies. Should be a function of the redshift bin index i as an int and the redshift z as a numpy.ndarray comoving_func : callable callable function that returns the comoving distance. Should beb a function of the redshift z as a numpy.ndarray zarr : numpy.ndarray 1d array of all redshifts z that should be used as integration points. Returns ------- callable callable function that receives a numpy.ndarray of requested redshifts and returns the lensing efficiency for the i-th bin as a numpy.ndarray """ zprime = zarr[:, None] wintgd = ngal_func(zprime, i, "WL") * (1.0 - comoving_func(zarr) / comoving_func(zprime)) witri = np.tril(wintgd) wint = integrate.trapezoid(witri, zarr, axis=0) intp = interp1d(zarr, wint, kind="cubic") return intp
[docs] def much_faster_integral_efficiency(i, ngal_func, comoving_func, zarr): """O(N) algorithm for the lensing efficiency integral using backward cumulative trapezoid integrals. Parameters ---------- i : int index of the redshift bin for which the lensing kernel should be computed ngal_func : callable callable function that returns the number density distribution of galaxies. Should be a function of the redshift bin index i as an int and the redshift z as a numpy.ndarray comoving_func : callable callable function that returns the comoving distance. Should be a function of the redshift z as a numpy.ndarray zarr : numpy.ndarray 1d array of all redshifts z that should be used as integration points. Returns ------- callable callable function that receives a numpy.ndarray of requested redshifts and returns the lensing efficiency for the i-th bin as a numpy.ndarray """ # Grid and step z = zarr if z.size < 2: return interp1d(z, np.zeros_like(z), kind="cubic", fill_value="extrapolate") dz = np.mean(np.diff(z)) # Required functions on grid chi = comoving_func(z) ngal = ngal_func(z, i, "WL") ngal_over_chi = ngal / chi # Backward trapezoid cumulative integral: I[k] = ∫_{z_k}^{z_max} f(z') dz' def backward_trapz(y, dx): if y.size < 2: return np.zeros_like(y) yr = y[::-1] seg = 0.5 * (yr[:-1] + yr[1:]) * dx cumsum = np.concatenate([[0.0], np.cumsum(seg)]) return cumsum[::-1] I1 = backward_trapz(ngal, dz) I2 = backward_trapz(ngal_over_chi, dz) eff = I1 - chi * I2 return interp1d(z, eff, kind="cubic")
[docs] class ComputeCls: def __init__( self, cosmopars, photopars, IApars, biaspars, print_info_specs=False, fiducial_cosmo=None ): """Main class to obtain the angular power spectrum of the photometric probe. Parameters ---------- cosmopars : dict a dictionary containing the cosmological parameters photopars : dict a dictionary containing specifications for the window function's galaxy distribution IApars : dict a dictionary containing the specifications for the intrinsic alignment effect in cosmic shear biaspars : dict a dictionary containing the specifications for the galaxy biases print_info_specs : bool If True will print the numerical specifications of the computation. Defaults to False fiducial_cosmo : cosmicfishpie.cosmology.cosmology.cosmo_functions, optional An instance of `cosmo_functions` of the fiducial cosmology. Attributes ---------- cosmopars : dict a dictionary containing the cosmological parameters photopars : dict a dictionary containing specifications for the window function's galaxy distribution IApars : dict a dictionary containing the specifications for the intrinsic alignment effect in cosmic shear biaspars : dict a dictionary containing the specifications for the galaxy biases cosmo : cosmicfishpie.cosmology.cosmo_functions An instance of `cosmo_functions`. Will either contain the fiducial cosmology if passed or compute from the cosmopars dict observables : list a list of the observables that the angular power spectrum is computed for nuisance : cosmicfishpie.cosmology.nuisance.Nuisance An instance of `Nuisance` that contains the relevant modeling of nuisance parameters window : cosmicfishpie.LSSsurvey.photo_window.GalaxyPhotoDist An instance of `GalaxyPhotoDist` containing the galaxy distribution needed to compute the window functions tracer : str What Power spectrum should be used when calculating the angular power spectrum of galaxy clustering. Either "matter" or "clustering" binrange : range a range with all the bins asked for in the survey specifications zsamp : int how many individual values of the redshifts are used in the internal calculations ellsamp : int how many individual values of the multipoles are used in the internal calculations ell : numpy.ndarray array containing the multipoles for which the angular power spectrum is computed. z_min : float minimum redshift of the probes z_max : float maximum redshift of the probes z : numpy.ndarray array containing the redshifts used in the internal calculations dz : numpy.ndarray array containing the numerical distance of the redshifts in z """ self.feed_lvl = cfg.settings["feedback"] upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="-> Started Cls calculation", instance=self, ) tcosmo1 = time() self.cosmopars = cosmopars if fiducial_cosmo is None: self.cosmo = cosmology.cosmo_functions(cosmopars, cfg.input_type) else: self.cosmo = fiducial_cosmo tcosmo2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="---> Cosmological functions obtained in ", instance=self, time_ini=tcosmo1, time_fin=tcosmo2, ) self.observables = [] self.binrange = {} for key in cfg.obs: if key in ["GCph", "WL"]: self.observables.append(key) if key == "GCph": self.binrange[key] = cfg.specs["binrange_GCph"] elif key == "WL": self.binrange[key] = cfg.specs["binrange_WL"] self.binrange_WL = cfg.specs["binrange_WL"] self.binrange_GCph = cfg.specs["binrange_GCph"] tnuis1 = time() self.biaspars = biaspars self.IApars = IApars self.nuisance = nuisance.Nuisance() # if 'GCph' in self.observables: self.bfunction = self.nuisance.bias(self.biaspars) if "WL" in self.observables: self.IAvalue = self.nuisance.IA(self.IApars, self.cosmo) tnuis2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="---> Nuisance functions obtained in ", instance=self, time_ini=tnuis1, time_fin=tnuis2, ) tngal1 = time() self.photopars = photopars self.window = photo_window.GalaxyPhotoDist(self.photopars) tngal2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="---> Galaxy Photometric distributions obtained in ", instance=self, time_ini=tngal1, time_fin=tngal2, ) if "GCph" in self.observables and "WL" in self.observables: cfg.specs["ellmax"] = max(cfg.specs["lmax_GCph"], cfg.specs["lmax_WL"]) cfg.specs["ellmin"] = min(cfg.specs["lmin_GCph"], cfg.specs["lmin_WL"]) elif "GCph" in self.observables: cfg.specs["ellmax"] = cfg.specs["lmax_GCph"] cfg.specs["ellmin"] = cfg.specs["lmin_GCph"] elif "WL" in self.observables: cfg.specs["ellmax"] = cfg.specs["lmax_WL"] cfg.specs["ellmin"] = cfg.specs["lmin_WL"] else: raise ValueError("Observables not specified correctly") self.tracer = cfg.settings["GCph_Tracer"] # self.binrange = cfg.specs["binrange"] self.zsamp = int(round(200 * cfg.settings["accuracy"])) if cfg.settings["ell_sampling"] == "accuracy": self.ellsamp = int(round(100 * cfg.settings["accuracy"])) else: if isinstance(cfg.settings["ell_sampling"], int): self.ellsamp = cfg.settings["ell_sampling"] else: raise ValueError("ell_sampling should be an integer or 'accuracy'") self.ell = np.logspace( np.log10(cfg.specs["ellmin"]), np.log10(cfg.specs["ellmax"] + 10), num=self.ellsamp ) self.z_min = np.min([cfg.specs["z_bins_GCph"][0], cfg.specs["z_bins_WL"][0]]) self.z_max = np.max([cfg.specs["z_bins_GCph"][-1], cfg.specs["z_bins_WL"][-1]]) self.z = np.linspace(self.z_min, self.z_max, self.zsamp) self.dz = np.mean(np.diff(self.z)) # Precompute comoving distance (major hotspot) once; reuse everywhere self._chi_z = self.cosmo.comoving(self.z) # Lazy caches for other frequently used cosmological functions (filled on demand) self._hubble_z = None self._IA_z = None if print_info_specs: self.print_numerical_specs()
[docs] def compute_all(self): """Main function to compute the angular power spectrum. Will first compute the limber approximated angular power spectrum and the window functions for both probes. From that it will obtain the angular power spectrum. Note ----- This function does not return the calculated quantities. Rather they are found in new attributes of the object. The result of the Limber approximated power spectra are found in the new attributes `Pell` and `sqrtPell` The window functions are found in the new attributes `GC` if galaxy clustering is asked for, and `WL` if cosmic sheer is asked for The angular power spectrum is found in the new attribute `result` """ tini = time() upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="-> Computing power spectra and kernels ", instance=self, ) tplim1 = time() self.P_limber() self.sqrtP_limber() tplim2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="---> Computed P_limber in ", time_ini=tplim1, time_fin=tplim2, instance=self, ) tkern1 = time() self.compute_kernels() tkern2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="---> Computed Kernels in: ", time_ini=tkern1, time_fin=tkern2, instance=self, ) tcls1 = time() self.result = self.computecls_vectorized() tcls2 = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="---> Computed Cls in: ", time_ini=tcls1, time_fin=tcls2, instance=self, ) tend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="--> Total Cls computation performed in : ", time_ini=tini, time_fin=tend, instance=self, )
[docs] def print_numerical_specs(self): """prints the numerical specifications of the internal computations""" if self.feed_lvl >= 2: print("***") print("Numerical specifications: ") print("WL ell max = ", str(cfg.specs["lmax_WL"])) print("GCph ell max = ", str(cfg.specs["lmax_GCph"])) print("ell min = ", str(cfg.specs["ellmin"])) print("ell max = ", str(cfg.specs["ellmax"])) print("ell sampling: ", str(self.ellsamp)) print("z sampling: ", str(self.zsamp)) print("z_min : ", str(self.z_min)) print("z_max : ", str(self.z_max)) print("z_max : ", str(self.z_max)) print("delta_z : ", str(self.dz)) print("***")
[docs] def P_limber(self): """ Calculates the Limber-approximated power spectrum. This is done for a range of redshift values and multipoles. Will add MG effects to WL and(!) GCph if asked for. Note ----- This does not return the power spectra. The results are stored in the attribute `Pell` """ self.Pell = np.zeros((self.zsamp, self.ellsamp)) if _USE_FAST_P: chi = self._chi_z # reuse precomputed ell_grid = self.ell[None, :] + 0.5 kappa = ell_grid / chi[:, None] mask = kappa <= cfg.specs["kmax"] if np.any(mask): for iz, chi_val in enumerate(chi): row_mask = mask[iz] if not np.any(row_mask): continue kappas_row = kappa[iz, row_mask] zval = self.z[iz] self.Pell[iz, row_mask] = ( (self.cosmo.SigmaMG(zval, kappas_row) ** 2) * self.cosmo.matpow( zval, kappas_row, nonlinear=cfg.settings["nonlinear_photo"] ) / (chi_val**2.0) ) else: chi = self.cosmo.comoving(self.z) for ell, zin in product(range(self.ellsamp), range(self.zsamp)): kappa = (self.ell[ell] + 0.5) / chi[zin] if kappa <= cfg.specs["kmax"]: self.Pell[zin, ell] = ( (self.cosmo.SigmaMG(self.z[zin], kappa) ** 2) * self.cosmo.matpow( self.z[zin], kappa, nonlinear=cfg.settings["nonlinear_photo"] ) / (chi[zin] ** 2.0) ) else: self.Pell[zin, ell] = 0.0 return None
[docs] def sqrtP_limber(self): """ Calculates the square root of the Limber-approximated power spectrum for weak lensing (WL) and galaxy clustering photometric (GCph) observables. This is done for a range of redshift values and multipoles. Depending on the tracer asked for in 'GCph_Tracer' will use 'matter' or 'clustering' observable GCph. Will add MG effects to WL if asked for. Note ----- This does not return the power spectra. The results are stored in the attribute `sqrtPell` """ # Sakr Fix June 2023 # self.sqrtPell = {'WL' :np.zeros((self.zsamp,self.ellsamp)), # 'WL_IA' :np.zeros((self.zsamp,self.ellsamp)), # 'GCph':np.zeros((self.zsamp,self.ellsamp)), # 'GCph_IA':np.zeros((self.zsamp,self.ellsamp))} self.sqrtPell = { "WL": np.zeros((self.zsamp, self.ellsamp)), "WL_IA": np.zeros((self.zsamp, self.ellsamp)), "GCph": np.zeros((self.zsamp, self.ellsamp)), "GCph_IA": np.zeros((self.zsamp, self.ellsamp)), } chi = self._chi_z # reuse cached kappa = (self.ell[:, None] + 1 / 2) / chi index_pknn = np.array(np.where(kappa < cfg.specs["kmax"])).T for ell, zin in index_pknn: self.sqrtPell["WL"][zin, ell] = ( self.cosmo.SigmaMG(self.z[zin], kappa[ell, zin]) * np.sqrt( self.cosmo.matpow( self.z[zin], kappa[ell, zin], nonlinear=cfg.settings["nonlinear"] ) ) ) / chi[zin] self.sqrtPell["WL_IA"][zin, ell] = ( np.sqrt( self.cosmo.matpow( self.z[zin], kappa[ell, zin], nonlinear=cfg.settings["nonlinear"] ) ) / chi[zin] ) self.sqrtPell["GCph"][zin, ell] = ( np.sqrt( self.cosmo.matpow( self.z[zin], kappa[ell, zin], nonlinear=cfg.settings["nonlinear"], tracer=self.tracer, ) ) / chi[zin] ) return None
[docs] def galaxy_kernel(self, z, i): """Calculates the GCph kernel function Parameters ---------- z : numpy.ndarray array containing the redshifts for which the kernel should be computed for i : int index of the redshift bin Returns ------- numpy.ndarray Value of the galaxy clustering kernel at redshift z for bin i Note ----- Implements the following equation: .. math:: W_i^{GCph} = b(z) \\frac{n_i(z)}{\\bar{n}(z)} H(z) """ tgcstart = time() # Wgc = self.window.norm_ngal_photoz(z,i) * np.array([self.nuisance.bias(self.biaspars, i)(z) * \ # Wgc = self.window.norm_ngal_photoz(z,i) * # np.array([self.biaspars['b{:d}'.format(i)] * \ Wgc = self.window.norm_ngal_photoz(z, i, "GCph") * self.cosmo.Hubble(z) # Wgc = self.window.norm_ngal_photoz(z,i) * self.nuisance.bias(self.biaspars, i)(z) * \ # self.cosmo.Hubble(z) tgcend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=" ...done bin {} for GCph in: ", instance=self, time_ini=tgcstart, time_fin=tgcend, ) return Wgc
[docs] def lensing_kernel(self, z, i): """Computes the photometric cosmic shear kernel function Parameters ---------- z : numpy.ndarray array containing the redshifts for which the kernel should be computed for i : int index of the redshift bin Returns ------- numpy.ndarray Value of the cosmic shearing kernel at redshift z for bin i Note ----- Implements the following equation: .. math:: W_i^{WL} = W_i^{IA}+\\frac{3}{2}\\left(\\frac{H_0}{c}\\right)^2\\Omega_{m,0}(1+z)r(z) \\int_z^{z_{\\rm max}}{dz' \\frac{n_i(z')}{\\bar{n}(z)}\\left[1-\\frac{r(z)}{r(z')}\\right]} """ twlstart = time() if _USE_FAST_KERNEL: prefac = (3.0 / 2.0) * self.cosmo.Hubble(0.0) ** 2.0 * self.cosmo.Omegam_of_z(0.0) chi_z = self.cosmo.comoving(z) if z is not self.z else self._chi_z eff_z = self.efficiency[i](z) Wwl = prefac * (1.0 + z) * chi_z * eff_z IA_val = self.IAvalue(z) hub_z = self.cosmo.Hubble(z) WIA = self.window.norm_ngal_photoz(z, i, "WL") * IA_val * hub_z else: prefac = (3.0 / 2.0) * self.cosmo.Hubble(0.0) ** 2.0 * self.cosmo.Omegam_of_z(0.0) Wwl = np.array( [prefac * (1 + zi) * self.cosmo.comoving(zi) * self.efficiency[i](zi) for zi in z] ) WIA = self.window.norm_ngal_photoz(z, i, "WL") * np.array( [self.IAvalue(zi) * self.cosmo.Hubble(zi) for zi in z] ) # Sakr Fix June 2023 # kernel = Wwl+WIA twlend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=" ...done bin {} for WL in:", instance=self, time_ini=twlstart, time_fin=twlend, ) # Sakr Fix June 2023 # return Wwl return Wwl, WIA
[docs] def integral_efficiency(self, i): """computes the integral that enters the lensing kernel for a given redshift bin Parameters ---------- i : int index of the redshift bin for which the lensing efficiency should be calculated Returns ------- callable callable function that receives a numpy.ndarray of requested redshifts and returns the lensing efficiency for the i-th bin as a numpy.ndarray """ # expensive calculation doesn't need to # be performed if cosmopars and photopars are the same z = self.z ngal_func = self.window.norm_ngal_photoz comoving_func = self.cosmo.comoving if not _USE_FAST_EFF: # Fallback to vectorized O(N^2) implementation return faster_integral_efficiency(i, ngal_func, comoving_func, z) # New O(N) implementation outsourced to helper return much_faster_integral_efficiency(i, ngal_func, comoving_func, z)
[docs] def lensing_efficiency(self): """computes the integral that enters the lensing kernel for all redshift bins Returns ------- list list of callable functions that give the lensing efficiency for each bin """ teffstart = time() efficiency = [self.integral_efficiency(i) for i in self.binrange_WL] efficiency.insert(0, None) teffend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=" ...lensing efficiency computation done in:", instance=self, time_ini=teffstart, time_fin=teffend, ) return efficiency
[docs] def compute_kernels(self): """function to compute all the needed window functions. Note ----- This does not return the window functions. They are found in the new attributes `GC` if galaxy clustering is asked for, and `WL` if cosmic sheer is asked for """ if "GCph" in self.observables: self.GC = [ interp1d(self.z, self.galaxy_kernel(self.z, ind), kind="cubic") for ind in self.binrange_GCph ] self.GC.insert(0, None) if "WL" in self.observables: self.efficiency = self.lensing_efficiency() # Sakr Fix June 2023 # self.WL = [interp1d(self.z,self.lensing_kernel(self.z,ind), kind='cubic') for ind in self.binrange] self.WL = [ interp1d(self.z, self.lensing_kernel(self.z, ind)[0], kind="cubic") for ind in self.binrange_WL ] self.WL.insert(0, None) # Sakr Fix June 2023 self.WL_IA = [ interp1d(self.z, self.lensing_kernel(self.z, ind)[1], kind="cubic") for ind in self.binrange_WL ] self.WL_IA.insert(0, None) return None
[docs] def genwindow(self, z, obs, i): """generic function to obtain the window functions for the different observables. Parameters ---------- z : numpy.ndarray array of redshifts for which the window function should be computed obs : str name of the observable (GC or WL) i : int integer of the redshift bin the window should be computed for Returns ------- numpy.ndarray Values of the window function for the observable obs, for redshift bin i, and at redshifts z. """ win = [] win_IA = [] if obs == "GCph": bz = self.nuisance.gcph_bias(self.biaspars, i)(z) win = self.GC[i](z) * bz win_IA = self.GC[i](z) * 0.0 elif obs == "WL": win = self.WL[i](z) win_IA = self.WL_IA[i](z) return win, win_IA
[docs] def computecls(self): """Function to compute the angular power spectrum for all observables, redshift bins and multipoles. Returns ------- dict a dictionary containing all auto and cross correlation angular power spectra. Its keys are formatted to have an array of the power spectra in 'X ixY j'. Also the multipoles for which the angular power spectra were computed for are found in 'ells'. Note ----- Implements the following equation: .. math:: C_{i,j}^{X,Y}(\\ell) = c \\int \\mathrm{d}z \\frac{W_i^X (z)W_j^Y (z)}{ H(z) r^2(z)} P_{\\delta \\delta} \\big[\\frac{\\ell + 1/2}{r(z)} , z \\big] """ upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="Computing Cls integral for {}".format(self.observables), instance=self, ) # full_ell = np.linspace(cfg.specs['ellmin'],cfg.specs['ellmax'],cfg.specs['ellmax']-cfg.specs['ellmin']) # full_ell = np.round(full_ell, 0) full_ell = self.ell cls = {"ells": full_ell} hub = self.cosmo.Hubble(self.z) tstart = time() tcell = time() # PYTHONIZE THIS HORRIBLE THING # for obs1, obs2, bin1, bin2 in product( # self.observables, self.observables, self.binrange[0], self.binrange[1] #MMmod: BEWARE! THIS IS UGLY! # ): for obs1, obs2 in product(self.observables, self.observables): for bin1, bin2 in product(self.binrange[obs1], self.binrange[obs2]): clinterp = self.clsintegral(obs1, obs2, bin1, bin2, hub) finalcls = np.zeros((len(full_ell))) for ind, lval in enumerate(full_ell): if (cfg.specs["lmin_" + obs1] <= lval <= cfg.specs["lmax_" + obs1]) and ( cfg.specs["lmin_" + obs2] <= lval <= cfg.specs["lmax_" + obs2] ): finalcls[ind] = clinterp(lval) cls[obs1 + " " + str(bin1) + "x" + obs2 + " " + str(bin2)] = finalcls tbin = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=" ...{} {} x {} {} done in: ".format(obs1, bin1, obs2, bin2), instance=self, time_ini=tcell, time_fin=tbin, ) tcell = tbin tend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="Cls integral computation done in: ", instance=self, time_ini=tstart, time_fin=tend, ) return cls
[docs] def computecls_vectorized(self): """Vectorized function to compute the angular power spectrum for all observables, redshift bins and multipoles. Returns ------- dict a dictionary containing all auto and cross correlation angular power spectra. Its keys are formatted to have an array of the power spectra in 'X ixY j'. Also the multipoles for which the angular power spectra were computed for are found in 'ells'. Note ----- Implements the following equation: .. math:: C_{i,j}^{X,Y}(\\ell) = c \\int \\mathrm{d}z \\frac{W_i^X (z)W_j^Y (z)}{ H(z) r^2(z)} P_{\\delta \\delta} \\big[\\frac{\\ell + 1/2}{r(z)} , z \\big] """ upt.time_print( feedback_level=self.feed_lvl, min_level=2, text="Computing Cls integral for {}".format(self.observables), instance=self, ) full_ell = self.ell cls = {"ells": full_ell} hub = self.cosmo.Hubble(self.z) tstart = time() # Pre-compute all clsintegrals clinterps = {} for obs1, obs2 in product(self.observables, self.observables): # Get the correct binrange for each observable bins1 = self.binrange[obs1] bins2 = self.binrange[obs2] for bin1, bin2 in product(bins1, bins2): clinterps[(obs1, obs2, bin1, bin2)] = self.clsintegral(obs1, obs2, bin1, bin2, hub) # Vectorized computation of finalcls for (obs1, obs2, bin1, bin2), clinterp in clinterps.items(): lmin1, lmax1 = cfg.specs[f"lmin_{obs1}"], cfg.specs[f"lmax_{obs1}"] lmin2, lmax2 = cfg.specs[f"lmin_{obs2}"], cfg.specs[f"lmax_{obs2}"] mask = ( (full_ell >= lmin1) & (full_ell <= lmax1) & (full_ell >= lmin2) & (full_ell <= lmax2) ) finalcls = np.zeros(len(full_ell)) finalcls[mask] = clinterp(full_ell[mask]) key = f"{obs1} {bin1}x{obs2} {bin2}" cls[key] = finalcls tbin = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text=f" ...{obs1} {bin1} x {obs2} {bin2} done in: ", instance=self, time_ini=tstart, time_fin=tbin, ) tstart = tbin tend = time() upt.time_print( feedback_level=self.feed_lvl, min_level=3, text="Cls integral computation done in: ", instance=self, time_ini=tstart, time_fin=tend, ) return cls
[docs] def clsintegral(self, obs1, obs2, bin1, bin2, hub): """function to obtain the angular power spectrum as an function of the multipole. Parameters ---------- obs1 : str name of the observable of the redshift bin bin1 (GC or WL) obs2 : str name of the observable of the redshift bin bin2 (GC or WL) bin1 : int index of the redshift bin at which the observable obs1 is measured bin2 : int index of the redshift bin at which the observable obs2 is measured hub : callable function that should be passed a numpy.ndarray of redshifts returns the hubble expansion rate at these redshifts. Returns ------- callable Function that receives an array multipole and returns the angular power spectrum for these multipoles. """ mask1 = (self.ell >= cfg.specs["lmin_" + obs1]) & (self.ell <= cfg.specs["lmax_" + obs1]) mask2 = (self.ell >= cfg.specs["lmin_" + obs2]) & (self.ell <= cfg.specs["lmax_" + obs2]) # Sakr Fix June 2023 # pz_arr = self.genwindow(self.z,obs1,bin1)*self.genwindow(self.z,obs2,bin2)/hub # intgn = self.sqrtPell[obs1]* self.sqrtPell[obs2] * pz_arr[:,np.newaxis] intgn = ( self.sqrtPell[obs1] * self.genwindow(self.z, obs1, bin1)[0][:, np.newaxis] / np.sqrt(hub)[:, np.newaxis] + self.sqrtPell[obs1 + "_IA"] * self.genwindow(self.z, obs1, bin1)[1][:, np.newaxis] / np.sqrt(hub)[:, np.newaxis] ) * ( self.sqrtPell[obs2] * self.genwindow(self.z, obs2, bin2)[0][:, np.newaxis] / np.sqrt(hub)[:, np.newaxis] + self.sqrtPell[obs2 + "_IA"] * self.genwindow(self.z, obs2, bin2)[1][:, np.newaxis] / np.sqrt(hub)[:, np.newaxis] ) clint = integrate.trapezoid(intgn, dx=self.dz, axis=0) clint[~mask1] = 0 clint[~mask2] = 0 clinterp = interp1d(self.ell, clint, kind="linear") # 'cubic') return clinterp