# FGBuster
# Copyright (C) 2019 Davide Poletti, Josquin Errard and the FGBuster developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Functions from fgbuster.observation_helpers used in the micmac package.
"""
import types
import re
import numpy as np
import pandas as pd
import healpy as hp
import pysm3
import pysm3.units as u
from cmbdb import cmbdb
__all__ = [
'get_sky',
'get_instrument',
'get_observation',
'get_noise_realization',
]
INSTRUMENT_STD_ATTR = 'frequency depth_i depth_p fwhm'.split()
_NL = '\n'
[docs]
def get_sky(nside, tag='c1d0s0'):
""" Get a pre-defined PySM sky
Parameters
----------
nside: int
healpix nside of the sky templates
tag: string
See the `pysm documentation
<https://pysm3.readthedocs.io/en/latest/models.html#models>`_
for a complete list of available options.
Default is 'c1d0s0', i.e. cmb (c1), dust with constant temperature and
spectral index (d0), and synchrotron with constant spectral index (s0).
Returns
-------
sky: pysm3.Sky
See the `pysm documentation
<https://pysm3.readthedocs.io/en/latest/api/pysm.Sky.html#pysm.Sky>`_
"""
# preset_strings = [tag[i:i+2] for i in range(0, len(tag), 2)]
# preset_strings = re.findall(r'[a-zA-Z]\d+', tag)
preset_strings = re.findall(r'[a-zA-Z]+[0-9]+', tag)
print(preset_strings)
return pysm3.Sky(nside, preset_strings=preset_strings)
[docs]
def get_instrument(tag=''):
""" Get a pre-defined instrumental configuration
Parameters
----------
tag: string
name of the pre-defined experimental configurations.
It can contain the name of multiple experiments separated by a space.
Call the function with a random input to get the available instruments.
Returns
-------
instr: pandas.DataFrame
It contains the experimental configuration of the desired instrument(s).
"""
df = cmbdb.loc[cmbdb['experiment'].isin(tag.split())]
if df.empty:
if tag == 'test':
df = pd.DataFrame()
df['frequency'] = np.arange(10., 300, 30.)
df['depth_p'] = (np.linspace(20, 40, 10) - 30)**2
df['depth_i'] = (np.linspace(20, 40, 10) - 30)**2
else:
from importlib.util import find_spec
exp_file = find_spec('cmbdb').submodule_search_locations[0]
exp_file += '/experiments.yaml'
github = 'https://github.com/dpole/cmbdb'
raise ValueError(
(f"Instrument(s) {tag} not available." if tag else "") +
f"Choose between: {' '.join(cmbdb.experiment.unique())}{_NL}"
f"Add your instrument to your local copy of cmbdb: {exp_file}\n"
f"Beware, you might lose changes when you update: "
f"push your new configuration to {github}")
return df.dropna(axis=1, how='all')
[docs]
def get_observation(instrument='', sky=None,
noise=False, nside=None, unit='uK_CMB'):
""" Get a pre-defined instrumental configuration
Parameters
----------
instrument:
It can be either a `str` (see :func:`get_instrument`) or an
object that provides the following as a key or an attribute.
- **frequency** (required)
- **depth_p** (required if ``noise=True``)
- **depth_i** (required if ``noise=True``)
They can be anything that is convertible to a float numpy array.
If only one of ``depth_p`` or ``depth_i`` is provided, the other is
inferred assuming that the former is sqrt(2) higher than the latter.
sky: str of pysm3.Sky
Sky to observe. It can be a `pysm3.Sky` or a tag to create one.
noise: bool
If true, add Gaussian, uncorrelated, isotropic noise.
nside: int
Desired output healpix nside. It is optional if `sky` is a `pysm3.Sky`,
and required if it is a `str` or ``None``.
unit: str
Unit of the output. Only K_CMB and K_RJ (and multiples) are supported.
Returns
-------
observation: array
Shape is ``(n_freq, 3, n_pix)``
"""
if isinstance(instrument, str):
instrument = get_instrument(instrument)
else:
instrument = standardize_instrument(instrument)
if nside is None:
nside = sky.nside
elif not isinstance(sky, str):
try:
assert nside == sky.nside, (
"Mismatch between the value of the nside of the pysm3.Sky "
"argument and the one passed in the nside argument.")
except AttributeError:
raise ValueError("Either provide a pysm3.Sky as sky argument "
" or specify the nside argument.")
if noise:
res = get_noise_realization(nside, instrument, unit)
else:
res = np.zeros((len(instrument.frequency), 3, hp.nside2npix(nside)))
if sky is None or sky == '':
return res
if isinstance(sky, str):
sky = get_sky(nside, sky)
for res_freq, freq in zip(res, instrument.frequency):
emission = sky.get_emission(freq * u.GHz).to(
getattr(u, unit),
equivalencies=u.cmb_equivalencies(freq * u.GHz))
res_freq += emission.value
return res
[docs]
def get_noise_realization(nside, instrument, unit='uK_CMB'):
""" Generate noise maps for the instrument
Parameters
----------
nside: int
Desired output healpix nside.
instrument:
Object that provides the following as a key or an attribute.
- **frequency** (required)
- **depth_p** (required if ``noise=True``)
- **depth_i** (required if ``noise=True``)
They can be anything that is convertible to a float numpy array.
If only one of ``depth_p`` or ``depth_i`` is provided, the other is
inferred assuming that the former is sqrt(2) higher than the latter.
unit: str
Unit of the output. Only K_CMB and K_RJ (and multiples) are supported.
sky: str of pysm3.Sky
Sky to observe. It can be a `pysm3.Sky` or a tag to create one.
noise: bool
If true, add Gaussian, uncorrelated, isotropic noise.
Returns
-------
observation: array
Shape is ``(n_freq, 3, n_pix)``.
"""
instrument = standardize_instrument(instrument)
n_freq = len(instrument.frequency)
n_pix = hp.nside2npix(nside)
res = np.random.normal(size=(n_pix, 3, n_freq))
depth = np.stack(
(instrument.depth_i, instrument.depth_p, instrument.depth_p))
depth *= u.arcmin * u.uK_CMB
depth = depth.to(
getattr(u, unit) * u.arcmin,
equivalencies=u.cmb_equivalencies(instrument.frequency * u.GHz))
res *= depth.value / hp.nside2resol(nside, True)
return res.T
def standardize_instrument(instrument):
f"""Handle different input instruments
Parameters
----------
instrument:
Anything that has
{_NL.join([' * '+attr for attr in INSTRUMENT_STD_ATTR]) }
as keys or attributes, including `pandas.DataFrame`.
Returns
-------
std_instr: SimpleNamespace
It contains the properties above as attributes. They are converted to a
float array.
"""
std_instr = types.SimpleNamespace()
for attr in INSTRUMENT_STD_ATTR:
try:
try:
value = np.array(getattr(instrument, attr), dtype=np.float64)
except AttributeError:
value = np.array(instrument[attr], dtype=np.float64)
setattr(std_instr, attr, value.copy())
except (TypeError, KeyError): # Not subscriptable or missing key
pass
if attr == 'frequency' and std_instr.frequency.ndim == 3:
std_instr.frequency = [tuple(x) for x in std_instr.frequency]
return std_instr