# This file is part of MICMAC.
# Copyright (C) 2024 CNRS / SciPol developers
#
# MICMAC is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MICMAC is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with MICMAC. If not, see <https://www.gnu.org/licenses/>.
"""
Module to create customized fgs models starting from the PySM ones.
"""
import healpy as hp
import numpy as np
from pysm3 import Sky
from scipy import constants
__all__ = [
'get_spectral_params_true_values',
'parametric_sky_customized',
'get_observation_customized',
'fgs_freq_maps_from_customized_model_nonparam',
]
h_over_k = constants.h * 1e9 / constants.k
[docs]
def get_spectral_params_true_values(nside, model=['s0', 'd0']):
"""
For a given PySM model returns the true values
of the spectral parameters.
The first fgs must have attribute pl_index
and the second mbb_index and mbb_temperature.
Parameters
----------
nside: int
Healpix nside
model: list of str
PySM model to consider
Returns
-------
beta_pl: float
Synchrotron spectral index
beta_mbb: float
Dust spectral index
temp_mbb: float
Dust temperature
"""
sky = Sky(nside=nside, preset_strings=model)
# Synchrotron
synch = sky.components[0]
beta_pl = synch.pl_index.value
# Dust
dust = sky.components[1]
beta_mbb = dust.mbb_index.value
temp_mbb = dust.mbb_temperature.value
return beta_pl, beta_mbb, temp_mbb
[docs]
def parametric_sky_customized(fgs_models, nside_map, nside_spv):
"""
Gives d1s1-like model with less spv of the spectral parameters
(still parametric)
Returns a PySM-like Sky object
Parameters
----------
fgs_models: list of str
PySM model to consider
nside_map: int
Healpix nside of the final maps
nside_spv: list
Healpix nside of the spectral parameters
Returns
-------
sky: pysm3.Sky
PySM-like Sky object with the modified spectral parameters
new_spectral_params: list of np.array
List of the new spectral parameters
"""
new_spectral_params = []
### Get the sky
sky = Sky(nside=nside_map, preset_strings=fgs_models)
for f, model in enumerate(fgs_models):
### Modify the spectral parameter values
if model == 'd1':
# beta dust
beta_mbb = sky.components[f].mbb_index.value
beta_mbb_dowgraded = hp.ud_grade(beta_mbb, nside_spv[f])
# hp.mollview(beta_mbb_dowgraded, title='Downgraded beta dust')
beta_mbb_new = hp.ud_grade(beta_mbb_dowgraded, nside_map)
# hp.mollview(beta_mbb_new, title='New beta dust')
new_spectral_params.append(beta_mbb_new)
for i, item in enumerate(beta_mbb_new):
sky.components[f].mbb_index.value[i] = item
# temp dust
temp_mbb = sky.components[f].mbb_temperature.value
temp_mbb_dowgraded = hp.ud_grade(temp_mbb, nside_spv[f])
# hp.mollview(temp_mbb_dowgraded, title='Downgraded temp dust')
temp_mbb_new = hp.ud_grade(temp_mbb_dowgraded, nside_map)
# hp.mollview(temp_mbb_new, title='New temp dust')
new_spectral_params.append(temp_mbb_new)
for i, item in enumerate(temp_mbb_new):
sky.components[f].mbb_temperature.value[i] = item
elif model == 's1':
# beta synch
beta_pl = sky.components[f].pl_index.value
beta_pl_dowgraded = hp.ud_grade(beta_pl, nside_spv[f])
# hp.mollview(beta_pl_dowgraded, title='Downgraded beta synch')
beta_pl_new = hp.ud_grade(beta_pl_dowgraded, nside_map)
# hp.mollview(beta_pl_new, title='New beta synch')
new_spectral_params.append(beta_pl_new)
for i, item in enumerate(beta_pl_new):
sky.components[f].pl_index[i] = item
# plt.show()
else:
raise ValueError('Model not recognized (only d1 and s1 supported as of now)')
return sky, new_spectral_params
[docs]
def get_observation_customized(instrument='', sky=None, noise=False, nside=None, unit='uK_CMB'):
"""
NOTE: This is a customized version of the FGBuster function
it takes the PySm Sky directly instead of a string tag
Get a pre-defined instrumental configuration
Parameters
----------
instrument:
It can be either a `str` (see :func:`get_instrument`) or an
object that provides the following as a key or an attribute.
- **frequency** (required)
- **depth_p** (required if ``noise=True``)
- **depth_i** (required if ``noise=True``)
They can be anything that is convertible to a float numpy array.
If only one of ``depth_p`` or ``depth_i`` is provided, the other is
inferred assuming that the former is sqrt(2) higher than the latter.
sky: str of pysm3.Sky
Sky to observe. It can be a `pysm3.Sky` or a tag to create one.
noise: bool
If true, add Gaussian, uncorrelated, isotropic noise.
nside: int
Desired output healpix nside. It is optional if `sky` is a `pysm3.Sky`,
and required if it is a `str` or ``None``.
unit: str
Unit of the output. Only K_CMB and K_RJ (and multiples) are supported.
Returns
-------
observation: array
Shape is ``(n_freq, 3, n_pix)``
"""
import pysm3.units as u
from micmac.external.fgbuster import (
get_instrument,
get_noise_realization,
standardize_instrument,
)
if isinstance(instrument, str):
instrument = get_instrument(instrument)
else:
instrument = standardize_instrument(instrument)
if nside is None:
nside = sky.nside
elif not isinstance(sky, str):
try:
assert nside == sky.nside, (
'Mismatch between the value of the nside of the pysm3.Sky '
'argument and the one passed in the nside argument.'
)
except AttributeError:
raise ValueError('Either provide a pysm3.Sky as sky argument ' ' or specify the nside argument.')
if noise:
res = get_noise_realization(nside, instrument, unit)
else:
res = np.zeros((len(instrument.frequency), 3, hp.nside2npix(nside)))
for res_freq, freq in zip(res, instrument.frequency):
emission = sky.get_emission(freq * u.GHz).to(getattr(u, unit), equivalencies=u.cmb_equivalencies(freq * u.GHz))
res_freq += emission.value
return res
[docs]
def fgs_freq_maps_from_customized_model_nonparam(
nside_map, nside_spv, instrument, fgs_models, idx_ref_freq, return_mixing_mat=False
):
"""
Gives non-parametric model built from input (parametric) fgs_model,
with a different level of spv of the scaling laws given by nside_spv.
The scaling laws (A) come from the freq maps of the input fgs_model
downgraded to nside_spv.
Returns the final freq maps elements built as d=As.
Parameters
----------
nside_map: int
nside at which the final maps are built
nside_spv: list
nside at which the spv in the scaling laws is downgraded
instrument: dictionary
dictionary containing the instrument configuration,
must have key "frequency" containing a list of the instr freqs
fgs_models: list[str]
list of strings refferring to the PySM fgs models to start from
idx_ref_freq: int
index of the reference frequency to use for normalization in A
return_mixing_mat: bool
if True, return also the mixing matrix
Returns
-------
freq_maps_final: array
final frequency maps
mixing_mat: array
mixing matrix, returned only if return_mixing_mat is True
"""
from micmac.external.fgbuster import get_observation
from micmac.external.fgbuster import _my_ud_grade
n_fgs_comp = len(fgs_models)
mixing_mat = np.zeros((len(instrument.frequency), n_fgs_comp, hp.nside2npix(nside_map)))
ref_maps_QU = []
# Loop over fgs components (typically synch and dust)
for i, model_i in enumerate(fgs_models):
# Take initial freq maps
if nside_spv[i] == 0:
freq_maps_fgs_i = get_observation(instrument, model_i, nside=1, noise=False)[:, 1, :]
freq_maps_fgs_i = np.average(freq_maps_fgs_i, axis=1)
else:
freq_maps_fgs_i = get_observation(instrument, model_i, nside=nside_spv[i], noise=False)[
:, 1, :
] # keep only Q (assumed same mixing mat elements for Q and U)
# Build new mixing matrix elements
mixing_mat[:, i, :] = np.array(
[
_my_ud_grade(freq_maps_fgs_i[f] / freq_maps_fgs_i[idx_ref_freq], nside_map)
for f in range(len(instrument.frequency))
]
)
# Take reference freq map
ref_maps_QU.append(get_observation(instrument, model_i, nside=nside_map, noise=False)[idx_ref_freq, 1:, :])
# Build final frequency maps
freq_maps_final = np.einsum('fcp,csp->fsp', mixing_mat, np.array(ref_maps_QU))
if return_mixing_mat:
return freq_maps_final, mixing_mat
return freq_maps_final