import math
import os
import subprocess
from collections import abc
from configparser import ConfigParser
from functools import wraps
from warnings import catch_warnings, simplefilter
import numpy as np
# Add this global variable after imports
[docs]
class misc:
[docs]
@staticmethod
def deepupdate(original, update):
"""Recursively update a dict.
Subdicts won't be overwritten but also updated.
"""
if not isinstance(original, abc.Mapping):
return update
for key, value in update.items():
if isinstance(value, abc.Mapping):
original[key] = misc.deepupdate(original.get(key, {}), value)
else:
original[key] = value
return original
[docs]
class printing:
SUPPRESS_WARNINGS = False # Can be toggled elsewhere in the codebase
debug = False
[docs]
@staticmethod
def debug_print(*args, **kwargs):
if printing.debug:
print(*args, **kwargs)
return None
[docs]
@staticmethod
def time_print(
feedback_level=0,
min_level=0,
text="Computation done in: ",
time_ini=None,
time_fin=None,
instance=None,
):
if time_ini is not None and time_fin is not None:
elapsed_time = time_fin - time_ini
ela_str = " {:.2f} s".format(elapsed_time)
else:
ela_str = ""
if instance is not None:
instr = "In class: " + instance.__class__.__name__
else:
instr = ""
if feedback_level >= min_level:
print("")
print(instr + " " + text + ela_str)
return None
[docs]
@staticmethod
def suppress_warnings(func):
"""
Decorator to optionally suppress warnings based on global SUPPRESS_WARNINGS flag.
Parameters
----------
func : callable
The function to wrap
Returns
-------
callable
Wrapped function with optional warning suppression
"""
@wraps(func)
def wrapper(*args, **kwargs):
if printing.SUPPRESS_WARNINGS:
with catch_warnings():
simplefilter("ignore")
return func(*args, **kwargs)
return func(*args, **kwargs)
return wrapper
[docs]
class numerics:
old_round_decimals_up = False
[docs]
@staticmethod
def moving_average(data_set, periods=2):
weights = np.ones(periods) / periods
return np.convolve(data_set, weights, mode="valid")
[docs]
@staticmethod
def round_decimals_up(number, decimals: int = 2, precision=5):
"""
Returns a value rounded up to a specific number of decimal places.
"""
if numerics.old_round_decimals_up:
number = np.float16(np.format_float_positional(number, precision=precision))
# this function protects from precision issues with floats
if decimals == 0:
return np.ceil(number)
elif number < 1e-2:
decimals = 4
elif number < 1e-1:
decimals = 3
factor = 10**decimals
rounded_number = np.ceil(number * factor) / factor
else:
printing.debug_print(f"number: {number}")
exponent = math.floor(math.log10(number))
mantissa = number / (10**exponent)
rounded_mantissa = math.ceil(mantissa * 10) / 10
rounded_number = rounded_mantissa * (10**exponent)
return rounded_number
[docs]
@staticmethod
def closest(lst, K):
lst = np.asarray(lst)
idx = (np.abs(lst - K)).argmin()
return lst[idx]
[docs]
@staticmethod
def bisection(array, value):
"""Given an ``array``, and given a ``value``, returns an
index j such that ``value`` is between array[j]
and array[j+1]. ``array`` must be monotonic increasing.
Returns 0 if value is less than or equal to the first element,
and len(array)-1 if value is greater than or equal to the last element.
"""
array = np.asarray(array) # Ensure array is a numpy array
n = len(array)
if np.isclose(value, array[0]) or value < array[0]:
return 0
if np.isclose(value, array[-1]) or value > array[-1]:
return n - 2 # used with bins, which are defined as [z_i, z_i+1]
left, right = 0, n - 1
while left < right:
mid = (left + right) // 2
if np.isclose(array[mid], value):
return mid
elif array[mid] < value:
left = mid + 1
else:
right = mid
return left - 1
[docs]
@staticmethod
def find_nearest(a, a0):
"Element in nd array `a` closest to the scalar value `a0`"
idx = np.abs(a - a0).argmin()
return idx
# -----------------------------------------------------------------------------
# Reproducibility helpers
# -----------------------------------------------------------------------------
[docs]
def load_fisher_from_json(json_path):
"""Load a FisherMatrix configuration snapshot JSON and construct a FisherMatrix.
Parameters
----------
json_path : str
Path to a JSON produced by FisherMatrix.export_fisher (``*_specifications.json``).
Returns
-------
tuple
(fisher_matrix_instance, snapshot_dict)
Notes
-----
- This only reconstructs the object with the same options/specifications/parameters.
You still need to call ``.compute()`` on the returned FisherMatrix to perform the run.
"""
import json as _json
from cosmicfishpie.fishermatrix import cosmicfish as _cff
with open(json_path, "r") as jf:
snap = _json.load(jf)
options = snap.get("options", {})
specifications = snap.get("specifications", {})
fiducialpars = snap.get("fiducialpars", {})
freepars = snap.get("freepars", {})
meta = snap.get("metadata", {})
observables = meta.get("observables", ["GCph", "WL"]) # sane default
fm = _cff.FisherMatrix(
fiducialpars=fiducialpars,
freepars=freepars,
options=options,
specifications=specifications,
observables=observables,
cosmoModel=options.get("cosmo_model", "LCDM"),
surveyName=options.get("survey_name", "Euclid"),
)
return fm, snap
[docs]
class filesystem:
[docs]
@staticmethod
def mkdirp(dirpath):
"""
This function creates the directory dirpath if it is not found
:param dirpath: string with the path of the directory to be created
:return: None
:rtype: NoneType
"""
outdir = os.path.dirname(dirpath)
# create directory if it does not exist
if outdir == "":
print("Output root is on working directory")
elif not os.path.exists(outdir):
try:
os.makedirs(outdir)
print((str(outdir) + " directory created"))
except OSError:
raise
else:
print((str(outdir) + " exists already"))
return None
# Return the git revision as a string
[docs]
@staticmethod
def git_version():
# From: https://stackoverflow.com/a/40170206/1378746
def _minimal_ext_cmd(cmd):
# construct minimal environment
env = {}
for k in ["SYSTEMROOT", "PATH"]:
v = os.environ.get(k)
if v is not None:
env[k] = v
# LANGUAGE is used on win32
env["LANGUAGE"] = "C"
env["LANG"] = "C"
env["LC_ALL"] = "C"
out = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env).communicate()[0]
return out
try:
out = _minimal_ext_cmd(["git", "rev-parse", "HEAD"])
GIT_REVISION = out.strip().decode("ascii")
except OSError:
GIT_REVISION = "Unknown"
except BaseException:
GIT_REVISION = "Not a git repository"
return GIT_REVISION
[docs]
class physmath:
sr = np.power(180 / np.pi, 2)
[docs]
@staticmethod
def areasky():
fullarea = 4 * np.pi * physmath.sr
return fullarea
[docs]
@staticmethod
def radtodeg(rads):
degs = rads * physmath.sr
return degs
[docs]
@staticmethod
def degtorad(degs):
rads = degs / physmath.sr
return rads
[docs]
def sqdegtofsky(sqd):
fsky = sqd / physmath.areasky()
return fsky
[docs]
def fskytosqdeg(fsky):
sqdeg = fsky * physmath.areasky()
return sqdeg