# This file is part of MICMAC.
# Copyright (C) 2024 CNRS / SciPol developers
#
# MICMAC is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MICMAC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MICMAC. If not, see <https://www.gnu.org/licenses/>.
"""
Module to initialize the Mixing Matrix
with parameter values coming from the SEDs.
"""
from collections.abc import Iterable
import healpy as hp
import numpy as np
from astropy.cosmology import Planck15
from scipy import constants
from micmac.foregrounds.templates import get_n_patches_b
__all__ = ['InitMixingMatrix']
h_over_k = constants.h * 1e9 / constants.k
[docs]
class InitMixingMatrix:
[docs]
def __init__(
self,
freqs,
ncomp,
pos_special_freqs,
spv_nodes_b,
nside=None,
non_param_fgs_mixing_matrix=None,
beta_pl=-3.0,
beta_mbb=1.54,
temp_mbb=20.0,
):
"""
The main goal of this class is create the initial parameter values,
with init_params, which returns the params as a flattened array of values ordered
by components, freqs, patches.
Note:
* units are K_CMB.
* you must be always coherent with the order of fgs:
e.g. if you put first synchrotron and then dust in the mixing matrix (A)
that you pass to this class, then also in the pos_special_freqs, etc.
Parameters
----------
freqs: array
all input freq bands
ncomp: int
all comps (also cmb)
pos_special_freqs: array
indices of the freqs that are known
spv_nodes_b: array
tree containing info to build spv_templates
nside: int
nside of the map
non_param_fgs_mixing_matrix: array
only the fgs part of mixing matrix (A)
beta_pl: float or array[float]
spectral idx of the synchrotron power law (pl), if given as array it is expected to correspond to different patches
beta_mbb: float or array[float]
spectral idx of the dust modified black body (mbb), if given as array it is expected to correspond to different patches
temp_mbb: float or array[float]
temperature of the dust modified black body (mbb), if given as array it is expected to correspond to different patches
"""
self.freqs = freqs # all input freq bands
self.ncomp = ncomp # all comps (also cmb)
# make pos_special_freqs only positive
for i, val_i in enumerate(pos_special_freqs):
if val_i < 0:
pos_special_freqs[i] = len(self.freqs) + pos_special_freqs[i]
self.pos_special_freqs = pos_special_freqs
self.spv_nodes_b = spv_nodes_b # tree containing info to build spv_templates
self.nside = nside # nside of the map
self.non_param_fgs_mixing_matrix = non_param_fgs_mixing_matrix # only the fgs part of mixing matrix
self.beta_mbb = np.array(beta_mbb)
self.temp_mbb = np.array(temp_mbb)
self.beta_pl = np.array(beta_pl)
self.known_freqs = np.array(
[
self.freqs[self.pos_special_freqs[0]],
self.freqs[self.pos_special_freqs[1]],
]
)
self.unknown_freqs = np.delete(self.freqs, self.pos_special_freqs)
[docs]
def init_params(self):
"""
Main function to initialize the mixing matrix B.
* if non_param_mixing_matrix (A) is passed that is used to build the init values
* otherwise the init values are build from beta_mbb, temp_mbb, beta_pl
Returns
-------
init_param_values: array
initial parameter values as a flattened array of values ordered as [Bf1_comp1_patch1, Bf1_comp1_patch2, ..., Bf2_comp1_patch1, ..., Bf1_comp2_patch1, ..., Bfn_comp2_patchn, ...]
"""
# Get params of B pixel indep or with pixel dimension
if isinstance(self.non_param_fgs_mixing_matrix, Iterable):
# Get params from fgs mixing matrix (A) passed by the user
print('>>> init params built from fgs mixing matrix (A) passed by the user:')
fgs_SEDs = self.non_param_fgs_mixing_matrix
else:
print('>>> init params built with spectral params:', self.beta_mbb, self.temp_mbb, self.beta_pl)
fgs_SEDs = self.fgs_SEDs_from_spectral_params()
init_param_values_ = self.init_params_from_fgs_SEDs(fgs_SEDs)
# Make one parameter per patch
init_param_values = self.ud_grade_init_params_to_npatches(init_param_values_)
return init_param_values
[docs]
def fgs_SEDs_from_spectral_params(self):
"""
Get the SEDs of the foregrounds (fgs) from the spectral parameters.
Note: In Mixing Matrix we consider first synch then dust
Returns
-------
fgs_SEDs: array
SEDs of the fgs
"""
fgs_SEDs_pl = self.powerlaw(self.known_freqs[0], self.freqs[:, None], self.beta_pl)
fgs_SEDs_mbb = self.modifiedblackbody(self.known_freqs[1], self.freqs[:, None], self.temp_mbb, self.beta_mbb)
fgs_SEDs = np.stack((fgs_SEDs_pl, fgs_SEDs_mbb), axis=1)
return fgs_SEDs
[docs]
def init_params_from_fgs_SEDs(self, fgs_SEDs):
"""
Get params w SEDs for redefined fgs (true B entries)
B = A invM
in A only fgs entries are needed (no CMB column)
in M only the fgs entries are needed (only A_f1)
build M
Parameters
----------
fgs_SEDs: array
SEDs of the fgs
"""
# Get A_f1
A_f1 = fgs_SEDs[[self.pos_special_freqs[0], self.pos_special_freqs[-1]], :, :]
# Get inverse of A_f1
inv_A_f1 = np.linalg.inv(A_f1.swapaxes(0, -1)).swapaxes(0, -1)
return np.einsum('fc...,cg...->fg...', fgs_SEDs, inv_A_f1)
[docs]
def ud_grade_init_params_to_npatches(self, params_):
"""
Take one mixing matrix value on the full sky or one per pixel
and returns one mixing matrix value per patch.
Parameters
----------
params_: array
mixing matrix values
"""
assert self.ncomp - 1 == 2
# Modify to have a value per patch
params_s = []
params_d = []
ind_unknown_f = 0
for ind_f, val_f in enumerate(self.freqs):
if ind_f in self.pos_special_freqs:
pass
else:
# TODO: extend the counting of the patches to adaptive multires
# (maybe easier to extend it by looking at the spv_templates
# or add a function in the templates_spv.py to get the number of patches for each b)
n_patches_b_s = get_n_patches_b(self.spv_nodes_b[ind_unknown_f])
n_patches_b_d = get_n_patches_b(self.spv_nodes_b[ind_unknown_f + len(self.unknown_freqs)])
if params_.shape[2] == 1:
### Synchrotron
for patch in range(n_patches_b_s):
# UPgrade to params spv (one parameter per patch)
params_s.append(params_[ind_f, 0])
### Dust
for patch in range(n_patches_b_d):
# UPgrade to params spv (one parameter per patch)
params_d.append(params_[ind_f, 1])
else:
# DOWNgrade to params spv (one parameter per patch)
nside_b_s = int(np.sqrt(n_patches_b_s / 12))
if nside_b_s == 0:
params_s.append(np.mean(params_[ind_f, 0, :]))
else:
[params_s.append(item) for item in hp.ud_grade(params_[ind_f, 0, :], nside_b_s)]
nside_b_d = int(np.sqrt(n_patches_b_d / 12))
if nside_b_d == 0:
params_d.append(np.mean(params_[ind_f, 1, :]))
else:
[params_d.append(item) for item in hp.ud_grade(params_[ind_f, 1, :], nside_b_d)]
ind_unknown_f += 1
params = np.concatenate((params_s, params_d)).flatten()
return params
### Functions to get the parametric scaling laws
[docs]
def K_rj2K_cmb(self, nu):
"""
Conversion factor from K_rj to K_CMB at frequency nu
Parameters
----------
nu: float or array[float]
frequency in GHz
Returns
-------
K_rj2K_cmb: float or array[float]
conversion factor from K_rj to K_CMB at frequency nu
"""
Tcmb = Planck15.Tcmb(0).value
# Conversion factor at frequency nu
K_rj2K_cmb = np.expm1(h_over_k * nu / Tcmb) ** 2 / (np.exp(h_over_k * nu / Tcmb) * (h_over_k * nu / Tcmb) ** 2)
return K_rj2K_cmb
[docs]
def K_rj2K_cmb_nu0(self, nu, nu0):
"""
Conversion factor at frequency nu divided by the one at frequency nu0
Parameters
----------
nu: float or array[float]
frequency in GHz
nu0: float or array[float]
reference frequency in GHz
Returns
-------
K_rj2K_cmb_nu0: float or array[float]
conversion factor at frequency nu divided by the one at frequency nu0
"""
K_rj2K_cmb_nu0 = self.K_rj2K_cmb(nu) / self.K_rj2K_cmb(nu0)
return K_rj2K_cmb_nu0
[docs]
def powerlaw(self, nu0, nu, beta_pl):
"""
Scaling lawing law for synchrotron power law (pl) in K_CMB
Parameters
----------
nu0: float or array[float]
reference frequency in GHz
nu: float or array[float]
frequency in GHz
beta_pl: array
spectral idx of the pl
Returns
-------
analytic_expr: array[float]
scaling law for pl (in K_CMB)
"""
analytic_expr = (nu / nu0) ** (beta_pl)
# conversion to K_CMB units
analytic_expr *= self.K_rj2K_cmb_nu0(nu, nu0)
return analytic_expr
[docs]
def modifiedblackbody(self, nu0, nu, temp_mbb, beta_mbb):
"""
Scaling law for modified balck body (mbb) in K_CMB
Parameters
----------
nu0: float or array[float]
reference frequency in GHz
nu: float or array[float]
frequency in GHz
temp_mbb: array
temperature of the mbb
beta_mbb: array
spectral idx of the mbb
Returns
-------
analytic_expr: array[float]
scaling law for dust mbb (in K_CMB)
"""
analytic_expr = (
(np.exp(nu0 / temp_mbb * h_over_k) - 1)
/ (np.exp(nu / temp_mbb * h_over_k) - 1)
* (nu / nu0) ** (1 + beta_mbb)
)
# conversion to K_CMB units
analytic_expr *= self.K_rj2K_cmb_nu0(nu, nu0)
return analytic_expr