"""
This module contains the transfer function analysis class and related functions.
In data analysis, you should have already used :class:`meer21cm.power.PowerSpectrum`
to calculate the power spectrum of the data,
or :class:`meer21cm.mock.MockSimulation` to generate the mock data for power spectrum estimation.
In that case, you can pass the :class:`meer21cm.power.PowerSpectrum` or :class:`meer21cm.mock.MockSimulation`
instance as an input to :class:`meer21cm.transfer.TransferFunction`.
The transfer function class then takes into account of the settings of the power spectrum instance,
and perform mock simulations to estimate the numerical transfer function,
keeping the consistency of the gridding, beam, and other settings.
Finally note that, the transfer function calculation supports parallelisation using MPI,
by using :class:`mpi4py.futures.MPIPoolExecutor`.
The default installation of meer21cm does not include ``mpi4py``,
and you need to configure MPI first (usually with openmpi or mpich), and then install ``mpi4py`` manually.
"""
import numpy as np
from .util import pca_clean, dft_matrix, inv_dft_matrix
from .mock import MockSimulation
from multiprocessing import Pool
from .power import PowerSpectrum
from typing import Callable
from scipy.interpolate import interp1d
[docs]
def fft_matrix(mat, norm="backward"):
r"""
Perform the Fourier transform of a matrix.
.. math::
\tilde{M} = \mathcal{F} M \mathcal{F}^{-1}
where :math:`\mathcal{F}` is the Fourier transform matrix.
See also :func:`meer21cm.util.dft_matrix`.
Parameters
----------
mat: np.ndarray
The matrix to be transformed.
norm: str, default "backward"
The normalization of the Fourier transform.
Returns
-------
mat_fft: np.ndarray
The Fourier transformed matrix.
"""
return (
dft_matrix(mat.shape[0], norm=norm)
[docs]
@ mat
@ inv_dft_matrix(mat.shape[0], norm=norm)
)
def get_pca_matrix(map, N_fg, weights, mean_center_map):
r"""
Get the PCA matrix that operates on the map data.
The eigendecomposition of the map data gives the eigenvectors :math:`v_{i}`,
which can be used to form the source mixing matrix A so that
.. math::
A = \{v_1, v_2, ..., v_{N_fg}\}
The PCA matrix is then
.. math::
R = I - A A^T
The residual data vector after cleaning is then
.. math::
r_{ija} = \sum_{b} R_{ab} m_{ijb}
where :math:`R_{ab}` is the PCA matrix, and :math:`m_{ijb}` is the uncleaned map data.
Parameters
----------
map: np.ndarray
The map data.
N_fg: int
The number of foreground components.
weights: np.ndarray
The weights of the map data.
mean_center_map: bool
Whether to mean center the map data.
Returns
-------
"""
_, A_mat = pca_clean(
map,
N_fg,
weights=weights,
mean_center=mean_center_map,
return_A=True,
ignore_nan=True,
)
R_mat = np.eye(map.shape[-1]) - A_mat @ A_mat.T
return np.nan_to_num(R_mat)
[docs]
def analytic_transfer_function(clean_mat_1, clean_mat_2=None):
r"""
Calculate the analytic transfer function of a clean matrix.
See Chen 2025 [1] for derivations.
For a foreground cleaning matrix :math:`R_{ab}`,
the residual data vector for power spectrum estimation is :math:`r_{ija} = \sum_b R_{ab} m_{ijb}`,
where i,j are the pixel indices, and a,b are the frequency indices.
Under the flat sky approximation, the signal loss, as well as the mode-mixing,
is along the line-of-sight (k_para) direction.
The unnormalised window function matrix is
.. math::
H_{ab} = |\tilde{R}^1_{ab} (\tilde{R}^2_{ab})^*|_{\rm Re}
The corresponding signal loss is :math:`\sum_b H_{ab}`, and the analytical
transfer function is the inverse of the signal loss.
After normalisation, the window function matrix is
.. math::
W = {\rm diag}\Big(\sum_b H_{ab}\Big)^{-1} H
Parameters
----------
clean_mat_1: np.ndarray
The clean matrix that applies to the data vector.
clean_mat_2: np.ndarray, optional
The clean matrix that applies to the second data vector for cross-correlation.
If not provided, it is assumed to be the same as clean_mat_1 for auto-correlation.
Returns
-------
transfer_func: np.ndarray
The analytical transfer function.
Wab: np.ndarray
The normalised window function matrix.
References
----------
.. [1] Chen, Z.,, "A quadratic estimator view of the transfer function correction in intensity mapping surveys", https://ui.adsabs.harvard.edu/abs/2025MNRAS.542L...1C/abstract.
"""
assert (clean_mat_1.ndim == 2) and (clean_mat_1.shape[0] == clean_mat_1.shape[1])
if clean_mat_2 is None:
clean_mat_2 = clean_mat_1
assert np.allclose(clean_mat_1.shape, clean_mat_2.shape)
num_k = clean_mat_1.shape[0] // 2 + 1
R_mat_fourier_1 = fft_matrix(clean_mat_1)
R_mat_fourier_2 = fft_matrix(clean_mat_2)
Hab = (np.conj(R_mat_fourier_1) * R_mat_fourier_2).real
Hab = Hab[:num_k, :num_k]
signal_loss = Hab.sum(1)
renorm_mat = np.diag(1 / signal_loss)
Wab = renorm_mat @ Hab
return 1 / signal_loss, Wab
required_attrs = [
"wproj",
"num_pix_x",
"num_pix_y",
"nu",
"data",
"map_has_sampling",
"weights_map_pixel",
"true_cosmology",
"fiducial_cosmology",
"sigma_beam_ch",
"ps_type",
"cold",
"backend",
"omega_hi",
"tracer_bias_1",
"tracer_bias_2",
"sigma_v_1",
"sigma_v_2",
"include_beam",
"fog_profile",
"weights_field_1",
"weights_field_2",
"weights_grid_1",
"weights_grid_2",
"mean_amp_1",
"mean_amp_2",
"kaiser_rsd",
"sigma_z_1",
"sigma_z_2",
"sampling_resol",
"box_buffkick",
"num_particle_per_pixel",
"mean_center_1",
"mean_center_2",
"unitless_1",
"unitless_2",
"compensate",
"taper_func",
"grid_scheme",
"interlace_shift",
"flat_sky",
"flat_sky_padding",
"kperpbins",
"kparabins",
"k1dbins",
"include_sky_sampling",
"k1dweights",
"batch_number",
]
[docs]
class TransferFunction:
"""
A class to perform transfer function analysis.
By default, the transfer function is calculated by generating mock HI signal,
and perform PCA cleaning by injecting the mock HI signal into the data (unless
``R_mat`` is provided, in which case no PCA is performed and R_mat is used to clean the injected data).
The transfer function is then calculated by dividing the 1D power spectrum of the cleaned data
by the 1D power spectrum of the mock data. A minimum example running 10 realizations:
.. code-block:: python
>>> tf = TransferFunction(ps, N_fg=3)
>>> tf1d_arr = tf.run(range(10), type="auto") # gives you the 1D transfer function with 10 realizations
If ``type`` is ``"auto"``, the numerator is the **cross-correlation between cleaned HI mock and
the original HI mock** (and therefore the result is based on HI "auto" power),
and the denominator is the auto power of the original HI mock.
If ``type`` is ``"cross"``, the numerator is the cross-correlation between cleaned HI mock
and galaxy mock, and the denominator is the cross power of the original HI mock and galaxy mock.
Additionally, if ``type`` is ``"null"``, instead of running transfer function calculation,
no HI mock is generated, and only mock galaxy data is used to cross-correlate with the map
data as a null test.
Parameters
----------
ps: :class:`meer21cm.power.PowerSpectrum`
The power spectrum instance.
N_fg: int
The number of eigenmodes for PCA cleaning.
R_mat: np.ndarray, default None
The PCA matrix. If not provided, it will be calculated from the data.
If provided, no map injection is performed and the cleaning uses this matrix.
uncleaned_data: np.ndarray, default None
The uncleaned data, which the mock HI signal is to be injected into.
If not provided, the mock HI signal is injected into ``ps.data``.
Note that this can be **wrong** if ``ps.data`` is already overridden by the residual map.
In that case, you should provide the original map.
highres_sim: int, default Nones
The mock field will be gridded to a high resolution sky map,
with the resolution specified by this integer times the pixel resolution, and then
down-sampled to the pixel resolution.
upres_transverse: float, default 4
The mock field is first generated in a rectangular box.
This is the up-sampling factor of the box comparing to the pixel resolution in the transverse direction.
upres_radial: float, default 4
The up-sampling factor in the radial direction comparing to the frequency resolution.
mean_center_map: bool, default True
Whether to mean center the map data for PCA cleaning.
pca_map_weights: np.ndarray, default None
If mean centering is performed, this is the weights for the mean calculation.
Also used to weight the data for covariance calculation for PCA eigendecomposition.
parallel_plane: bool, default True
Whether to simulate the mock field in the parallel-plane limit (i.e. k_z = k_parallel).
Only used if ``rsd_from_field`` is True, otherwise parallel-plane is hard-coded to True.
rsd_from_field: bool, default False
Whether to generate the RSD effect at field level. If False, the RSD effect is generated at power spectrum level.
discrete_source_dndz: list or function, default np.ones_like
The redshift distribution of the discrete tracer sources.
Must be a function of redshift.
Note that the overall number of discrete sources is set to ``ps.ra_gal.size``
or ``ps.num_discrete_source``, so
only the shape of the dndz is used, not the normalization.
If a list is provided, the first element is the redshift array, and the second element is the dndz array,
and a 1D linear interpolation is used to get the dndz function.
pool: str, default "multiprocessing"
The pool to use for parallelisation. Can be "multiprocessing" or "mpi".
num_process: int, default None
The number of processes to use for parallelisation.
If not provided, the number of processes is set to the number of cores available.
unmask_during_mock: bool, default False
Whether to unmask the wcs during the mock simulation.
If True, the survey area will be unmasked during the mock simulation, and then masked again after mock HI is generated.
This is useful for mitigating edge effects and when box buffer is small.
For a large patch with a wide z-range, it is recommended to set this to True,
otherwise the mock HI map may contain nan and cause errors.
"""
def __init__(
self,
ps: PowerSpectrum,
N_fg: int,
R_mat: np.ndarray | None = None,
uncleaned_data: np.ndarray | None = None,
highres_sim: int | None = None,
upres_transverse: float = 4,
upres_radial: float = 4,
mean_center_map: bool = True,
pca_map_weights: np.ndarray | None = None,
parallel_plane: bool = True,
rsd_from_field: bool = False,
discrete_source_dndz: list | Callable = np.ones_like,
pool: str = "multiprocessing",
num_process: int | None = None,
unmask_during_mock: bool = False,
):
self.ps = ps
self.N_fg = N_fg
self.highres_sim = highres_sim
self.upres_transverse = upres_transverse
self.upres_radial = upres_radial
self.mean_center_map = mean_center_map
if pca_map_weights is None:
pca_map_weights = self.ps.weights_map_pixel
self.pca_map_weights = pca_map_weights
self.parallel_plane = parallel_plane
self.rsd_from_field = rsd_from_field
self.discrete_source_dndz = discrete_source_dndz
if uncleaned_data is None:
uncleaned_data = np.zeros_like(ps.data)
self.uncleaned_data = uncleaned_data
self.pool = pool
self.num_process = num_process
self.R_mat = R_mat
self.unmask_during_mock = unmask_during_mock
[docs]
def get_mock_instance_attr_dict(self, seed):
"""
Generate the attribute dictionary for the mock instance.
It reads the attributes from the input power spectrum instance,
and then sets a few mock-specific attributes specified in the input parameters
of :class:`meer21cm.transfer.TransferFunction`.
The seed attribute is given at the input of this function.
Parameters
----------
seed: int
The seed for the mock instance.
Returns
-------
attr_dict: dict
The attribute dictionary for the mock instance.
"""
attr_dict = {}
for attr in required_attrs:
attr_dict[attr] = getattr(self.ps, attr)
attr_dict["seed"] = seed
if hasattr(self.ps, "ra_gal"):
attr_dict["num_discrete_source"] = self.ps.ra_gal.size
# if it is a mock itself then overrides with its own attribute
if hasattr(self.ps, "num_discrete_source"):
attr_dict["num_discrete_source"] = self.ps.num_discrete_source
attr_dict["highres_sim"] = self.highres_sim
attr_dict["parallel_plane"] = self.parallel_plane
attr_dict["rsd_from_field"] = self.rsd_from_field
# attr_dict["discrete_source_dndz"] = self.discrete_source_dndz
attr_dict["downres_factor_radial"] = 1 / self.upres_radial
attr_dict["downres_factor_transverse"] = 1 / self.upres_transverse
highres_sim = self.highres_sim if self.highres_sim is not None else 1
attr_dict["kmax"] = self.ps.kmax * highres_sim
attr_dict["num_kpoints"] = self.ps.num_kpoints * highres_sim
return attr_dict
[docs]
def get_arg_list_for_parallel_null(
self,
seed_list,
return_power_3d=False,
return_power_1d=False,
):
"""
Generate a list of arguments for parallelisation of the null test runs.
This list is then used for ``pool.starmap``.
Parameters
----------
seed_list: list
The list of seeds for the mock instances.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the null test.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the null test.
Note that for null test, the result itself is the 1D power, so this
is a dummy argument to keep consistency with
``cross`` and ``auto`` runs.
Returns
-------
arg_list: list
The list of arguments for the parallelisation.
"""
arg_list = []
for seed in seed_list:
mock_attr_dict = self.get_mock_instance_attr_dict(seed)
arg_list.append(
(
mock_attr_dict,
self.ps.field_1,
self.ps.downres_factor_radial,
self.ps.downres_factor_transverse,
self.ps.weights_field_1,
self.ps.weights_grid_1,
self.ps.weights_field_2,
self.ps.weights_grid_2,
self.ps.k1dweights,
return_power_3d,
self.unmask_during_mock,
self.ps.ra_range,
self.ps.dec_range,
self.discrete_source_dndz,
)
)
return arg_list
[docs]
def get_arg_list_for_parallel_cross(
self, seed_list, return_power_3d=False, return_power_1d=False
):
"""
Generate a list of arguments for parallelisation of the cross-correlation runs
(i.e. the transfer function is calculated between mock HI and mock galaxy).
This list is then used for ``pool.starmap``.
Parameters
----------
seed_list: list
The list of seeds for the mock instances.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the cross-correlation.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the cross-correlation.
Returns
-------
arg_list: list
The list of arguments for the parallelisation.
"""
arg_list = []
for seed in seed_list:
mock_attr_dict = self.get_mock_instance_attr_dict(seed)
arg_list.append(
(
mock_attr_dict,
self.ps.downres_factor_radial,
self.ps.downres_factor_transverse,
self.ps.weights_field_1,
self.ps.weights_grid_1,
self.ps.weights_field_2,
self.ps.weights_grid_2,
self.ps.k1dweights,
self.N_fg,
self.pca_map_weights,
self.mean_center_map,
self.R_mat,
self.uncleaned_data,
return_power_3d,
return_power_1d,
self.unmask_during_mock,
self.ps.ra_range,
self.ps.dec_range,
self.discrete_source_dndz,
)
)
return arg_list
[docs]
def get_arg_list_for_parallel_auto(
self, seed_list, return_power_3d=False, return_power_1d=False
):
"""
Generate a list of arguments for parallelisation of the auto-correlation runs
(i.e. the transfer function is calculated between cleaned mock HI and original mock HI).
This list is then used for ``pool.starmap``.
Parameters
----------
seed_list: list
The list of seeds for the mock instances.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the auto-correlation.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the auto-correlation.
Returns
-------
arg_list: list
The list of arguments for the parallelisation.
"""
arg_list = []
for seed in seed_list:
mock_attr_dict = self.get_mock_instance_attr_dict(seed)
arg_list.append(
(
mock_attr_dict,
self.ps.downres_factor_radial,
self.ps.downres_factor_transverse,
self.ps.weights_field_1,
self.ps.weights_grid_1,
self.ps.k1dweights,
self.N_fg,
self.pca_map_weights,
self.mean_center_map,
self.R_mat,
self.uncleaned_data,
return_power_3d,
return_power_1d,
self.unmask_during_mock,
self.ps.ra_range,
self.ps.dec_range,
)
)
return arg_list
[docs]
def run(
self, seed_list, type="cross", return_power_3d=False, return_power_1d=False
):
"""
Run the transfer function calculation.
Note that, ``run`` automatically uses a parallel pool to loop over the ``seed_list``.
If you believe the parallel behaviour is not as expected,
you can manually extract the argument list and map the function yourself.
For example:
.. code-block:: python
>>> tf = TransferFunction(ps, N_fg=3)
>>> tf1d_arr = tf.run(range(10), type="auto")
is the same as:
.. code-block:: python
>>> tf = TransferFunction(ps, N_fg=3)
>>> arg_list = tf.get_arg_list_for_parallel_auto(range(10))
>>> tf1d_arr = []
>>> with Pool(tf.num_process) as pool:
>>> for result_i in pool.starmap(run_tf_calculation_auto, arg_list):
>>> tf1d_arr.append(result_i)
Parameters
----------
seed_list: list
The list of seeds for the mock instances.
type: str, default "cross"
The type of transfer function calculation to run.
Can be "cross", "auto", or "null".
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the transfer function.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the transfer function.
Returns
-------
results_arr: list
The list of results for the transfer function calculation, with
each element being a sublist for the result of one seed.
The first element of each sublist is the transfer function.
If ``return_power_3d`` is True, the second element is the 3D power spectrum of the original mock data,
and the third element is the 3D power spectrum of the cleaned mock data.
If ``return_power_1d`` is True, the next element is the 1D power spectrum of the original mock data,
and the next element after that is the 1D power spectrum of the cleaned mock data.
"""
if type == "cross":
run_func = run_tf_calculation_cross
elif type == "auto":
run_func = run_tf_calculation_auto
elif type == "null":
run_func = run_null_test
else:
raise ValueError(f"Invalid type: {type}")
arg_func = getattr(self, f"get_arg_list_for_parallel_{type}")
arg_list = arg_func(seed_list, return_power_3d, return_power_1d)
if self.pool == "multiprocessing":
pool_func = Pool
elif self.pool == "mpi":
from mpi4py.futures import MPIPoolExecutor
pool_func = MPIPoolExecutor
else:
raise ValueError(f"Invalid pool: {self.pool}")
results_arr = []
with pool_func(self.num_process) as pool:
for result_i in pool.starmap(run_func, arg_list):
results_arr.append(result_i)
return results_arr
# this must be pickleable inputs for multiprocessing
[docs]
def run_tf_calculation_cross(
mock_attr_dict,
downres_factor_radial,
downres_factor_transverse,
weights_field_1,
weights_grid_1,
weights_field_2,
weights_grid_2,
k_sel_3d_to_1d,
N_fg,
pca_map_weights,
mean_center_map,
R_mat=None,
uncleaned_data=0.0,
return_power_3d=False,
return_power_1d=False,
unmask_during_mock=False,
ra_range=None,
dec_range=None,
discrete_source_dndz=None,
):
"""
Run the transfer function calculation by calculating the ratio of the 1D cross-power spectrum of
the cleaned mock HI data x mock galaxy data to the 1D cross-power spectrum of the original mock HI data x mock galaxy data.
Parameters
----------
mock_attr_dict: dict
The attribute dictionary to initialize the mock instance.
downres_factor_radial: float
The downres factor for the radial direction for gridding the map data.
downres_factor_transverse: float
The downres factor for the transverse direction for gridding the map data.
weights_field_1: np.ndarray
The field-level weights for the field 1.
weights_grid_1: np.ndarray
The grid-level weights for the field 1.
weights_field_2: np.ndarray
The field-level weights for the field 2.
weights_grid_2: np.ndarray
The grid-level weights for the field 2.
k_sel_3d_to_1d: np.ndarray
The weights for averaging the 3D power spectrum k-modes to 1D power spectrum.
N_fg: int
The number of foreground components to clean.
pca_map_weights: np.ndarray
The weights for the mean centering and covariance matrix calculation during PCA.
mean_center_map: bool
Whether to mean-center the map before PCA cleaning.
R_mat: np.ndarray, default None
The PCA matrix to clean the map.
If not provided, it will be calculated by injecting the original mock HI data into the data map.
If provided, it will be used to clean the map, and no injection + PCA is performed.
uncleaned_data: np.ndarray, default 0.0
The uncleaned data map.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the mock results, including the uncleaned and cleaned data.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the mock results, including the uncleaned and cleaned data.
unmask_during_mock: bool, default False
Whether to unmask the wcs during the mock simulation.
If True, the survey area will be unmasked during the mock simulation, and then masked again after mock HI is generated.
This is useful for mitigating edge effects and when box buffer is small.
For a large patch with a wide z-range, it is recommended to set this to True,
otherwise the mock HI map may contain nan and cause errors.
ra_range: tuple, default None
The range of the RA in the sky map. Map and galaxy will be trimmed to this range.
dec_range: tuple, default None
The range of the Dec in the sky map. Map and galaxy will be trimmed to this range.
discrete_source_dndz: list or function, default None
The redshift distribution of the discrete tracer sources.
Must be a function of redshift.
Note that the overall number of discrete sources is set to ``ps.ra_gal.size``
or ``ps.num_discrete_source``, so
only the shape of the dndz is used, not the normalization.
If a list is provided, the first element is the redshift array, and the second element is the dndz array,
and a 1D linear interpolation is used to get the dndz function.
Returns
-------
result: list
The list of results, including the transfer function, the 3D power spectrum of the uncleaned and cleaned data, and the 1D power spectrum of the uncleaned and cleaned data.
"""
mock = MockSimulation(**mock_attr_dict)
if not isinstance(discrete_source_dndz, Callable):
discrete_source_dndz = interp1d(
discrete_source_dndz[0],
discrete_source_dndz[1],
kind="linear",
bounds_error=False,
fill_value=0,
)
mock.discrete_source_dndz = discrete_source_dndz
if unmask_during_mock:
W_HI_mock = mock.W_HI.copy()
w_HI_mock = mock.w_HI.copy()
mock.W_HI = np.ones_like(mock.W_HI)
mock.w_HI = np.ones_like(mock.w_HI)
mock.data = mock.propagate_mock_field_to_data(mock.mock_tracer_field_1)
mock.propagate_mock_tracer_to_gal_cat()
if unmask_during_mock:
mock.W_HI = W_HI_mock
mock.w_HI = w_HI_mock
mock.data[mock.W_HI == 0] = 0
mock.ra_range = ra_range
mock.dec_range = dec_range
mock.trim_map_to_range()
mock.trim_gal_to_range()
mock.downres_factor_radial = downres_factor_radial
mock.downres_factor_transverse = downres_factor_transverse
mock.get_enclosing_box()
mock_map_rg, _, _ = mock.grid_data_to_field()
mockgal_map_rg, _, _ = mock.grid_gal_to_field()
mock.field_1 = mock_map_rg
mock.weights_field_1 = weights_field_1
mock.weights_grid_1 = weights_grid_1
mock.field_2 = mockgal_map_rg
mock.weights_field_2 = weights_field_2
mock.weights_grid_2 = weights_grid_2
# get 1d power
pmock_1d_cross, _, _ = mock.get_1d_power(
mock.cross_power_3d, k1dweights=k_sel_3d_to_1d
)
if return_power_3d:
power_3d_uncleaned = mock.cross_power_3d
# perform PCA cleaning
map_to_clean = uncleaned_data + mock.data
R_mat_mock = R_mat
if R_mat is None:
R_mat_mock = get_pca_matrix(
map_to_clean, N_fg, pca_map_weights, mean_center_map
)
if mean_center_map:
data_mean = np.sum(pca_map_weights * mock.data, axis=(0, 1)) / np.sum(
pca_map_weights, axis=(0, 1)
)
mock.data -= data_mean[None, None, :]
mock_map_cleaned = np.einsum("ij,abj->abi", R_mat_mock, mock.data)
mock.data = mock_map_cleaned
# regrid the cleaned data
mock_map_rg, _, _ = mock.grid_data_to_field()
mock.field_1 = mock_map_rg
mock.weights_field_1 = weights_field_1
mock.weights_grid_1 = weights_grid_1
# get 1d power again
pmock_1d_cross_cleaned, _, _ = mock.get_1d_power(
mock.cross_power_3d, k1dweights=k_sel_3d_to_1d
)
tf_1d_i = pmock_1d_cross_cleaned / pmock_1d_cross
result = [tf_1d_i]
if return_power_3d:
result.append(power_3d_uncleaned)
result.append(mock.cross_power_3d)
if return_power_1d:
result.append(pmock_1d_cross)
result.append(pmock_1d_cross_cleaned)
return result
[docs]
def run_tf_calculation_auto(
mock_attr_dict,
downres_factor_radial,
downres_factor_transverse,
weights_field_1,
weights_grid_1,
k_sel_3d_to_1d,
N_fg,
pca_map_weights,
mean_center_map,
R_mat=None,
uncleaned_data=0.0,
return_power_3d=False,
return_power_1d=False,
unmask_during_mock=False,
ra_range=None,
dec_range=None,
):
"""
Run the transfer function calculation by calculating the ratio between the 1D power spectrum of
cleaned mock HI data x original mock HI data to the 1D power spectrum of the original mock HI data x original mock HI data.
Parameters
----------
mock_attr_dict: dict
The attribute dictionary to initialize the mock instance.
downres_factor_radial: float
The downres factor for the radial direction for gridding the map data.
downres_factor_transverse: float
The downres factor for the transverse direction for gridding the map data.
weights_field_1: np.ndarray
The field-level weights for the field 1.
weights_grid_1: np.ndarray
The grid-level weights for the field 1.
k_sel_3d_to_1d: np.ndarray
The weights for averaging the 3D power spectrum k-modes to 1D power spectrum.
N_fg: int
The number of foreground components to clean.
pca_map_weights: np.ndarray
The weights for the mean centering and covariance matrix calculation during PCA.
mean_center_map: bool
Whether to mean-center the map before PCA cleaning.
R_mat: np.ndarray, default None
The PCA matrix to clean the map.
If not provided, it will be calculated by injecting the original mock HI data into the data map.
If provided, it will be used to clean the map, and no injection + PCA is performed.
uncleaned_data: np.ndarray, default 0.0
The uncleaned data map.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the mock results, including the uncleaned and cleaned data.
return_power_1d: bool, default False
Whether to return the 1D power spectrum of the mock results, including the uncleaned and cleaned data.
unmask_during_mock: bool, default False
Whether to unmask the wcs during the mock simulation.
If True, the survey area will be unmasked during the mock simulation, and then masked again after mock HI is generated.
This is useful for mitigating edge effects and when box buffer is small.
For a large patch with a wide z-range, it is recommended to set this to True,
otherwise the mock HI map may contain nan and cause errors.
ra_range: tuple, default None
The range of the RA in the sky map. Map and galaxy will be trimmed to this range.
dec_range: tuple, default None
The range of the Dec in the sky map. Map and galaxy will be trimmed to this range.
Returns
-------
result: list
The list of results, including the transfer function, the 3D power spectrum of the uncleaned and cleaned data, and the 1D power spectrum of the uncleaned and cleaned data.
"""
mock = MockSimulation(**mock_attr_dict)
if unmask_during_mock:
W_HI_mock = mock.W_HI.copy()
w_HI_mock = mock.w_HI.copy()
mock.W_HI = np.ones_like(mock.W_HI)
mock.w_HI = np.ones_like(mock.w_HI)
mock.data = mock.propagate_mock_field_to_data(mock.mock_tracer_field_1)
if unmask_during_mock:
mock.W_HI = W_HI_mock
mock.w_HI = w_HI_mock
mock.data[mock.W_HI == 0] = 0
mock.ra_range = ra_range
mock.dec_range = dec_range
mock.trim_map_to_range()
mock.downres_factor_radial = downres_factor_radial
mock.downres_factor_transverse = downres_factor_transverse
mock.get_enclosing_box()
mock_map_rg_before, _, _ = mock.grid_data_to_field()
mock.field_1 = mock_map_rg_before
mock.weights_field_1 = weights_field_1
mock.weights_grid_1 = weights_grid_1
# get 1d power
pmock_1d_before, _, _ = mock.get_1d_power(
mock.auto_power_3d_1, k1dweights=k_sel_3d_to_1d
)
if return_power_3d:
power_3d_uncleaned = mock.auto_power_3d_1
# perform PCA cleaning
map_to_clean = uncleaned_data + mock.data
R_mat_mock = R_mat
if R_mat is None:
R_mat_mock = get_pca_matrix(
map_to_clean, N_fg, pca_map_weights, mean_center_map
)
if mean_center_map:
data_mean = np.sum(pca_map_weights * mock.data, axis=(0, 1)) / np.sum(
pca_map_weights, axis=(0, 1)
)
mock.data -= data_mean[None, None, :]
mock_map_cleaned = np.einsum("ij,abj->abi", R_mat_mock, mock.data)
mock.data = mock_map_cleaned
# regrid the cleaned data
mock_map_rg_cleaned, _, _ = mock.grid_data_to_field()
mock.field_1 = mock_map_rg_before
mock.weights_field_1 = weights_field_1
mock.weights_grid_1 = weights_grid_1
mock.field_2 = mock_map_rg_cleaned
mock.weights_field_2 = weights_field_1
mock.weights_grid_2 = weights_grid_1
mock.unitless_2 = False
mock.mean_center_2 = False
# get 1d power again
pmock_1d_cross_cleaned, _, _ = mock.get_1d_power(
mock.cross_power_3d, k1dweights=k_sel_3d_to_1d
)
tf_1d_i = pmock_1d_cross_cleaned / pmock_1d_before
result = [tf_1d_i]
if return_power_3d:
result.append(power_3d_uncleaned)
result.append(mock.cross_power_3d)
if return_power_1d:
result.append(pmock_1d_before)
result.append(pmock_1d_cross_cleaned)
return result
[docs]
def run_null_test(
mock_attr_dict,
hi_map_rg,
downres_factor_radial,
downres_factor_transverse,
weights_field_1,
weights_grid_1,
weights_field_2,
weights_grid_2,
k_sel_3d_to_1d,
return_power_3d=False,
unmask_during_mock=False,
ra_range=None,
dec_range=None,
discrete_source_dndz=None,
):
"""
Run null test realisations by calculating the 1D cross-power spectrum of the mock galaxy x data map.
Parameters
----------
mock_attr_dict: dict
The attribute dictionary to initialize the mock instance.
hi_map_rg: np.ndarray
The HI data map.
downres_factor_radial: float
The downres factor for the radial direction for gridding the map data.
downres_factor_transverse: float
The downres factor for the transverse direction for gridding the map data.
weights_field_1: np.ndarray
The field-level weights for the field 1.
weights_grid_1: np.ndarray
The grid-level weights for the field 1.
weights_field_2: np.ndarray
The field-level weights for the field 2.
weights_grid_2: np.ndarray
The grid-level weights for the field 2.
k_sel_3d_to_1d: np.ndarray
The weights for averaging the 3D power spectrum k-modes to 1D power spectrum.
return_power_3d: bool, default False
Whether to return the 3D power spectrum of the mock results.
unmask_during_mock: bool, default False
Whether to unmask the wcs during the mock simulation.
If True, the survey area will be unmasked during the mock simulation, and then masked again after mock HI is generated.
This is useful for mitigating edge effects and when box buffer is small.
For a large patch with a wide z-range, it is recommended to set this to True,
otherwise the mock HI map may contain nan and cause errors.
ra_range: tuple, default None
The range of the RA in the sky map. Map and galaxy will be trimmed to this range.
dec_range: tuple, default None
The range of the Dec in the sky map. Map and galaxy will be trimmed to this range.
discrete_source_dndz: list or function, default None
The redshift distribution of the discrete tracer sources.
Must be a function of redshift.
Note that the overall number of discrete sources is set to ``ps.ra_gal.size``
or ``ps.num_discrete_source``, so
only the shape of the dndz is used, not the normalization.
If a list is provided, the first element is the redshift array, and the second element is the dndz array,
and a 1D linear interpolation is used to get the dndz function.
Returns
-------
result: list
The list of results, including the 1D cross-power spectrum of the mock galaxy x data map.
If ``return_power_3d`` is True, the second element is the 3D power spectrum of the mock results.
"""
mock = MockSimulation(**mock_attr_dict)
if not isinstance(discrete_source_dndz, Callable):
discrete_source_dndz = interp1d(
discrete_source_dndz[0],
discrete_source_dndz[1],
kind="linear",
bounds_error=False,
fill_value=0,
)
mock.discrete_source_dndz = discrete_source_dndz
if unmask_during_mock:
W_HI_mock = mock.W_HI.copy()
w_HI_mock = mock.w_HI.copy()
mock.W_HI = np.ones_like(mock.W_HI)
mock.w_HI = np.ones_like(mock.w_HI)
mock.propagate_mock_tracer_to_gal_cat()
if unmask_during_mock:
mock.W_HI = W_HI_mock
mock.w_HI = w_HI_mock
mock.ra_range = ra_range
mock.dec_range = dec_range
mock.trim_map_to_range()
mock.trim_gal_to_range()
mock.downres_factor_radial = downres_factor_radial
mock.downres_factor_transverse = downres_factor_transverse
mock.get_enclosing_box()
mockgal_map_rg, _, _ = mock.grid_gal_to_field()
mock.field_1 = hi_map_rg
mock.weights_field_1 = weights_field_1
mock.weights_grid_1 = weights_grid_1
mock.field_2 = mockgal_map_rg
mock.weights_field_2 = weights_field_2
mock.weights_grid_2 = weights_grid_2
pmock_1d_cross, _, _ = mock.get_1d_power(
mock.cross_power_3d, k1dweights=k_sel_3d_to_1d
)
result = [pmock_1d_cross]
if return_power_3d:
result.append(mock.cross_power_3d)
return result