Source code for meer21cm.power

"""
This module handles computation of power spectrum from gridded fields and its corresponding model power spectrum from theory.

The class :py:class:`ModelPowerSpectrum` is the class for computing the model power spectrum of an LSS tracer field.

The class :py:class:`FieldPowerSpectrum` is the class for computing the power spectrum of a gridded field from LSS data.

The class :py:class:`PowerSpectrum` coherently combines the two classes above,
and provides an interface for gridding the intensity mapping data as well as the galaxy catalogue to perform
power spectrum estimation and for auto-correlation and cross-correlation.
"""
import numpy as np
from .cosmology import CosmologyCalculator
from .grid import (
    minimum_enclosing_box_of_lightcone,
    project_particle_to_regular_grid,
    interlace_two_fields,
    fourier_window_for_assignment,
)
from scipy.signal import windows
from .util import (
    tagging,
    radec_to_indx,
    redshift_to_freq,
    freq_to_redshift,
    find_ch_id,
    get_nd_slicer,
    omega_hi_to_average_temp,
    legendre_polynomial_with_factor,
    real_dtype_from_array,
)
from .dataanalysis import Specification
import healpy as hp
from collections.abc import Iterable
import inspect
import logging

logger = logging.getLogger(__name__)


[docs] class ModelPowerSpectrum(CosmologyCalculator): r""" The class for computing the model power spectrum of an LSS tracer field. Parameters ---------- kmode: np.ndarray, default None The **true** mode of k in Mpc-1. mumode: np.ndarray, default None The **true** mu values of each k-mode so that :math:`k_\parallel = k \times \mu`. tracer_bias_1: float, default 1.0 The linear bias of the first tracer. sigma_v_1: float, default 0.0 The velocity dispersion of the first tracer in km/s. tracer_bias_2: float, default None The linear bias of the second tracer. sigma_v_2: float, default 0.0 The velocity dispersion of the second tracer in km/s. include_beam: list, default [True, False] Whether to include the beam attenuation in the model calculation. Must be a list of two booleans, the first for the first tracer and the second for the second tracer. fog_profile: str, default "lorentz" The shape of the finger-of-god profile to be used in the model calculation. Either "lorentz" or "gaussian". cross_coeff: float, default 1.0 The cross-correlation coefficient between the two tracers. weights_field_1: np.ndarray, default None The field-level weights of the first tracer in the density field. weights_field_2: np.ndarray, default None The field-level weights of the second tracer in the density field. weights_grid_1: np.ndarray, default None The grid-level weights of the first tracer in the density field. weights_grid_2: np.ndarray, default None The grid-level weights of the second tracer in the density field. mean_amp_1: float, default 1.0 The mean amplitude of the first tracer. Can be used to rescale the power spectrum, for example by the average brightness temperature. mean_amp_2: float, default 1.0 The mean amplitude of the second tracer. Can be used to rescale the power spectrum, for example by the average brightness temperature. sampling_resol: list, default None The sampling resolution of the field in Mpc. If ``sampling_resol`` is "auto", the sampling resolution will be set to the pixel size of the map. include_sky_sampling: list, default [True, False] Whether to include the sky sampling in the model calculation. If just a boolean is provided, it will be used for both tracers. compensate: list, default [True, True] Whether the gridded fields are compensated according to the mass assignment scheme. Note that the compensation is applied to the model power spectrum, and **not** to the gridded data fields. kaiser_rsd: bool, default True Whether to include the RSD effect in the model calculation and mock simulation. sigma_z_1: float, default 0.0 The redshift error of the first tracer. sigma_z_2: float, default 0.0 The redshift error of the second tracer. **params: dict Additional parameters to be passed to the base class :class:`meer21cm.cosmology.CosmologyCalculator`. """ def __init__( self, kmode=None, mumode=None, tracer_bias_1=1.0, sigma_v_1=0.0, tracer_bias_2=None, sigma_v_2=0.0, include_beam=[True, False], fog_profile="lorentz", cross_coeff=1.0, weights_field_1=None, weights_field_2=None, weights_grid_1=None, weights_grid_2=None, mean_amp_1=1.0, mean_amp_2=1.0, sampling_resol=None, include_sky_sampling=[True, False], compensate=[True, True], kaiser_rsd=True, sigma_z_1=0.0, sigma_z_2=0.0, **params, ): super().__init__(**params) # for compatibility with FieldPowerSpectrum if not hasattr(self, "field_1_dep_attr"): self.field_1_dep_attr = [] if not hasattr(self, "field_2_dep_attr"): self.field_2_dep_attr = [] self.tracer_bias_1 = tracer_bias_1 self.sigma_v_1 = sigma_v_1 self.tracer_bias_2 = tracer_bias_2 self.sigma_v_2 = sigma_v_2 self.kmode = kmode self.mumode = mumode if kmode is None: self.kmode = np.geomspace(self.kmin, self.kmax, 600).reshape((10, 10, 6)) if mumode is None: self.mumode = np.zeros_like(self.kmode) self._include_beam = [None, None] # for initialization self.include_beam = include_beam self.cross_coeff = cross_coeff self._auto_power_matter_model_r = None self._auto_power_matter_model = None self._auto_power_tracer_1_model_noobs = None self._auto_power_tracer_2_model_noobs = None self._cross_power_tracer_model_noobs = None self._auto_power_tracer_1_model = None self._auto_power_tracer_2_model = None self._cross_power_tracer_model = None self.weights_field_1 = weights_field_1 self.weights_field_2 = weights_field_2 self.weights_grid_1 = weights_grid_1 self.weights_grid_2 = weights_grid_2 self.mean_amp_1 = mean_amp_1 self.mean_amp_2 = mean_amp_2 self.include_sky_sampling = include_sky_sampling self.sampling_resol = sampling_resol self.has_resol = True if self.sampling_resol is None: self.has_resol = False # avoid ambiguity problem of == auto if isinstance(self.sampling_resol, str): if self.sampling_resol == "auto": self.sampling_resol = [ self.pix_resol_in_mpc, self.pix_resol_in_mpc, self.los_resol_in_mpc, ] self.fog_profile = fog_profile self.kaiser_rsd = kaiser_rsd self._compensate = [None, None] # for initialization self.compensate = compensate self.sigma_z_1 = sigma_z_1 self.sigma_z_2 = sigma_z_2 @property def weights_field_1(self): """ The weights of the first tracer in the density field. """ return self._weights_field_1 @weights_field_1.setter def weights_field_1(self, value): self._weights_field_1 = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting weights_field_1" ) self.clean_cache(self.tracer_1_dep_attr) @property def weights_grid_1(self): """ The weights of the first tracer in the rectangular grid. """ return self._weights_grid_1 @weights_grid_1.setter def weights_grid_1(self, value): self._weights_grid_1 = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} and {self.field_1_dep_attr} due to resetting weights_grid_1" ) self.clean_cache(self.tracer_1_dep_attr) self.clean_cache(self.field_1_dep_attr) @property def weights_field_2(self): """ The weights of the second tracer in the density field. """ return self._weights_field_2 @weights_field_2.setter def weights_field_2(self, value): self._weights_field_2 = value if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting weights_field_2" ) self.clean_cache(self.tracer_2_dep_attr) @property def weights_grid_2(self): """ The weights of the second tracer in the rectangular grid. """ return self._weights_grid_2 @weights_grid_2.setter def weights_grid_2(self, value): self._weights_grid_2 = value if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} and {self.field_2_dep_attr} due to resetting weights_grid_2" ) self.clean_cache(self.tracer_2_dep_attr) self.clean_cache(self.field_2_dep_attr) # for compatibility with FieldPowerSpectrum weights_1 = weights_grid_1 weights_2 = weights_grid_2 @property def kaiser_rsd(self): """ Whether RSD is included in the simulation and model calculation. If True, uses the linear Kaiser effect and the FoG profile to compute the model power spectrum. """ return self._kaiser_rsd @kaiser_rsd.setter def kaiser_rsd(self, value): self._kaiser_rsd = value logger.debug( f"cleaning cache of {self.rsd_dep_attr} due to resetting kaiser_rsd" ) self.clean_cache(self.rsd_dep_attr) @property def fog_profile(self): """ The shape of the finger-of-god profile to be used in the model calculation. Either "lorentz" or "gaussian". """ return self._fog_profile @fog_profile.setter def fog_profile(self, value): self._fog_profile = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting fog_profile" ) self.clean_cache(self.tracer_1_dep_attr) if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting fog_profile" ) self.clean_cache(self.tracer_2_dep_attr) @property def sigma_v_1(self): """ The velocity dispersion of the first tracer in km/s. """ return self._sigma_v_1 @sigma_v_1.setter def sigma_v_1(self, value): self._sigma_v_1 = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting sigma_v_1" ) self.clean_cache(self.tracer_1_dep_attr) @property def sigma_z_1(self): """ The redshift error of the first tracer. """ return self._sigma_z_1 @sigma_z_1.setter def sigma_z_1(self, value): self._sigma_z_1 = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting sigma_z_1" ) self.clean_cache(self.tracer_1_dep_attr) @property def sigma_v_2(self): """ The velocity dispersion of the second tracer in km/s. """ return self._sigma_v_2 @sigma_v_2.setter def sigma_v_2(self, value): self._sigma_v_2 = value if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting sigma_v_2" ) self.clean_cache(self.tracer_2_dep_attr) @property def sigma_z_2(self): """ The redshift error of the second tracer. """ return self._sigma_z_2 @sigma_z_2.setter def sigma_z_2(self, value): self._sigma_z_2 = value if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting sigma_z_2" ) self.clean_cache(self.tracer_2_dep_attr) @property def include_beam(self): """ Whether the beam attenuation is included in the model calculation. Must be a list of two booleans, the first for the first tracer and the second for the second tracer. If just a boolean is provided, it will be used for both tracers. """ return self._include_beam @include_beam.setter def include_beam(self, value): value_before = self._include_beam self._include_beam = value if self.sigma_beam_ch is None and (np.array(self.include_beam).sum() > 0): logger.debug("no input beam found, setting include_beam to False") self._include_beam = [False, False] if value_before[0] != value[0]: if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting include_beam" ) self.clean_cache(self.tracer_1_dep_attr) if value_before[1] != value[1]: if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting include_beam" ) self.clean_cache(self.tracer_2_dep_attr) @property def compensate(self): """ Whether the gridded fields are compensated according to the mass assignment scheme. Note that the compensation is applied to the model power spectrum, and **not** to the gridded data fields. """ return self._compensate @compensate.setter def compensate(self, value): value_before = self._compensate if isinstance(value, bool): value = (value, value) self._compensate = value if value_before[0] != value[0]: if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting compensate" ) self.clean_cache(self.tracer_1_dep_attr) if value_before[1] != value[1]: if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting compensate" ) self.clean_cache(self.tracer_2_dep_attr)
[docs] def fog_gaussian(self, sigma_r, kmode=None, mumode=None): r""" The Gaussian finger-of-god profile. .. math:: {\rm FoG} = {\rm exp}(-(\sigma_r k_\parallel/H)^2/2) Note the power spectrum has FoG squared with the two FoG terms that can be different for two tracers. Parameters ---------- sigma_r: float. The velocity dispersion in terms of the comoving distance in Mpc. kmode: float, None. The mode of 3D k in Mpc-1. If None, self.kmode will be used. mumode: float, None. The mu values of each 3D k-mode. In None, self.mumode will be used. Returns ------- fog: float. The FoG term. """ if mumode is None: mumode = self.mumode if kmode is None: kmode = self.kmode k_parallel = kmode * mumode fog = np.exp(-((sigma_r * k_parallel) ** 2 / 2)) return fog
[docs] def fog_lorentz(self, sigma_r, kmode=None, mumode=None): r""" The Lorentzian finger-of-god profile. .. math:: {\rm FoG} = \sqrt{1/(1+(\sigma_r k_\parallel/H)^2)} Note the power spectrum has FoG squared with the two FoG terms that can be different for two tracers. Parameters ---------- sigma_r: float. The velocity dispersion in terms of the comoving distance in Mpc. kmode: float, None. The mode of 3D k in Mpc-1. If None, self.kmode will be used. mumode: float, None. The mu values of each 3D k-mode. In None, self.mumode will be used. Returns ------- fog: float. The FoG term. """ if mumode is None: mumode = self.mumode if kmode is None: kmode = self.kmode k_parallel = kmode * mumode fog = np.sqrt(1 / (1 + (sigma_r * k_parallel) ** 2)) return fog
[docs] def fog_term(self, sigma_r, kmode=None, mumode=None): """ The FoG term for the model calculation. It reads the profile type from the attribute ``fog_profile``. Parameters ---------- sigma_r: float. The velocity dispersion in terms of the comoving distance in Mpc. kmode: float, None. The mode of 3D k in Mpc-1. If None, self.kmode will be used. mumode: float, None. The mu values of each 3D k-mode. In None, self.mumode will be used. Returns ------- fog: float. The FoG term. """ return getattr(self, "fog_" + self.fog_profile)(sigma_r, kmode, mumode)
@property def tracer_bias_1(self): """ The linear bias of the first tracer. """ return self._tracer_bias_1 @tracer_bias_1.setter def tracer_bias_1(self, value): self._tracer_bias_1 = value if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting tracer_bias_1" ) self.clean_cache(self.tracer_1_dep_attr) @property def tracer_bias_2(self): """ The linear bias of the second tracer. """ return self._tracer_bias_2 @tracer_bias_2.setter def tracer_bias_2(self, value): self._tracer_bias_2 = value if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting tracer_bias_2" ) self.clean_cache(self.tracer_2_dep_attr) @property def cross_coeff(self): """ The cross-correlation coefficient between the two tracers. """ return self._cross_coeff @cross_coeff.setter def cross_coeff(self, value): self._cross_coeff = value if "cross_coeff_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.cross_coeff_dep_attr} due to resetting cross_coeff" ) self.clean_cache(self.cross_coeff_dep_attr) @property def kmode(self): """ The input kmode for the model calculation. """ return self._kmode @kmode.setter def kmode(self, value): self._kmode = np.asarray(value, dtype=self.real_dtype) if "kmode_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.kmode_dep_attr} due to resetting kmode" ) self.clean_cache(self.kmode_dep_attr) @property def mumode(self): """ The mu values of each 3D k-mode. """ return self._mumode @mumode.setter def mumode(self, value): self._mumode = value if "mumode_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.mumode_dep_attr} due to resetting mumode" ) self.clean_cache(self.mumode_dep_attr) @property def sampling_resol(self): """ The sampling resolution corresponding to the map-making/gridding of the density field. """ return self._sampling_resol @sampling_resol.setter def sampling_resol(self, value): self._sampling_resol = value self.has_resol = True if self.include_sky_sampling[0]: if "tracer_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_1_dep_attr} due to resetting sampling_resol" ) self.clean_cache(self.tracer_1_dep_attr) if self.include_sky_sampling[1]: if "tracer_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.tracer_2_dep_attr} due to resetting sampling_resol" ) self.clean_cache(self.tracer_2_dep_attr) @property @tagging("cosmo_model", "nu", "kmode") def auto_power_matter_model_r(self): """ The model matter power spectrum in real space (without RSD). """ if self._auto_power_matter_model_r is None: self.get_model_matter_power_r() return self._auto_power_matter_model_r
[docs] def get_model_matter_power_r(self): """ Calculate the model matter power spectrum in real space (without RSD). The attribute f"_auto_power_matter_model_r" will be set by the output. """ self._auto_power_matter_model_r = self.matter_power_spectrum_fnc(self.kmode)
@property @tagging("cosmo_model", "nu", "kmode", "mumode", "rsd") def auto_power_matter_model(self): """ The model matter power spectrum with RSD effects. The 3D k-modes corrospond to the input ``kmode`` and ``mumode``. """ if self._auto_power_matter_model is None: self.get_model_matter_power() return self._auto_power_matter_model @property @tagging("cosmo_model", "nu", "kmode", "mumode", "tracer_1", "rsd") def auto_power_tracer_1_model_noobs(self): """ The model power spectrum for the first tracer without observational effects. *Note that the power is in units of volume, so the mean amplitude is not applied.* """ if self._auto_power_tracer_1_model_noobs is None: self.get_model_power_noobs_i(1) return self._auto_power_tracer_1_model_noobs @property @tagging("cosmo_model", "nu", "kmode", "mumode", "tracer_2", "rsd") def auto_power_tracer_2_model_noobs(self): """ The model power spectrum for the second tracer without observational effects. *Note that the power is in units of volume, so the mean amplitude is not applied.* """ if self._auto_power_tracer_2_model_noobs is None: self.get_model_power_noobs_i(2) return self._auto_power_tracer_2_model_noobs @property @tagging( "cosmo_model", "nu", "kmode", "mumode", "tracer_1", "tracer_2", "rsd", "cross_coeff", ) def cross_power_tracer_model_noobs(self): """ The model power spectrum for the cross correlation between the two tracers without observational effects. *Note that the power is in units of volume, so the mean amplitude is not applied.* """ if self._cross_power_tracer_model_noobs is None: self.get_model_power_noobs_cross() return self._cross_power_tracer_model_noobs @property @tagging("cosmo_model", "nu", "kmode", "mumode", "tracer_1", "beam", "rsd") def auto_power_tracer_1_model(self): """ The 3D model power spectrum for the first tracer. The 3D k-modes corrospond to the input ``kmode`` and ``mumode``. Unlike ``noobs`` power, the mean amplitude is applied. """ if self._auto_power_tracer_1_model is None: self.get_model_power_i(1) mean_amp = self.mean_amp_1 if isinstance(mean_amp, str): logger.info(f"getting mean_amp_1 from self.{mean_amp}") mean_amp = getattr(self, mean_amp) logger.info( f"multiplying _auto_power_tracer_1_model with mean_amp_1**2: {mean_amp}**2" " to get auto_power_tracer_1_model", ) return self._auto_power_tracer_1_model * mean_amp**2 @property @tagging("cosmo_model", "nu", "kmode", "mumode", "tracer_2", "beam", "rsd") def auto_power_tracer_2_model(self): """ The 3D model power spectrum for the second tracer. The 3D k-modes corrospond to the input ``kmode`` and ``mumode``. Unlike ``noobs`` power, the mean amplitude is applied. """ if self._auto_power_tracer_2_model is None: self.get_model_power_i(2) mean_amp = self.mean_amp_2 if isinstance(mean_amp, str): logger.info(f"getting mean_amp_2 from self.{mean_amp}") mean_amp = getattr(self, mean_amp) logger.info( f"multiplying _auto_power_tracer_2_model with mean_amp_2**2: {mean_amp}**2" " to get auto_power_tracer_2_model", ) return self._auto_power_tracer_2_model * mean_amp**2 @property @tagging( "cosmo_model", "nu", "kmode", "mumode", "tracer_2", "beam", "tracer_1", "rsd", "cross_coeff", ) def cross_power_tracer_model(self): """ The 3D model cross power spectrum between the two tracers. The 3D k-modes corrospond to the input ``kmode`` and ``mumode``. Unlike ``noobs`` power, the mean amplitude is applied. """ if self._cross_power_tracer_model is None: self.get_model_power_cross() mean_amp2 = self.mean_amp_2 if isinstance(mean_amp2, str): mean_amp2 = getattr(self, mean_amp2) mean_amp = self.mean_amp_1 if isinstance(mean_amp, str): mean_amp = getattr(self, mean_amp) logger.info( f"multiplying _cross_power_tracer_model with mean_amp: {mean_amp} and mean_amp2: {mean_amp2} " " to get cross_power_tracer_model", ) return self._cross_power_tracer_model * mean_amp * mean_amp2
[docs] def map_sampling(self): """ The sampling window function from the map cube to be convolved with data. Note that the window can only be calculated in Cartesian grids, so it is not used in ``ModelPowerSpectrum`` and only in ``PowerSpectrum``. """ return 1.0
[docs] def gridding_compensation(self): """ The sampling window function to be compensated for the gridding mass assignment scheme. Note that the window can only be calculated in Cartesian grids, so it is not used in ``ModelPowerSpectrum`` and only in ``PowerSpectrum``. """ return 1.0
# calculate on the fly, no need for tagging
[docs] def beam_attenuation(self): """ The beam attenuation factor. """ if self.sigma_beam_ch is None: return 1.0 # in the future for asymmetric beam this way # of writing may be probelmatic # Numerical roundoff (more visible in lower-precision runs) can push # |mu| slightly above 1 and make 1 - mu^2 marginally negative. mu_sq = np.square(self.mumode) one_minus_mu_sq = np.clip(1 - mu_sq, 0.0, None) k_perp = self.kmode * np.sqrt(one_minus_mu_sq) sigma_beam_mpc = self.sigma_beam_in_mpc B_beam = gaussian_beam_attenuation(k_perp, sigma_beam_mpc) return B_beam
[docs] def cal_rsd_power( self, power_in_real_space, beta1, sigma_v_1, sigma_z_1, beta2=None, sigma_v_2=None, sigma_z_2=None, r=1.0, mumode=None, ): """ Calculate the redshift space power spectrum. If properties of the second tracer are not set, they will be set to the same as the first tracer so the result will be the auto power spectrum. Parameters ---------- power_in_real_space: np.ndarray The power spectrum in real space. beta1: float The growth rate over bias of the first tracer. sigma_v_1: float The velocity dispersion of the first tracer. sigma_z_1: float The redshift dispersion of the first tracer. beta2: float, default None The growth rate over bias of the second tracer. sigma_v_2: float, default None The velocity dispersion of the second tracer. sigma_z_2: float, default None The redshift dispersion of the second tracer. r: float, default 1.0 The correlation coefficient between the two tracers. mumode: np.ndarray, default None The mu values of each 3D k-mode. Returns ------- power_in_redshift_space: np.ndarray The power spectrum in redshift space. """ if mumode is None: mumode = self.mumode if beta2 is None: beta2 = beta1 if sigma_v_2 is None: sigma_v_2 = sigma_v_1 if sigma_z_2 is None: sigma_z_2 = sigma_z_1 power_in_redshift_space = ( power_in_real_space * (r + (beta1 + beta2) * mumode**2 + beta1 * beta2 * mumode**4) * self.fog_term(self.deltav_to_deltar(sigma_v_1), mumode=mumode) * self.fog_term(self.deltav_to_deltar(sigma_v_2), mumode=mumode) # fog is exp(-k^2 sigma^2 / 2), whereas redshift error # is exp(-k^2 deltar^2) * self.fog_gaussian(self.deltaz_to_deltar(sigma_z_1), mumode=mumode) * self.fog_gaussian(self.deltaz_to_deltar(sigma_z_2), mumode=mumode) ) return power_in_redshift_space
[docs] def get_model_matter_power(self): """ Calculate the model matter power spectrum. The attribute f"_auto_power_matter_model" will be set by the output. """ pk3d_mm_r = self.auto_power_matter_model_r if self.kaiser_rsd: beta_m = self.f_growth_true self._auto_power_matter_model = self.cal_rsd_power( pk3d_mm_r, beta_m, 0.0, 0.0, ) else: self._auto_power_matter_model = pk3d_mm_r logger.debug( "calculated model matter power spectrum, kaiser rsd: %s", self.kaiser_rsd ) logger.debug("model matter power spectrum: %s", self._auto_power_matter_model)
[docs] def get_model_power_noobs_i(self, i): """ Calculate the model power spectrum for the i-th tracer without observational effects. The attribute f"_auto_power_tracer_{i}_model_noobs" will be set by the output. """ tracer_bias_i = getattr(self, "tracer_bias_" + str(i)) if tracer_bias_i is None: # For the second tracer, many higher-level APIs (e.g. PowerSpectrum, MockSimulation) # rely on the existence of a well-defined theoretical model. If ``tracer_bias_2`` # is not set, calculating these model power spectra would fail in a non-obvious way # later on. Raise an explicit error here instead. if i == 2: raise ValueError( "tracer_bias_2 is not set, so the theoretical power spectrum for " "tracer_2 (and any cross-correlation involving tracer_2) cannot be " "computed. Please pass tracer_bias_2 when initialising the object " "or set ``obj.tracer_bias_2`` before accessing tracer_2 model power." ) pk3d_mm_r = self.auto_power_matter_model_r # tracer in real space is just the matter ps times the bias pk3d_tt_r = tracer_bias_i**2 * pk3d_mm_r # apply the RSD if self.kaiser_rsd: beta_i = self.f_growth_true / tracer_bias_i power_noobs_i = self.cal_rsd_power( pk3d_tt_r, beta_i, getattr(self, "sigma_v_" + str(i)), getattr(self, "sigma_z_" + str(i)), ) else: power_noobs_i = pk3d_tt_r logger.info( f"{inspect.currentframe().f_code.co_name}: setting self._auto_power_tracer_{i}_model_noobs" ) setattr(self, f"_auto_power_tracer_{i}_model_noobs", power_noobs_i)
[docs] def get_model_power_i(self, i): """ Calculate the model power spectrum for the i-th tracer. The attribute f"_auto_power_tracer_{i}_model" will be set by the output. Parameters ---------- i: int The index of the tracer. Returns ------- auto_power_model: np.ndarray The model power spectrum for the i-th tracer. """ logger.debug( "calculating model power for tracer %s with bias %s", i, getattr(self, "tracer_bias_" + str(i)), ) B_beam = self.beam_attenuation() B_sampling = self.map_sampling() B_comp = self.gridding_compensation() tracer_beam_indx = np.array(self.include_beam).astype("int")[i - 1] tracer_samp_indx = np.array(self.include_sky_sampling).astype("int")[i - 1] tracer_comp_indx = np.array(self.compensate).astype("int")[i - 1] auto_power_model = getattr(self, f"auto_power_tracer_{i}_model_noobs").copy() # first apply the beam logger.debug("applying beam attenuation?: %s", tracer_beam_indx) auto_power_model *= B_beam ** (tracer_beam_indx * 2) # then apply the sky-map sampling and gridding compensation logger.debug("applying sky-map sampling?: %s", tracer_samp_indx) auto_power_model *= B_sampling ** (tracer_samp_indx * 2) logger.debug("applying gridding compensation?: %s", tracer_comp_indx) auto_power_model *= B_comp ** (tracer_comp_indx * 2) # then the weights in the grid space before FFT # assume map-making, gridding and field-level weights are commutable weights_grid = self.get_weights_none_to_one("weights_grid_" + str(i)) weights_field = self.get_weights_none_to_one("weights_field_" + str(i)) weights_tot = weights_field * weights_grid logger.debug("applying weights convolution: %s", weights_tot) auto_power_model = get_modelpk_conv( auto_power_model, weights1_in_real=weights_tot, weights2=weights_tot, renorm=True, ) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting self._auto_power_tracer_{i}_model" ) setattr(self, "_auto_power_tracer_" + str(i) + "_model", auto_power_model) return auto_power_model
[docs] def get_model_power_noobs_cross(self): """ Calculate the model cross power spectrum between the two tracers without observational effects. The attribute f"_cross_power_tracer_model_noobs" will be set by the output. """ if self.tracer_bias_2 is None: raise ValueError( "tracer_bias_2 is not set, so the theoretical cross power spectrum for " "cross-correlation cannot be computed. " "Please pass tracer_bias_2 when initialising the object " "or set ``obj.tracer_bias_2`` before accessing cross-power model quantities." ) pk3d_mm_r = self.auto_power_matter_model_r pk3d_tt_r = self.tracer_bias_1 * self.tracer_bias_2 * pk3d_mm_r if self.kaiser_rsd: beta_1 = self.f_growth_true / self.tracer_bias_1 beta_2 = self.f_growth_true / self.tracer_bias_2 result = self.cal_rsd_power( pk3d_tt_r, beta1=beta_1, sigma_v_1=self.sigma_v_1, sigma_z_1=self.sigma_z_1, beta2=beta_2, sigma_v_2=self.sigma_v_2, sigma_z_2=self.sigma_z_2, r=self.cross_coeff, ) else: result = pk3d_tt_r * self.cross_coeff self._cross_power_tracer_model_noobs = result
[docs] def get_model_power_cross(self): """ Calculate the model cross power spectrum between the two tracers. The attribute f"_cross_power_tracer_model" will be set by the output. """ if getattr(self, "tracer_bias_" + str(2)) is None: raise ValueError( "tracer_bias_2 is not set, so the theoretical cross power spectrum " "between tracer_1 and tracer_2 cannot be computed. Please pass " "tracer_bias_2 when initialising the object or set " "``obj.tracer_bias_2`` before accessing cross-power model quantities." ) B_beam = self.beam_attenuation() B_sampling = self.map_sampling() B_comp = self.gridding_compensation() tracer_beam_indx = np.array(self.include_beam).astype("int") tracer_samp_indx = np.array(self.include_sky_sampling).astype("int") tracer_comp_indx = np.array(self.compensate).astype("int") self._cross_power_tracer_model = self.cross_power_tracer_model_noobs.copy() # then apply the beam, sky-map sampling, and gridding compensation logger.debug( "applying beam attenuation for tracer 1 and/or 2?: %s", tracer_beam_indx ) self._cross_power_tracer_model *= B_beam ** ( tracer_beam_indx[0] + tracer_beam_indx[1] ) logger.debug( "applying sky-map sampling for tracer 1 and/or 2?: %s", tracer_samp_indx ) self._cross_power_tracer_model *= B_sampling ** ( tracer_samp_indx[0] + tracer_samp_indx[1] ) logger.debug( "applying gridding compensation for tracer 1 and/or 2?: %s", tracer_comp_indx, ) self._cross_power_tracer_model *= B_comp ** ( tracer_comp_indx[0] + tracer_comp_indx[1] ) # then the weights in the grid space before FFT weights_grid_1 = self.get_weights_none_to_one("weights_grid_1") weights_field_1 = self.get_weights_none_to_one("weights_field_1") weights_grid_2 = self.get_weights_none_to_one("weights_grid_2") weights_field_2 = self.get_weights_none_to_one("weights_field_2") weights_tot_1 = weights_field_1 * weights_grid_1 weights_tot_2 = weights_field_2 * weights_grid_2 logger.debug( "applying weights convolution: %s and %s", weights_tot_1, weights_tot_2 ) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting self._cross_power_tracer_model" ) self._cross_power_tracer_model = get_modelpk_conv( self._cross_power_tracer_model, weights1_in_real=weights_tot_1, weights2=weights_tot_2, renorm=True, ) return self._cross_power_tracer_model
[docs] class FieldPowerSpectrum(Specification): """ The class for computing the power spectrum of a gridded field from LSS data. Parameters ---------- field_1: np.ndarray. The density field of the first tracer. field_2: np.ndarray, default None The density field of the second tracer. If None, calculation of the second tracer and the cross-correlation will be skipped. box_len: list of 3 floats. The length of the box along each axis. weights_1: np.ndarray, default None The weights of the first tracer. Default is uniform weights. mean_center_1: bool, default False Whether to mean-center the first tracer field. unitless_1: bool, default False Whether to divide the first tracer field by its mean. weights_2: np.ndarray, default None The weights of the second tracer. Default is uniform weights. mean_center_2: bool, default False Whether to mean-center the second tracer field. unitless_2: bool, default False Whether to divide the second tracer field by its mean. **params: dict Additional parameters to be passed to the base class :class:`meer21cm.dataanalysis.Specification`. """ def __init__( self, field_1, box_len, weights_1=None, mean_center_1=False, unitless_1=False, field_2=None, weights_2=None, mean_center_2=False, unitless_2=False, **params, ): Specification.__init__(self, **params) self.field_1 = field_1 self.field_2 = field_2 self.weights_1 = weights_1 self.weights_2 = weights_2 self.box_len = np.array(box_len) self.box_ndim = np.array(field_1.shape) self.mean_center_1 = mean_center_1 self.unitless_1 = unitless_1 self.mean_center_2 = mean_center_2 self.unitless_2 = unitless_2 if field_2 is not None: error_message = "field_1 and field_2 must have same dimensions" assert np.allclose(field_2.shape, field_1.shape), error_message self._fourier_field_1 = None self._fourier_field_2 = None @property def box_len(self): """ The length of all sides of the box in Mpc. """ return self._box_len @box_len.setter def box_len(self, value): self._box_len = value if "box_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.box_dep_attr} due to resetting box_len" ) self.clean_cache(self.box_dep_attr) @property def box_resol(self): """ The grid length of each side of the enclosing box in Mpc. """ return self.box_len / self.box_ndim @property def box_ndim(self): """ The number of grids along each side of the enclosing box. To ensure even sampling of +k and -k modes, the number of grids along every axis needs to be odd. """ return self._box_ndim @box_ndim.setter def box_ndim(self, value): self._box_ndim = value if "box_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.box_dep_attr} due to resetting box_ndim" ) self.clean_cache(self.box_dep_attr)
[docs] def set_corr_type(self, corr_type, tracer_indx): """ A utility function to help decide whether a tracer field needs to be mean centred, renormalised by its mean, and shot noise removed. Currently only two types are supported, "Gal" and "HI" (case-insensitive). If the tracer is galaxy (number counts), the auto power spectrum is mean centred, renormalised, and then shot noise removed. If HI, none of the above will be performed. Parameters ---------- corr_type: str The tracer type. tracer_indx: int Either 1 or 2. """ logger.debug("setting corr_type: %s for tracer %s", corr_type, tracer_indx) if corr_type[:3].lower() == "gal": mean_center = True unitless = True mean_amp = 1.0 elif corr_type[:2].lower() == "hi": mean_center = False unitless = False mean_amp = "average_hi_temp" else: raise ValueError("unknown corr_type") if not tracer_indx in [1, 2]: raise ValueError("tracer_indx should be either 1 or 2") logger.debug("setting mean_center_%s: %s", tracer_indx, mean_center) logger.debug("setting unitless_%s: %s", tracer_indx, unitless) logger.debug("setting mean_amp_%s: %s", tracer_indx, mean_amp) setattr(self, "mean_center_" + str(tracer_indx), mean_center) setattr(self, "unitless_" + str(tracer_indx), unitless) setattr(self, "mean_amp_" + str(tracer_indx), mean_amp)
@property def x_vec(self): """ The 3D x-vector of the box. """ return get_x_vector( self.box_ndim, self.box_resol, ) @property def x_mode(self): """ The mode of the 3D x-vector. """ return get_vec_mode(self.x_vec) @property def k_vec(self): """ The 3D k-vector of the box. """ return get_k_vector( self.box_ndim, self.box_resol, ) @property def k_nyquist(self): """ The Nyquist frequency of the 3D box along each axis. """ k_max = np.array([np.abs(self.k_vec[i]).max() for i in range(len(self.k_vec))]) return k_max @property def k_perp(self): """ The **fiducial** perpendicular k-vector of the 3D box. """ return get_vec_mode(self.k_vec[:-1]) @property def k_para(self): """ The **fiducial** parallel k-mode of the 3D box. """ return self.k_vec[-1] @property def k_mode(self): """ The **fiducial** (observed) mode of the 3D k-vector. """ return get_vec_mode(self.k_vec) @property def mu_mode(self): """ The **fiducial** (observed) mu values of each k-mode so that :math:`k_\parallel = k \times \mu`. """ with np.errstate(divide="ignore", invalid="ignore"): return np.nan_to_num(self.k_para[None, None, :] / self.k_mode) @property def field_1(self): """ The density field of the first tracer. """ return self._field_1 @property def field_2(self): """ The density field of the second tracer. """ return self._field_2 @field_1.setter def field_1(self, value): # if field is updated, clear fourier field self._field_1 = value if "field_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_1_dep_attr} due to resetting field_1" ) self.clean_cache(self.field_1_dep_attr) @field_2.setter def field_2(self, value): # if field is updated, clear fourier field self._field_2 = value if "field_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_2_dep_attr} due to resetting field_2" ) self.clean_cache(self.field_2_dep_attr) @property def mean_center_1(self): """ Whether field_1 needs to be mean centered """ return self._mean_center_1 @property def mean_center_2(self): """ Whether field_2 needs to be mean centered """ return self._mean_center_2 @mean_center_1.setter def mean_center_1(self, value): # if weight is updated, clear fourier field self._mean_center_1 = value if "field_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_1_dep_attr} due to resetting mean_center_1" ) self.clean_cache(self.field_1_dep_attr) @mean_center_2.setter def mean_center_2(self, value): # if weight is updated, clear fourier field self._mean_center_2 = value if "field_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_2_dep_attr} due to resetting mean_center_2" ) self.clean_cache(self.field_2_dep_attr) @property def unitless_1(self): """ Whether field_1 needs to be divided by its mean """ return self._unitless_1 @property def unitless_2(self): """ Whether field_2 needs to be divided by its mean """ return self._unitless_2 @unitless_1.setter def unitless_1(self, value): # if weight is updated, clear fourier field self._unitless_1 = value if "field_1_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_1_dep_attr} due to resetting unitless_1" ) self.clean_cache(self.field_1_dep_attr) @unitless_2.setter def unitless_2(self, value): # if weight is updated, clear fourier field self._unitless_2 = value if "field_2_dep_attr" in dir(self): logger.debug( f"cleaning cache of {self.field_2_dep_attr} due to resetting unitless_2" ) self.clean_cache(self.field_2_dep_attr) @property @tagging("box", "field_1") def fourier_field_1(self): """ The Fourier transform of the density field of the first tracer. """ if self._fourier_field_1 is None: self.get_fourier_field_1() return self._fourier_field_1
[docs] def get_fourier_field_1(self): """ Calculate the Fourier transform of the density field of the first tracer. """ result = get_fourier_density( self.field_1, weights=self.weights_1, mean_center=self.mean_center_1, unitless=self.unitless_1, ) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting self._fourier_field_1" ) self._fourier_field_1 = result
@property @tagging("box", "field_2") def fourier_field_2(self): """ The Fourier transform of the density field of the second tracer. """ if self._fourier_field_2 is None: self.get_fourier_field_2() return self._fourier_field_2
[docs] def get_fourier_field_2(self): """ Calculate the Fourier transform of the density field of the second tracer. """ if self.field_2 is None: logger.info("field_2 is None, returning None") return None result = get_fourier_density( self.field_2, weights=self.weights_2, mean_center=self.mean_center_2, unitless=self.unitless_2, ) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting self._fourier_field_2" ) self._fourier_field_2 = result
# the calculation of this is not heavy, simply on the fly @property def auto_power_3d_1(self): """ The 3D power spectrum of the first tracer. """ power_spectrum = get_power_spectrum( self.fourier_field_1, self.box_len, weights=self.weights_1, renorm=False, ) return power_spectrum * self.renorm_ps_1 @property def auto_power_3d_2(self): """ The 3D power spectrum of the second tracer. """ if self.field_2 is None: return None power_spectrum = get_power_spectrum( self.fourier_field_2, self.box_len, weights=self.weights_2, renorm=False, ) return power_spectrum * self.renorm_ps_2 @property def cross_power_3d(self): """ The 3D cross power spectrum between the two tracers. """ if self.field_2 is None: return None weights_2 = self.weights_2 # if none, the default for get_power_spectrum is # to use weights_1, here we want separate weights_2 if weights_2 is None: weights_2 = np.ones(self.field_2.shape) power_spectrum = get_power_spectrum( self.fourier_field_1, self.box_len, weights=self.weights_1, field_2=self.fourier_field_2, weights_2=weights_2, renorm=False, ) return power_spectrum * self.renorm_ps_cross @property def renorm_ps_1(self): """ The renormalization factor of the power spectrum of the first tracer. """ grid_w = self.get_weights_none_to_one("weights_1") field_w = 1.0 mean_renorm = 1.0 if hasattr(self, "weights_field_1"): field_w = self.get_weights_none_to_one("weights_field_1") if self.unitless_1: mean_renorm = (field_w * grid_w).sum() / (grid_w).sum() return ( power_weights_renorm(grid_w * field_w, grid_w * field_w) * mean_renorm**2 ) @property def renorm_ps_2(self): """ The renormalization factor of the power spectrum of the second tracer. """ grid_w = self.get_weights_none_to_one("weights_2") field_w = 1.0 mean_renorm = 1.0 if hasattr(self, "weights_field_2"): field_w = self.get_weights_none_to_one("weights_field_2") if self.unitless_2: mean_renorm = (field_w * grid_w).sum() / (grid_w).sum() return ( power_weights_renorm(grid_w * field_w, grid_w * field_w) * mean_renorm**2 ) @property def renorm_ps_cross(self): """ The renormalization factor of the cross power spectrum. """ grid_w_1 = self.get_weights_none_to_one("weights_1") field_w_1 = 1.0 mean_renorm_1 = 1.0 if hasattr(self, "weights_field_1"): field_w_1 = self.get_weights_none_to_one("weights_field_1") if self.unitless_1: mean_renorm_1 = (field_w_1 * grid_w_1).sum() / (grid_w_1).sum() grid_w_2 = self.get_weights_none_to_one("weights_2") field_w_2 = 1.0 mean_renorm_2 = 1.0 if hasattr(self, "weights_field_2"): field_w_2 = self.get_weights_none_to_one("weights_field_2") if self.unitless_2: mean_renorm_2 = (field_w_2 * grid_w_2).sum() / (grid_w_2).sum() return ( power_weights_renorm(grid_w_1 * field_w_1, grid_w_2 * field_w_2) * mean_renorm_1 * mean_renorm_2 )
[docs] def get_renormed_field( real_field, weights=None, mean_center=False, unitless=False, ): """ Mean center the field and renormalise it by dividing the mean. Parameters ---------- real_field: np.ndarray The real-space field. weights: np.ndarray, default None The weights of the field. mean_center: bool, default False Whether to mean center the field. unitless: bool, default False Whether to make the field unitless. Returns ------- field: np.ndarray The renormalized field. """ field = np.asarray(real_field) real_dtype = real_dtype_from_array(field) field = field.astype(real_dtype, copy=True) if weights is None: weights = np.ones_like(field, dtype=real_dtype) weights = np.asarray(weights, dtype=real_dtype) if mean_center or unitless: field_mean = np.sum(weights * field) / np.sum(weights) else: return real_field if mean_center: field -= field_mean if unitless: field /= field_mean return field
[docs] def get_fourier_density( real_field, weights=None, mean_center=False, unitless=False, norm="forward", ): """ Perform Fourier transform of a density field in real space. Note that this is deliberately written in a way that is not dimension specific. It can be used to calculate power spectrum of arbitrary dimension. Note that, the field is multiplied by the weights and then Fourier-transformed, and is **not weight normalised**. Parameters ---------- real_field: np.ndarray The real-space field. weights: np.ndarray, default None The weights of the field. mean_center: bool, default False Whether to mean center the field. unitless: bool, default False Whether to make the field unitless. norm: str, default "forward" The normalization of the Fourier transform. Naming is the same as np.fft. Returns ------- fourier_field: np.ndarray The Fourier transform of the field. """ field = get_renormed_field( real_field, weights=weights, mean_center=mean_center, unitless=unitless, ) if weights is None: weights = np.ones_like(field, dtype=real_dtype_from_array(field)) weights = np.asarray(weights, dtype=real_dtype_from_array(field)) fourier_field = np.fft.rfftn(field * weights, norm=norm) return fourier_field
[docs] def get_x_vector(box_ndim, box_resol): """ Get the position vector along each direction for a given box. Parameters ---------- box_ndim: int The number of dimensions of the box. box_resol: float The resolution of the box. Returns ------- xvecarr: tuple The position vector along each direction. """ xvecarr = tuple( box_resol[i] * (np.arange(box_ndim[i]) + 0.5) for i in range(len(box_ndim)) ) return xvecarr
[docs] def get_k_vector(box_ndim, box_resol): """ Get the wavenumber vector along each direction for a given box. Parameters ---------- box_ndim: int The number of dimensions of the box. box_resol: float The resolution of the box. Returns ------- kvecarr: tuple The wavenumber vector along each direction. """ kvecarr = [ 2 * np.pi * np.fft.fftfreq( box_ndim[i], d=box_resol[i], ) for i in range(len(box_ndim)) ] kvecarr[-1] = np.abs(kvecarr[-1][: box_ndim[-1] // 2 + 1]) return kvecarr
[docs] def get_vec_mode(vecarr): """ Calculate the mode of the n-dimensional vectors on the grids Parameters ---------- vecarr: tuple The vectors. Returns ------- mode: np.ndarray The mode of the vectors. """ result = np.sqrt( np.sum( (np.meshgrid(*([(vec) ** 2 for vec in vecarr]), indexing="ij")), 0, ) ) return result
[docs] def get_shot_noise_galaxy( gal_count, box_len, weights_grid=None, weights_field=None, ): """ Calculate the shot noise of a galaxy number count field. """ gal_count = np.asarray(gal_count) real_dtype = real_dtype_from_array(gal_count) if weights_grid is None: weights_grid = np.ones(gal_count.shape, dtype=real_dtype) if weights_field is None: weights_field = np.ones(gal_count.shape, dtype=real_dtype) weights_grid = np.asarray(weights_grid, dtype=real_dtype) weights_field = np.asarray(weights_field, dtype=real_dtype) w_g_n = (weights_grid * gal_count).sum() / gal_count.sum() w_2_g_n = (weights_grid**2 * gal_count).sum() / gal_count.sum() wfwg_2_v = ((weights_field * weights_grid) ** 2).mean() wfwg_v = (weights_field * weights_grid).mean() shot_noise = ( np.prod(box_len) / gal_count.sum() * w_2_g_n / w_g_n**2 * (wfwg_v**2 / wfwg_2_v) ) return shot_noise
[docs] def get_shot_noise( real_field, box_len, weights=None, ): """ Calculate the shot noise of a field. Parameters ---------- real_field: np.ndarray The real-space field. box_len: tuple The length of the box along each direction. weights: np.ndarray, default None The weights of the field. Returns ------- shot_noise: float The shot noise of the field. """ real_dtype = real_dtype_from_array(real_field) box_len = np.asarray(box_len, dtype=real_dtype) box_volume = np.prod(box_len) if weights is None: weights = np.ones(real_field.shape, dtype=real_dtype) weights = np.asarray(weights, dtype=real_dtype) weights_renorm = power_weights_renorm(weights, weights) shot_noise = ( box_volume * np.sum((weights * real_field) ** 2) / np.sum(weights * real_field) ** 2 * weights_renorm * np.mean(weights) ** 2 ) return shot_noise
[docs] def get_modelpk_conv(psmod, weights1_in_real=None, weights2=None, renorm=True): """ Convolve a model power spectrum with real-space weights. Parameters ---------- psmod: np.ndarray The model power spectrum. weights1_in_real: np.ndarray, default None The real-space weights for the first field. Default is None, which means no weights. weights2: np.ndarray, default None The real-space weights for the second field. Default is None, which means no weights. renorm: bool, default True Whether to renormalize the power spectrum. Returns ------- power_conv: np.ndarray The convolved power spectrum. """ if weights1_in_real is None and weights2 is None: return psmod if weights1_in_real is None: weights1_in_real = np.ones_like(weights2) if weights2 is None: weights2 = np.ones_like(weights1_in_real) assert np.allclose(weights1_in_real.shape, weights2.shape) if np.allclose(weights1_in_real, 1) and np.allclose(weights2, 1): return psmod weights_fourier = np.fft.rfftn(weights1_in_real) weights_fourier *= np.conj(np.fft.rfftn(weights2)) # using fft instead of rfft somehow is wrong, I have no idea why # the behaviour of convolving with uniform weights is incorrect at k_xyz=0 due to rfft power_conv = ( np.fft.rfftn( np.fft.irfftn(psmod, axes=[0, 1, 2], s=weights2.shape) * np.fft.irfftn(weights_fourier, axes=[0, 1, 2], s=weights2.shape) ) / weights1_in_real.size ) * ( (weights2.shape[-1] // 2 * 2) / weights2.shape[-1] ) # rfft correction for odd number? if renorm: weights_renorm = power_weights_renorm(weights1_in_real, weights2=weights2) power_conv *= weights_renorm return power_conv.real
[docs] def power_weights_renorm(weights1_in_real=None, weights2=None): r""" Calculate the renormalization coefficient based on the weights on the density field when calculating power spectrum. The renormalization is defined as .. math:: \frac{{N_{\rm grid}}} {\sum_{i} w_1(x_i) w_2(x_i)}, where :math:`N_{\rm grid}` is the number of grids in the box and :math:`i` loops over all the grids. Note that this renormaliszation corresponds to the diagonal renormalisation matrix that does not change the window function convolution, but only renormalises the sum of each row of the window function matrix. See Chen (2025) [1] for more details. Parameters ---------- weights1_in_real: array, default None. The weights of the density field in real space. Must be in the shape of the regular grid field. weights2: array, default None. If cross-correlation, the weights for the second field. Returns ------- weights_norm: float. The renormalization coefficient. References ---------- [1] Chen, Z., 2025, "A quadratic estimator view of the transfer function correction in intensity mapping surveys", https://ui.adsabs.harvard.edu/abs/2025MNRAS.542L...1C/abstract. """ if weights1_in_real is None and weights2 is None: return 1.0 if weights1_in_real is None: weights1_in_real = np.ones_like(weights2) if weights2 is None: weights2 = np.ones_like(weights1_in_real) weights_norm = weights1_in_real.size / np.sum(weights1_in_real * weights2) return weights_norm
[docs] def get_power_spectrum( fourier_field, box_len, weights=None, field_2=None, weights_2=None, renorm=True, ): """ Calculate the power spectrum for one/two given Fourier fields. Parameters ---------- fourier_field: np.ndarray The Fourier field of the first tracer. box_len: tuple The length of the box along each direction. weights: np.ndarray, default None The weights of the first tracer **in real space**. field_2: np.ndarray, default None The Fourier field of the second tracer. If None, it is set to be the same as the first field. weights_2: np.ndarray, default None The weights of the second tracer **in real space**. **Must be provided if field_2 is provided.** renorm: bool, default True Whether to renormalize the power spectrum by the weights. Returns ------- power: np.ndarray The power spectrum. """ box_len = np.array(box_len) box_volume = np.prod(box_len) if field_2 is None: field_2 = fourier_field fourier_field = np.array(fourier_field) field_2 = np.array(field_2) power = np.real(fourier_field * np.conj(field_2)) if weights is None and weights_2 is None: return power * box_volume if weights is None: weights = np.ones(weights_2.shape) if weights_2 is None: weights_2 = weights # if weights_2 is None, the renormalisation sets it to weights weights_norm = power_weights_renorm(weights, weights_2) if renorm: power *= weights_norm return power * box_volume
[docs] def get_gaussian_noise_floor( sigma_n, box_ndim, box_volume=1.0, counts=None, ): """ Calculate the Gaussian noise floor for a given field. Parameters ---------- sigma_n: float The standard deviation of the noise before being averaged down by the sampling. box_ndim: tuple The number of grids of the box along each direction. box_volume: float, default 1.0 The volume of the box. counts: np.ndarray, default None The number of sampling in the box. If None, it is set to be 1.0. Returns ------- noise_floor: float The noise floor. """ box_ndim = np.array(box_ndim) if counts is None: counts = np.ones(box_ndim.tolist()) counts = np.array(counts) box_std = sigma_n / np.sqrt(counts) fourier_var = np.sum(box_std**2) / np.prod(box_ndim) ** 2 return fourier_var * box_volume
[docs] def bin_3d_to_1d( ps3d, kfield, k1dedges, weights=None, error=False, vectorize=False, ): r""" Bin a 3d distribution, e.g. power spectrum :math:`P_{3D}(\vec{k})`, into 1D average. Note that, the distribution is unraveled to a 1D array, so essentially an array of any dimension would do, as long as ``ps3d``, ``kfield``, and ``weights`` have the same size. The mean of the 1D average is calculated as .. math:: \hat{P}_{\rm 1D}^{i} = \big(\sum_j P_{\rm 3D}^{ j} w_{ j} \big)/\big(\sum_j w_{ j}\big), where j loops over all the modess that fall into the :math:`i^{\rm th}` bin and :math:`w_{ j}` is the weights. If ``error`` is set to ``True``, a sampling error is calculated and returned so that .. math:: (\Delta P_{\rm 1D}^{\rm i})^2 = \big(\sum_j (P_{\rm 3D}^{\rm j}-\hat{P}_{\rm 1D}^{\rm i})^2 w_{\rm j}^2 \big) \Big/ \big(\sum_j w_{\rm j}\big)^2. Parameters ---------- ps3d: np.ndarray The 3D distribution to be binned. kfield: np.ndarray The k-field of the 3D distribution. k1dedges: np.ndarray The bin edges for the 1D power spectrum. weights: np.ndarray, default None The weights for each 3D k-mode of the power spectrum. error: bool, default False Whether to calculate the sampling error. vectorize: bool, default False Whether to vectorize the calculation, assuming the first axis is independent realisations. Returns ------- ps1d: np.ndarray The 1D power spectrum. ps1derr: np.ndarray The sampling error for the 1D power spectrum. Returned only if ``error`` is ``True``. k1deff: np.ndarray The effective k-mode for each bin. nmodes: np.ndarray The number of modes in each bin. """ if not vectorize: ps3d = np.array(ps3d)[None, ...] if weights is None: weights = np.ones_like(ps3d[0]) ps3d = np.array(ps3d).reshape(len(ps3d), -1) kfield = np.array(kfield).ravel() weights = np.array(weights).ravel() indx = (kfield[:, None] >= k1dedges[None, :-1]) * ( kfield[:, None] < k1dedges[None, 1:] ) with np.errstate(divide="ignore", invalid="ignore"): ps1d = np.sum( ps3d[:, :, None] * indx[None, :, :] * weights[None, :, None], 1 ) / np.sum(indx[None, :, :] * weights[None, :, None], 1) k1deff = np.sum(kfield[:, None] * indx * weights[:, None], 0) / np.sum( indx * weights[:, None], 0 ) if error is True: with np.errstate(divide="ignore", invalid="ignore"): ps1derr = np.sqrt( np.sum( (ps3d[:, :, None] - ps1d[:, None, :]) ** 2 * (indx[None, :, :] * weights[None, :, None]) ** 2, 1, ) / np.sum((indx[None, :, :] * weights[None, :, None]), 1) ** 2 ) if not vectorize: ps1derr = ps1derr[0] if not vectorize: ps1d = ps1d[0] nmodes = np.sum(indx * (weights[:, None] > 0), 0) if error is True: return ps1d, ps1derr, k1deff, nmodes else: return ps1d, k1deff, nmodes
[docs] def bin_3d_to_cy( ps3d, kperp_i, kperpedges, weights=None, average=True, vectorize=False, ): """ Function to bin a 3D distribution (e.g. power spectrum) into cylindrical average. The arrays are unravelled to 2D before binning by keeping the last axis. The 2D array is then binned along the first axis. The output is flipped so that the first axis is the original last axis. Therefore, to bin a 3D power spectrum to a cylindrical average, one can simply run ``bin_3d_to_cy`` twice (see ``PowerSpectrum.get_cy_power``). Parameters ---------- ps3d: array. The 3D distribution to be binned. kperp_i: array. The perpendicular k-mode corresponding to the first axis. kperpedges: array. The bin edges for the perpendicular k-mode. weights: array, None. The weights for each 3D k-mode of the power spectrum. average: bool, default True. If ``True``, calculate the weighted average of the power spectrum in each bin. Else, calculate the weighted sum. vectorize: bool, default False Whether to vectorize the calculation, assuming the first axis is independent realisations. Returns ------- pscy: np.ndarray The cylindrical average of the 3D distribution. """ ps3d = np.array(ps3d) if not vectorize: ps3d = ps3d[None, ...] kperpedges = np.array(kperpedges) kperp_i = np.array(kperp_i).ravel() ps3d = ps3d.reshape((len(ps3d), len(kperp_i), -1)) if weights is None: weights = np.ones_like(ps3d[0]) weights = np.array(weights).reshape((len(kperp_i), -1)) indx = (kperp_i[:, None] >= kperpedges[None, :-1]) * ( kperp_i[:, None] < kperpedges[None, 1:] ) weights = indx[:, None, :] * weights[:, :, None] pscy = np.sum(ps3d[:, :, :, None] * weights[None], 1) if average: pscy = pscy / np.sum(weights, 0)[None] if not vectorize: pscy = pscy[0] return pscy
[docs] def gaussian_beam_attenuation(k_perp, beam_sigma_in_mpc): """ The beam attenuation term to be multiplied to model power spectrum assuming a Gaussian beam. Parameter --------- k_perp: np.ndarray. The transverse k-scale in Mpc^-1 beam_sigma_in_mpc: float. The sigma of the Gaussian beam in Mpc. Returns ------- beam_attenuation: np.ndarray. The beam attenuation factor. """ return np.exp(-(k_perp**2) * beam_sigma_in_mpc**2 / 2)
[docs] def step_window_attenuation(k_dir, step_size_in_mpc, p=1): """ The beam attenuation term to be multiplied to model power spectrum assuming a Gaussian beam. Parameter --------- k_perp: float. The transverse k-scale in Mpc^-1 beam_sigma_in_mpc: float. The sigma of the Gaussian beam in Mpc. p: int, default 1 The index of assignment scheme. Returns ------- window_attenuation: np.ndarray. The window attenuation factor. """ # note np.sinc is sin(pi x)/(pi x) return np.sinc(k_dir * step_size_in_mpc / np.pi / 2) ** p
[docs] class PowerSpectrum(FieldPowerSpectrum, ModelPowerSpectrum): """ The class for coherently combining the :class:`FieldPowerSpectrum` and :class:`ModelPowerSpectrum` classes, and providing an interface for gridding the intensity mapping data as well as the galaxy catalogue to perform power spectrum estimation and for auto-correlation and cross-correlation. Note that, while you can manually set most of the attributes such as the box length, the density field, the weights, etc., the class is mainly used for first gridding the intensity mapping data and the galaxy catalogue to a rectangular grid field, which then set these attributes automatically. The usual input parameters are those of :class:`meer21cm.dataanalysis.Specification`. For usage, check the tutorial notebooks in the examples and cookbook sections. Parameters ---------- field_1: np.ndarray, default None The density field of the first tracer. box_len: list of 3 floats. The length of the box along each axis. weights_field_1: np.ndarray, default None The field-level weights of the first tracer. Default is uniform weights. weights_grid_1: np.ndarray, default None The grid-level weights of the first tracer. Default is uniform weights. mean_center_1: bool, default False Whether to mean-center the first tracer field. unitless_1: bool, default False Whether to divide the first tracer field by its mean. field_2: np.ndarray, default None The density field of the second tracer. If None, calculation of the second tracer and the cross-correlation will be skipped. weights_field_2: np.ndarray, default None The field-level weights of the second tracer. Default is uniform weights. weights_grid_2: np.ndarray, default None The grid-level weights of the second tracer. Default is uniform weights. mean_center_2: bool, default False Whether to mean-center the second tracer field. unitless_2: bool, default False Whether to divide the second tracer field by its mean. k1dbins: list of floats, default None The bin edges of k in Mpc-1 for the 1D power spectrum. kmode: float, default None The mode of 3D k in Mpc-1 for the model calculation. mumode: float, default None The mu mode of each 3D k-mode for the model calculation. tracer_bias_1: float, default 1.0 The linear bias of the first tracer. sigma_v_1: float, default 0.0 The velocity dispersion of the first tracer in km/s. tracer_bias_2: float, default None The linear bias of the second tracer. sigma_v_2: float, default 0.0 The velocity dispersion of the second tracer in km/s. include_beam: list, default [True, False] Whether to include the beam attenuation in the model calculation. fog_profile: str, default "lorentz" The shape of the finger-of-god profile to be used in the model calculation. cross_coeff: float, default 1.0 The cross-correlation coefficient between the two tracers. model_k_from_field: bool, default True Whether to calculate the model k-mode ``self.kmode`` from the field k-mode ``self.k_mode``. mean_amp_1: float, default 1.0 The mean amplitude of the first tracer. mean_amp_2: float, default 1.0 The mean amplitude of the second tracer. sampling_resol: list, default None The sampling resolution of the field in Mpc. If ``sampling_resol`` is "auto", the sampling resolution will be set to the pixel size of the map. include_sky_sampling: list, default [True, False] Whether to include the sky sampling in the model calculation. If just a boolean is provided, it will be used for both tracers. downres_factor_transverse: float, default None The down-sampling factor for the transverse direction of the rectangular box for gridding. downres_factor_radial: float, default None The down-sampling factor for the radial direction of the rectangular box for gridding. init_box_from_map_data: bool, default False If True, the box dimensions will be calculated from the input data cube during initialization. You can always manually set the box dimensions later by ``self.get_enclosing_box()``. box_buffkick: float, default 5 The buffer kick for the box on each side when gridding. In the unit of Mpc. compensate: list, default [False, False] Whether the gridded fields are compensated according to the mass assignment scheme. Note that the compensation is applied to the model power spectrum, and **not** to the gridded data fields. taper_func: function, default windows.blackmanharris The taper function to be applied to the gridded field. Note that the taper function is not automatically applied, and is only used when calling :meth:`PowerSpectrum.apply_taper_to_field`. kaiser_rsd: bool, default True Whether to include the RSD effect in the model calculation and mock simulation. grid_scheme: str, default "nnb" The grid scheme to be used for gridding. Can be "nnb", "cic", "tsc" or "pcs". interlace_shift: float, default 0.0 The shift in the unit of grid cell size for interlacing. num_particle_per_pixel: int, default 1 The number of random sampling particles for each sky map pixel. seed: int, default None The seed for the random number generator. kperpbins: list of floats, default None The bin edges of k_perp in Mpc-1 for the cylindrical average power spectrum. kparabins: list of floats, default None The bin edges of k_para in Mpc-1 for the cylindrical average power spectrum. flat_sky: bool, default False Whether to use the flat sky approximation. flat_sky_padding: list, default [0, 0, 0] The padding for the flat sky box. **params: dict Additional parameters to be passed to the base class :class:`meer21cm.cosmology.CosmologyCalculator`. """ def __init__( self, field_1=None, box_len=None, weights_field_1=None, weights_grid_1=None, mean_center_1=False, unitless_1=False, field_2=None, weights_field_2=None, weights_grid_2=None, mean_center_2=False, unitless_2=False, k1dbins=None, kmode=None, mumode=None, tracer_bias_1=1.0, sigma_v_1=0.0, tracer_bias_2=None, sigma_v_2=0.0, include_beam=[True, False], fog_profile="lorentz", cross_coeff=1.0, model_k_from_field=True, mean_amp_1=1.0, mean_amp_2=1.0, sampling_resol=None, include_sky_sampling=[True, False], downres_factor_transverse=1.2, downres_factor_radial=2.0, init_box_from_map_data=False, box_buffkick=5, compensate=[False, False], taper_func=windows.blackmanharris, kaiser_rsd=True, grid_scheme="nnb", interlace_shift=0.0, num_particle_per_pixel=1, seed=None, kperpbins=None, kparabins=None, flat_sky=False, flat_sky_padding=[0, 0, 0], k1dweights=None, **params, ): if seed is None: seed = np.random.randint(0, 2**32) self.seed = seed self.num_particle_per_pixel = num_particle_per_pixel if field_1 is None: if "box_ndim" in params.keys(): field_1 = np.ones(params["box_ndim"]) else: field_1 = np.ones([1, 1, 1]) if box_len is None: box_len = np.array([1, 1, 1]) FieldPowerSpectrum.__init__( self, field_1, box_len, weights_1=weights_grid_1, mean_center_1=mean_center_1, unitless_1=unitless_1, field_2=field_2, weights_2=weights_grid_2, mean_center_2=mean_center_2, unitless_2=unitless_2, ) self.kmode = kmode self.mumode = mumode ModelPowerSpectrum.__init__( self, kmode=self.kmode, mumode=self.mumode, tracer_bias_1=tracer_bias_1, sigma_v_1=sigma_v_1, tracer_bias_2=tracer_bias_2, sigma_v_2=sigma_v_2, include_beam=include_beam, fog_profile=fog_profile, cross_coeff=cross_coeff, weights_field_1=weights_field_1, weights_field_2=weights_field_2, weights_grid_1=weights_grid_1, weights_grid_2=weights_grid_2, mean_amp_1=mean_amp_1, mean_amp_2=mean_amp_2, sampling_resol=sampling_resol, include_sky_sampling=include_sky_sampling, kaiser_rsd=kaiser_rsd, compensate=compensate, **params, ) if model_k_from_field: self.propagate_field_k_to_model() self.model_k_from_field = model_k_from_field self.k1dbins = k1dbins self.kperpbins = kperpbins self.kparabins = kparabins self.downres_factor_transverse = downres_factor_transverse self.downres_factor_radial = downres_factor_radial init_attr = [ "_rot_mat_sky_to_box", "_pix_coor_in_cartesian", "_counts_in_box", "_flat_sky", "_box_origin", "_box_voxel_redshift", ] for attr in init_attr: setattr(self, attr, None) self.upgrade_sampling_from_gridding = False self.box_buffkick = box_buffkick self.taper_func = taper_func if init_box_from_map_data: self.get_enclosing_box() self.grid_scheme = grid_scheme self.interlace_shift = interlace_shift self.flat_sky = flat_sky self.flat_sky_padding = flat_sky_padding self.k1dweights = k1dweights @property def seed(self): """ Seed value for RNG calls throughout the instance. """ return self._seed @seed.setter def seed(self, pseed): self._seed = pseed if "seed_dep_attr" in dir(self): self.clean_cache(self.seed_dep_attr) @property def box_buffkick(self): """ The buffer kick for the box on each side when gridding. In the unit of Mpc. """ return self._box_buffkick @box_buffkick.setter def box_buffkick(self, value): if not isinstance(value, Iterable): self._box_buffkick = np.array([value, value, value]) else: self._box_buffkick = np.array(value) init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug(f"cleaning cache of {init_attr} due to resetting box_buffkick") for attr in init_attr: setattr(self, attr, None) @property def num_particle_per_pixel(self): """ The number of random sampling particles for each sky map pixel. """ return self._num_particle_per_pixel @num_particle_per_pixel.setter def num_particle_per_pixel(self, value): self._num_particle_per_pixel = int(value) init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug( f"cleaning cache of {init_attr} due to resetting num_particle_per_pixel" ) for attr in init_attr: setattr(self, attr, None) @property def interlace_shift(self): """ The length in the unit of grid cell size for shifting the gridded field for interlacing. 0 corresponds to no interlacing. """ return self._interlace_shift @interlace_shift.setter def interlace_shift(self, value): self._interlace_shift = value @property def downres_factor_transverse(self): """ The down-sampling factor for the transverse direction of the rectangular box for gridding. The box resolution is then multiplied by this factor from the resolution of the sky map pixel specified by ``pix_resol_in_mpc``. For example, if ``pix_resol_in_mpc`` is 0.1 Mpc, and ``downres_factor_transverse`` is 2.0, the box resolution will be 0.2 Mpc. """ return self._downres_factor_transverse @downres_factor_transverse.setter def downres_factor_transverse(self, value): self._downres_factor_transverse = value # clean cache init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug( f"cleaning cache of {init_attr} due to resetting downres_factor_transverse" ) for attr in init_attr: setattr(self, attr, None) @property def downres_factor_radial(self): """ The down-sampling factor for the radial direction of the rectangular box for gridding. The box resolution is then multiplied by this factor from the resolution of the frequency channel specified by ``los_resol_in_mpc``. For example, if ``los_resol_in_mpc`` is 0.1 Mpc, and ``downres_factor_radial`` is 2.0, the box resolution will be 0.2 Mpc. """ return self._downres_factor_radial @downres_factor_radial.setter def downres_factor_radial(self, value): self._downres_factor_radial = value # clean cache init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug( f"cleaning cache of {init_attr} due to resetting downres_factor_radial" ) for attr in init_attr: setattr(self, attr, None) @property def counts_in_box(self): """ The counts of the map cube voxels in the rectangular box. """ if self._counts_in_box is None: self._counts_in_box = self.get_counts_in_box() return self._counts_in_box @property def flat_sky(self): """ Whether to use flat sky approximation. If True, no proper projection and sky rotation is considered. Instead, the sky map cube is assumed to be a rectangular grid with equal voxel size specified by ``pix_resol_in_mpc`` and ``los_resol_in_mpc``. """ return self._flat_sky @flat_sky.setter def flat_sky(self, value): self._flat_sky = bool(value) # clean cache init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug(f"cleaning cache of {init_attr} due to resetting flat_sky") for attr in init_attr: setattr(self, attr, None) @property def flat_sky_padding(self): """ Pad the rectangular box in the flat sky approximation. The input should be a list of 3 integers, corresponding to number of padding cells along each dimension in both directions. For example, [1,1,1] will pad 2x2x2 cells. """ return self._flat_sky_padding @flat_sky_padding.setter def flat_sky_padding(self, value): self._flat_sky_padding = value # clean cache init_attr = [ "_box_origin", "_counts_in_box", ] logger.debug(f"cleaning cache of {init_attr} due to resetting flat_sky_padding") for attr in init_attr: setattr(self, attr, None)
[docs] def propagate_field_k_to_model(self): r""" Use field k-modes for the model, taking into account the Alcock–Paczynski effect. .. math:: k_\perp^\text{fiducial} = k_\perp^\text{true} \times \alpha_\perp k_\parallel^\text{fiducial} = k_\parallel^\text{true} \times \alpha_\parallel """ # use field kmode to propagate into model kperp = self.k_perp / self.alpha_perp kpara = self.k_para / self.alpha_parallel self.kmode = np.sqrt(kperp[:, :, None] ** 2 + kpara[None, None, :] ** 2) with np.errstate(divide="ignore", invalid="ignore"): mu = np.nan_to_num(kpara[None, None, :] / self.kmode) self.mumode = np.clip(mu, -1.0, 1.0)
[docs] def get_1d_power( self, power3d, k1dbins=None, k1dweights=None, k_xyz_min=None, k_xyz_max=None, k_perppara_min=None, k_perppara_max=None, multipole_ell=0, mu_model=None, ): """ Bin the 3D power spectrum into 1D power spectrum. If the input ``power3d`` is a string, it is assumed to be an attribute of the class, for example ``auto_power_3d_1``. Also see :meth:`meer21cm.power.bin_3d_to_1d` for more details. By default the 1D power spectrum is calculated for the monopole. Passing ``multipole_ell`` will calculate the 1D power spectrum for the given multipole. Parameters ---------- power3d: np.ndarray or str The 3D power spectrum. k1dbins: np.ndarray, default None The bins for the 1D power spectrum. Default is the same as the class attribute. k1dweights: np.ndarray, default None The weights for the 3D power spectrum. Default is equal weights for every k-mode. k_xyz_min: list of size 3, default None The minimum k-mode for the 1D power spectrum in x, y, z directions. k_xyz_max: list of size 3, default None The maximum k-mode for the 1D power spectrum in x, y, z directions. k_perppara_min: list of size 2, default None The minimum k_perp and k_para for the 1D power spectrum. k_perppara_max: list of size 2, default None The maximum k_perp and k_para for the 1D power spectrum. multipole_ell: int, default 0 The multipole order for the 1D power spectrum. By default the 1D power spectrum is calculated for the monopole. mu_model: np.ndarray, default None The mu-modes for the legendre polynomial. If None, use the class attribute ``mumode``. Returns ------- power1d: np.ndarray The 1D power spectrum. k1deff: np.ndarray The effective k-mode for each bin. nmodes: np.ndarray The number of modes in each bin. """ if k1dbins is None: k1dbins = self.k1dbins if k1dweights is None: k1dweights = self.k1dweights # if still None, use equal weights if k1dweights is None: k1dweights = np.ones_like(self.k_mode) if isinstance(power3d, str): power3d = getattr(self, power3d) # filter k-modes slicer = get_nd_slicer() k_3d_sel_min = 1.0 if k_xyz_min is not None: k_3d_sel_min = [ ((np.abs(self.k_vec[i]) >= k_xyz_min[i]))[slicer[i]] for i in range(len(self.k_vec)) ] k_3d_sel_min = k_3d_sel_min[0] * k_3d_sel_min[1] * k_3d_sel_min[2] k_3d_sel_max = 1.0 if k_xyz_max is not None: k_3d_sel_max = [ ((np.abs(self.k_vec[i]) <= k_xyz_max[i]))[slicer[i]] for i in range(len(self.k_vec)) ] k_3d_sel_max = k_3d_sel_max[0] * k_3d_sel_max[1] * k_3d_sel_max[2] k_cy_sel_min = 1.0 if k_perppara_min is not None: k_cy_sel_min = ((np.abs(self.k_perp) >= k_perppara_min[0]))[:, :, None] * ( (np.abs(self.k_para) >= k_perppara_min[1]) )[None, None, :] k_cy_sel_max = 1.0 if k_perppara_max is not None: k_cy_sel_max = ((np.abs(self.k_perp) <= k_perppara_max[0]))[:, :, None] * ( (np.abs(self.k_para) <= k_perppara_max[1]) )[None, None, :] k1dweights = ( k1dweights * k_3d_sel_min * k_3d_sel_max * k_cy_sel_min * k_cy_sel_max ) k1dweights[0, 0, 0] = 0.0 if mu_model is None: mu_model = self.mumode multipole_factor = np.poly1d(legendre_polynomial_with_factor(multipole_ell))( mu_model ) power1d, k1deff, nmodes = bin_3d_to_1d( power3d * multipole_factor, self.k_mode, k1dbins, weights=k1dweights, ) return power1d, k1deff, nmodes
[docs] def get_cy_power( self, power3d, kperpbins=None, kparabins=None, kcyweights=None, multipole_ell=0, mu_model=None, ): """ Bin the 3D power spectrum into cylindrical k_perp-k_para power spectrum. If the input ``power3d`` is a string, it is assumed to be an attribute of the class, for example ``auto_power_3d_1``. Also see :meth:`meer21cm.power.bin_3d_to_cy` for more details. Passing ``multipole_ell`` will calculate the cylindrical power spectrum multiplied by the Legendre polynomial. Parameters ---------- power3d: np.ndarray or str The 3D power spectrum. kperpbins: np.ndarray, default None The k_perp bins for the cylindrical ps. Default is the same as the class attribute. kparabins: np.ndarray, default None The k_para bins for the cylindrical ps. Default is the same as the class attribute. kcyweights: np.ndarray, default None The weights for the 3D power spectrum. Default is equal weights for every k-mode. multipole_ell: int, default 0 The multipole order for the cylindrical power spectrum. By default the cylindrical power spectrum is calculated for the monopole. mu_model: np.ndarray, default None The mu-modes for the legendre polynomial. If None, use the class attribute ``mumode``. Returns ------- powercy: np.ndarray The cylindrical power spectrum. weightscy: np.ndarray The weights for the cylindrical k-modes. """ if kperpbins is None: kperpbins = self.kperpbins if kparabins is None: kparabins = self.kparabins if kcyweights is None: kcyweights = np.ones_like(self.k_mode) if isinstance(power3d, str): power3d = getattr(self, power3d) kcyweights[0, 0, 0] = 0.0 if mu_model is None: mu_model = self.mumode multipole_factor = np.poly1d(legendre_polynomial_with_factor(multipole_ell))( mu_model ) powercy = bin_3d_to_cy( power3d * multipole_factor, self.k_perp, kperpbins, weights=kcyweights, ) weightscy = bin_3d_to_cy( kcyweights, self.k_perp, kperpbins, weights=kcyweights, average=False, ) powercy = bin_3d_to_cy( powercy, np.abs(self.k_para), kparabins, weights=weightscy, ) weightscy = bin_3d_to_cy( weightscy, np.abs(self.k_para), kparabins, weights=weightscy, average=False, ) return powercy, weightscy
# calculate on-the-fly, no cache
[docs] def map_sampling(self, sampling_resol=None, p=1): """ The sampling window function from the map cube to be convolved with model power spectrum. This should correspond to the resolution of map-making on the sky and the frequency channel, **not** the resolution of the gridded field. Parameters ---------- sampling_resol: list, default None The sampling resolution of the field in Mpc. Default is the class attribute ``sampling_resol``. p: int, default 1 The index of assignment scheme. Returns ------- B_sampling: np.ndarray. The sampling window function in 3D k-space. """ if not self.has_resol: return 1.0 k_x = self.k_vec[0][:, None, None] k_y = self.k_vec[1][None, :, None] k_para = self.k_mode * self.mumode if sampling_resol is None: sampling_resol = self.sampling_resol B_sampling = np.nan_to_num( step_window_attenuation(k_x, sampling_resol[0], p) * step_window_attenuation(k_y, sampling_resol[1], p) * step_window_attenuation(k_para, sampling_resol[2], p) ) return B_sampling
[docs] def gridding_compensation(self): """ The sampling window function to be compensated for the gridding mass assignment scheme. """ return fourier_window_for_assignment(self.box_ndim, self.grid_scheme)
@property def box_origin(self): """ The coordinate of the origin of the box in Mpc. See :func:`meer21cm.grid.minimum_enclosing_box_of_lightcone` for definition. """ return self._box_origin @box_origin.setter def box_origin(self, value): self._box_origin = np.array(value) @property def rot_mat_sky_to_box(self): """ The rotational matrix from spheircal cooridnate to regular box. See :func:`meer21cm.grid.minimum_enclosing_box_of_lightcone` for definition. """ return self._rot_mat_sky_to_box @property def pix_coor_in_cartesian(self): """ The cartesian coordinate of the pixels in Mpc. """ return self._pix_coor_in_cartesian @property def pix_coor_in_box(self): """ The cartesian coordinate of the pixels in Mpc, shifted so that the origin is the origin of the enclosing box. """ return self.pix_coor_in_cartesian - self.box_origin[None, :]
[docs] def use_flat_sky_box(self, flat_sky_padding=None): """ Use flat sky approximation to calculate the box dimensions. Parameters ---------- flat_sky_padding: list, default None The padding for the flat sky box. If None, use the class attribute ``flat_sky_padding``. """ if flat_sky_padding is None: flat_sky_padding = self.flat_sky_padding logger.debug(f"using flat sky box with padding {flat_sky_padding}") logger.info( f"{inspect.currentframe().f_code.co_name}: setting self.box_ndim, self.box_len, self.box_origin" ) self.box_ndim = np.array(self.data.shape) + 2 * np.array(flat_sky_padding) self.box_len = np.array(self.box_ndim) * np.array( [ self.pix_resol_in_mpc, self.pix_resol_in_mpc, self.los_resol_in_mpc, ] ) # flat sky does not have rotation so there is no box_origin self.box_origin = np.array([0, 0, 0]) if self.model_k_from_field: logger.info( f"{inspect.currentframe().f_code.co_name}: " "setting the model self.kmode and self.mumode to correspond to the field k-modes" ) self.propagate_field_k_to_model() self._counts_in_box = None nu_ext = np.linspace( self.nu.min() - self.freq_resol * flat_sky_padding[2], self.nu.max() + self.freq_resol * flat_sky_padding[2], len(self.nu) + 2 * flat_sky_padding[2], ) self._box_voxel_redshift = ( np.ones(self.box_ndim) * freq_to_redshift(nu_ext)[None, None, :] )
[docs] def get_enclosing_box(self, rot_mat=None): """ invoke to calculate the box dimensions for enclosing all the map pixels. Parameters ---------- rot_mat: np.ndarray, default None The rotational matrix from the sky to the box. If None, calculates the suitable rotation matrix automatically. """ if self.flat_sky: self.use_flat_sky_box() if self.model_k_from_field: logger.info( f"{inspect.currentframe().f_code.co_name}: " "setting the model self.kmode and self.mumode to correspond to the field k-modes" ) self.propagate_field_k_to_model() return 1 ra = self.ra_map.copy()[self.W_HI.sum(-1) > 0] dec = self.dec_map.copy()[self.W_HI.sum(-1) > 0] logger.debug(f"calculating enclosing box for {len(ra)} particles") ( _x_start, _y_start, _z_start, _x_len, _y_len, _z_len, rot_back, pos_arr, ) = minimum_enclosing_box_of_lightcone( ra, dec, self.nu, cosmo=self.astropy_cosmo_fiducial, return_coord=True, buffkick=self.box_buffkick, rot_mat=rot_mat, ) logger.debug( f"{inspect.currentframe().f_code.co_name}: calculated enclosing box with size {_x_len} x {_y_len} x {_z_len}" ) logger.info( f"{inspect.currentframe().f_code.co_name}: setting self.box_len, self.box_origin, self.box_ndim" ) self._box_origin = np.array([_x_start, _y_start, _z_start]) self._box_len = np.array( [ _x_len, _y_len, _z_len, ] ) self._rot_mat_sky_to_box = np.linalg.inv(rot_back) # random sample num_p = self.num_particle_per_pixel ra_sample = [ ra, ] * num_p dec_sample = [ dec, ] * num_p nu_sample = [ self.nu, ] * num_p ra_sample = np.array(ra_sample) dec_sample = np.array(dec_sample) nu_sample = np.array(nu_sample) logger.debug(f"randomly sampled {num_p} particles in each pixel") rng = np.random.default_rng(seed=self.seed) rand_angle = rng.uniform( -self.pix_resol / 2, self.pix_resol / 2, size=(2,) + ra_sample[1:].shape ) rand_nu = rng.uniform( -self.freq_resol / 2, self.freq_resol / 2, size=(1,) + nu_sample[1:].shape ) ra_sample[1:] += rand_angle[0] dec_sample[1:] += rand_angle[1] nu_sample[1:] += rand_nu[0] pos_arr = [ pos_arr, ] for i in range(1, num_p): (_, _, _, _, _, _, _, pos_arr_i) = minimum_enclosing_box_of_lightcone( ra_sample[i], dec_sample[i], nu_sample[i], cosmo=self.astropy_cosmo_fiducial, return_coord=True, buffkick=self.box_buffkick, rot_mat=self.rot_mat_sky_to_box, ) pos_arr.append(pos_arr_i) pos_arr = np.array(pos_arr) pos_arr = pos_arr.reshape((-1, 3)) self._pix_coor_in_cartesian = pos_arr downres = np.array( [ self.downres_factor_transverse, self.downres_factor_transverse, self.downres_factor_radial, ] ) pix_resol_in_mpc = self.pix_resol_in_mpc los_resol_in_mpc = self.los_resol_in_mpc box_resol = ( np.array([pix_resol_in_mpc, pix_resol_in_mpc, los_resol_in_mpc]) * downres ) ndim_rg = self.box_len / box_resol ndim_rg = ndim_rg.astype("int") for i in range(3): if ndim_rg[i] % 2 == 0: ndim_rg[i] += 1 box_resol = self.box_len / ndim_rg self.box_ndim = ndim_rg logger.debug( f"calculated box resolution due to downres factor: {box_resol}, {downres}" ) self._counts_in_box = None slicer = get_nd_slicer() vec = [(self.x_vec[i] + self.box_origin[i])[slicer[i]] for i in range(3)] vec_len = np.sqrt(vec[0] ** 2 + vec[1] ** 2 + vec[2] ** 2) self._box_voxel_redshift = self.z_as_func_of_comov_dist(vec_len) if self.model_k_from_field: logger.info( f"{inspect.currentframe().f_code.co_name}: " "setting the model self.kmode and self.mumode to correspond to the field k-modes" ) self.propagate_field_k_to_model()
@property def average_model_hi_temp(self): """ Calculate the average HI brightness temperature in the map cube, taking care of redshift evolution and map sampling. Calculation is based on the true (fitted) cosmology. """ t_bar = omega_hi_to_average_temp( self.omega_hi, z=self.z_ch, cosmo=self.astropy_cosmo_true ) t_bar = (t_bar * self.w_HI.sum((0, 1))).sum() / self.w_HI.sum() return t_bar @property def model_hi_temp_in_box(self): """ Based on the redshift dependence of Omega_HI, calculate the average HI brightness temperature for each grid in the rectangular box. This can be used in a way that the weighted average of the ``model_hi_temp_in_box`` is used as the average t_bar in the model power spectrum (by passing it to ``mean_amp_1``), whereas the 3D ``model_hi_temp_in_box`` is used as the field weight to account for the effect of the redshift evolution of Omega_HI in the power spectrum. Calculation is based on the true (fitted) cosmology. """ z_grid = self._box_voxel_redshift omega_hi_grid = self.omega_hi_z_func(z_grid) t_bar_grid = omega_hi_to_average_temp( omega_hi_grid, z=z_grid, cosmo=self.astropy_cosmo_true ) return t_bar_grid
[docs] def get_counts_in_box(self, partial_sel=None): """ Grid the counts of the map cube voxels into the rectangular box, and return the effective counts per rectangular grid. Parameters ---------- partial_sel: array, default None An additional selection function of the data on top of W_HI. Allows hacking for batch processing. Returns ------- counts_in_grids: array. The counts of the map cube voxels in the rectangular box. """ if self.flat_sky: counts_in_grids = self.w_HI else: pix_coor_orig = self.pix_coor_in_box.reshape( (self.num_particle_per_pixel, -1) )[0].reshape((-1, 3)) num_pix = (self.W_HI.sum(-1) > 0).sum() if partial_sel is None: partial_sel = slice(None) pix_coor_orig = pix_coor_orig.reshape((num_pix, self.nu.size, 3)) pix_coor_orig = pix_coor_orig[partial_sel].reshape((-1, 3)) counts_in_grids, _, _ = project_particle_to_regular_grid( pix_coor_orig, self.box_len, self.box_ndim, grid_scheme=self.grid_scheme, particle_mass=self.w_HI[self.W_HI.sum(-1) > 0][partial_sel].ravel(), compensate=False, # compensate should be at model level average=False, ) return counts_in_grids
[docs] def grid_data_to_field(self, flat_sky=None, partial_sel=None): """ Grid the stored data map to a rectangular grid field. If flat_sky is True, no gridding is performed. Instead, the map cube dimensions are taken to be a rectangular grid, with the grid length corresponding to the pixel resolution on x/y and los frequency resolution as z. If flat_sky is False, the data is gridded onto a regular grid using the input grid scheme and performing the proper curved sky projection. The gridded field is stored as field_1 and the weights are stored as weights_1. Parameters ---------- flat_sky: bool, default None If True, use flat sky approximation. partial_sel: array, default None An additional selection function of the data on top of W_HI. Allows hacking for batch processing. """ if flat_sky is None: flat_sky = self.flat_sky if flat_sky: self.field_1 = self.data self.weights_1 = self.w_HI.astype(float) self.use_flat_sky_box(flat_sky_padding=[0, 0, 0]) self.mean_center_1 = False self.unitless_1 = False self.include_sky_sampling = [True, False] self.compensate = False self.include_beam = [True, False] return self.field_1, self.weights_1, (self.weights_1 > 0).astype(float) if self.box_origin is None: self.get_enclosing_box() num_pix = (self.W_HI.sum(-1) > 0).sum() all_sel = np.arange(num_pix) if partial_sel is None: selected_pix = all_sel else: selected_pix = all_sel[partial_sel] batch_sel = np.array_split(selected_pix, self.batch_number) batch_sel = [sel for sel in batch_sel if sel.size > 0] if len(batch_sel) == 0: shape = tuple(self.box_ndim.tolist()) hi_map_rg = np.zeros(shape, dtype=self.real_dtype) hi_weights_sum = np.zeros(shape, dtype=self.real_dtype) pixel_counts_sum = np.zeros(shape, dtype=self.real_dtype) self.field_1 = hi_map_rg self.weights_1 = pixel_counts_sum.astype(float) self.unitless_1 = False include_beam = np.array(self.include_beam) include_beam[0] = True self.include_beam = include_beam include_sky_sampling = np.array(self.include_sky_sampling) include_sky_sampling[0] = True self.include_sky_sampling = include_sky_sampling return hi_map_rg, hi_weights_sum, pixel_counts_sum map_weighted_sum = None hi_weights_sum = None pixel_counts_sum = None for sel in batch_sel: hi_map_i, hi_weights_i, pixel_counts_i = self._grid_data_to_field(sel) if map_weighted_sum is None: map_weighted_sum = np.zeros_like(hi_map_i) hi_weights_sum = np.zeros_like(hi_weights_i) pixel_counts_sum = np.zeros_like(pixel_counts_i) map_weighted_sum += hi_map_i * hi_weights_i hi_weights_sum += hi_weights_i pixel_counts_sum += pixel_counts_i with np.errstate(divide="ignore", invalid="ignore"): hi_map_rg = np.nan_to_num(map_weighted_sum / hi_weights_sum) self.field_1 = hi_map_rg self.weights_1 = pixel_counts_sum.astype(float) self.unitless_1 = False include_beam = np.array(self.include_beam) include_beam[0] = True self.include_beam = include_beam include_sky_sampling = np.array(self.include_sky_sampling) include_sky_sampling[0] = True self.include_sky_sampling = include_sky_sampling return hi_map_rg, hi_weights_sum, pixel_counts_sum
def _grid_data_to_field(self, partial_sel): num_pix = (self.W_HI.sum(-1) > 0).sum() data_particle = self.data[self.W_HI.sum(-1) > 0] weights_particle = self.w_HI[self.W_HI.sum(-1) > 0] num_p = self.num_particle_per_pixel data_particle = [data_particle] * num_p weights_particle = [weights_particle] * num_p real_dtype = real_dtype_from_array(self.data) data_particle = np.asarray(data_particle, dtype=real_dtype)[ :, partial_sel ].ravel() weights_particle = np.asarray(weights_particle, dtype=real_dtype)[ :, partial_sel ].ravel() pix_coor_in_box = self.pix_coor_in_box.reshape( (num_p, num_pix, self.nu.size, 3) ) pix_coor_in_box = pix_coor_in_box[:, partial_sel].reshape((-1, 3)) hi_map_rg, hi_weights_rg, pixel_counts_hi_rg = project_particle_to_regular_grid( pix_coor_in_box, self.box_len, self.box_ndim, grid_scheme=self.grid_scheme, particle_mass=data_particle, particle_weights=weights_particle, compensate=False, # compensate should be at model level ) hi_map_rg2, _, _ = project_particle_to_regular_grid( pix_coor_in_box, self.box_len, self.box_ndim, grid_scheme=self.grid_scheme, particle_mass=data_particle, particle_weights=weights_particle, compensate=False, # compensate should be at model level shift=self.interlace_shift, ) hi_map_rg = interlace_two_fields(hi_map_rg, hi_map_rg2, self.interlace_shift) hi_map_rg = np.asarray(hi_map_rg, dtype=real_dtype) hi_weights_rg = np.asarray(hi_weights_rg, dtype=real_dtype) return hi_map_rg, hi_weights_rg, pixel_counts_hi_rg
[docs] def grid_gal_to_field(self, radecfreq=None, flat_sky=None): """ Grid the galaxy catalogue to a rectangular grid field. If flat_sky is True, no gridding is performed. Instead, the map cube dimensions are taken to be a rectangular grid, with the grid length corresponding to the pixel resolution on x/y and los frequency resolution as z. """ if self.box_origin is None: self.get_enclosing_box() if flat_sky is None: flat_sky = self.flat_sky if radecfreq is None: ra_gal = self.ra_gal dec_gal = self.dec_gal freq_gal = self.freq_gal else: ra_gal, dec_gal, freq_gal = radecfreq real_dtype = self.real_dtype if ra_gal.size == 0: gal_pos_in_box = np.zeros((0, 3), dtype=real_dtype) if flat_sky: self.compensate = False z_gal = freq_to_redshift(freq_gal) self.use_flat_sky_box(flat_sky_padding=[0, 0, 0]) pos_indx_1, pos_indx_2 = radec_to_indx( ra_gal, dec_gal, self.wproj, to_int=False ) if ra_gal.size > 0: gal_pos_in_box = np.zeros((ra_gal.size, 3), dtype=real_dtype) gal_pos_in_box[:, 0] = pos_indx_1 / self.num_pix_x * self.box_len[0] gal_pos_in_box[:, 1] = pos_indx_2 / self.num_pix_y * self.box_len[1] gal_pos_in_box[:, 2] = ( self.astropy_cosmo_fiducial.comoving_distance(z_gal).value - self.astropy_cosmo_fiducial.comoving_distance( self.z_ch.min() ).value ) else: if ra_gal.size > 0: (_, _, _, _, _, _, _, gal_pos_arr) = minimum_enclosing_box_of_lightcone( ra_gal, dec_gal, freq_gal, cosmo=self.astropy_cosmo_fiducial, return_coord=True, tile=False, rot_mat=self.rot_mat_sky_to_box, ) gal_pos_in_box = (gal_pos_arr - self.box_origin[None, :]).astype( real_dtype, copy=False ) all_sel = np.arange(gal_pos_in_box.shape[0]) gal_sel_batches = np.array_split(all_sel, self.batch_number) gal_sel_batches = [sel for sel in gal_sel_batches if sel.size > 0] if len(gal_sel_batches) == 0: shape = tuple(self.box_ndim.tolist()) gal_map_rg = np.zeros(shape, dtype=real_dtype) gal_weights_rg = np.zeros(shape, dtype=real_dtype) pixel_counts_gal_rg = np.zeros(shape, dtype=real_dtype) else: gal_map_rg = None gal_weights_rg = None pixel_counts_gal_rg = None for sel in gal_sel_batches: gal_map_i, gal_weights_i, pixel_counts_i = self._grid_gal_to_field( gal_pos_in_box, sel ) if gal_map_rg is None: gal_map_rg = np.zeros_like(gal_map_i) gal_weights_rg = np.zeros_like(gal_weights_i) pixel_counts_gal_rg = np.zeros_like(pixel_counts_i) gal_map_rg += gal_map_i gal_weights_rg += gal_weights_i pixel_counts_gal_rg += pixel_counts_i gal_map_rg = np.asarray(gal_map_rg, dtype=real_dtype) gal_weights_rg = np.asarray(gal_weights_rg, dtype=real_dtype) pixel_counts_gal_rg = np.asarray(pixel_counts_gal_rg, dtype=real_dtype) self.field_2 = gal_map_rg # only pixels sampled by the lightcone is used weights_g = (self.counts_in_box > 0).astype(real_dtype) self.weights_field_2 = weights_g self.weights_grid_2 = np.ones_like(self.field_2, dtype=real_dtype) self.mean_center_2 = True self.unitless_2 = True include_beam = np.array(self.include_beam) include_beam[1] = False self.include_beam = include_beam include_sky_sampling = np.array(self.include_sky_sampling) include_sky_sampling[1] = False self.include_sky_sampling = include_sky_sampling return gal_map_rg, gal_weights_rg, pixel_counts_gal_rg
def _grid_gal_to_field(self, gal_pos_in_box, partial_sel): gal_pos_i = gal_pos_in_box[partial_sel] ( gal_map_rg, gal_weights_rg, pixel_counts_gal_rg, ) = project_particle_to_regular_grid( gal_pos_i, self.box_len, self.box_ndim, grid_scheme=self.grid_scheme, compensate=False, # compensate should be at model level average=False, ) gal_map_rg2, _, _ = project_particle_to_regular_grid( gal_pos_i, self.box_len, self.box_ndim, grid_scheme=self.grid_scheme, compensate=False, # compensate should be at model level average=False, shift=self.interlace_shift, ) gal_map_rg = interlace_two_fields(gal_map_rg, gal_map_rg2, self.interlace_shift) real_dtype = self.real_dtype gal_map_rg = np.asarray(gal_map_rg, dtype=real_dtype) gal_weights_rg = np.asarray(gal_weights_rg, dtype=real_dtype) pixel_counts_gal_rg = np.asarray(pixel_counts_gal_rg, dtype=real_dtype) return gal_map_rg, gal_weights_rg, pixel_counts_gal_rg
[docs] def get_n_bar_correction(self): """ Calculate the number density correction for the galaxy catalogue. """ n_bar = self.ra_gal.size / self.survey_volume n_bar2 = ( (self.field_2 * self.weights_2).sum() / self.weights_2.sum() / np.prod(self.box_resol) ) return n_bar2 / n_bar
[docs] def ra_dec_z_for_coord_in_box(self, pos_in_box): """ Convert the coordinates in the box to ra, dec, z, and also return the comoving distance to the observer for each point. Parameters ---------- pos_in_box: array. The coordinates in the box. Returns ------- pos_ra: array. The ra of the points. pos_dec: array. The dec of the points. pos_z: array. The redshift of the points. pos_comov_dist: array. The comoving distance to the observer for each point. """ pos_arr = pos_in_box + self.box_origin rot_back = np.linalg.inv(self.rot_mat_sky_to_box) pos_arr = np.einsum("ij,aj->ai", rot_back, pos_arr) pos_comov_dist = np.sqrt(np.sum(pos_arr**2, axis=-1)) pos_z = self.z_as_func_of_comov_dist(pos_comov_dist) pos_ra, pos_dec = hp.vec2ang(pos_arr / pos_comov_dist[:, None], lonlat=True) return pos_ra, pos_dec, pos_z, pos_comov_dist
def _grid_field_to_sky_map_wcs( self, field, average=True, mask=True, wproj=None, num_pix_x=None, num_pix_y=None, los_sel=None, ): """ Grid a box field onto the WCS map cube (raster ``(num_pix_x, num_pix_y, n_ch)``). """ if wproj is None: wproj = self.wproj if num_pix_x is None: num_pix_x = self.num_pix_x if num_pix_y is None: num_pix_y = self.num_pix_y los_sel = ( np.arange(self.box_ndim[2], dtype=int) if los_sel is None else np.asarray(los_sel, dtype=int) ) expected_shape = (self.box_ndim[0], self.box_ndim[1], los_sel.size) if field.shape != expected_shape: raise ValueError( f"field shape {field.shape} does not match expected shape " f"{expected_shape} for los_sel size {los_sel.size}" ) x_vec = self.x_vec[0] y_vec = self.x_vec[1] z_vec = self.x_vec[2][los_sel] nx = x_vec.size ny = y_vec.size nz = z_vec.size nxyz = nx * ny * nz pos_xyz = np.empty((nxyz, 3), dtype=self.real_dtype) pos_xyz[:, 0] = np.repeat(x_vec, ny * nz) pos_xyz[:, 1] = np.tile(np.repeat(y_vec, nz), nx) pos_xyz[:, 2] = np.tile(z_vec, nx * ny) pos_ra, pos_dec, pos_z, _ = self.ra_dec_z_for_coord_in_box(pos_xyz) pos_indx_1, pos_indx_2 = radec_to_indx(pos_ra, pos_dec, wproj, to_int=False) pos_indx_z = redshift_to_freq(pos_z) - self.nu.min() pos_indx_array = np.empty((nxyz, 3), dtype=self.real_dtype) pos_indx_array[:, 0] = pos_indx_1 pos_indx_array[:, 1] = pos_indx_2 pos_indx_array[:, 2] = pos_indx_z map_bin, _, count_bin = project_particle_to_regular_grid( pos_indx_array, np.array([num_pix_x, num_pix_y, self.nu.max() - self.nu.min()]), np.array([num_pix_x, num_pix_y, self.nu.size]), particle_mass=field.ravel(), average=average, compensate=False, grid_scheme="nnb", ) if mask: map_bin *= self.W_HI return map_bin, count_bin def _grid_field_to_sky_map_healpix( self, field, average=True, mask=True, los_sel=None, ): """ Grid a box field onto the sparse HEALPix map ``(len(pixel_id), n_ch)``. Voxel centres are assigned to HEALPix pixels at ``self.hp_nside`` and to frequency channels via :func:`~meer21cm.util.find_ch_id`. Only voxels whose pixel lies in ``self.pixel_id`` contribute. """ los_sel = ( np.arange(self.box_ndim[2], dtype=int) if los_sel is None else np.asarray(los_sel, dtype=int) ) expected_shape = (self.box_ndim[0], self.box_ndim[1], los_sel.size) if field.shape != expected_shape: raise ValueError( f"field shape {field.shape} does not match expected shape " f"{expected_shape} for los_sel size {los_sel.size}" ) nside = int(self.hp_nside) pixel_id = np.asarray(self.pixel_id, dtype=np.int64) n_out = pixel_id.size n_ch = int(self.nu.size) order = np.argsort(pixel_id, kind="mergesort") pix_sorted = pixel_id[order] x_vec = self.x_vec[0] y_vec = self.x_vec[1] z_vec = self.x_vec[2][los_sel] nx = x_vec.size ny = y_vec.size nz = z_vec.size nxyz = nx * ny * nz pos_xyz = np.empty((nxyz, 3), dtype=self.real_dtype) pos_xyz[:, 0] = np.repeat(x_vec, ny * nz) pos_xyz[:, 1] = np.tile(np.repeat(y_vec, nz), nx) pos_xyz[:, 2] = np.tile(z_vec, nx * ny) pos_ra, pos_dec, pos_z, _ = self.ra_dec_z_for_coord_in_box(pos_xyz) hpix = hp.ang2pix(nside, pos_ra, pos_dec, lonlat=True).astype(np.int64) pos_nu = np.asarray(redshift_to_freq(pos_z), dtype=np.float64) ch_idx = find_ch_id(pos_nu, self.nu) valid_ch = (ch_idx >= 0) & (ch_idx < n_ch) hpix = hpix[valid_ch] ch_idx = ch_idx[valid_ch] mass = np.asarray(field, dtype=self.real_dtype).ravel()[valid_ch] row_s = np.searchsorted(pix_sorted, hpix) # Do not index pix_sorted[row_s] when row_s == n_out (past end of searchsorted). in_bounds = row_s < n_out in_survey = np.zeros(hpix.shape, dtype=bool) in_survey[in_bounds] = pix_sorted[row_s[in_bounds]] == hpix[in_bounds] row = order[row_s[in_survey]] ch_idx = ch_idx[in_survey] mass = mass[in_survey] map_sum = np.zeros((n_out, n_ch), dtype=self.real_dtype) cnt = np.zeros((n_out, n_ch), dtype=self.real_dtype) np.add.at(map_sum, (row, ch_idx), mass) np.add.at(cnt, (row, ch_idx), 1.0) if average: with np.errstate(divide="ignore", invalid="ignore"): map_bin = np.where(cnt > 0, map_sum / cnt, 0.0) else: map_bin = map_sum count_bin = cnt if mask: map_bin *= self.W_HI return map_bin, count_bin
[docs] def grid_field_to_sky_map( self, field, average=True, mask=True, wproj=None, num_pix_x=None, num_pix_y=None, los_sel=None, ): """ Grid a field in the rectangular box onto the sky. Parameters ---------- field: array. The field in the box to be gridded. average: bool, default True. Whether the field grids are averaged or summed into sky pixels. mask: bool, default True. If True, the sky map is then masked by the survey selection function. wproj: :class:`astropy.wcs.WCS` object, default None. **WCS only.** The WCS for the output sky map. Default uses ``self.wproj``. num_pix_x: int, default None. **WCS only.** Number of pixels along the first map axis. Default ``self.num_pix_x``. num_pix_y: int, default None. **WCS only.** Number of pixels along the second map axis. Default ``self.num_pix_y``. los_sel: array-like, default None. Optional selector of line-of-sight (last-axis) indices represented by ``field``. If None, ``field`` is assumed to contain all LOS slices with shape ``self.box_ndim``. If provided, ``field`` must have shape ``(self.box_ndim[0], self.box_ndim[1], len(los_sel))`` and the projected output still uses the full LOS map axis (``self.nu.size``), so multiple chunked calls can be merged by accumulating mass/count outputs. Returns ------- map_bin: array. The output sky map (WCS ``(nx, ny, n_ch)`` or HEALPix ``(n_pix, n_ch)``). count_bin: array. Per-cell accumulation used for averaging (WCS) or voxel counts (HEALPix). """ fmt = self.skymap.format if fmt == "wcs": return self._grid_field_to_sky_map_wcs( field, average=average, mask=mask, wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, los_sel=los_sel, ) if fmt == "healpix": return self._grid_field_to_sky_map_healpix( field, average=average, mask=mask, los_sel=los_sel, )
[docs] def gen_random_poisson_galaxy(self, sel=None, num_g_rand=None, seed=None): """ Generate a random galaxy catalogue from the map cube following the Poisson distribution. The generation of the sample does not use the instance seed if not explicitly passed and will use a random one otherwise. If you want to generate multiple random catalogues, you need to set a different seed manually for each catalogue. Parameters ---------- sel: array, default None The selection function of the galaxy catalogue. If None, use the class attribute ``self.W_HI``. num_g_rand: int, default None The number of galaxies to generate. Default uses the number of galaxies stored in the data in `self.ra_gal`. seed: int, default None The seed for the random number generator. Returns ------- ra_rand: np.ndarray. The ra of the random galaxies. dec_rand: np.ndarray. The dec of the random galaxies. freq_rand: np.ndarray. The ``frequency`` of the random galaxies. The redshift of the random galaxies can be calculated by ``meer21cm.util.redshift_to_freq(z_rand)``. """ if sel is None: sel = self.W_HI[:, :, 0] if num_g_rand is None: num_g_rand = self.ra_gal.size rng = np.random.default_rng(seed=seed) ra_rand = self.ra_map[sel] dec_rand = self.dec_map[sel] ra_rand = rng.choice(ra_rand, size=num_g_rand, replace=True) dec_rand = rng.choice(dec_rand, size=num_g_rand, replace=True) rand_disp = rng.uniform( -self.pix_resol / 2, self.pix_resol / 2, size=num_g_rand * 2 ) ra_rand += rand_disp[:num_g_rand] dec_rand += rand_disp[num_g_rand:] # in future this should be a dNdz cov_dist_limit = [ self.astropy_cosmo_fiducial.comoving_distance(self.z_ch.min()) .to("Mpc") .value, self.astropy_cosmo_fiducial.comoving_distance(self.z_ch.max()) .to("Mpc") .value, ] cov_dist_rand = rng.uniform( cov_dist_limit[0], cov_dist_limit[1], size=num_g_rand ) z_rand = self.z_as_func_of_comov_dist(cov_dist_rand) return ra_rand, dec_rand, redshift_to_freq(z_rand)
[docs] def apply_taper_to_field( self, field, taper_func=None, axis=[ 2, ], ): """ Apply a taper to the field, by multiplying the taper function to the corresponding weights of the field. Parameters ---------- field: int. The index of the field to be tapered, either 1 or 2. taper_func: function, default None. The taper function. Default uses the stored ``self.taper_func``. axis: list, default [2,]. The axis to apply the taper to. Default is the z-axis which is approximately the los. """ if taper_func is None: taper_func = self.taper_func taper_i = [taper_func(self.box_ndim[i]) for i in range(3)] taper = 1 for i in axis: slice_list_i = [None, None, None] slice_list_i[i] = slice(None, None, None) slice_list_i = tuple(slice_list_i) taper = taper * taper_i[i][slice_list_i] setattr(self, f"weights_{field}", getattr(self, f"weights_{field}") * taper)
@property @tagging("cosmo_fiducial", "nu", "mock", "box") def box_voxel_redshift(self): """ The redshift of each voxel in the rectangular box. """ if self._box_voxel_redshift is None: return np.ones(self.box_ndim) * self.z return self._box_voxel_redshift