Source code for meer21cm.mock

"""
This module contains the classes for generating mock intensity mapping data, galaxy catalogues for cross-correlation,
and HI galaxy catalogues for HI emission line observations.
"""

import numpy as np
from scipy.interpolate import interp1d
from astropy.cosmology import Planck18
from numpy.random import default_rng
from astropy import constants
from .util import (
    get_wcs_coor,
    radec_to_indx,
    freq_to_redshift,
    tagging,
    busy_function_simple,
    himf_pars_jones18,
    sample_from_dist,
    center_to_edges,
    tully_fisher,
    redshift_to_freq,
    find_ch_id,
    mass_intflux_coeff,
    Obuljen18,
    create_udres_wproj,
    sample_map_from_highres,
    angle_in_range,
    get_nd_slicer,
    real_dtype_from_array,
)
from .power import PowerSpectrum, Specification, power_weights_renorm
from .telescope import weighted_convolution
from halomod import TracerHaloModel as THM
import warnings
import inspect
import logging

logger = logging.getLogger(__name__)


[docs] class MockSimulation(PowerSpectrum): """ The class for generating mock intensity mapping data cube and galaxy catalogues for cross-correlation. Parameters ---------- density: str, default "lognormal" The density distribution of the mock field. Can be "lognormal" or "gaussian". num_discrete_source: int, default 100 The number of discrete tracer sources. discrete_base_field: int, default 2 The tracer (1 or 2) field to sample the discrete tracer positions. highres_sim: int, default None For WCS maps, this factor is **deprecated** in :meth:`propagate_mock_field_to_data` (a warning is issued and native resolution is used). It still controls the optional high-resolution path in :meth:`HIGalaxySimulation.propagate_hi_profile_to_map`. parallel_plane: bool, default True Whether to simulate the mock field in the parallel-plane limit (i.e. k_z = k_parallel). Only used if ``rsd_from_field`` is True, otherwise parallel-plane is hard-coded to True. rsd_from_field: bool, default False Whether to generate the RSD effect at field level. If False, the RSD effect is generated at power spectrum level. discrete_source_dndz: function, default np.ones_like The redshift distribution of the discrete tracer sources. Must be a function of redshift. Note that the overall number of discrete sources is given by ``self.num_discrete_source``, so only the shape of the dndz is used, not the normalization. **params: dict Additional parameters to be passed to the base class :class:`meer21cm.power.PowerSpectrum`. """ def __init__( self, density="lognormal", num_discrete_source=100, discrete_base_field=2, highres_sim=None, parallel_plane=True, rsd_from_field=False, discrete_source_dndz=np.ones_like, mock_amp_1=None, mock_amp_2=None, **params, ): super().__init__(**params) self.density = density.lower() init_attr = [ "_x_start", "_y_start", "_z_start", "_x_len", "_y_len", "_z_len", "_rot_mat_sky_to_box", "_pix_coor_in_cartesian", "_box_len", "_box_resol", "_box_ndim", "_mock_matter_field_r", "_mock_matter_field", "_mock_tracer_position_in_box", "_mock_tracer_position_in_radecz", "_mock_tracer_field_1", "_mock_tracer_field_2", "_mock_tracer_field_1_r", "_mock_tracer_field_2_r", "_mock_kaiser_field_k_matter", "_mock_kaiser_field_k_tracer_1", "_mock_kaiser_field_k_tracer_2", "_mock_velocity_u_matter", "_mock_velocity_u_tracer_1", "_mock_velocity_u_tracer_2", "_mock_amp_1", "_mock_amp_2", ] for attr in init_attr: setattr(self, attr, None) self.num_discrete_source = num_discrete_source self.discrete_base_field = discrete_base_field self.highres_sim = highres_sim self.parallel_plane = parallel_plane self.rsd_from_field = rsd_from_field if not rsd_from_field and not parallel_plane: warnings.warn("rsd_from_field is False, parallel_plane will be ignored") self.discrete_source_dndz = discrete_source_dndz self.mock_amp_1 = mock_amp_1 self.mock_amp_2 = mock_amp_2 if self.true_cosmology != self.fiducial_cosmology: warnings.warn( "true_cosmology != fiducial_cosmology, " "box dimensions and model power will be inconsistent" ) def _iter_last_axis_batches(self, axis_size): """ Return stable last-axis batches for chunked processing. Always returns at least one non-empty batch so `batch_number=1` exercises the same loop path as `batch_number>1`. """ all_indx = np.arange(axis_size) return [ sel for sel in np.array_split(all_indx, self.batch_number) if sel.size > 0 ] def _iter_field_los_chunks(self, field): """ Iterate over LOS chunks of a 3D field. """ for sel in self._iter_last_axis_batches(field.shape[-1]): yield sel, field[..., sel] def _apply_weighted_convolution_in_batches( self, map_data, map_counts, beam_image=None, wproj=None, num_pix_x=None, num_pix_y=None, ): """ Apply beam convolution in channel batches. This keeps the same behavior for `batch_number=1` while allowing conservative channel-wise execution for larger batch counts. """ map_out = np.zeros_like(map_data) lazy_beam = beam_image is None for ch_sel in self._iter_last_axis_batches(map_data.shape[-1]): if lazy_beam: beam_chunk = self.get_beam_image( wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, cache=False, ch_sel=ch_sel, ) else: beam_chunk = beam_image[..., ch_sel] map_i, _ = weighted_convolution( map_data[..., ch_sel], beam_chunk, (map_counts[..., ch_sel] > 0).astype(float), ) map_out[..., ch_sel] = map_i return map_out def _project_field_chunk_to_sky_map( self, field_chunk, los_sel, average=True, wproj=None, num_pix_x=None, num_pix_y=None, ): """ Project one LOS chunk of a box field to the sky map grid. This helper provides a chunk-level boundary for external orchestration. """ return self.grid_field_to_sky_map( field_chunk, average=average, mask=False, wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, los_sel=los_sel, ) def _sample_tracer_positions_from_density_chunk( self, density_chunk, dndz_prob_chunk, z_sel, rng ): """ Sample tracer positions from one LOS chunk of density and dN/dz weights. """ weighted_density = density_chunk * dndz_prob_chunk n_per_cell = rng.poisson(weighted_density) x_chunk = np.meshgrid( self.x_vec[0], self.x_vec[1], self.x_vec[2][z_sel], indexing="ij" ) tracer_positions = np.array([x.flatten() for x in x_chunk]).T tracer_positions = tracer_positions.repeat(n_per_cell.flatten(), axis=0) if tracer_positions.size == 0: return tracer_positions tracer_positions += ( rng.uniform(-0.5, 0.5, size=(tracer_positions.shape[0], len(self.box_ndim))) * self.box_resol[None, :] ) if self.flat_sky: tracer_positions -= ( np.array(self.flat_sky_padding) * np.array(self.box_resol) )[None, :] return tracer_positions @property def tot_num_source_in_box(self): """ The total number of mock sources in the box needed to achieve ``self.num_discrete_source`` number of sources in the survey volume. Only used internally, no physical meaning. Note that if you change the simulation settings such as ``self.num_discrete_source``, ``self.discrete_source_dndz``, ``self.z_ch``, ``self.W_HI``, etc, this property will be automatically updated but the mock catalog is not. """ if self.flat_sky: dndz_arr = self.discrete_source_dndz(self.box_voxel_redshift) z_sel = (self.box_voxel_redshift >= self.z_ch.min()) & ( self.box_voxel_redshift <= self.z_ch.max() ) dndz_arr = dndz_arr[z_sel] dndz_arr /= dndz_arr.max() ratio_dndz = 1 / dndz_arr.mean() self._dndz_renorm = ( lambda z: self.discrete_source_dndz(z) / dndz_arr.max() * ratio_dndz ) ratio = ( np.prod(np.array(self.data.shape) + 2 * np.array(self.flat_sky_padding)) / self.W_HI.sum() ) return self.num_discrete_source * ratio else: nu_ext = center_to_edges(self.nu) z_ext = freq_to_redshift(nu_ext) volume_per_channel = ( ( self.W_HI[..., self.maximum_sampling_channel].sum() * self.pixel_area * (np.pi / 180) ** 2 ) / 3 * ( self.astropy_cosmo_true.comoving_distance(z_ext[:-1]) ** 3 - self.astropy_cosmo_true.comoving_distance(z_ext[1:]) ** 3 ).value ) dn_channel = self.discrete_source_dndz(self.z_ch) renorm = self.num_discrete_source / (dn_channel * volume_per_channel).sum() ratio = np.prod(self.box_len) / self.survey_volume renorm *= np.prod(self.box_len) / (self.num_discrete_source * ratio) self._dndz_renorm = lambda z: self.discrete_source_dndz(z) * renorm return self.num_discrete_source * ratio @property def highres_sim(self): """ Optional integer upsampling factor for :meth:`HIGalaxySimulation.propagate_hi_profile_to_map`. For WCS, :meth:`propagate_mock_field_to_data` ignores this value and emits a ``DeprecationWarning`` when it is not ``None``. """ return self._highres_sim @highres_sim.setter def highres_sim(self, value): self._highres_sim = value @property def parallel_plane(self): """ Whether the mock field is generated in the parallel-plane limit. """ return self._parallel_plane @parallel_plane.setter def parallel_plane(self, value): self._parallel_plane = value logger.debug( f"cleaning cache of {self.rsd_dep_attr} due to resetting parallel_plane" ) self.clean_cache(self.rsd_dep_attr) @property def rsd_from_field(self): """ If True, the kaiser rsd effect is generated at field level by calculating the corresponding peculiar velocity field. This allows the lognormal mock to go beyond the parallel-plane limit. If False, the kaiser rsd effect is generated at power spectrum level assuming parallel-plane, and then the field is generated from the anistropic power spectrum. """ return self._rsd_from_field @rsd_from_field.setter def rsd_from_field(self, value): self._rsd_from_field = value logger.debug( f"cleaning cache of {self.rsd_dep_attr} due to resetting rsd_from_field" ) self.clean_cache(self.rsd_dep_attr) @property def num_discrete_source(self): """ The total number of discrete tracer sources. Note that the final mock catalogue is not exactly this number, due to Poisson sampling errors. """ return self._num_discrete_source @num_discrete_source.setter def num_discrete_source(self, value): self._num_discrete_source = value logger.debug( f"cleaning cache of {self.discrete_dep_attr} due to resetting num_discrete_source" ) self.clean_cache(self.discrete_dep_attr) @property def discrete_source_dndz(self): """ The redshift kernel of the discrete tracer sources. Must be a function of redshift. ** Only the shape of the dndz is used, not the normalization. ** The overall number of discrete sources is given by ``self.num_discrete_source``. Must be in the unit of per volume instead of unitless , i.e. the integral of the dndz over redshift must be a number density instead of number of sources. """ return self._discrete_source_dndz @discrete_source_dndz.setter def discrete_source_dndz(self, value): self._discrete_source_dndz = value logger.debug( f"cleaning cache of {self.discrete_dep_attr} due to resetting discrete_source_dndz" ) self.clean_cache(self.discrete_dep_attr) @property def discrete_base_field(self): """ Which tracer (1 or 2) field to sample the discrete tracer positions. """ return self._discrete_base_field @discrete_base_field.setter def discrete_base_field(self, value): if isinstance(value, str): value = int(value) if not value in [1, 2]: raise ValueError("discrete_base_field must be 1 or 2") self._discrete_base_field = value logger.debug( f"cleaning cache of {self.discrete_dep_attr} due to resetting discrete_base_field" ) self.clean_cache(self.discrete_dep_attr) @property @tagging("cosmo_model", "nu", "mock", "box", "seed") def mock_matter_field_r(self): """ The simulated dark matter density field in real space. """ if self._mock_matter_field_r is None: self.get_mock_matter_field_r() return self._mock_matter_field_r @property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "seed") def mock_matter_field(self): """ The simulated dark matter density field in redshift space. If ``self.kaiser_rsd`` is False, it is simply set to the real space field. """ if self._mock_matter_field is None: self.get_mock_matter_field() return self._mock_matter_field
[docs] def get_mock_matter_field_r(self): """ Generate the mock matter field in real space. """ logger.info( f"invoking {inspect.currentframe().f_code.co_name} to set __mock_matter_field_r" ) self._mock_matter_field_r = self.get_mock_field_r(bias=1)
[docs] def get_mock_field_r(self, bias=1): """ Generate a mock field in real space that follows the input matter power spectrum ``self.matter_power_spectrum_fnc`` * bias**2. """ logger.info( f"invoking {inspect.currentframe().f_code.co_name} with bias={bias}" ) if self.box_ndim is None: self.get_enclosing_box() self.propagate_field_k_to_model() power_array = self.auto_power_matter_model_r * bias**2 power_array = np.asarray(power_array, dtype=self.real_dtype) power_array[0, 0, 0] = 0.0 delta_x = self.get_mock_field_from_power(power_array) return delta_x
[docs] def get_mock_field_from_power(self, power_array): """ Generate a mock field that follows the input matter power spectrum ``power_array``. """ logger.info( f"invoking {inspect.currentframe().f_code.co_name} assuming {self.density} distribution" ) if self.density == "lognormal": backend = generate_lognormal_field elif self.density == "gaussian": backend = generate_gaussian_field else: raise ValueError( f"density must be 'lognormal' or 'gaussian', got {self.density}" ) power_array = np.asarray(power_array, dtype=self.real_dtype) delta_x = backend(self.box_ndim, self.box_len, power_array, self.seed) return delta_x
@property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "seed") def mock_velocity_u_matter(self): r""" The normalised peculiar velocity field in real space, defined as .. math:: u_i = -\frac{v_i}{\mathcal{H} f} where :math:`v_i` is the peculiar velocity, :math:`\mathcal{H}` is the conformal Hubble parameter, and :math:`f` is the growth rate. """ if self._mock_velocity_u_matter is None: self._mock_velocity_u_matter = self.get_mock_velocity_u_field( mock_field=self.mock_matter_field_r ) return self._mock_velocity_u_matter @property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "tracer_1", "seed") def mock_velocity_u_tracer_1(self): """ The normalised peculiar velocity field used for the first tracer. While the peculiar velocity field is only dependent on the matter field, note that if you simulate lognormal tracer fields, the two fields are not exactly correlated. One simple way to think about it is that field_1/bias_1 is not equal to field_2/bias_2, because both fields range from -1 to +max, and similarly the mock matter field is also not field_1/bias_1 or field_2/bias_2. This is an intrinsic weakness of the lognormal mock that you should be aware of. As a result, for each tracer the velocity field should be recalculated based on field_i/tracer_bias_i. """ if self._mock_velocity_u_tracer_1 is None: self._mock_velocity_u_tracer_1 = self.get_mock_velocity_u_field( mock_field=self.mock_tracer_field_1_r / self.tracer_bias_1 ) return self._mock_velocity_u_tracer_1 @property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "tracer_2", "seed") def mock_velocity_u_tracer_2(self): """ The normalised peculiar velocity field used for the second tracer. See the docstring of :meth:`mock_velocity_u_tracer_1` for more details. """ if self._mock_velocity_u_tracer_2 is None: self._mock_velocity_u_tracer_2 = self.get_mock_velocity_u_field( mock_field=self.mock_tracer_field_2_r / self.tracer_bias_2 ) return self._mock_velocity_u_tracer_2
[docs] def get_mock_velocity_u_field(self, mock_field): r""" Generate the normalised peculiar velocity field in real space Parameters ---------- mock_field: np.ndarray The mock field in real space. Returns ------- u_r: np.ndarray The normalised peculiar velocity field in real space. """ logger.info(f"invoking {inspect.currentframe().f_code.co_name}") real_dtype = real_dtype_from_array(mock_field) delta_k = np.fft.rfftn(mock_field, norm="forward") complex_dtype = delta_k.dtype slicer = get_nd_slicer() with np.errstate(divide="ignore", invalid="ignore"): kvecoverk2 = np.array( [self.k_vec[i][slicer[i]] / self.kmode**2 for i in range(3)] ).astype(real_dtype, copy=False) kvecoverk2[:, self.kmode == 0] = 0 u_k = np.asarray(-1j, dtype=complex_dtype) * kvecoverk2 * delta_k[None] u_r = np.array( [ np.fft.irfftn(u_k[i], s=mock_field.shape, norm="forward") for i in range(3) ] ).astype(real_dtype, copy=False) return u_r
@property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "seed") def mock_kaiser_field_k_matter(self): """ The Kaiser rsd effect correction for the mock matter field in k-space. """ if self._mock_kaiser_field_k_matter is None: self.get_mock_kaiser_field_k(field="matter") return self._mock_kaiser_field_k_matter @property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "tracer_1", "seed") def mock_kaiser_field_k_tracer_1(self): """ The Kaiser rsd effect correction for the mock tracer field 1 in k-space. See the docstring of :meth:`get_mock_kaiser_field_k` for more details. """ if self._mock_kaiser_field_k_tracer_1 is None: self.get_mock_kaiser_field_k(field="tracer_1") return self._mock_kaiser_field_k_tracer_1 @property @tagging("cosmo_model", "nu", "mock", "box", "rsd", "tracer_2", "seed") def mock_kaiser_field_k_tracer_2(self): """ The Kaiser rsd effect correction for the mock tracer field 2 in k-space. See the docstring of :meth:`get_mock_kaiser_field_k` for more details. """ if self._mock_kaiser_field_k_tracer_2 is None: self.get_mock_kaiser_field_k(field="tracer_2") return self._mock_kaiser_field_k_tracer_2
[docs] def get_mock_kaiser_field_k(self, field): r""" Generate the Kaiser rsd effect for the mock matter field in k-space. In the parrallel-plane limit, the Kaiser rsd effect is given by: .. math:: \delta_{\rm rsd} = f \mu^2 \delta_k where :math:`f` is the growth rate, :math:`\mu` is the cosine of the angle between the wave vector and the line of sight, and :math:`\delta_k` is the Fourier transform of the real-space matter density field. This function returns :math:`\delta_{\rm rsd}` in k-space so that for any mock tracer field, the Kaiser effect can be applied by adding :math:`\delta_{\rm rsd}` to the real-space tracer field in k-space. """ u_r = getattr(self, f"mock_velocity_u_{field}") slicer = get_nd_slicer() real_dtype = real_dtype_from_array(u_r) if self.parallel_plane: box_coord_l = np.zeros((3,) + tuple(self.box_ndim), dtype=real_dtype) box_coord_l[-1] = np.asarray(1.0, dtype=real_dtype) else: box_coord = np.meshgrid(*self.x_vec, indexing="ij") box_coord = np.array(box_coord, dtype=real_dtype) + np.asarray( self.box_origin[:, None, None, None], dtype=real_dtype ) box_coord_l = box_coord / np.sqrt((box_coord**2).sum(axis=0))[None] self._box_coord_l = box_coord_l u_in_xhat = (u_r * box_coord_l).sum(axis=0) y_k = np.array( [np.fft.rfftn(u_in_xhat * box_coord_l[i], norm="forward") for i in range(3)] ) y_k_dot_k = np.array( [(y_k[i] * self.k_vec[i][slicer[i]]) for i in range(3)] ).sum(axis=0) delta_rsd_k = ( np.asarray(1j * self.f_growth_true, dtype=y_k_dot_k.dtype) * y_k_dot_k ) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting _mock_kaiser_field_k_{field} " f"with parallel_plane={self.parallel_plane}" ) setattr(self, f"_mock_kaiser_field_k_{field}", delta_rsd_k) return delta_rsd_k
[docs] def get_mock_matter_field(self): """ Generate the mock matter field in redshift space. """ logger.info(f"invoking {inspect.currentframe().f_code.co_name}, ") self._mock_matter_field = self.get_mock_field_in_redshift_space( delta_x=self.mock_matter_field_r, field="matter", sigma_v=0 )
[docs] def get_mock_field_in_redshift_space(self, delta_x, field, sigma_v): """ Generate the mock field in redshift space. Parameters ---------- delta_x: np.ndarray The mock field in real space. field: str The field to be simulated. Can be "matter" or "tracer_1" or "tracer_2". sigma_v: float The velocity dispersion of the tracer in km/s. Returns ------- delta_x: np.ndarray The mock field in redshift space. """ if self.kaiser_rsd: logger.info(f"invoking {inspect.currentframe().f_code.co_name}") delta_k = np.fft.rfftn(delta_x, norm="forward") delta_k += getattr(self, f"mock_kaiser_field_k_{field}") mumode = self.mumode fog = self.fog_term( self.deltav_to_deltar(sigma_v), kmode=self.kmode, mumode=mumode ) delta_k *= fog delta_x = np.fft.irfftn(delta_k, s=delta_x.shape, norm="forward") return np.asarray(delta_x, dtype=self.real_dtype)
@property def mock_amp_1(self): """ The overall amplitude of the mock tracer field 1 in each grid. This can be T_HI(z) at each grid for example, which is multiplied to the simulated density field to get the final mock field 1 ``self.mock_tracer_field_1``. If not given, default is to use ``self.mean_amp_1`` as the amplitude. """ if self._mock_amp_1 is None: return self.mean_amp_1 return self._mock_amp_1 @mock_amp_1.setter def mock_amp_1(self, value): self._mock_amp_1 = value return self._mock_amp_1 @property def mock_amp_2(self): """ The overall amplitude of the mock tracer field 2 in each grid. This can be T_HI(z) at each grid for example, which is multiplied to the simulated density field to get the final mock field 2 ``self.mock_tracer_field_2``. If not given, default is to use ``self.mean_amp_2`` as the amplitude. """ if self._mock_amp_2 is None: return self.mean_amp_2 return self._mock_amp_2 @mock_amp_2.setter def mock_amp_2(self, value): self._mock_amp_2 = value return self._mock_amp_2 @property @tagging("cosmo_model", "nu", "mock", "box", "tracer_1", "rsd", "seed") def mock_tracer_field_1(self): """ The simulated tracer field 1 in redshift space with unit if ``mock_amp_1`` or ``mean_amp_1`` is given. """ if self._mock_tracer_field_1 is None: self.get_mock_tracer_field(1) mean_amp = self.mock_amp_1 if isinstance(mean_amp, str): mean_amp = getattr(self, mean_amp) amp = np.asarray(mean_amp, dtype=self._mock_tracer_field_1.dtype) return self._mock_tracer_field_1 * amp @property @tagging("cosmo_model", "nu", "mock", "box", "tracer_2", "rsd", "seed") def mock_tracer_field_2(self): """ The simulated tracer field 2 in redshift space with unit if ``mock_amp_2`` or ``mean_amp_2`` is given. """ if self._mock_tracer_field_2 is None: self.get_mock_tracer_field(2) mean_amp = self.mock_amp_2 if isinstance(mean_amp, str): mean_amp = getattr(self, mean_amp) amp = np.asarray(mean_amp, dtype=self._mock_tracer_field_2.dtype) return self._mock_tracer_field_2 * amp @property @tagging("cosmo_model", "nu", "mock", "box", "tracer_1", "seed") def mock_tracer_field_1_r(self): """ The simulated tracer field 1 **unitsless density contrast** in real space. """ if self._mock_tracer_field_1_r is None: self.get_mock_tracer_field_r(1) return self._mock_tracer_field_1_r @property @tagging("cosmo_model", "nu", "mock", "box", "tracer_2", "seed") def mock_tracer_field_2_r(self): """ The simulated tracer field 2 **unitsless density contrast** in real space. """ if self._mock_tracer_field_2_r is None: self.get_mock_tracer_field_r(2) return self._mock_tracer_field_2_r
[docs] def get_mock_tracer_field_r(self, tracer_i): """ Generate the mock tracer field in real space. Parameters ---------- tracer_i: int The index of the tracer. Can be 1 or 2. Returns ------- delta_x: np.ndarray The mock tracer field in real space. """ delta_x = self.get_mock_field_r(bias=getattr(self, f"tracer_bias_{tracer_i}")) logger.info( f"{inspect.currentframe().f_code.co_name}: " f"seeting _mock_tracer_field_{tracer_i}_r" ) setattr(self, f"_mock_tracer_field_{tracer_i}_r", delta_x) return delta_x
[docs] def get_mock_tracer_field(self, tracer_i): """ Generate the mock tracer field in redshift space. Parameters ---------- tracer_i: int The index of the tracer. Can be 1 or 2. Returns ------- delta_x: np.ndarray The mock tracer field in redshift space. """ if self.rsd_from_field and self.kaiser_rsd: delta_x = self.get_mock_field_in_redshift_space( delta_x=getattr(self, f"mock_tracer_field_{tracer_i}_r"), field=f"tracer_{tracer_i}", sigma_v=getattr(self, f"sigma_v_{tracer_i}"), ) elif self.kaiser_rsd: if self.box_ndim is None: self.get_enclosing_box() power_array = getattr(self, f"auto_power_tracer_{tracer_i}_model_noobs") delta_x = self.get_mock_field_from_power(power_array) else: delta_x = getattr(self, f"mock_tracer_field_{tracer_i}_r") logger.info( f"{inspect.currentframe().f_code.co_name}: setting _mock_tracer_field_{tracer_i}" ) setattr(self, f"_mock_tracer_field_{tracer_i}", delta_x) return delta_x
@property @tagging( "cosmo_model", "nu", "mock", "box", "tracer_1", "tracer_2", "discrete", "rsd", "seed", ) def mock_tracer_position_in_box(self): """ The simulated tracer positions in the box in real space. """ if self._mock_tracer_position_in_box is None: self.get_mock_tracer_position_in_box(self.discrete_base_field) return self._mock_tracer_position_in_box
[docs] def get_mock_tracer_position_in_box(self, tracer_i, density_field=None): """ Function to retrieve the tracer positions in redshift space. Modified from ``powerbox``. Parameters ---------- tracer_i: int The index of the tracer. Can be 1 or 2. density_field: np.ndarray, default None The density field of the tracer. If None, the simulated mock tracer field will be used. Returns ------- tracer_positions: np.ndarray The tracer positions in the rectangular box. """ rng = default_rng(self.seed) # note that `_mock...` does not have mean amplitude so this is what # we should be using instead of `mock...`, this is just to invoke # the simulation getattr(self, "mock_tracer_field_" + str(tracer_i)) # now actually getting the underlying overdensity # if self.rsd_from_field: # density_field = getattr(self, "_mock_tracer_field_" + str(tracer_i) + "_r") + 1 # else: # density_field = getattr(self, "_mock_tracer_field_" + str(tracer_i)) + 1 if density_field is None: density_field = getattr(self, "_mock_tracer_field_" + str(tracer_i)) density_field = np.array(density_field, copy=True) density_field += np.asarray(1.0, dtype=density_field.dtype) num_g = self.tot_num_source_in_box norm_before = density_field.sum() mask = (density_field > 0).astype(density_field.dtype) mask_renorm = np.sqrt(power_weights_renorm(mask, mask)) density_field[density_field < 0] = 0 # some cheats to renorm the density field density_field_new = density_field ** (mask_renorm) norm = density_field_new.sum() density_field = density_field ** (norm / norm_before) norm = density_field.sum() if norm > 0: density_field *= np.asarray(num_g / norm, dtype=density_field.dtype) else: density_field[:] = 0 tracer_positions_list = [] for z_sel, density_chunk in self._iter_field_los_chunks(density_field): dndz_prob_chunk = self._dndz_renorm(self.box_voxel_redshift[..., z_sel]) tracer_positions_i = self._sample_tracer_positions_from_density_chunk( density_chunk=density_chunk, dndz_prob_chunk=dndz_prob_chunk, z_sel=z_sel, rng=rng, ) if tracer_positions_i.size > 0: tracer_positions_list.append(tracer_positions_i) if len(tracer_positions_list) == 0: tracer_positions = np.zeros((0, len(self.box_ndim)), dtype=self.real_dtype) else: tracer_positions = np.concatenate(tracer_positions_list, axis=0) tracer_positions = np.asarray(tracer_positions, dtype=self.real_dtype) # for some reason I can not get this to work # if self.rsd_from_field and self.kaiser_rsd: # box_coord_l = self._box_coord_l # distance_shift = ( # -self.f_growth * # box_coord_l * # (box_coord_l * # getattr(self, f"mock_velocity_u_tracer_{tracer_i}") # ).sum(axis=0)[None] # ).reshape((3,-1)).T # tracer_positions += distance_shift[tracer_which_cell] logger.info( f"{inspect.currentframe().f_code.co_name}: " "setting _mock_tracer_position_in_box" ) self._mock_tracer_position_in_box = tracer_positions
@property @tagging( "cosmo_model", "nu", "mock", "box", "tracer_1", "tracer_2", "discrete", "rsd", "seed", ) def mock_tracer_position_in_radecz(self): """ The simulated tracer positions projected onto the grid. The tracers outside the binary selection window ``map_has_sampling`` or outside the frequency range can be trimmed off. Furthermore, excess tracers are also excluded. The selection function is stored at ``inside_range``. Returns a list in the order of (ra,dec,z,inside_range). """ if self._mock_tracer_position_in_radecz is None: self.get_mock_tracer_position_in_radecz() return self._mock_tracer_position_in_radecz @property def ra_mock_tracer(self): """ The RA coordinate of mock galaxies on the sky """ return self.mock_tracer_position_in_radecz[0] @property def dec_mock_tracer(self): """ The Dec coordinate of mock galaxies on the sky """ return self.mock_tracer_position_in_radecz[1] @property def z_mock_tracer(self): """ The redshift of mock galaxies """ return self.mock_tracer_position_in_radecz[2] @property def mock_inside_range(self): """ Whether the mock galaxies are inside the survey area and frequency range """ return self.mock_tracer_position_in_radecz[3]
[docs] def get_mock_tracer_position_in_radecz(self): """ Project the mock tracer positions in the rectangular box to the sky coordinates and redshifts. """ if self.flat_sky: self._mock_tracer_comov_dist = ( self.mock_tracer_position_in_box[:, -1] + self.astropy_cosmo_true.comoving_distance(self.z_ch.min()).value ) z_mock_tracer = self.z_as_func_of_comov_dist(self._mock_tracer_comov_dist) pos_indx_1 = ( self.mock_tracer_position_in_box[:, 0] / self.box_len[0] * self.num_pix_x ) pos_indx_2 = ( self.mock_tracer_position_in_box[:, 1] / self.box_len[1] * self.num_pix_y ) ra_mock_tracer, dec_mock_tracer = get_wcs_coor( self.wproj, pos_indx_1, pos_indx_2 ) else: ( ra_mock_tracer, dec_mock_tracer, z_mock_tracer, tracer_comov_dist, ) = self.ra_dec_z_for_coord_in_box(self.mock_tracer_position_in_box) self._mock_tracer_comov_dist = tracer_comov_dist freq_tracer = redshift_to_freq(z_mock_tracer) tracer_ch_id = find_ch_id(freq_tracer, self.nu) # num_ch id is for tracer outside the frequency range z_sel = tracer_ch_id < len(self.nu) radec_sel = ( angle_in_range(ra_mock_tracer, self.ra_range[0], self.ra_range[1]) * (dec_mock_tracer > self.dec_range[0]) * (dec_mock_tracer < self.dec_range[1]) ) inside_range = z_sel * radec_sel mock_inside_range = inside_range logger.info( f"invoking {inspect.currentframe().f_code.co_name} with flat_sky={self.flat_sky}: " "setting _mock_tracer_position_in_radecz" ) self._mock_tracer_position_in_radecz = ( ra_mock_tracer, dec_mock_tracer, z_mock_tracer, mock_inside_range, )
[docs] def propagate_mock_tracer_to_gal_cat(self, trim=True): """ Propagate the mock tracer positions to the galaxy data catalogue. If trim, only tracers inside the ra-dec-z range will be propagated. Parameters ---------- trim: bool, default True If True, only the mock tracer positions inside the ra-dec-z range will be propagated. """ ra, dec, z, inside_range = self.mock_tracer_position_in_radecz inside_range = inside_range.copy() # if False, inside_range is all True inside_range = (inside_range + 1 - trim) > 0 self._ra_gal = ra[inside_range] self._dec_gal = dec[inside_range] self._z_gal = z[inside_range] logger.info( f"{inspect.currentframe().f_code.co_name}: " f"setting ra_gal, dec_gal, z_gal with trim={trim}" )
def _propagate_mock_field_to_data_wcs(self, field, beam=True, average=True): """ Project a box mock field onto the native WCS map grid (optionally beam-smooth). """ if self.sigma_beam_ch is None: beam = False wproj = self.wproj num_pix_x = self.num_pix_x num_pix_y = self.num_pix_y if self.flat_sky: field_dtype = real_dtype_from_array(field) map_out = np.zeros( (num_pix_x, num_pix_y, self.nu.size), dtype=field_dtype, ) for ch_sel, field_chunk in self._iter_field_los_chunks(field): map_out[..., ch_sel] = field_chunk map_counts = np.ones_like(map_out, dtype=field_dtype) else: map_out = None map_counts = None for ch_sel, field_chunk in self._iter_field_los_chunks(field): map_i, counts_i = self._project_field_chunk_to_sky_map( field_chunk, ch_sel, average=average, wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, ) if map_out is None: map_out = np.zeros_like(map_i) map_counts = np.zeros_like(counts_i) if average: map_out += map_i * counts_i else: map_out += map_i map_counts += counts_i del map_i, counts_i if average: with np.errstate(divide="ignore", invalid="ignore"): map_out = np.nan_to_num(map_out / map_counts) if beam: map_out = self._apply_weighted_convolution_in_batches( map_out, map_counts, beam_image=None, wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, ) return map_out def _propagate_mock_field_to_data_healpix(self, field, beam=True, average=True): """ Project a box mock field onto the sparse HEALPix map (optional harmonic beam smoothing). Gridding uses :meth:`~meer21cm.power.PowerSpectrum.grid_field_to_sky_map` on LOS chunks; sums and voxel counts are merged like the curved-sky WCS path. Beam convolution uses :meth:`~meer21cm.dataanalysis.Specification.convolve_data` with ``kernel=None`` (harmonic smoothing). """ if self.sigma_beam_ch is None: beam = False map_out = None map_counts = None for ch_sel, field_chunk in self._iter_field_los_chunks(field): map_i, counts_i = self.grid_field_to_sky_map( field_chunk, average=False, mask=False, los_sel=ch_sel, ) if map_out is None: map_out = np.zeros_like(map_i) map_counts = np.zeros_like(counts_i) map_out += map_i map_counts += counts_i if average: with np.errstate(divide="ignore", invalid="ignore"): map_out = np.nan_to_num(map_out / map_counts) if beam: wt = (map_counts > 0).astype(self.real_dtype) map_out, _ = self.convolve_data( kernel=None, data=map_out, weights=wt, assign_to_self=False, ) return map_out
[docs] def propagate_mock_field_to_data( self, field, beam=True, highres_sim=None, average=True ): """ Grid the mock tracer field onto the sky map cube. Parameters ---------- field: np.ndarray The mock tracer field to be gridded. beam: bool, default True Whether to convolve the beam model to the sky map. highres_sim: int, default None **Deprecated for WCS.** If this argument or ``self.highres_sim`` is not ``None`` while the sky map uses WCS, a ``DeprecationWarning`` is emitted and the factor is ignored; gridding always uses the native map resolution. average: bool, default True If True, then the map pixel value will be the average of the rectangular box values (i.e. the mock field is in temperature units). If False, then the map pixel value will be the sum of the rectangular box values (i.e. the mock field is in flux density units). Returns ------- np.ndarray The output sky map cube. """ fmt = self.skymap.format if fmt == "wcs": resolved = highres_sim if highres_sim is not None else self.highres_sim if resolved is not None: warnings.warn( "highres_sim is deprecated for WCS sky maps and is ignored in " "propagate_mock_field_to_data; use native-resolution gridding.", DeprecationWarning, stacklevel=2, ) return self._propagate_mock_field_to_data_wcs( field, beam=beam, average=average ) if fmt == "healpix": return self._propagate_mock_field_to_data_healpix( field, beam=beam, average=average )
[docs] class HIGalaxySimulation(MockSimulation): """ The class for generating mock HI galaxy catalogues for HI emission line observations. Parameters ---------- no_vel: bool, default True If True, HI sources will have no velocity width, and the total flux will be allocated to the central frequency channel. tf_slope: float, default None The slope of the Tully-Fisher relation when ``no_vel`` is False. tf_zero: float, default None The intercept of the Tully-Fisher relation when ``no_vel`` is False. halo_model: :class:`halomod.TracerHaloModel`, default None The halo model to be used. Only used if ``hi_mass_from`` is "hod". hi_mass_from: str, default "hod" The method for calculating the HI mass of the mock tracers. Can be "hod" or "himf". himf_pars: dict, default None The parameters for the HI mass function. Only used if ``hi_mass_from`` is "himf". num_ch_ext_on_each_side: int, default 5 The number of frequency channels to be extended on each side of the central frequency channel when generating the HI profile. A good rule of thumb is 200km/s over ``self.vel_resol``. **params: dict Additional parameters to be passed to the base class :class:`meer21cm.mock.MockSimulation`. """ def __init__( self, no_vel=True, tf_slope=None, tf_zero=None, halo_model=None, hi_mass_from="hod", himf_pars=None, num_ch_ext_on_each_side=5, **params, ): super().__init__(**params) # has to have tracer 2 for sim if self.tracer_bias_2 is None: self.tracer_bias_2 = 1.0 self.no_vel = no_vel self.tf_slope = tf_slope self.tf_zero = tf_zero if halo_model is None: halo_model = THM( cosmo_model=self.astropy_cosmo_true, z=self.z, hod_model=Obuljen18, ) self.halo_model = halo_model self.hi_mass_from = hi_mass_from.lower() if himf_pars is None: himf_pars = himf_pars_jones18(self.h / 0.7) self.himf_pars = himf_pars self.num_ch_ext_on_each_side = num_ch_ext_on_each_side init_attr = [ "_halo_mass_mock_tracer", "_hi_mass_mock_tracer", "_hi_profile_mock_tracer", ] for attr in init_attr: setattr(self, attr, None) @property def hi_mass_from(self): """ Methods for calculating HI mass. Can either be 'hod' or 'himf'. """ return self._hi_mass_from @hi_mass_from.setter def hi_mass_from(self, value): self._hi_mass_from = value if "himass_dep_attr" in dir(self): self.clean_cache(self.himass_dep_attr) @property def no_vel(self): """ If True, HI sources will have no velocity width. """ return self._no_vel @no_vel.setter def no_vel(self, value): self._no_vel = value if "hivel_dep_attr" in dir(self): self.clean_cache(self.hivel_dep_attr) @property def tf_slope(self): """ Slope of Tully-Fisher relation. See :func:`meer21cm.util.tully_fisher`. """ return self._tf_slope @tf_slope.setter def tf_slope(self, value): self._tf_slope = value if "hivel_dep_attr" in dir(self): self.clean_cache(self.hivel_dep_attr) @property def tf_zero(self): """ The intercept of the T-F relation. See :func:`meer21cm.util.tully_fisher`. """ return self._tf_zero @tf_zero.setter def tf_zero(self, value): self._tf_zero = value if "hivel_dep_attr" in dir(self): self.clean_cache(self.hivel_dep_attr) @property def halo_model(self): """ A :class:`halomod.TracerHaloModel` object for storing the halo model. """ return self._halo_model @halo_model.setter def halo_model(self, value): self._halo_model = value if "hm_dep_attr" in dir(self): self.clean_cache(self.hm_dep_attr) @property @tagging( "hm", "cosmo_model", "nu", "mock", "box", "tracer_1", "tracer_2", "discrete", "rsd", "seed", ) def halo_mass_mock_tracer(self): """ The halo mass of the mock tracers in log10 M_sun/h. """ if self._halo_mass_mock_tracer is None: self.get_halo_mass_mock_tracer() return self._halo_mass_mock_tracer
[docs] def get_halo_mass_mock_tracer(self): """ Calculate the halo mass of the mock tracers. In this case, the discrete tracer is considered to be the halos instead of galaxies, so corroespondingly you should set ``self.num_discrete_source`` to the number of halos, and ``self.tracer_bias_2`` to the effective bias of the halo number density field. The sampling is then done by using the halo mass function stored in ``self.halo_model``. See ``hmf`` package for more details about the halo mass function. """ # propagate mock catalogue to galaxy catalogue self.propagate_mock_tracer_to_gal_cat() num_g_tot_in_mockrange = self.ra_gal.size num_g_tot_in_map = self.ra_mock_tracer.size hm = self.halo_model dlog10m = hm.dlog10m m_arr_inv = hm.m[::-1] # in (Mpc/h)^-3 n_halo = np.cumsum((hm.dndlog10m * dlog10m)[::-1]) # in Mpc^-3 n_halo *= self.h**3 dV_pix = self.pix_resol_in_mpc**2 * self.los_resol_in_mpc m_indx_mock = np.where( (n_halo * dV_pix * self.W_HI.sum()) < num_g_tot_in_mockrange )[0].max() logm_halo_min = np.log10(m_arr_inv)[m_indx_mock] dndlog10_fnc = interp1d(np.log10(hm.m), hm.dndlog10m) self._halo_mass_mock_tracer = sample_from_dist( dndlog10_fnc, logm_halo_min, np.log10(hm.m.max()), size=num_g_tot_in_map, seed=self.seed, )
@property @tagging( "hm", "cosmo_model", "nu", "mock", "box", "tracer_1", "tracer_2", "discrete", "rsd", "himass", "seed", ) def hi_mass_mock_tracer(self): """ The HI mass of the mock tracers in log10 M_sun. """ if self._hi_mass_mock_tracer is None: getattr(self, "get_hi_mass_mock_tracer" + "_" + self.hi_mass_from)() return self._hi_mass_mock_tracer
[docs] def get_hi_mass_mock_tracer_hod(self): """ Calculate the HI mass of the mock tracers using the halo occupation distribution based on the HOD model stored in ``self.halo_model``. See ``halomod`` package for more details about the HOD model. """ himass_g = ( self.halo_model.hod.total_occupation(10**self.halo_mass_mock_tracer) / self.h ) # no h self._hi_mass_mock_tracer = np.log10(himass_g)
@property @tagging( "hm", "cosmo_model", "nu", "mock", "box", "tracer_1", "tracer_2", "discrete", "rsd", "himass", "hivel", "seed", ) def hi_profile_mock_tracer(self): """ The emission line profiles of the mock tracers in Jansky """ if self._hi_profile_mock_tracer is None: self.get_hi_profile_mock_tracer() return self._hi_profile_mock_tracer
[docs] def get_hi_profile_mock_tracer(self): """ Calculate the emission line profiles of the mock tracers. The profiles are calculated using the HI mass of the mock tracers, and the Tully-Fisher relation stored in ``self.tf_slope`` and ``self.tf_zero``. """ hifluxd_ch = hi_mass_to_flux_profile( self.hi_mass_mock_tracer, self.z_mock_tracer, self.nu, self.tf_slope, self.tf_zero, cosmo=self.astropy_cosmo_true, seed=self.seed, no_vel=self.no_vel, num_ch_ext_on_each_side=self.num_ch_ext_on_each_side, ) self._hi_profile_mock_tracer = hifluxd_ch
[docs] def propagate_hi_profile_to_map( self, return_highres=False, beam=True, beam_image=None ): """ Project the ``hi_profile_mock_tracer`` onto sky maps and convolve with the beam (if ``beam``). If ``return_highres``, the returned map is the higher resolution map specified by ``highres_sim``. If not, the map will be downsampled to the original resolution. """ if self.sigma_beam_ch is None and beam_image is None: beam = False highres = self.highres_sim num_pix_x = self.num_pix_x num_pix_y = self.num_pix_y hifluxd_ch = self.hi_profile_mock_tracer if highres is None: # no need for downsampling return_highres = True wproj = self.wproj else: wproj = create_udres_wproj(self.wproj, highres) num_pix_x *= highres num_pix_y *= highres indx_0, indx_1 = radec_to_indx( self.ra_mock_tracer, self.dec_mock_tracer, wproj, to_int=False ) nu_ext = self.nu.copy() for i in range(self.num_ch_ext_on_each_side * 2): nu_ext = center_to_edges(nu_ext) indx_z = find_ch_id(redshift_to_freq(self.z_mock_tracer), nu_ext) num_ch_vel = hifluxd_ch.shape[0] // 2 hi_map_ext_in_jy = np.zeros( (num_pix_x, num_pix_y, len(nu_ext)), dtype=hifluxd_ch.dtype ) for i, indx_diff in enumerate( np.linspace(-num_ch_vel, num_ch_vel, 2 * num_ch_vel + 1).astype("int") ): hiflux_i = hifluxd_ch[i] indx_2 = indx_z + indx_diff indx_bins = [ np.arange(hi_map_ext_in_jy.shape[i] + 1) - 0.5 for i in range(3) ] map_i, _ = np.histogramdd( np.array([indx_0, indx_1, indx_2]).T, bins=indx_bins, weights=hiflux_i, ) hi_map_ext_in_jy += map_i # remove the excess channels used for taking into account of galaxies # whose centres are outside the frequency range but the profile tails are inside hi_map_in_jy = hi_map_ext_in_jy[ :, :, self.num_ch_ext_on_each_side : -self.num_ch_ext_on_each_side ] if beam: if beam_image is None: map_counts = np.ones_like(hi_map_in_jy, dtype=hi_map_in_jy.dtype) hi_map_in_jy = self._apply_weighted_convolution_in_batches( hi_map_in_jy, map_counts, beam_image=None, wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, ) else: map_counts = np.ones_like(hi_map_in_jy, dtype=hi_map_in_jy.dtype) hi_map_in_jy = self._apply_weighted_convolution_in_batches( hi_map_in_jy, map_counts, beam_image=beam_image, ) if return_highres: return hi_map_in_jy spec = Specification( wproj=wproj, num_pix_x=num_pix_x, num_pix_y=num_pix_y, ) ra_map = spec.ra_map dec_map = spec.dec_map hi_map_in_jy = sample_map_from_highres( hi_map_in_jy, ra_map, dec_map, self.wproj, self.num_pix_x, self.num_pix_y, average=False, ) return hi_map_in_jy
[docs] def hi_mass_to_flux_profile( loghimass, z_g, nu, tf_slope=None, tf_zero=None, cosmo=Planck18, seed=None, num_ch_ext_on_each_side=5, internal_step=1001, no_vel=True, ): r""" Convert HI mass to emission line profile. The relation between mass and flux (flux density integrated) of a HI source can be written as [1] .. math:: M_{\rm HI} = \frac{16\pi m_H}{3 h f_{21} A_{10}}\, D_L^2\, S, where :math:`M_{\rm HI}` is the HI mass, :math:`m_H` is the mass of neutral hydrogen atom, :math:`h` is the Planck constant, :math:`f_{21}` is the rest frequency of 21cm line, :math:`A_{10}` is the spontaneous emission rate of 21cm line, :math:`D_L` is the luminosity distance of the source, and :math:`S` is the flux. If ``no_vel`` is set to ``False``, the w50 parameters of the HI sources are calculated using a Tully-Fisher relation. Random inclinations and busy function parameters are assigned to the sources. The busy functions are then used as the emission profiles. The profiles are gridded into frequency channels. The gridded profile, ``hifluxd_ch`` has the shape of (ch_offset,num_source), with the zeroth axis corresponding to the channel offset, and varies from (-N,-N+1,...,0,...,N-1,N). Parameters ---------- loghimass: array The HI mass of sources in log10 solmar mass (**no h**). z_g: array The redshifts of the sources nu: array The frequency channels in Hz. tf_slope: float, default None. The slope of the T-F relation. See :func:`meer21cm.util.tully_fisher`. tf_zero: float, default None. The intercept of the T-F relation. See :func:`meer21cm.util.tully_fisher`. cosmo: optional, default Planck18. The cosmology used. seed: optional, default None. The seed number for rng. num_ch_ext_on_each_side: optional, default 5. Internal parameter, no need to change. internal_step: optional, default 1001. Internal parameter, decrease for less accuracy. Can be lower when velocity resolution is low. no_vel: optional, default True. If True, source will have no emission line profile and just a delta peak. Returns ------- hifluxd_ch: array The flux density of each source in Jy. References ---------- .. [1] Meyer et al., "Tracing HI Beyond the Local Universe", https://arxiv.org/abs/1705.04210 """ rng = np.random.default_rng(seed=seed) # internally allowing some extension so galaxies # outside the frequency range can be accurately calculated # so the tail of profile can be correctly distributed into the range nu_ext = nu.copy() for i in range(num_ch_ext_on_each_side * 2): nu_ext = center_to_edges(nu_ext) num_g = loghimass.size lumi_dist_g = cosmo.luminosity_distance(z_g).to("Mpc").value # in Jy Hz hiintflux_g = 10**loghimass / mass_intflux_coeff / lumi_dist_g**2 freq_resol = np.diff(nu).mean() # no velocity, just one channel if no_vel: hifluxd_ch = hiintflux_g / freq_resol return hifluxd_ch[None, :] # get w_50 hivel_g = tully_fisher(10**loghimass / 1.4, tf_slope, tf_zero, inv=True) incli_g = np.abs(np.sin(rng.uniform(0, 2 * np.pi, size=num_g))) # get width hiwidth_g = incli_g * hivel_g # in km/s/freq dvdf = (constants.c / nu).to("km/s").value.mean() vel_resol = dvdf * np.diff(nu).mean() # get the number of channels needed to plot the profile num_ch_vel = (int(hiwidth_g.max() / vel_resol)) // 2 + 2 # in terms of frequency offset freq_ch_arr = np.linspace(-num_ch_vel, num_ch_vel, 2 * num_ch_vel + 1) * freq_resol # busy function parameters busy_c = 10 ** (rng.uniform(-3, -2, size=num_g)) busy_b = 10 ** (rng.uniform(-2, 0, size=num_g)) # fine resolution velocity offset points for calculating the profile vel_int_arr = np.linspace(-hiwidth_g.max(), hiwidth_g.max(), num=internal_step) hiprofile_g = busy_function_simple( vel_int_arr[:, None], 1, busy_b, (busy_c / hiwidth_g)[None, :] * 2, hiwidth_g[None, :] / 2, ) # the sum over profile should give the integrated flux hiprofile_g = ( hiprofile_g / (np.sum(hiprofile_g, axis=0))[None, :] * hiintflux_g[None, :] ) gal_freq = redshift_to_freq(z_g) gal_which_ch = find_ch_id(gal_freq, nu_ext) # the fine resolution point in Hz freq_int_arr = ( vel_int_arr[None, :] * gal_freq[:, None] / constants.c.to("km/s").value ) hicumflux_g = np.cumsum(hiprofile_g, axis=0) # central freq of a galaxy relative to the frequency channel freq_start_pos = np.zeros_like(gal_freq) inrange_sel = gal_which_ch < nu_ext.size freq_start_pos[inrange_sel] = ( nu_ext[gal_which_ch[inrange_sel]] - gal_freq[inrange_sel] - freq_resol / 2 ) freq_gal_arr = freq_ch_arr[:, None] - freq_start_pos[None, :] freq_indx = np.argmin( np.abs(freq_gal_arr[:, :, None] - freq_int_arr[None, :, :]).reshape( (-1, freq_int_arr.shape[-1]) ), axis=1, ) freq_indx = freq_indx.reshape(freq_gal_arr.shape) hifluxd_ch = np.zeros(freq_indx.shape) for i in range(num_g): hifluxd_ch[:, i] = hicumflux_g[:, i][freq_indx[:, i]] hifluxd_ch = np.diff(hifluxd_ch, axis=0) hifluxd_ch = np.concatenate((np.zeros(num_g)[None, :], hifluxd_ch), axis=0) # from Jy Hz to Jy hifluxd_ch /= freq_resol return hifluxd_ch
[docs] def generate_gaussian_field( box_ndim, box_len, power_spectrum, seed, ps_has_volume=True ): r""" Generate a Gaussian field with the given power spectrum. If ``ps_has_volume`` is ``True``, the power spectrum is assumed to have the volume unit, usually in the unit of :math:`(Mpc)^{-3}`. The length unit of the box is arbitrary, as long as the unit in ``box_len`` and ``power_spectrum`` are consistent. Parameters ---------- box_ndim: array The number of grid points in each dimension. box_len: array The length of the box in each dimension. power_spectrum: array The input power spectrum. seed: int The seed for the random number generator. ps_has_volume: bool, default True If ``True``, the power spectrum is assumed to have the volume unit. Returns ------- delta_x: array The generated Gaussian field. """ ps = np.asarray(power_spectrum) real_dtype = real_dtype_from_array(ps) rng = np.random.default_rng(seed) noise_real = rng.normal(0, 1, box_ndim).astype(real_dtype, copy=False) noise_k = np.fft.rfftn(noise_real) ps = ps.astype(real_dtype, copy=True) if ps_has_volume: scale = np.asarray(np.prod(box_ndim) / np.prod(box_len), dtype=real_dtype) ps *= scale scaling = np.sqrt(ps) delta_k = noise_k * scaling # Inverse FFT to get real-valued Gaussian field delta_x = np.fft.irfftn(delta_k, axes=range(len(box_ndim)), s=box_ndim) return delta_x.astype(real_dtype, copy=False)
[docs] def generate_lognormal_field( box_ndim, box_len, power_spectrum, seed, ps_has_volume=True ): r""" Generate a lognormal field with the given power spectrum. If ``ps_has_volume`` is ``True``, the power spectrum is assumed to have the volume unit, usually in the unit of :math:`(Mpc)^{-3}`. The length unit of the box is arbitrary, as long as the unit in ``box_len`` and ``power_spectrum`` are consistent. Parameters ---------- box_ndim: array The number of grid points in each dimension. box_len: array The length of the box in each dimension. power_spectrum: array The input power spectrum. seed: int The seed for the random number generator. ps_has_volume: bool, default True If ``True``, the power spectrum is assumed to have the volume unit. Returns ------- delta_x: array The generated lognormal field. """ ps = np.asarray(power_spectrum) real_dtype = real_dtype_from_array(ps) ps = ps.astype(real_dtype, copy=True) if ps_has_volume: scale = np.asarray(np.prod(box_ndim) / np.prod(box_len), dtype=real_dtype) ps *= scale # Compute correlation function ξ_δ(r) via inverse FFT xi_delta = np.fft.irfftn(ps, axes=(0, 1, 2), s=box_ndim).real.astype( real_dtype, copy=False ) # Compute Gaussian correlation function ξ_G(r) = ln(1 + ξ_δ(r)) eps = np.asarray(1e-10, dtype=real_dtype) xi_G = np.log(np.asarray(1.0, dtype=real_dtype) + xi_delta + eps) # Avoid log(0) # Compute Gaussian power spectrum Delta_G(k) Delta_G = np.abs(np.fft.rfftn(xi_G)) delta_x_g = generate_gaussian_field( box_ndim, box_len, Delta_G, seed, ps_has_volume=False ) # Apply lognormal transformation sigma_sq = np.asarray(np.mean(Delta_G.ravel()[1:]), dtype=real_dtype) half = np.asarray(0.5, dtype=real_dtype) one = np.asarray(1.0, dtype=real_dtype) delta_x = np.exp(delta_x_g - sigma_sq * half) - one return delta_x.astype(real_dtype, copy=False)
[docs] def generate_colored_noise(x_size, x_len, power_spectrum, seed=None): """ Generate random 1D gaussian fluctuations following a specific spectrum. This is similar to ``colorednoise`` package for generating colored noise. It is simply wrapping the ``generate_gaussian_field`` function under the hood. Note that the Fourier convention used should be consistent with :py:mod:`np.fft`, and the power spectrum is dimensionless. Parameters ---------- x_size: int The number of sampling. x_len: float The **total length** of the sampling. power_spectrum: array The power spectrum of the random noise in Fourier space. seed: int, default None The seed number for random generator for sampling. If None, a random seed is used. Returns ------- rand_arr: float array. The random noise. """ rand_arr = generate_gaussian_field( x_size, x_len, power_spectrum, seed, ps_has_volume=False ) rand_arr -= rand_arr.mean() return rand_arr