# -*- coding: utf-8 -*-
"""DERIVATIVES
This is the derivatives engine of CosmicFish.
"""
import copy
from time import time
import numpy as np
import cosmicfishpie.configs.config as cfg
from cosmicfishpie.utilities.utils import printing as upt
[docs]
class derivatives:
def __init__(
self,
observable,
fiducial,
special_deriv_function=None,
derivatives_type="3PT",
freeparams=dict(),
observables_type=None,
external_settings=None,
feed_lvl=None,
):
"""This class is the main derivative engine for the different observables. It gives access to different derivative methods. After the constructor of this class is called the resulting dictionary with the derivatives are is found in it's `results` attribute.
Arguments
---------
observable : callable
A callable function that when passed a dictionary of all cosmological and nuisance parameters will return the observable of a probe
fiducial : dict
A dictionary containing the fiducial values of all parameters
special_deriv_function : callable, optional
callable function that receives a parameter name and calculates the exact derivative of the observable for that parameter
freeparams : dict, optional
A dictionary for the parameters that should be varied. Will vary all parameters if not passed
Attributes
----------
observable : callable
A callable function that when passed a dictionary of all cosmological and nuisance parameters will return the observable of a probe
fiducial : dict
A dictionary containing the fiducial values of all parameters
special : callable, None
If it exists, it is a callable function that receives a parameter name and calculates the exact derivative of the observable for that parameter
feed_lvl : int
Number indicating the verbosity of the output. Higher numbers generally mean more output. Defaults to 2
observables : callable
A callable that is passed all cosmological and nuisance parameters in a dict that is returning the computed observable of a given probe
external_settings : dict
A dictionary containing all paths to the external files, how all the names of the files in the folder correspond to the cosmological quantities, the units etc. Will be none if code runs in internal mode
result : dict
A dictionary with all derivatives of the observable with respect to the parameter corresponding to the key name
"""
if freeparams != dict():
self.freeparams = freeparams
else:
self.freeparams = cfg.freeparams
self.observable = observable
self.fiducial = fiducial
self.special = special_deriv_function
if feed_lvl is None:
self.feed_lvl = cfg.settings["feedback"]
else:
self.feed_lvl = feed_lvl
if observables_type is None:
self.observables_type = cfg.obs
else:
self.observables_type = observables_type
if external_settings is None:
self.external_settings = cfg.external
else:
self.external_settings = external_settings
if derivatives_type is None:
self.derivatives_type = cfg.settings["derivatives"]
else:
self.derivatives_type = derivatives_type
if derivatives_type == "3PT":
self.result = self.derivative_3pt()
elif derivatives_type == "STEM":
self.result = self.derivative_stem()
elif derivatives_type == "POLY":
self.result = self.derivative_poly()
elif derivatives_type == "4PT_FWD":
self.result = self.derivative_forward_4pt()
else:
raise ValueError("ERROR: I don't know this derivative type!!!")
[docs]
def der_3pt_stencil(self, fwd, bwd, step):
"""Helper function to compute the 3PT symmetrical finite step size derivative
Arguments
---------
fwd : float, numpy.ndarray
Observable computed at the forward step
fwd : float, numpy.ndarray
Observable computed at the backwards step
step : float
Absolute step size of the numerical derivative
Returns
-------
float, numpy.ndarray
Numerical derivative using the a 3 point stencil
"""
der = (fwd - bwd) / (2 * step)
return der
[docs]
def derivative_3pt(self):
"""One of the possible derivative methods. Computes the numerical derivative using a finite differences 3 point symmetrical derivative
Returns
-------
dict
A dictionary containing the derivative of the observable for each varied parameter.
Note
-----
Implements the following equation:
.. math::
\\frac{\\mathrm{d} \\mathcal{O}}{\\mathrm{d} \\theta} = \\frac{\\mathcal{O}(\\theta+h)-\\mathcal{O}(\\theta-h)}{2\\,h}
"""
deriv_dict = {}
for par in self.freeparams:
if self.special is not None:
special_deriv = self.special(par)
if special_deriv is not None:
print("ððð Obtaining analytical derivative for parameter: ", str(par))
deriv_dict[par] = special_deriv
continue
elif special_deriv is None:
pass
if self.fiducial[par] != 0.0:
stepsize = self.fiducial[par] * self.freeparams[par]
else:
stepsize = self.freeparams[par]
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="+++ Computing derivative on {}".format(par),
)
tini = time()
# doing forward step
fwd = copy.deepcopy(self.fiducial)
fwd[par] = fwd[par] + stepsize
obs_fwd = self.observable(fwd)
# Doing backward step
bwd = copy.deepcopy(self.fiducial)
bwd[par] = bwd[par] - stepsize
obs_bwd = self.observable(bwd)
observables_type = self.observables_type or []
if "GCph" in observables_type or "WL" in observables_type:
dpar = {}
for key in obs_fwd:
if key == "ells":
dpar[key] = obs_fwd[key]
else:
dpar[key] = self.der_3pt_stencil(obs_fwd[key], obs_bwd[key], stepsize)
elif "GCsp" in observables_type or "IM" in observables_type:
dpar = {}
for key in obs_fwd:
if key == "z_bins":
dpar[key] = obs_fwd[key]
else:
dpar[key] = self.der_3pt_stencil(obs_fwd[key], obs_bwd[key], stepsize)
elif any(o in observables_type for o in ("CMB_T", "CMB_E", "CMB_B")):
dpar = {}
for key in obs_fwd:
if key == "ells":
dpar[key] = obs_fwd[key]
else:
dpar[key] = self.der_3pt_stencil(obs_fwd[key], obs_bwd[key], stepsize)
elif "plain" in observables_type:
dpar = self.der_3pt_stencil(obs_fwd, obs_bwd, stepsize)
else:
raise ValueError(
f"Unable to compute derivatives: unsupported observables_type={observables_type!r}."
)
tend = time()
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="Derivative on {} done! in :".format(par),
time_ini=tini,
time_fin=tend,
instance=self,
)
deriv_dict[par] = dpar
return deriv_dict
[docs]
def der_fwd_4pt(self, fwdi, step):
"""Helper function to compute the 4PT forward finite step size derivative
Arguments
---------
fwdi : list, numpy.ndarray
Observable computed at the fiducial and at equally spaced points in the forward direction
step : float
Absolute distance between the size of the numerical derivative
Returns
-------
float, numpy.ndarray
Numerical derivative using the a 4 point forward stencil
"""
der = (-11 * fwdi[0] + 18 * fwdi[1] - 9 * fwdi[2] + 2 * fwdi[3]) / (6 * step**1)
return der
[docs]
def derivative_forward_4pt(self):
r"""One of the possible derivative methods. Computes the numerical derivative using a finite differences one-sided 4 point forward derivative.
Taken from:
https://web.media.mit.edu/~crtaylor/calculator.html
@misc{fdcc,
title={Finite Difference Coefficients Calculator},
author={Taylor, Cameron R.},
year={2016},
howpublished="\url{https://web.media.mit.edu/~crtaylor/calculator.html}"
}
Returns
-------
dict
A dictionary containing the derivative of the observable for each varied parameter.
Note
-----
Implements the following equation:
.. math::
\\frac{\\mathrm{d} \\mathcal{O}}{\\mathrm{d} \\theta} = \\frac{-11\\,\\mathcal{O}(\\theta)+18\\,\\mathcal{O}(\\theta+h)-9\\,\\mathcal{O}(\\theta+2\\,)+2\\,\\mathcal{O}(\\theta+3\\,h)}{6\\,h}
"""
deriv_dict = {}
for par in self.freeparams:
if self.special is not None:
special_deriv = self.special(par)
if special_deriv is not None:
upt.time_print(
feedback_level=self.feed_lvl,
min_level=2,
text='ððð "Obtaining analytical derivative for parameter: {:s}'.format(par),
)
deriv_dict[par] = special_deriv
continue
elif special_deriv is None:
pass
if self.fiducial[par] != 0.0:
stepsize = self.fiducial[par] * self.freeparams[par]
else:
stepsize = self.freeparams[par]
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="+++ Computing 4pt forward derivative on {}".format(par),
)
tini = time()
# doing forward step
fwd_0 = copy.deepcopy(self.fiducial)
fwd_1 = copy.deepcopy(self.fiducial)
fwd_2 = copy.deepcopy(self.fiducial)
fwd_3 = copy.deepcopy(self.fiducial)
fwd_0[par] = fwd_0[par]
fwd_1[par] = fwd_1[par] + 1 * stepsize
fwd_2[par] = fwd_2[par] + 2 * stepsize
fwd_3[par] = fwd_3[par] + 3 * stepsize
fwdlist = [fwd_0, fwd_1, fwd_2, fwd_3]
Nsteps_fwd = len(fwdlist)
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="++++ Computing observables at 4 steps",
)
obs_fwd_list = []
for ffstep in fwdlist:
upt.time_print(
feedback_level=self.feed_lvl,
min_level=2,
text="^^^ Computing observable at parameter {:s} with value: {:.6f} and stepsize: {:.4f}".format(
par, ffstep[par], (ffstep[par] - fwd_0[par])
),
)
obs_at_step = self.observable(ffstep)
obs_fwd_list.append(obs_at_step)
upt.time_print(
feedback_level=self.feed_lvl,
min_level=2,
text="++^^++ Size of obs_fwd_list : {:d}".format(len(obs_fwd_list)),
)
if "GCph" in self.observables_type or "WL" in self.observables_type:
dpar = {}
for key in obs_fwd_list[0]:
if key == "ells":
dpar[key] = obs_fwd_list[0][key]
else:
obs_fwd_list_at_key = [obs_fwd_list[sti][key] for sti in range(Nsteps_fwd)]
dpar[key] = self.der_fwd_4pt(obs_fwd_list_at_key, stepsize)
if "GCsp" in self.observables_type or "IM" in self.observables_type:
dpar = {}
for key in obs_fwd_list[0]:
if key == "z_bins":
dpar[key] = obs_fwd_list[0][key]
else:
obs_fwd_list_at_key = [obs_fwd_list[sti][key] for sti in range(Nsteps_fwd)]
dpar[key] = self.der_fwd_4pt(obs_fwd_list_at_key, stepsize)
if "plain" in self.observables_type:
dpar = self.der_fwd_4pt(obs_fwd_list, stepsize)
tend = time()
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="Derivative on {} done! in :".format(par),
time_ini=tini,
time_fin=tend,
instance=self,
)
deriv_dict[par] = dpar
return deriv_dict
[docs]
def derivative_stem(self):
"""One of the possible derivative methods. Computes the numerical derivative using the SteM derivative method
Returns
-------
dict
A dictionary containing the derivative of the observable for each varied parameter.
"""
numstem = 11
mult_eps_factor = 5
def adaptive_eps(param_eps):
if self.external_settings is not None:
eps_v = np.array(self.external_settings["eps_values"])
d_eps = np.concatenate([-eps_v[::-1], eps_v])
else:
d_eps = np.linspace(
-param_eps * mult_eps_factor, param_eps * mult_eps_factor, numstem
)
# eps_arr = np.linspace(-self.freeparams[par], self.freeparams[par], numstem)
return d_eps
threshold = 1.0e-3
deriv_dict = {}
for par in self.freeparams:
if self.fiducial[par] != 0.0:
stepsize = self.fiducial[par] * adaptive_eps(self.freeparams[par])
else:
stepsize = adaptive_eps(self.freeparams[par])
dpar = {}
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="+++ Computing STEM derivative on {}".format(par),
)
tini = time()
obs_mod = []
for step in stepsize:
modpars = copy.deepcopy(self.fiducial)
modpars[par] = modpars[par] + step
obs_mod.append(self.observable(modpars))
if "GCph" in self.observables_type or "WL" in self.observables_type:
for key in obs_mod[0]:
# WARNING: THIS WORKS FOR NOW, BUT OTHER THINGS HAVE TO BE
# ADDED TO THE IF FOR OTHER OBS
if key == "ells":
dpar[key] = obs_mod[0][key]
else:
temp = []
for ind in range(len(dpar["ells"])):
residuals = 1000
counter = 0
tempstep = stepsize
while residuals > threshold:
fit = np.polyfit(
tempstep,
[obs_mod[step][key][ind] for step in range(len(tempstep))],
1,
full=True,
)
residuals = fit[1]
if residuals > threshold:
tempstep = tempstep[1:-1]
counter += 1
if numstem - counter < 3:
print("ERROR: {} derivative could not converge!".format(par))
exit()
temp.append(fit[0][0])
dpar[key] = np.array(temp)
else:
raise ValueError("STEM derivative not availabe for spectropscopic probes yet!")
tend = time()
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="Derivative on {} done! in :".format(par),
time_ini=tini,
time_fin=tend,
instance=self,
)
deriv_dict[par] = dpar
return deriv_dict
[docs]
def derivative_poly(self):
"""One of the possible derivative methods. Computes the numerical derivative using a polynomial derivative method
Returns
-------
dict
A dictionary containing the derivative of the observable for each varied parameter."""
numpoints = 10 # HARD CODED?
deriv_dict = {}
for par in self.freeparams:
if self.fiducial[par] != 0.0:
stepsize = np.linspace(
-self.fiducial[par] * self.freeparams[par],
self.fiducial[par] * self.freeparams[par],
numpoints,
)
else:
stepsize = np.linspace(-self.freeparams[par], self.freeparams[par], numpoints)
dpar = {}
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="+++ Computing poly derivative on {}".format(par),
)
tini = time()
fidpar = self.fiducial[par]
obs_mod = []
for step in stepsize:
modpars = copy.deepcopy(self.fiducial)
modpars[par] = modpars[par] + step
obs_mod.append(self.observable(modpars))
for key in obs_mod[0]:
# WARNING: THIS WORKS FOR NOW, BUT OTHER THINGS HAVE TO BE
# ADDED TO THE IF FOR OTHER OBS
if key == "ells":
dpar[key] = obs_mod[0][key]
else:
temp = []
for ind in range(len(dpar["ells"])):
fit = np.polyfit(
stepsize, [obs_mod[step][key][ind] for step in range(len(stepsize))], 4
)
temp.append(
4 * fit[0] * fidpar**3 * +3 * fit[2] * fidpar**2
+ 2 * fit[3] * fidpar
+ fit[4]
)
dpar[key] = np.array(temp)
tend = time()
upt.time_print(
feedback_level=self.feed_lvl,
min_level=1,
text="Derivative on {} done! in :".format(par),
time_ini=tini,
time_fin=tend,
instance=self,
)
deriv_dict[par] = dpar
return deriv_dict