# -*- coding: utf-8 -*-
"""CMB MAIN
This is the main engine for CMB Fisher Matrix.
"""
import copy
import datetime
import os
import warnings
from itertools import product
from time import time
import numpy as np
import pandas as pd
import cosmicfishpie.CMBsurvey.CMB_obs as CMB_obs
import cosmicfishpie.configs.config as cfg
import cosmicfishpie.fishermatrix.derivatives as fishderiv
from cosmicfishpie.utilities.utils import printing as upt
pd.set_option("display.float_format", "{:.9E}".format)
[docs]
class CMBCov:
"""CMB covariance and derivatives for Fisher forecasts.
This class:
- computes fiducial CMB spectra (via `CMB_obs.ComputeCls`)
- adds instrumental noise given `cfg.specs` beam/noise settings
- constructs a per-ell covariance matrix
- computes numerical derivatives w.r.t. `cfg.freeparams`
Notes
-----
The CMB implementation currently supports the observables `CMB_T`, `CMB_E`,
`CMB_B`. Lensing spectra (e.g. phi-phi) are not implemented yet.
"""
def __init__(self, cosmopars, print_info_specs=False):
self.cosmopars = cosmopars
self.observables = []
for key in cfg.obs: # LENSING TO BE ADDED
if key in ["CMB_T", "CMB_E", "CMB_B"]:
self.observables.append(key)
self.print_info = print_info_specs
self.feed_lvl = cfg.settings["feedback"]
[docs]
def getcls(self, allpars):
"""Compute (noise-free) CMB C_ell for a given parameter dictionary."""
# Splitting the dictionary of full parameters
pars = dict((k, allpars[k]) for k in self.cosmopars)
cls = CMB_obs.ComputeCls(pars, print_info_specs=self.print_info)
cls.compute_all()
CMBcls = cls.result
return CMBcls
[docs]
def getclsnoise(self, cls):
"""Noise value
Parameters
----------
cls : dict
cls dictionary
Returns
-------
array
Noise value[ell,bin]
Note
-----
Implements the following equation:
.. math::
N_\\ell^X = ...
"""
noisy_cls = copy.deepcopy(cls)
# MM: kind of copy pasting the original cosmicfish stuff
# temp1 = self.cosmopars['TCMB']*np.pi/180./60.
# temp2 = 180.*60.*np.sqrt(8.*np.log(2.))/np.pi
# noisevar_temp = (np.array(cfg.specs['CMB_temp_sens'])*np.array(cfg.specs['CMB_fwhm'])*temp1)**2.
# noisevar_pol = (np.array(cfg.specs['CMB_pol_sens'])*np.array(cfg.specs['CMB_fwhm'])*temp1)**2.
# sigma2 = -1.*(np.array(cfg.specs['CMB_fwhm'])/temp2 )**2
# TT_Noise = np.zeros((len(noisy_cls['ells'])))
# pol_Noise = np.zeros((len(noisy_cls['ells'])))
# for ind,sig2 in enumerate(sigma2):
# TT_Noise += np.exp(noisy_cls['ells']*(noisy_cls['ells']+1)*sig2)/noisevar_temp[ind]
# pol_Noise += np.exp(noisy_cls['ells']*(noisy_cls['ells']+1)*sig2)/noisevar_pol[ind]
# for obs in self.observables:
# if obs == 'CMB_T':
# noisy_cls[obs+'x'+obs] += 1/TT_Noise
# elif obs == 'CMB_E' or obs == 'CMB_B':
# noisy_cls[obs+'x'+obs] += 1/pol_Noise
noise_model = str(cfg.specs.get("CMB_noise_model", "legacy")).strip().lower()
if noise_model not in {"legacy", "knox"}:
warnings.warn(
f"Unknown CMB_noise_model={noise_model!r}; falling back to 'legacy'.",
RuntimeWarning,
stacklevel=2,
)
noise_model = "legacy"
arcmin_to_rad = np.pi / (180.0 * 60.0)
# norm = ls*(ls+1)/(2.*np.pi)
thetab = [
arcmin_to_rad * beam / np.sqrt(8.0 * np.log(2.0)) for beam in cfg.specs["CMB_fwhm"]
]
Bell = [
np.exp(ang**2.0 * noisy_cls["ells"] * (noisy_cls["ells"] + 1) / 2.0) for ang in thetab
]
TTnoise_chan = np.zeros((len(cfg.specs["CMB_fwhm"]), len(noisy_cls["ells"])))
polnoise_chan = np.zeros((len(cfg.specs["CMB_fwhm"]), len(noisy_cls["ells"])))
for ind in range(len(cfg.specs["CMB_fwhm"])):
if noise_model == "knox":
delta_t_rad = arcmin_to_rad * cfg.specs["CMB_temp_sens"][ind]
delta_p_rad = arcmin_to_rad * cfg.specs["CMB_pol_sens"][ind]
# Eq. N_ell = w^{-1} b_ell^{-2}, with w^{-1} = Delta^2.
TTnoise_chan[ind, :] = (delta_t_rad**2.0) * (Bell[ind][:] ** 2.0)
polnoise_chan[ind, :] = (delta_p_rad**2.0) * (Bell[ind][:] ** 2.0)
else:
wtemp = (
arcmin_to_rad * cfg.specs["CMB_fwhm"][ind] * cfg.specs["CMB_temp_sens"][ind]
) ** (-2.0)
wpol = (
arcmin_to_rad * cfg.specs["CMB_fwhm"][ind] * cfg.specs["CMB_pol_sens"][ind]
) ** (-2.0)
TTnoise_chan[ind, :] = Bell[ind][:] / wtemp
polnoise_chan[ind, :] = Bell[ind][:] / wpol
if len(cfg.specs["CMB_fwhm"]) > 1:
if noise_model == "knox":
TTnoise = np.array(
[
self.sum_inv_linear(TTnoise_chan[:, ind])
for ind in range(len(noisy_cls["ells"]))
]
)
polnoise = np.array(
[
self.sum_inv_linear(polnoise_chan[:, ind])
for ind in range(len(noisy_cls["ells"]))
]
)
else:
TTnoise = np.array(
[
self.sum_inv_squares(TTnoise_chan[:, ind])
for ind in range(len(noisy_cls["ells"]))
]
)
polnoise = np.array(
[
self.sum_inv_squares(polnoise_chan[:, ind])
for ind in range(len(noisy_cls["ells"]))
]
)
else:
TTnoise = TTnoise_chan[0, :]
polnoise = polnoise_chan[0, :]
polnoise_E = polnoise
if noise_model == "knox":
# Preferred naming ("boost") for low-ell EE noise scaling.
lowell_factor = cfg.specs.get("CMB_EE_noise_boost_lowell")
lowell_lmax = cfg.specs.get("CMB_EE_noise_boost_lmax")
# Backward compatibility with deprecated "inflation" keys.
if lowell_factor is None and "CMB_EE_noise_inflation_lowell" in cfg.specs:
lowell_factor = cfg.specs.get("CMB_EE_noise_inflation_lowell")
warnings.warn(
"CMB_EE_noise_inflation_lowell is deprecated; use CMB_EE_noise_boost_lowell instead.",
DeprecationWarning,
stacklevel=2,
)
if lowell_lmax is None and "CMB_EE_noise_lmax_inflation" in cfg.specs:
lowell_lmax = cfg.specs.get("CMB_EE_noise_lmax_inflation")
warnings.warn(
"CMB_EE_noise_lmax_inflation is deprecated; use CMB_EE_noise_boost_lmax instead.",
DeprecationWarning,
stacklevel=2,
)
lowell_factor = float(1.0 if lowell_factor is None else lowell_factor)
lowell_lmax = int(29 if lowell_lmax is None else lowell_lmax)
if lowell_factor > 0.0 and lowell_factor != 1.0:
polnoise_E = polnoise.copy()
polnoise_E[noisy_cls["ells"] <= lowell_lmax] *= lowell_factor
for obs in self.observables:
if obs == "CMB_T":
noisy_cls[obs + "x" + obs] += TTnoise
elif obs == "CMB_E":
noisy_cls[obs + "x" + obs] += polnoise_E
elif obs == "CMB_B":
noisy_cls[obs + "x" + obs] += polnoise
return noisy_cls
[docs]
def sum_inv_squares(self, arr):
"""Combine multiple channels by inverse-variance weighting."""
res = np.sqrt(sum([i ** (-2) for i in arr]))
return 1 / res
[docs]
def sum_inv_linear(self, arr):
"""Combine channels as inverse-noise weighted sum."""
arr = np.asarray(arr, dtype=float)
inv = np.where(arr > 0.0, 1.0 / arr, 0.0)
inv_sum = np.sum(inv)
if inv_sum <= 0.0:
return np.inf
return 1.0 / inv_sum
[docs]
def get_covmat(self, noisy_cls):
"""Data covariance
Parameters
----------
noisy_cls : dict
dictionary containing cls with noise
Returns
-------
list
data covariance matrix
"""
pd.set_option("display.float_format", "{:.9E}".format)
covvec = []
# Create indexes for data frame
cols = []
for o in self.observables:
cols.append(o)
for ind, ell in enumerate(noisy_cls["ells"]):
covdf = pd.DataFrame(index=cols, columns=cols)
covdf = covdf.fillna(0.0)
for obs1, obs2 in product(self.observables, self.observables):
covdf.at[obs1, obs2] = noisy_cls[obs1 + "x" + obs2][ind] / np.sqrt(
np.sqrt(cfg.specs["fsky_" + obs1] * cfg.specs["fsky_" + obs2])
)
covvec.append(covdf)
return covvec
[docs]
def compute_covmat(self):
"""Compute fiducial CMB spectra, add noise, and build covariance matrices.
Returns
-------
tuple
`(noisy_cls, covvec)` where `noisy_cls` is a dict of spectra arrays and
`covvec` is a list of per-ell pandas DataFrames.
"""
tini = datetime.datetime.now().timestamp()
allpars = {}
allpars.update(self.cosmopars)
obstring = ""
for obs in self.observables:
obstring = obstring + obs
# Check free pars are in the fiducial
for key in cfg.freeparams:
if key not in allpars:
print("ERROR: free param " + key + " does not have a fiducial value!")
return None
if not os.path.exists("./raw_results"):
os.makedirs("./raw_results")
# compute fiducial
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Computing fiducial")
t1 = datetime.datetime.now().timestamp()
# fiducial_csv = './raw_results/'+cfg.settings['outroot']+'_'+obstring+'_fiducial.csv'
# if os.path.isfile(fiducial_csv):
# cls = pd.read_csv(fiducial_csv, index_col=0)
# if True: ## TODO: needs to be fixed to properly handle Pandas!
cls = self.getcls(allpars)
# fiducial = pd.DataFrame(cls, columns=cls.keys())
# fiducial.to_csv(fiducial_csv, index=False)
t2 = datetime.datetime.now().timestamp()
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Fiducial generated in {:.2f} s".format(t2 - t1))
# with open('./raw_results/'+cfg.settings['outroot']+'_'+obstring+'_fiducial.txt', 'w') as f:
# f.write(fiducial.to_string(header = True, index = False))
# if cfg.settings['derivatives'] == '3PT': numcomp = 2
# if cfg.settings['derivatives'] == 'STEM': numcomp = 11
# estimate = datetime.timedelta(seconds=numcomp*(t2-t1)*len(cfg.freeparams))
# if cfg.settings['feedback'] > 0: print('')
# if cfg.settings['feedback'] > 0: print('Estimated time to finish {}'.format(estimate))
t1 = datetime.datetime.now().timestamp()
# add noise
noisy_cls = self.getclsnoise(cls)
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Noise added to fiducial")
t2 = datetime.datetime.now().timestamp()
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Noisy Cls generated in {:.2f} s".format(t2 - t1))
# TODO: fix pandas stuff
# noisy_fiducial = pd.DataFrame(noisy_cls, columns=noisy_cls.keys())
# noisy_fiducial_csv = './raw_results/'+cfg.settings['outroot']+'_'+obstring+'_fiducial_noisy.csv'
# noisy_fiducial.to_csv(noisy_fiducial_csv, index=False)
# with open('./raw_results/'+cfg.settings['outroot']+'_'+obstring+'_fiducial_noisy.txt', 'w') as f:
# f.write(noisy_fiducial.to_string(header = True, index = False))
# build covmat
self.cls = cls
self.noisy_cls = noisy_cls
t1 = datetime.datetime.now().timestamp()
self.covmat = self.get_covmat(noisy_cls)
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Computed covariance matrix")
t2 = datetime.datetime.now().timestamp()
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Covmat of Cls generated in {:.2f} s".format(t2 - t1))
tfin = datetime.datetime.now().timestamp()
if cfg.settings["feedback"] > 0:
print("")
if cfg.settings["feedback"] > 0:
print("Total calculation in {:.2f} s".format(tfin - tini))
return self.noisy_cls, self.covmat
[docs]
def compute_derivs(self, print_info_specs=False):
"""Compute numerical derivatives of CMB spectra w.r.t. free parameters."""
# compute and save derivatives-----------------------------------------
allpars = {}
allpars.update(self.cosmopars)
derivs = dict()
# needs to be fixed: TODO: not treating things properly as Pandas!
# for par in cfg.freeparams:
# deriv_csv = './raw_results/'+cfg.settings['outroot']+'_'+obstring+'_derivative_'+par+'.csv'
# if os.path.isfile(deriv_csv):
# derivs[par] = pd.read_csv(deriv_csv, index_col=0)
# compute_derivs=False
# else:
# compute_derivs=True
# break
self.print_info = print_info_specs # Avoid printing info at each step
compute_derivs = True
if compute_derivs:
tder1 = time()
print(">> computing derivs >>")
deriv_engine = fishderiv.derivatives(self.getcls, allpars)
derivs = deriv_engine.result
tder2 = time()
# TODO: fix pandas stuff!!
# for par in cfg.freeparams:
# deriv_csv = './raw_results/'+cfg.settings['outroot']+'_'+obstring+'_derivative_'+par+'.csv'
# derivative = pd.DataFrame(derivs[par], columns=derivs[par].keys())
# derivative.to_csv(deriv_csv, index=False)
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="-->> Derivatives computed in ",
time_ini=tder1,
time_fin=tder2,
instance=self,
)
# with open('./raw_results/'+cfg.settings['outroot']+'_'+obstring+'_derivative_'+par+'.txt', 'w') as f:
# f.write(derivative.to_string(header = True, index = False))
self.derivs = derivs
return self.derivs