# -*- coding: utf-8 -*-
"""
COSMOLOGY.
This module contains useful cosmological functions.
"""
import os
import sys
import types
from copy import deepcopy
from glob import glob
from time import time
from warnings import warn
import numpy as np
import scipy.constants as sconst
from joblib import Memory
from scipy import integrate
from scipy.interpolate import (
InterpolatedUnivariateSpline,
RectBivariateSpline,
UnivariateSpline,
)
from scipy.signal import savgol_filter
import cosmicfishpie.configs.config as cfg
from cosmicfishpie.utilities.utils import numerics as unu
from cosmicfishpie.utilities.utils import printing as upr
cachedir = "memory_cache"
memory = Memory(cachedir, verbose=0)
def _dcom_func_trapz(zi, interpolfunc):
"""
Calculate the comoving distance using the trapezoidal rule.
Parameters
----------
zi : float
Redshift value
interpolfunc : callable
Interpolation function for H(z)
Returns
-------
float
Comoving distance
"""
zt = np.linspace(0.0, zi, 100)
Hzt = interpolfunc(zt)
dcom = integrate.trapezoid(1 / Hzt, zt)
return dcom
@memory.cache
def memorize_external_input(cosmopars, fiducialcosmopars, external, extra_settings):
"""
Memorize external input for caching purposes.
Parameters
----------
cosmopars : dict
Cosmological parameters
fiducialcosmopars : dict
Fiducial cosmological parameters
external : dict
External input settings
extra_settings : dict
Additional settings
Returns
-------
external_input
Memorized external input
"""
externalinput = external_input(
cosmopars,
fiducialcosmopars=fiducialcosmopars,
external=external,
extra_settings=extra_settings,
)
return externalinput
[docs]
class boltzmann_code:
hardcoded_Neff = 3.044
hardcoded_neutrino_mass_fac = 94.07
hardcoded_mnu_massive_min = 0.001
def __init__(self, cosmopars, code="camb"):
"""
Initialize the boltzmann_code class.
Parameters
----------
cosmopars : dict
The cosmological parameters object to be copied.
code : str, optional
The Boltzmann code to be used (default is 'camb').
Raises
------
ValueError
If an unsupported Boltzmann code is specified.
"""
self.cosmopars = deepcopy(cosmopars)
self.feed_lvl = cfg.settings["feedback"]
self.settings = cfg.settings
upr.SUPPRESS_WARNINGS = cfg.settings["SUPPRESS_WARNINGS"]
self.set_cosmicfish_defaults()
if code == "camb":
camb_path = os.path.realpath(os.path.join(os.getcwd(), self.settings["camb_path"]))
sys.path.insert(0, camb_path)
import camb as camb
self.boltzmann_cambpars = cfg.boltzmann_cambpars
self.camb_setparams(self.cosmopars, camb)
self.camb_results(camb)
elif code == "class":
self.boltzmann_classpars = cfg.boltzmann_classpars
from classy import Class
self.class_setparams(self.cosmopars)
self.class_results(Class)
elif code == "symbolic":
try:
import colossus.cosmology as colmo
import colossus.settings as colossus_settings
import symbolic_pofk.linear as symblin
import symbolic_pofk.syrenhalofit as symbfit
self.colmo = colmo
self.symblin = symblin
self.symbfit = symbfit
colossus_base_dir = cfg.settings.get("colossus_base_dir")
if colossus_base_dir is not None:
colossus_settings.BASE_DIR = colossus_base_dir
colossus_persistence = cfg.settings.get("colossus_persistence")
if colossus_persistence is not None:
colossus_settings.PERSISTENCE = colossus_persistence
if not hasattr(colmo.cosmology, "_cfp_wrapped_setCosmology"):
orig_setCosmology = colmo.cosmology.setCosmology
def _setCosmology_with_persistence(cosmo_name, params=None, **kwargs):
if "persistence" not in kwargs and colossus_persistence is not None:
kwargs["persistence"] = colossus_persistence
return orig_setCosmology(cosmo_name, params=params, **kwargs)
colmo.cosmology.setCosmology = _setCosmology_with_persistence
colmo.cosmology._cfp_wrapped_setCosmology = True
except ImportError:
print("Module symbolic_pofk not properly installed. Aborting")
sys.exit()
self.boltzmann_symbolicpars = cfg.boltzmann_symbolicpars
self.halofit_version = self.boltzmann_symbolicpars["COSMO_SETTINGS"][
"halofit_version"
] # 'syren' or 'halofit+' or 'takahashi'
if self.halofit_version == "takahashi":
self.which_params = "Takahashi"
self.add_correction = False
elif self.halofit_version == "halofit+":
self.which_params = "Bartlett"
self.add_correction = False
elif self.halofit_version == "syren":
self.which_params = "Bartlett"
self.add_correction = True
self.extrapolate = self.boltzmann_symbolicpars["NUMERICS"]["extrapolate"]
self.emulator_precision = self.boltzmann_symbolicpars["ACCURACY"][
"emulator_precision"
] # 'max_precision' or 'fiducial'
self.symbolic_setparams()
self.symbolic_results()
else:
print("other Boltzmann code not implemented yet")
exit()
[docs]
def set_cosmicfish_defaults(self):
"""
Fill up default values in the cosmopars dictionary if the values are not found.
This method sets default values for various cosmological parameters if they
are not already present in the cosmopars dictionary.
"""
# Set default value for Omegam if neither Omegam or omch2 (camb) or omega_cdm (class) are passed
if not any(par in self.cosmopars for par in ["Omegam", "omch2", "omega_cdm", "Omega_cdm"]):
self.cosmopars["Omegam"] = 0.32
# Set default value for Omegab if neither Omegab or ombh2 or omega_b or Omega_b or 100omega_b are passed
if not any(
par in self.cosmopars for par in ["Omegab", "ombh2", "omega_b", "Omega_b", "100omega_b"]
):
self.cosmopars["Omegab"] = 0.05
# Set default value for h only if no h/H0/theta-like parameter is passed
if not any(
par in self.cosmopars for par in ["H0", "h", "theta", "cosmomc_theta", "thetastar"]
):
self.cosmopars["h"] = 0.67
# Set default value for ns if it is not found in cosmopars
if not any(par in self.cosmopars for par in ["ns", "n_s"]):
self.cosmopars["ns"] = self.cosmopars.get("ns", 0.96)
# Set default value for sigma8 if neither sigma8 or As or logAs or 10^9As are passed
if not any(
par in self.cosmopars for par in ["sigma8", "As", "logAs", "10^9As", "ln_A_s_1e10"]
):
self.cosmopars["sigma8"] = 0.815583
# Set default values for w0 and wa if cosmo_model is 'w0waCDM'
if self.settings["cosmo_model"] == "w0waCDM":
if not any(par in self.cosmopars for par in ["w", "w0_fld"]):
self.cosmopars["w0"] = self.cosmopars.get("w0", -1.0)
if not any(par in self.cosmopars for par in ["wa", "wa_fld"]):
self.cosmopars["wa"] = self.cosmopars.get("wa", 0.0)
# Set default value for mnu if Omeganu or omnuh2 or mnu is not found in cosmopars
if not any(par in self.cosmopars for par in ["Omeganu", "omnuh2", "mnu"]):
self.cosmopars["mnu"] = self.cosmopars.get(
"mnu", self.cosmopars.get("m_nu", self.cosmopars.get("M_nu", 0.06))
)
# Set default value for Neff if it is not found in cosmopars
# self.cosmopars["Neff"] = self.cosmopars.get("Neff", self.cosmopars.get("N_eff", 3.046))
# Set default value for gamma, if it is not found in cosmopars
# gamma is not used in many places, therefore not needed to add back in cosmopars
self.gamma = self.cosmopars.get("gamma", 0.545)
[docs]
@staticmethod
def print_cosmo_params(cosmopars, feedback=1, text="---Cosmo pars---"):
"""
Print cosmological parameters.
Parameters
----------
cosmopars : dict
Dictionary of cosmological parameters to print.
feedback : int, optional
Feedback level determining whether to print (default is 1).
text : str, optional
Header text for the parameter list (default is "---Cosmo pars---").
"""
if feedback > 2:
print("")
print(text)
for key in cosmopars:
print(key + "=" + str(cosmopars[key]))
[docs]
@staticmethod
def f_deriv(D_growth_zk, z_array, k_array, k_fix=False, fixed_k=1e-3):
"""
Calculate the growth rate f(z,k).
Parameters
----------
D_growth_zk : callable
Function that returns the growth factor D(z,k).
z_array : array_like
Array of redshift values.
k_array : array_like
Array of wavenumbers.
k_fix : bool, optional
If True, use a fixed k value (default is False).
fixed_k : float, optional
Fixed k value to use if k_fix is True (default is 1e-3).
Returns
-------
tuple
Growth rate f(z,k) and the corresponding z array.
"""
if k_fix:
k_array = np.full((len(k_array)), fixed_k)
## Generates interpolators D(z) for varying k values
D_z = np.array([UnivariateSpline(z_array, D_growth_zk(z_array, kk), s=0) for kk in k_array])
## Generates arrays f(z) for varying k values
f_z = np.array(
[-(1 + z_array) / D_zk(z_array) * (D_zk.derivative())(z_array) for D_zk in D_z]
)
return f_z, z_array
[docs]
@staticmethod
def compute_sigma8(z_range, pk_interpolator, h_value, k_range):
"""
Calculate sigma8 over a range of redshifts using a given power spectrum interpolator.
This function computes sigma8, which is the RMS matter fluctuation in spheres of 8 h^-1 Mpc radius,
for a range of redshifts. It uses a provided power spectrum interpolator to perform the calculation.
Parameters
----------
z_range : numpy.ndarray
Array of redshift values at which to calculate sigma8.
pk_interpolator : callable
A function that takes (z, k) as arguments and returns
the power spectrum P(k,z). It should be able to handle
array inputs for both z and k.
Returns
-------
scipy.interpolate.UnivariateSpline
A 1-D interpolation function sigma8(z) that can be
used to obtain sigma8 values for any redshift within
the input range.
"""
R = 8.0 / (h_value)
kmin = np.min(k_range)
kmax = np.max(k_range)
# resampling the k range to 10000 points
k = np.linspace(kmin, kmax, 10000)
sigma_z = np.empty_like(z_range)
pk_z = pk_interpolator(z_range, k)
for i in range(len(sigma_z)):
integrand = (
9
* (k * R * np.cos(k * R) - np.sin(k * R)) ** 2
* pk_z[i]
/ k**4
/ R**6
/ 2
/ np.pi**2
)
sigma_z[i] = np.sqrt(integrate.trapezoid(integrand, k))
sigma8_z_interp = UnivariateSpline(z_range, sigma_z, s=0)
return sigma8_z_interp
[docs]
def camb_setparams(self, cosmopars, camb):
"""
Set the parameters for CAMB computation.
Parameters
----------
cosmopars : dict
Dictionary containing the cosmological parameters.
camb : module
The CAMB module.
Notes
-----
This method sets up CAMB parameters and prepares for power spectrum computation.
"""
# Adding hard coded CAMB options
self.cosmopars = deepcopy(cosmopars)
self.gamma = self.cosmopars.get(
"gamma", 0.545
) # default gamma value 0.545, set if gamma not in self.cosmopars
## gamma not used in many places, therefore not needed to add back in self.cosmopars
tini_basis = time()
self.cambcosmopars = {
**self.boltzmann_cambpars["ACCURACY"],
**self.boltzmann_cambpars["COSMO_SETTINGS"],
**self.boltzmann_cambpars[self.settings["cosmo_model"]],
}.copy()
self.kmax_pk = self.cambcosmopars["kmax"]
self.kmin_pk = 1e-4
self.zmax_pk = self.boltzmann_cambpars["NUMERICS"]["zmax"]
self.z_samples = self.boltzmann_cambpars["NUMERICS"]["zsamples"]
upr.debug_print(self.cambcosmopars)
self.cambcosmopars.update(self.cosmopars)
self.cambcosmopars = self.changebasis_camb(self.cambcosmopars, camb)
upr.debug_print(self.cambcosmopars)
tend_basis = time()
if self.feed_lvl > 2:
print("Basis change took {:.2f} s".format(tend_basis - tini_basis))
self.print_cosmo_params(
self.cambcosmopars, feedback=self.feed_lvl, text="---CAMB parameters---"
)
self.cambclasspars = camb.set_params(**self.cambcosmopars)
if hasattr(self.cambclasspars, "H0"):
self.h_now = self.cambclasspars.H0 / 100.0
self.camb_zarray = np.linspace(0.0, self.zmax_pk, self.z_samples)[::-1]
self.cambclasspars.set_matter_power(
redshifts=self.camb_zarray,
k_per_logint=self.cambcosmopars["k_per_logint"],
kmax=self.cambcosmopars["kmax"],
accurate_massive_neutrino_transfers=self.cambcosmopars[
"accurate_massive_neutrino_transfers"
],
)
# TODO: nonlinear options to be selectable
self.cambclasspars.NonLinear = camb.model.NonLinear_both
self.cambclasspars.set_for_lmax(4000, lens_potential_accuracy=1)
[docs]
def changebasis_camb(self, cosmopars, camb):
"""
Convert cosmological parameters to CAMB format.
Parameters
----------
cosmopars : dict
Dictionary of cosmological parameters.
camb : module
The CAMB module.
Returns
-------
dict
Dictionary of CAMB-formatted cosmological parameters.
"""
cambpars = deepcopy(cosmopars)
# Support Planck-style theta parameterization (theta = 100 * theta_MC).
if "theta" in cambpars and "cosmomc_theta" not in cambpars:
cambpars["cosmomc_theta"] = cambpars.pop("theta") / 100.0
if "h" in cambpars:
cambpars["H0"] = cambpars.pop("h") * 100
# CAMB requires H0 to be unset when cosmomc_theta is used.
if "cosmomc_theta" in cambpars and "H0" in cambpars:
cambpars.pop("H0")
h_now = None
if "H0" in cambpars:
h_now = cambpars["H0"] / 100.0
self.h_now = h_now
if "Omegab" in cambpars:
if "H0" not in cambpars:
raise ValueError("Cannot convert Omegab without H0/h. Pass ombh2 or provide h/H0.")
cambpars["ombh2"] = cambpars.pop("Omegab") * (cambpars["H0"] / 100) ** 2
if "Omegak" in cambpars:
cambpars["omk"] = cambpars.pop("Omegak")
if "w0" in cambpars:
cambpars["w"] = cambpars.pop("w0")
if "logAs" in cambpars:
cambpars["As"] = np.exp(cambpars.pop("logAs")) * 1.0e-10
upr.debug_print("DEBUG: --> ", cosmopars)
shareDeltaNeff = cfg.settings["ShareDeltaNeff"]
cambpars["share_delta_neff"] = shareDeltaNeff
fidNeff = boltzmann_code.hardcoded_Neff
minmassmnu = boltzmann_code.hardcoded_mnu_massive_min
if "mnu" in cambpars:
if cambpars["mnu"] < minmassmnu and cambpars["num_nu_massive"] > 0:
raise ValueError(
f"mnu is less than {minmassmnu} and num_nu_massive is greater than 0. Check your yaml file."
)
if "Neff" in cambpars:
Neff = cambpars.pop("Neff")
if shareDeltaNeff:
cambpars["num_nu_massless"] = Neff - cambpars["num_nu_massive"]
else:
cambpars["num_nu_massless"] = Neff - fidNeff / 3
else:
Neff = cambpars["num_nu_massive"] + cambpars["num_nu_massless"]
if shareDeltaNeff:
g_factor = Neff / 3
else:
g_factor = fidNeff / 3
neutrino_mass_fac = boltzmann_code.hardcoded_neutrino_mass_fac
h2 = h_now**2 if h_now is not None else None
onuh2 = cambpars.get("omnuh2", 0.0)
if "mnu" in cambpars:
# If h/H0 is not explicit (e.g. theta parameterization), keep mnu as-is.
if h2 is not None:
Onu = cambpars["mnu"] / neutrino_mass_fac * (g_factor) ** 0.75 / h2
onuh2 = Onu * h2
cambpars["omnuh2"] = onuh2
elif "Omeganu" in cambpars:
if h2 is None:
raise ValueError("Cannot convert Omeganu without h/H0. Pass mnu or omnuh2.")
cambpars["omnuh2"] = cambpars.pop("Omeganu") * h2
onuh2 = cambpars["omnuh2"]
elif "omnuh2" in cambpars:
onuh2 = cambpars["omnuh2"]
if "Omegam" in cambpars: # TO BE GENERALIZED
if h2 is None:
raise ValueError(
"Cannot convert Omegam to omch2 without h/H0. Pass omch2 directly or provide h/H0."
)
cambpars["omch2"] = cambpars.pop("Omegam") * h2 - cambpars["ombh2"] - onuh2
rescaleAs = False
if "sigma8" in cambpars:
insigma8 = cambpars["sigma8"]
cambpars["As"] = self.settings.get("rescale_ini_As", 2.1e-9)
cambpars.pop("sigma8")
rescaleAs = True
try:
camb.set_params(**cambpars) # to see which methods are being called: verbose=True
except camb.CAMBUnknownArgumentError as argument:
upr.debug_print("Remove parameter from cambparams: ", str(argument))
# pars= camb.set_params(redshifts=[0.], kmax=50.0,accurate_massive_neutrino_transfers=True,lmax=1000, lens_potential_accuracy=1,**cambpars)
self.extrap_kmax = cambpars.pop("extrap_kmax", 100)
if rescaleAs is True:
cambpars["As"] = self.rescale_LP(cambpars, camb, insigma8)
return cambpars
[docs]
def rescale_LP(self, cambpars, camb, insigma8):
"""
Rescale As to match a target sigma8 value.
Parameters
----------
cambpars : dict
CAMB parameters.
camb : module
The CAMB module.
insigma8 : float
Target sigma8 value.
Returns
-------
float
Rescaled As value.
"""
cambpars_LP = cambpars.copy()
ini_As = self.settings.get("rescale_ini_As", 2.1e-9)
boost = self.settings.get("rescale_boost", 1)
cambpars_LP["AccuracyBoost"] = boost
cambpars_LP["lAccuracyBoost"] = boost
cambpars_LP["lSampleBoost"] = boost
cambpars_LP["kmax"] = 20
pars = camb.set_params(redshifts=[0.0], **cambpars_LP)
results = camb.get_results(pars)
test_sig8 = np.array(results.get_sigma8())
final_As = ini_As * (insigma8 / test_sig8[-1]) ** 2.0
get_rescaled_s8 = self.settings.get("get_rescaled_s8", False)
if get_rescaled_s8:
cambpars_rs = cambpars_LP.copy()
cambpars_rs["As"] = final_As
pars2 = camb.set_params(redshifts=[0.0], **cambpars_rs)
results2 = camb.get_results(pars2)
final_sig8 = np.array(results2.get_sigma8())[-1]
if self.feed_lvl > 2:
print("AccuracyBoost input = ", cambpars["AccuracyBoost"])
print("AccuracyBoost rescaling = ", cambpars_LP["lAccuracyBoost"])
print("Goal sig8 = ", insigma8)
print("Reference As = ", ini_As)
print("Reference sig8 = ", test_sig8)
print("Rescaled As = ", final_As)
if get_rescaled_s8:
print("Rescaled sig8 = ", final_sig8)
return final_As
[docs]
def camb_results(self, camb):
"""
Compute and store CAMB results.
Parameters
----------
camb : module
The CAMB module.
Notes
-----
This method computes various cosmological quantities using CAMB and stores
them in the results attribute.
"""
tini_camb = time()
self.results = types.SimpleNamespace()
cambres = camb.get_results(self.cambclasspars)
if self.feed_lvl > 2:
tres = time()
print("Time for Results = ", tres - tini_camb)
Pk_l, self.results.zgrid, self.results.kgrid = cambres.get_matter_power_interpolator(
hubble_units=False,
k_hunit=False,
var1="delta_tot",
var2="delta_tot",
nonlinear=False,
extrap_kmax=self.extrap_kmax,
return_z_k=True,
)
Pk_nl, zgrid, kgrid = cambres.get_matter_power_interpolator(
hubble_units=False,
k_hunit=False,
var1="delta_tot",
var2="delta_tot",
nonlinear=True,
extrap_kmax=self.extrap_kmax,
return_z_k=True,
)
Pk_cb_l, zgrid, kgrid = cambres.get_matter_power_interpolator(
hubble_units=False,
k_hunit=False,
var1="delta_nonu",
var2="delta_nonu",
nonlinear=False,
extrap_kmax=self.extrap_kmax,
return_z_k=True,
)
self.results.Pk_l = RectBivariateSpline(
self.results.zgrid, self.results.kgrid, Pk_l.P(self.results.zgrid, self.results.kgrid)
)
self.results.Pk_nl = RectBivariateSpline(
self.results.zgrid, self.results.kgrid, Pk_nl.P(self.results.zgrid, self.results.kgrid)
)
self.results.Pk_cb_l = RectBivariateSpline(
self.results.zgrid,
self.results.kgrid,
Pk_cb_l.P(self.results.zgrid, self.results.kgrid),
)
self.results.h_of_z = InterpolatedUnivariateSpline(
self.results.zgrid, cambres.h_of_z(self.results.zgrid)
)
self.results.ang_dist = InterpolatedUnivariateSpline(
self.results.zgrid, cambres.angular_diameter_distance(self.results.zgrid)
)
self.results.com_dist = InterpolatedUnivariateSpline(
self.results.zgrid, cambres.comoving_radial_distance(self.results.zgrid)
)
self.results.Om_m = InterpolatedUnivariateSpline(
self.results.zgrid,
(
cambres.get_Omega("cdm", z=self.results.zgrid)
+ cambres.get_Omega("baryon", z=self.results.zgrid)
+ cambres.get_Omega("nu", z=self.results.zgrid)
),
)
# Calculate the Non linear cb power spectrum using Gabrieles Approximation
f_cdm = cambres.get_Omega("cdm", z=0) / self.results.Om_m(0)
f_b = cambres.get_Omega("baryon", z=0) / self.results.Om_m(0)
f_cb = f_cdm + f_b
f_nu = 1 - f_cb
Pk_cross_l = cambres.get_matter_power_interpolator(
hubble_units=False,
k_hunit=False,
var1="delta_nonu",
var2="delta_nu",
nonlinear=False,
extrap_kmax=self.extrap_kmax,
return_z_k=False,
)
Pk_nunu_l = cambres.get_matter_power_interpolator(
hubble_units=False,
k_hunit=False,
var1="delta_nu",
var2="delta_nu",
nonlinear=False,
extrap_kmax=self.extrap_kmax,
return_z_k=False,
)
Pk_cb_nl = (
1
/ f_cb**2
* (
Pk_nl.P(self.results.zgrid, self.results.kgrid)
- 2 * Pk_cross_l.P(self.results.zgrid, self.results.kgrid) * f_cb * f_nu
- Pk_nunu_l.P(self.results.zgrid, self.results.kgrid) * f_nu**2
)
)
self.results.Pk_cb_nl = RectBivariateSpline(
self.results.zgrid, self.results.kgrid, Pk_cb_nl
)
if self.feed_lvl > 2:
tPk = time()
print("Time for lin+nonlin Pk = ", tPk - tres)
P_kz_0 = self.results.Pk_l(0.0, self.results.kgrid)
D_g_norm_kz = np.sqrt(self.results.Pk_l(self.results.zgrid, self.results.kgrid) / P_kz_0)
self.results.D_growth_zk = RectBivariateSpline(
self.results.zgrid, self.results.kgrid, (D_g_norm_kz), kx=3, ky=3
)
P_cb_kz_0 = self.results.Pk_cb_l(0.0, self.results.kgrid)
D_g_cb_norm_kz = np.sqrt(
self.results.Pk_cb_l(self.results.zgrid, self.results.kgrid) / P_cb_kz_0
)
self.results.D_growth_cb_zk = RectBivariateSpline(
self.results.zgrid, self.results.kgrid, (D_g_cb_norm_kz), kx=3, ky=3
)
if self.feed_lvl > 2:
tDzk = time()
print("Time for Growth factor = ", tDzk - tPk)
f_z_k_array, z_array = self.f_deriv(
self.results.D_growth_zk, self.results.zgrid, self.results.kgrid
)
self.results.f_growthrate_zk = RectBivariateSpline(
z_array, self.results.kgrid, f_z_k_array.T
)
f_cb_z_k_array, z_array = self.f_deriv(
self.results.D_growth_cb_zk, self.results.zgrid, self.results.kgrid
)
self.results.f_growthrate_cb_zk = RectBivariateSpline(
z_array, self.results.kgrid, f_cb_z_k_array.T
)
if self.feed_lvl > 2:
tfzk = time()
print("Time for Growth factor = ", tfzk - tDzk)
self.results.s8_cb_of_z = self.compute_sigma8(
self.results.zgrid, self.results.Pk_cb_l, self.h_now, self.results.kgrid
)
self.results.s8_of_z = self.compute_sigma8(
self.results.zgrid, self.results.Pk_l, self.h_now, self.results.kgrid
)
if self.feed_lvl > 2:
ts8 = time()
print("Time for Growth factor = ", ts8 - tfzk)
if self.cambcosmopars["Want_CMB"]:
powers = cambres.get_cmb_power_spectra(CMB_unit="muK")
self.results.camb_cmb = powers["total"]
tend_camb = time()
if self.feed_lvl > 2:
print("Time for CMB = ", tend_camb - ts8)
if self.feed_lvl > 1:
print("Cosmology computation took {:.2f} s".format(tend_camb - tini_camb))
[docs]
def class_setparams(self, cosmopars):
"""
Set the parameters for CLASS computation.
Parameters
----------
cosmopars : dict
Dictionary containing the cosmological parameters.
Notes
-----
This method sets up CLASS parameters and prepares for power spectrum computation.
"""
tini_basis = time()
self.classcosmopars = {
**self.boltzmann_classpars["ACCURACY"],
**self.boltzmann_classpars["COSMO_SETTINGS"],
**self.boltzmann_classpars[self.settings["cosmo_model"]],
}.copy()
upr.debug_print(self.classcosmopars)
upr.debug_print(cosmopars)
self.classcosmopars.update(self.changebasis_class(self.cosmopars))
upr.debug_print(self.classcosmopars)
self.kmax_pk = self.classcosmopars["P_k_max_1/Mpc"]
self.kmin_pk = 1e-4
self.zmax_pk = self.classcosmopars["z_max_pk"]
tend_basis = time()
if self.feed_lvl > 2:
print("Basis change took {:.2f} s".format(tend_basis - tini_basis))
self.print_cosmo_params(
self.classcosmopars, feedback=self.feed_lvl, text="---CLASS parameters---"
)
[docs]
def changebasis_class(self, cosmopars):
"""
Convert cosmological parameters to CLASS format.
Parameters
----------
cosmopars : dict
Dictionary of cosmological parameters.
Returns
-------
dict
Dictionary of CLASS-formatted cosmological parameters.
"""
classpars = deepcopy(cosmopars)
if "h" in classpars:
classpars["h"] = classpars.pop("h")
h = classpars["h"]
if "H0" in classpars:
classpars["H0"] = classpars.pop("H0")
h = classpars["H0"] / 100.0
shareDeltaNeff = cfg.settings["ShareDeltaNeff"]
fidNeff = boltzmann_code.hardcoded_Neff
Neff = classpars.pop("Neff", fidNeff)
if shareDeltaNeff:
classpars["N_ur"] = (
2.0 / 3.0 * Neff
) # This version does not have the discontinuity at Nur = 1.99
g_factor = Neff / 3.0
else:
classpars["N_ur"] = Neff - fidNeff / 3.0
g_factor = fidNeff / 3.0
neutrino_mass_fac = boltzmann_code.hardcoded_neutrino_mass_fac
if "mnu" in classpars:
classpars["T_ncdm"] = (4.0 / 11.0) ** (1.0 / 3.0) * g_factor ** (1.0 / 4.0)
classpars["Omega_ncdm"] = (
classpars["mnu"] * g_factor ** (0.75) / neutrino_mass_fac / h**2
)
classpars.pop("mnu")
# classpars['m_ncdm'] = classpars.pop('mnu')
# Om_ncdm = classpars['m_ncdm'] / 93.13858 /h/h
elif "Omeganu" in classpars:
classpars["Omega_ncdm"] = classpars.pop("Omeganu")
if "100omega_b" in classpars:
classpars["omega_b"] = (1 / 100) * classpars.pop("100omega_b")
if "Omegab" in classpars:
classpars["Omega_b"] = classpars.pop("Omegab")
if "Omegam" in classpars:
classpars["Omega_cdm"] = (
classpars.pop("Omegam") - classpars["Omega_b"] - classpars["Omega_ncdm"]
)
if "w0" in classpars:
classpars["w0_fld"] = classpars.pop("w0")
if "wa" in classpars:
classpars["wa_fld"] = classpars.pop("wa")
if "logAs" in classpars:
classpars["A_s"] = np.exp(classpars.pop("logAs")) * 1.0e-10
if "10^9As" in classpars:
classpars["A_s"] = classpars.pop("10^9As") * 1.0e-9
if "ns" in classpars:
classpars["n_s"] = classpars.pop("ns")
return classpars
[docs]
def class_results(self, Class):
"""
Compute and store CLASS results.
Parameters
----------
Class : class
The CLASS class.
Notes
-----
This method computes various cosmological quantities using CLASS and stores
them in the results attribute.
"""
self.results = types.SimpleNamespace()
classres = Class()
classres.set(self.classcosmopars)
classres.compute()
self.Classres = classres
self.results.h_of_z = np.vectorize(classres.Hubble)
self.results.ang_dist = np.vectorize(classres.angular_distance)
self.results.com_dist = np.vectorize(classres.comoving_distance)
h = classres.h()
self.results.s8_of_z = np.vectorize(lambda zz: classres.sigma(R=8 / h, z=zz))
self.results.s8_cb_of_z = np.vectorize(lambda zz: classres.sigma_cb(R=8 / h, z=zz))
self.results.Om_m = np.vectorize(classres.Om_m)
# Calculate the Matter fractions for CB Powerspectrum
f_cdm = classres.Omega0_cdm() / classres.Omega_m()
f_b = classres.Omega_b() / classres.Omega_m()
f_cb = f_cdm + f_b
f_nu = 1 - f_cb
## rows are k, and columns are z
## interpolating function Pk_l (k,z)
Pk_l, k, z = classres.get_pk_and_k_and_z(nonlinear=False)
Pk_cb_l, k, z = classres.get_pk_and_k_and_z(only_clustering_species=True, nonlinear=False)
self.results.Pk_l = RectBivariateSpline(z[::-1], k, (np.flip(Pk_l, axis=1)).transpose())
# self.results.Pk_l = lambda z,k: [np.array([classres.pk_lin(kval,z) for kval in k])]
self.results.Pk_cb_l = RectBivariateSpline(
z[::-1], k, (np.flip(Pk_cb_l, axis=1)).transpose()
)
# self.results.Pk_cb_l = lambda z,k: [np.array([classres.pk_cb_lin(kval,z) for kval in k])]
self.results.kgrid = k
self.results.zgrid = z[::-1]
## interpolating function Pk_nl (k,z)
Pk_nl, k, z = classres.get_pk_and_k_and_z(nonlinear=cfg.settings["nonlinear"])
self.results.Pk_nl = RectBivariateSpline(z[::-1], k, (np.flip(Pk_nl, axis=1)).transpose())
tk, k, z = classres.get_transfer_and_k_and_z()
T_cb = (f_b * tk["d_b"] + f_cdm * tk["d_cdm"]) / f_cb
T_nu = tk["d_ncdm[0]"]
pm = classres.get_primordial()
pk_prim = (
UnivariateSpline(pm["k [1/Mpc]"], pm["P_scalar(k)"])(k)
* (2.0 * np.pi**2)
/ np.power(k, 3)
)
pk_cnu = T_nu * T_cb * pk_prim[:, None]
pk_nunu = T_nu * T_nu * pk_prim[:, None]
Pk_cb_nl = 1.0 / f_cb**2 * (Pk_nl - 2 * pk_cnu * f_nu * f_cb - pk_nunu * f_nu * f_nu)
self.results.Pk_cb_nl = RectBivariateSpline(
z[::-1], k, (np.flip(Pk_cb_nl, axis=1)).transpose()
)
def create_growth():
z_ = self.results.zgrid
pk_flipped = np.flip(Pk_l, axis=1).T
D_growth_zk = RectBivariateSpline(z_, k, np.sqrt(pk_flipped / pk_flipped[0, :]))
return D_growth_zk
self.results.D_growth_zk = create_growth()
f_z_k_array, z_array = self.f_deriv(
self.results.D_growth_zk, self.results.zgrid, self.results.kgrid
)
f_g_kz = RectBivariateSpline(z_array, self.results.kgrid, f_z_k_array.T)
self.results.f_growthrate_zk = f_g_kz
def create_growth_cb():
z_ = self.results.zgrid
pk_flipped = np.flip(Pk_cb_l, axis=1).T
D_growth_zk = RectBivariateSpline(z_, k, np.sqrt(pk_flipped / pk_flipped[0, :]))
return D_growth_zk
self.results.D_growth_cb_zk = create_growth_cb()
def f_cb_deriv(k_array, k_fix=False, fixed_k=1e-3):
z_array = np.linspace(0, classres.pars["z_max_pk"], 100)
if k_fix:
k_array = np.full((len(k_array)), fixed_k)
## Generates interpolaters D(z) for varying k values
D_cb_z = np.array(
[
UnivariateSpline(z_array, self.results.D_growth_cb_zk(z_array, kk), s=0)
for kk in k_array
]
)
## Generates arrays f(z) for varying k values
f_cb_z = np.array(
[
-(1 + z_array) / D_cb_zk(z_array) * (D_cb_zk.derivative())(z_array)
for D_cb_zk in D_cb_z
]
)
return f_cb_z, z_array
f_cb_z_k_array, z_array = f_cb_deriv(self.results.kgrid)
f_g_cb_kz = RectBivariateSpline(z_array, self.results.kgrid, f_cb_z_k_array.T)
self.results.f_growthrate_cb_zk = f_g_cb_kz
[docs]
def changebasis_symb(self, cosmopars):
"""
Convert and adjust cosmological parameters for symbolic computation.
This method takes the input cosmological parameters and converts them
to the format required for symbolic computation. It handles unit
conversions, derives some parameters, and ensures consistency between
different parameterizations (e.g., sigma8 and As).
Parameters
----------
cosmopars : dict
Dictionary containing the input cosmological parameters.
Returns
-------
dict
A new dictionary with the converted and adjusted parameters suitable
for symbolic computation.
Raises
------
ValueError
If there's an issue with the sigma8 to As conversion or vice versa.
KeyError
If required parameters are missing from the input.
Notes
-----
- The method performs conversions between h and H0, ombh2 and Omegab,
and handles various representations of the primordial power spectrum
amplitude (As, sigma8).
- Debug information is printed if the upr.debug flag is set.
"""
upr.debug_print("DEBUG: cosmopars = ", cosmopars)
symbpars = deepcopy(cosmopars)
if "h" in symbpars:
symbpars["h"] = symbpars.pop("h")
if "H0" in symbpars:
H0 = symbpars.pop("H0")
symbpars["h"] = H0 / 100.0
if "ombh2" in symbpars:
symbpars["Omegab"] = symbpars.pop("ombh2") / (symbpars["h"] ** 2)
if "omch2" in symbpars:
symbpars["Omegam"] = (symbpars.pop("omch2") / symbpars["h"] ** 2) + symbpars["Omegab"]
# Omegam = Omegac + Omegab
# ["sigma8", "As", "logAs", "10^9As", "ln_A_s_1e10"]
try:
if "As" in symbpars:
symbpars["10^9As"] = 10**9 * symbpars.pop("As")
if "logAs" in symbpars:
symbpars["10^9As"] = 10**9 * (np.exp(symbpars.pop("logAs")) * 1.0e-10)
try:
As_value = symbpars.get("10^9As", -np.inf)
upr.debug_print("DEBUG: symbpars['10^9As'] = ", As_value)
if As_value != -np.inf:
try:
symbpars["sigma8"] = self.symblin.As_to_sigma8(
symbpars["10^9As"],
symbpars["Omegam"],
symbpars["Omegab"],
symbpars["h"],
symbpars["ns"],
)
upr.debug_print("DEBUG: derived sigma8 = ", symbpars["sigma8"])
except Exception as e:
print(f"An error occurred: {e}")
print("As to sigma8 conversion failed")
raise ValueError
else:
upr.debug_print("DEBUG: 10^9As value not found in passed parameters")
upr.debug_print("DEBUG: symbpars = ", symbpars)
except KeyError:
pass
try:
sigma8_value = symbpars.get("sigma8", -np.inf)
# if np.isscalar(sigma8_value) or (isinstance(sigma8_value, np.ndarray) and sigma8_value.ndim == 0):
if sigma8_value != -np.inf:
try:
As_n = self.symblin.sigma8_to_As(
symbpars["sigma8"],
symbpars["Omegam"],
symbpars["Omegab"],
symbpars["h"],
symbpars["ns"],
)
upr.debug_print("DEBUG: derived 10^9As = ", As_n)
symbpars["10^9As"] = As_n
except Exception as e:
print(f"An error occurred: {e}")
print("sigma8 to As conversion failed")
raise ValueError
else:
upr.debug_print("DEBUG: sigma8 value not found in passed parameters")
upr.debug_print("DEBUG: symbpars = ", symbpars)
except KeyError:
print("sigma8 or 10^9As not in symbpars")
raise KeyError
except Exception as e:
print(f"An error occurred: {e}")
print("sigma8<->As conversion failed for other reasons")
upr.debug_print("DEBUG: symbpars = ", symbpars)
raise
return symbpars
[docs]
def symbolic_setparams(self):
"""
Set up parameters for symbolic computation.
Notes
-----
This method prepares the cosmological parameters and numerical settings
for symbolic power spectrum computation.
Raises
------
ValueError
If the cosmological model is not supported by the symbolic code.
"""
tini_basis = time()
if cfg.settings["cosmo_model"] != "LCDM":
print("Symbolic_pofk only supports LCDM at the moment")
raise ValueError("Cosmo model not supported by cosmo code")
self.symbcosmopars = dict()
self.symbcosmopars.update(self.changebasis_symb(self.cosmopars))
self.h_now = self.symbcosmopars["h"]
self.kmax_pk = self.boltzmann_symbolicpars["NUMERICS"]["kmax_pk"]
self.kmin_pk = self.boltzmann_symbolicpars["NUMERICS"]["kmin_pk"]
self.zmax_pk = self.boltzmann_symbolicpars["NUMERICS"]["zmax_pk"]
self.zmin_pk = self.boltzmann_symbolicpars["NUMERICS"]["zmin_pk"]
self.z_samples = self.boltzmann_symbolicpars["NUMERICS"]["z_samples"]
self.zgrid = np.linspace(self.zmin_pk, self.zmax_pk, self.z_samples)
self.k_samples = self.boltzmann_symbolicpars["NUMERICS"]["k_samples"]
self.kgrid_1Mpc = np.logspace(
np.log10(self.kmin_pk * self.h_now), np.log10(self.kmax_pk * self.h_now), self.k_samples
)
tend_basis = time()
if self.feed_lvl > 2:
print("Basis change took {:.2f} s".format(tend_basis - tini_basis))
self.print_cosmo_params(
self.symbcosmopars, feedback=self.feed_lvl, text="--- Symbolic Cosmo parameters ---"
)
[docs]
@upr.suppress_warnings
def symbolic_results(self):
"""
Compute and store results using symbolic computation.
Returns
-------
types.SimpleNamespace
Namespace containing the computed cosmological quantities.
Notes
-----
This method computes various cosmological quantities using symbolic
computation and stores them in the results attribute.
"""
tini_results = time()
self.results = types.SimpleNamespace()
symb_colmo_pars = {
"flat": True,
"sigma8": self.symbcosmopars["sigma8"],
"Om0": self.symbcosmopars["Omegam"],
"Ob0": self.symbcosmopars["Omegab"],
"H0": self.symbcosmopars["h"] * 100,
"ns": self.symbcosmopars["ns"],
}
colossus_persistence = cfg.settings.get("colossus_persistence")
if colossus_persistence is not None:
symb_colmo_pars["persistence"] = colossus_persistence
self.results.zgrid = self.zgrid
self.h_now = self.symbcosmopars["h"]
self.kgrid_hMpc = self.kgrid_1Mpc / self.h_now
self.results.kgrid = self.kgrid_1Mpc # results kgrid is in units of 1/Mpc
self.symbcosmo = self.colmo.cosmology.setCosmology("colmo", **symb_colmo_pars)
self.results.h_of_z = np.vectorize(
lambda zz: self.symbcosmo.Hz(zz) / cosmo_functions.c
) # H(z) in 1/Mpc
self.results.ang_dist = np.vectorize(
lambda zz: self.symbcosmo.angularDiameterDistance(zz) / self.h_now
)
self.results.com_dist = np.vectorize(
lambda zz: self.symbcosmo.comovingDistance(z_min=0.0, z_max=zz, transverse=True)
/ self.h_now
)
self.results.s8_of_z = np.vectorize(lambda zz: self.symbcosmo.sigma(8, z=zz))
self.results.Om_m = self.symbcosmo.Om
D_kz = np.array([self.symbcosmo.growthFactor(self.zgrid) for kk in self.kgrid_1Mpc]).T
self.results.D_growth_zk = RectBivariateSpline(self.zgrid, self.kgrid_1Mpc, D_kz)
self.results.Pk_l_0 = ((1 / self.h_now) ** 3) * self.symblin.plin_emulated(
self.kgrid_hMpc,
self.symbcosmopars["sigma8"],
self.symbcosmopars["Omegam"],
self.symbcosmopars["Omegab"],
self.h_now,
self.symbcosmopars["ns"],
emulator=self.emulator_precision,
extrapolate=self.extrapolate,
kmin=self.kmin_pk,
kmax=self.kmax_pk,
)
# symbfit plin_emulated returns P_l(k,z=0) in 1/Mpc^3, requests kgrid in h/Mpc
Pk_at_z = (D_kz**2) * self.results.Pk_l_0
self.results.Pk_l = RectBivariateSpline(
self.zgrid, self.kgrid_1Mpc, Pk_at_z
) # P_l(k,z) in 1/Mpc^3
f_z_k_array, z_array = self.f_deriv(
self.results.D_growth_zk, self.results.zgrid, self.results.kgrid
)
self.results.f_growthrate_zk = RectBivariateSpline(z_array, self.kgrid_1Mpc, f_z_k_array.T)
def vectorized_halofit(z):
return ((1 / self.h_now) ** 3) * self.symbfit.run_halofit(
self.kgrid_hMpc,
self.symbcosmopars["sigma8"],
self.symbcosmopars["Omegam"],
self.symbcosmopars["Omegab"],
self.h_now,
self.symbcosmopars["ns"],
a=cosmo_functions.scale_factor(z),
emulator=self.emulator_precision,
extrapolate=self.extrapolate,
which_params=self.which_params,
add_correction=self.add_correction,
)
# symbfit run_halofit returns P_nl(k,z) in Mpc^3/h^3, requests kgrid in h/Mpc
vectorized_run_halofit = np.vectorize(vectorized_halofit, signature="()->(n)")
Pk_nl = vectorized_run_halofit(self.zgrid)
self.results.Pk_nl = RectBivariateSpline(self.zgrid, self.kgrid_1Mpc, Pk_nl)
# Pk_nl in 1/Mpc^3
if self.feed_lvl > 2:
print("Symbolic results took {:.2f} s".format(time() - tini_results))
return self.results
### Reset the zgrid and kgrid to the fiducial ones, in case one parameter variation had a different one
# self.results.zgrid = self.input_arrays[('z_grid','fiducial_eps_0')].flatten()
# self.results.kgrid = self.input_arrays[('k_grid','fiducial_eps_0')].flatten()
[docs]
class cosmo_functions:
c = sconst.speed_of_light / 1000 ##speed of light in km/s
def __init__(self, cosmopars, input=None):
"""
Initialize the cosmo_functions class.
Parameters
----------
cosmopars : dict
Dictionary containing the cosmological parameters.
input : str, optional
Input type for the cosmological code (default is None).
Raises
------
ValueError
If an unsupported input type is specified.
"""
self.settings = cfg.settings
self.fiducialcosmopars = cfg.fiducialparams
self.input = input
if input is None:
input = cfg.input_type
if input == "camb":
cambresults = boltzmann_code(cosmopars, code="camb")
self.code = "camb"
self.results = cambresults.results
self.kgrid = cambresults.results.kgrid
self.cosmopars = cambresults.cosmopars
self.cambcosmopars = cambresults.cambclasspars
elif input == "external":
self.external = cfg.external
## filter settings which contain the word 'external_'
# extra_settings = dict([[kk, self.settings[kk]] for kk in self.settings.keys() if 'external_' in kk])
# externalinput = external_input(cosmopars, fiducialcosmopars=self.fiducialcosmopars,
# external=self.external, extra_settings=extra_settings)
if self.settings["memorize_cosmo"]:
externalinput = memorize_external_input(
cosmopars, self.fiducialcosmopars, self.external, self.settings
)
else:
externalinput = external_input(
cosmopars,
fiducialcosmopars=self.fiducialcosmopars,
external=self.external,
extra_settings=self.settings,
)
self.code = "external"
self.results = externalinput.results
self.kgrid = externalinput.results.kgrid
self.cosmopars = externalinput.cosmopars
elif input == "class":
classresults = boltzmann_code(cosmopars, code="class")
self.code = "class"
self.results = classresults.results
self.Classres = classresults.Classres
self.kgrid = classresults.results.kgrid
self.cosmopars = classresults.cosmopars
self.classcosmopars = classresults.classcosmopars
elif input == "symbolic":
symbresults = boltzmann_code(cosmopars, code="symbolic")
self.code = "symbolic"
self.results = symbresults.results
self.kgrid = symbresults.results.kgrid
self.cosmopars = symbresults.cosmopars
self.symbcosmopars = symbresults.symbcosmopars
else:
print(input, ": This input type is not implemented yet")
[docs]
def Hubble(self, z, physical=False):
"""
Hubble function
Parameters
----------
z : float
redshift
physical: bool
Default False, if True, return H(z) in (km/s/Mpc).
Returns
-------
float
Hubble function values (Mpc^-1) at the redshifts of the input redshift
"""
prefactor = 1
if physical:
prefactor = self.c
hubble = prefactor * self.results.h_of_z(z)
return hubble
[docs]
def E_hubble(self, z):
"""
E(z) dimensionless Hubble function
Parameters
----------
z : float
redshift
Returns
-------
float
Dimensionless E(z) Hubble function values at the redshifts of the input redshift
"""
H0 = self.Hubble(0.0)
Eofz = self.Hubble(z) / H0
return Eofz
[docs]
def angdist(self, z):
"""
Angular diameter distance
Parameters
----------
z : float
redshift
Returns
-------
float
Angular diameter distance values at the redshifts of the input redshift
"""
dA = self.results.ang_dist(z)
return dA
[docs]
def matpow(self, z, k, nonlinear=False, tracer="matter"):
"""
Calculates the power spectrum of a given tracer quantity at a specific redshift and wavenumber.
Parameters
----------
z : float
The redshift of interest.
k : array_like
An array of wavenumbers at which to compute the power spectrum. These must be in units of 1/Mpc and
should be sorted in increasing order.
nonlinear : bool, optional
A boolean indicating whether or not to include nonlinear corrections to the matter power spectrum. The default
value is False.
tracer : str, optional
A string indicating which trace quantity to use for computing the power spectrum. If this argument is "matter"
or anything other than "clustering", the power spectrum functions `Pmm` will be used to compute the power
spectrum. If the argument is "clustering", the power spectrum function `Pcb` will be used instead. The default
value is "matter".
Returns
-------
np.ndarray:
Array containing the calculated power spectrum values.
Warnings
--------
If `tracer` is not "matter" or "clustering", a warning message is printed to the console saying the provided tracer was not
recognized and the function defaults to using `Pmm` to calculate the power spectrum of matter.
"""
if tracer == "clustering":
return self.Pcb(z, k, nonlinear=nonlinear)
if tracer != "matter":
warn("Did not recognize tracer: reverted to matter")
return self.Pmm(z, k, nonlinear=nonlinear)
[docs]
def Pmm(self, z, k, nonlinear=False):
"""
Compute the power spectrum of the total matter species (MM) at a given redshift and wavenumber.
Args:
self: An instance of the current class.
z: The redshift at which to compute the MM power spectrum.
k: The wavenumber at which to compute the MM power spectrum in 1/Mpc.
nonlinear (bool, optional): If True, include nonlinear effects in the computation. Default is False.
Returns:
float: The value of the MM power spectrum at the given redshift and wavenumber.
"""
if nonlinear is True:
power = self.results.Pk_nl(z, k, grid=False)
elif nonlinear is False:
power = self.results.Pk_l(z, k, grid=False)
return power # type: ignore
[docs]
def Pcb(self, z, k, nonlinear=False):
"""
Compute the power spectrum of the clustering matter species (CB) at a given redshift and wavenumber.
Args:
self: An instance of the current class.
z: The redshift at which to compute the CB power spectrum.
k: The wavenumber at which to compute the CB power spectrum in 1/Mpc.
nonlinear (bool, optional): If True, include nonlinear effects in the computation. Default is False.
Returns:
The value of the CB power spectrum at the given redshift and wavenumber.
"""
if nonlinear is True:
power = self.results.Pk_cb_nl(z, k, grid=False)
elif nonlinear is False:
power = self.results.Pk_cb_l(z, k, grid=False)
return power
[docs]
def nonwiggle_pow(self, z, k, nonlinear=False, tracer="matter"):
"""
Calculate the power spectrum at a specific redshift and wavenumber,
after smoothing to remove baryonic acoustic oscillations (BAO).
Args:
z: The redshift of interest.
k: An array of wavenumbers at which to compute the power
spectrum. Must be in units of Mpc^-1/h. Should be sorted in
increasing order.
nonlinear: Whether to include nonlinear corrections
to the matter power spectrum. Default is False.
tracer: Which perturbations to use for computing
the power spectrum. Options are 'matter' or 'clustering'.
Default is 'matter'.
Returns:
An array of power spectrum values corresponding to the
input wavenumbers. Units are (Mpc/h)^3.
Note:
This function computes the power spectrum of a given tracer quantity
at a specific redshift, using the matter power spectrum function
`matpow`. It then applies a Savitzky-Golay filter to smooth out the
BAO features in the power spectrum. This is done by first taking the
natural logarithm of the power spectrum values at a set of logarithmic
wavenumbers spanning from `kmin_loc` to `kmax_loc`. The smoothed power
spectrum is then returned on a linear (not logarithmic) grid of
wavenumbers given by the input array `k`.
"""
unitsf = self.cosmopars["h"]
kmin_loc = unitsf * self.settings["savgol_internalkmin"]
kmax_loc = unitsf * np.max(self.kgrid)
loc_samples = self.settings["savgol_internalsamples"]
log_kgrid_loc = np.linspace(np.log(kmin_loc), np.log(kmax_loc), loc_samples)
poly_order = self.settings["savgol_polyorder"]
dlnk_loc = np.mean(log_kgrid_loc[1:] - log_kgrid_loc[0:-1])
savgol_width = self.settings["savgol_width"]
n_savgol = int(np.round(savgol_width / np.log(1 + dlnk_loc)))
upr.debug_print(n_savgol)
upr.debug_print(savgol_width)
upr.debug_print(dlnk_loc)
upr.debug_print(kmin_loc)
upr.debug_print(kmax_loc)
intp_p = InterpolatedUnivariateSpline(
log_kgrid_loc,
np.log(
self.matpow(z, np.exp(log_kgrid_loc), nonlinear=nonlinear, tracer=tracer).flatten()
),
k=1,
)
pow_sg = savgol_filter(intp_p(log_kgrid_loc), n_savgol, poly_order)
intp_pnw = InterpolatedUnivariateSpline(np.exp(log_kgrid_loc), np.exp(pow_sg), k=1)
return intp_pnw(k)
[docs]
def comoving(self, z):
"""
Comoving distance
Parameters
----------
z : float
redshift
Returns
-------
float
Comoving distance values at the redshifts of the input redshift
"""
chi = self.results.com_dist(z)
return chi
[docs]
def sigma8_of_z(self, z, tracer="matter"):
"""
sigma_8
Parameters
----------
z : float
redshift
tracer: String
either 'matter' if you want sigma_8 calculated from the total matter power spectrum or 'clustering' if you want it from the Powerspectrum with massive neutrinos substracted
Returns
-------
float
The Variance of the matter perturbation smoothed over a scale of 8 Mpc/h
"""
if tracer == "clustering":
return self.results.s8_cb_of_z(z)
elif tracer == "matter":
return self.results.s8_of_z(z)
else:
warn("Did not recognize tracer: reverted to matter")
return self.results.s8_of_z(z)
[docs]
def growth(self, z, k=None):
"""
Growth factor
Parameters
----------
z : float
redshift
Returns
-------
float
Growth factor values at the redshifts of the input redshift
"""
if k is None:
k = 0.0001
Dg = self.results.D_growth_zk(z, k, grid=False)
return Dg
[docs]
def Omegam_of_z(self, z):
"""
Omega matter fraction as a function of redshift
Parameters
----------
z : float
redshift
Returns
-------
float
Omega matter (total) at the redshifts of the input redshift `z`
Note
-----
Assumes standard matter evolution
Implements the following equation:
.. math::
Omega_m(z) = Omega_{m,0}*(1+z)^3 / E^2(z)
"""
omz = 0
if self.input == "external":
omz = (self.cosmopars["Omegam"] * (1 + z) ** 3) / self.E_hubble(z) ** 2
else:
omz = self.results.Om_m(z)
return omz
[docs]
def f_growthrate(self, z, k=None, gamma=False, tracer="matter"):
r"""
Growth rate in LCDM gamma approximation
Parameters
----------
z : float
redshift
Returns
-------
float
Growth rate values at the redshifts of the input redshift,
using self.gamma as gamma value.
Note
-----
Implements the following equation:
.. math::
f(z) = Omega_m(z)^{\gamma}
"""
if k is None:
k = 0.0001
if tracer == "clustering":
fg = self.results.f_growthrate_cb_zk(z, k, grid=False)
return fg
if tracer != "matter":
warn("Did not recognize tracer: reverted to matter")
if gamma is False:
fg = self.results.f_growthrate_zk(z, k, grid=False)
else:
# Assumes standard Omega_matter evolution in z
fg = np.power(self.Omegam_of_z(z), self.gamma)
return fg
[docs]
def fsigma8_of_z(self, z, k=None, gamma=False, tracer="matter"):
r"""
Growth rate in LCDM gamma approximation
Parameters
----------
z : float
redshift
Returns
-------
float
Growth rate values at the redshifts of the input redshift,
using self.gamma as gamma value.
Note
-----
Implements the following equation:
.. math::
f(z) = Omega_m(z)^{\gamma}
"""
# Assumes standard Omega_matter evolution in z
fs8 = self.f_growthrate(z, k, gamma, tracer=tracer) * self.sigma8_of_z(z, tracer=tracer)
return fs8
[docs]
def SigmaMG(self, z, k):
"""
Compute the modified growth rate Sigma for modified gravity models.
Parameters
----------
z : float
Redshift
k : float
Wavenumber
Returns
-------
float
Modified growth rate Sigma
"""
Sigma = np.array(1)
if self.settings["activateMG"] == "late-time":
E11 = self.cosmopars["E11"]
E22 = self.cosmopars["E22"]
# TODO: Fix for non-flat models
Omega_DE = 1 - self.Omegam_of_z(z)
mu = 1 + E11 * Omega_DE
eta = 1 + E22 * Omega_DE
Sigma = (mu / 2) * (1 + eta)
elif (
self.settings["external_activateMG"] is True
or self.settings["activateMG"] == "external"
):
Sigma = self.results.SigWL_zk(z, k, grid=False)
return Sigma
[docs]
def cmb_power(self, lmin, lmax, obs1, obs2):
"""
Compute the CMB power spectrum.
Parameters
----------
lmin : int
Minimum multipole moment
lmax : int
Maximum multipole moment
obs1 : str
First observable (e.g., 'CMB_TCMB_T')
obs2 : str
Second observable (e.g., 'CMB_ECMB_E')
Returns
-------
np.ndarray
Array of CMB power spectrum values
"""
def _get_flag(container, key: str, default: bool = False) -> bool:
# CAMB params objects are not plain dicts; handle both styles.
if container is None:
return default
try:
val = container[key]
except Exception:
val = getattr(container, key, default)
try:
return bool(val)
except Exception:
return default
ells = np.arange(lmin, lmax)
norm_fac = 2 * np.pi / (ells * (ells + 1))
if self.code == "camb":
want = _get_flag(self.cambcosmopars, "Want_CMB", False) or _get_flag(
self.cambcosmopars, "WantCls", False
)
if not want and not hasattr(self.results, "camb_cmb"):
raise ValueError(
"CMB spectrum not computed for CAMB. Enable WantCls/Want_CMB in the CAMB YAML."
)
if not hasattr(self.results, "camb_cmb"):
raise AttributeError(
"CMB spectrum missing from CAMB results. Expected self.results.camb_cmb."
)
if obs1 + obs2 == "CMB_TCMB_T":
cls = norm_fac * self.results.camb_cmb[lmin:lmax, 0]
elif obs1 + obs2 == "CMB_ECMB_E":
cls = norm_fac * self.results.camb_cmb[lmin:lmax, 1]
elif obs1 + obs2 == "CMB_BCMB_B":
cls = norm_fac * self.results.camb_cmb[lmin:lmax, 2]
elif (obs1 + obs2 == "CMB_TCMB_E") or (obs1 + obs2 == "CMB_ECMB_T"):
cls = norm_fac * self.results.camb_cmb[lmin:lmax, 3]
else:
cls = np.zeros_like(ells, dtype=float)
return cls
if self.code == "class":
# CLASS CMB spectra plumbing is not yet implemented consistently.
# Fail fast with a clear message instead of returning None.
raise NotImplementedError(
"CMB spectra for CLASS backend are not supported yet. Use code='camb' for now."
)
raise NotImplementedError(
f"CMB spectra not supported for backend code={self.code!r}. Use code='camb'."
)
[docs]
@staticmethod
def scale_factor(z):
"""
Compute the scale factor a from redshift z.
Parameters
----------
z : float or array-like
Redshift
Returns
-------
float or array-like
Scale factor a
"""
return 1.0 / (1.0 + z)