Source code for meer21cm.grid

import numpy as np
import healpy as hp
from astropy.cosmology import Planck18
from astropy import units
from .util import f_21, angle_in_range

allowed_window_scheme = ("nnb", "cic", "tsc", "pcs")


[docs] def minimum_enclosing_box_of_lightcone( ra_arr, dec_arr, freq, cosmo=Planck18, ang_unit="deg", tile=True, return_coord=False, buffkick=0.0, rot_mat=None, ): """ This functions finds a rotational axis to rotate the sky vectors of input coordinates so that the (crude) mean of the coordinates is at (0,0,1), and then finds the enclosing cuboid box for the coordinates. The box is not really minimum but should be quite optimal. The function also returns a rotational matrix for rotating the coordinates in the cuboid back to the sky positions. For any point in the box ``pos = np.array([x,y,z])``, you can find its RA and Dec by performing the rotation .. highlight:: python .. code-block:: python vec = inv_rot @ pos vec /= np.sqrt(np.sum(vec**2)) ra_pos, dec_pos = hp.vec2ang(vec,lonlat=True) Parameters ---------- ra_arr: ``numpy`` array. The RA of the coordinates dec_arr: ``numpy`` array. The Dec of the coordinates freq: ``numpy`` array. The frequencies of the coordinates. cosmo: :class:`astropy.cosmology.Cosmology` object, default `astropy.cosmology.Planck18`. The input cosmology for converting frequencies to los length. ang_unit: str or :class:`astropy.units.Unit` The unit of the input angular coordinates. tile: bool, default True. Whether to tile the input cooridnates so that the output is a meshgrid of input angular coordinates and frequencies. return_coord: bool, default True. If True, also returns the corrosponding (x,y,z) coordinate of the input coordinates. buffkick: float, default 0.0. The box is extended by ``buffkick`` on each end of each dimension. rot_mat: ``numpy`` array, default None. If specified, override the rotation matrix calculated from the mean cooridnate. Returns ------- x_min: float. The origin of the box along x-axis. y_min: float. The origin of the box along y-axis. z_min: float. The origin of the box along z-axis. x_len: float. The length of the box along x-axis. y_len: float. The length of the box along y-axis. z_len: float. The length of the box along z-axis. inv_rot: ``numpy`` array. The rotational matrix to rotate the box back to the sky positions. pos_arr: ``numpy'' array. Only returns if ``return_coord = True''. The Cartesian coordinates of the input ra and dec. """ ra_arr = (ra_arr.ravel() * units.Unit(ang_unit)).to("deg").value dec_arr = (dec_arr.ravel() * units.Unit(ang_unit)).to("deg").value vec_arr = hp.ang2vec(ra_arr, dec_arr, lonlat=True) mean_vec = vec_arr.mean(axis=0) if rot_mat is None: rot_mat = find_rotation_matrix(mean_vec) z_arr = f_21 / freq.ravel() - 1 # rotate so that centre of field is the line-of-sight [0,0,1] vec_arr = np.einsum("ab,ib->ia", rot_mat, vec_arr) comov_dist_arr = cosmo.comoving_distance(z_arr).value if tile: pos_arr = vec_arr[:, None, :] * comov_dist_arr[None, :, None] else: pos_arr = vec_arr * comov_dist_arr[:, None] pos_arr = pos_arr.reshape((-1, 3)) x_min, y_min, z_min = pos_arr.min(axis=0) - buffkick x_max, y_max, z_max = pos_arr.max(axis=0) + buffkick inv_rot = np.linalg.inv(rot_mat) result = (x_min, y_min, z_min, x_max - x_min, y_max - y_min, z_max - z_min, inv_rot) if return_coord: result += (pos_arr,) return result
[docs] def find_rotation_matrix(vec): r""" find the rotation matrix to rotate the input vector to (0,0,1). Note that in 3D space, the rotation is not unique. For simplicity, this function first finds the rotational matrix so that the vector (x,y,z) is first rotated to :math:`(\sqrt{x^2+y^2},0,z)`, and then find another matrix to rotate the vector to (0,0,1). Parameters ---------- vec: ``numpy`` array. The input unit vector Returns ------- rot_mat: ``numpy`` array. The rotational matrix so that ``rot_mat @ vec`` is ``np.array([0,0,1])``. """ theta_rot = np.arctan2(vec[1], vec[0]) rot_mat_1 = np.array( [ [np.cos(-theta_rot), -np.sin(-theta_rot), 0], [np.sin(-theta_rot), np.cos(-theta_rot), 0], [0, 0, 1], ] ) inter_vec = rot_mat_1 @ vec phi_rot = -np.arctan2(inter_vec[0], inter_vec[2]) rot_mat_2 = np.array( [ [np.cos(-phi_rot), 0, -np.sin(-phi_rot)], [0, 1, 0], [np.sin(-phi_rot), 0, np.cos(-phi_rot)], ] ) return rot_mat_2 @ rot_mat_1
[docs] def fourier_window_for_assignment( num_mesh, window="nnb", ): r""" Calculate the effective window function in Fourier space from mass assignment scheme that sample continueous fields to discrete grids. The window function can be written as [1] .. math:: W(k_x,k_y,k_z) = \Bigg({\rm sinc}\bigg(\frac{k_x H_x}{2}\bigg) {\rm sinc}\bigg(\frac{k_y H_y}{2}\bigg) {\rm sinc}\bigg(\frac{k_z H_z}{2}\bigg)\Bigg)^p, where :math:`k_{x,y,z}` is the wavenumber of the grid in Fourier space and :math:`H_{x,y,z}` is the length of the grid in real space. :math:`p` is the power index related to the mass assignment scheme, and is equal to [1,2,3,4] for [nnb,cic,tsc,pcs] Parameters ---------- num_mesh: list The number of grids on each side window: str, default "nnb". The mass assignment scheme Returns ------- window_in_fourier: ``numpy`` array The window function in Fourier space References ---------- .. [1] Sefusatti, E. et al., "Accurate Estimators of Correlation Functions in Fourier Space", https://ui.adsabs.harvard.edu/abs/2016MNRAS.460.3624S. """ p = float(allowed_window_scheme.index(window) + 1) wx, wy, wz = [np.sinc(np.fft.fftfreq(num_mesh[i])) for i in range(3)] window_in_fourier = ( wx[:, None, None] * wy[None, :, None] * wz[None, None, : num_mesh[-1] // 2 + 1] ) ** p return window_in_fourier
[docs] def compensate_grid_window_effects( field_in_real_space, grid_scheme="nnb", ): """ Apply correction to cancel the windowing effects from discretization of fields into grids. """ num_mesh = field_in_real_space.shape window = fourier_window_for_assignment( num_mesh, grid_scheme, ) field_in_fourier_space = np.fft.rfftn(field_in_real_space) field_in_fourier_space /= window field_compensated = np.fft.irfftn( field_in_fourier_space, axes=(0, 1, 2), s=field_in_real_space.shape ) return field_compensated
[docs] def interlace_two_fields( real_field_1, real_field_2, shift, ): """ Interlacing two fields, where one is the shifted version of the other. Parameters ---------- real_field_1: array-like. The first field for interlacing real_field_2: array-like. The second field for interlacing, which should be a shifted version of the first. shift: float. The shift of the field when performing Fourier transform, in the unit of cell size. Returns ------- interlaced_field: array-like. The interlaced field. """ box_ndim = real_field_1.shape fourier_field_1 = np.fft.fftn(real_field_1) fourier_field_2 = np.fft.fftn(real_field_2) kH_2 = [np.fft.fftfreq(box_ndim[i]) for i in range(3)] kH_2 = np.array(np.meshgrid(*kH_2, indexing="ij")) exp_term = np.prod(np.exp(-2 * 1j * shift * kH_2), axis=0) fourier_field_1 = (fourier_field_1 + exp_term * fourier_field_2) / 2 return np.fft.ifftn(fourier_field_1).real
[docs] def project_function( s_arr, grid_scheme="nnb", ): """ Return the weighting function for the given mass assignment scheme and input 1D distance. The distance is in the unit of the cell size. Parameters ---------- s_arr: float array. The distance in the unit of the cell size. grid_scheme: str, default 'nnb' The mass assignment scheme. Returns ------- weight_arr: float array. The weighting function. """ s_arr = np.abs(s_arr) p = allowed_window_scheme.index(grid_scheme) if p == 0: return (s_arr <= 0.5).astype("float") elif p == 1: return (1 - s_arr) * (s_arr <= 1) elif p == 2: result = (3 / 4 - s_arr**2) * (s_arr <= 0.5) + (0.5 * (1.5 - s_arr) ** 2) * ( s_arr < 1.5 ) * (s_arr > 0.5) return result elif p == 3: result = (4 - 6 * s_arr**2 + 3 * s_arr**3) / 6 * (s_arr <= 1) + ( 2 - s_arr ) ** 3 / 6 * (s_arr < 2) * (s_arr > 1) return result
[docs] def particle_to_mesh_distance( particle_pos, box_len, box_ndim, ): """ Calculate the distance between particles and the nearest mesh center. The distance is in the unit of the cell size. For particles outside the box, the nearest mesh center is the one on the boundary. Parameters ---------- particle_pos: array. The coordinates of the particles. box_len: array. The length of the box on each side box_ndim: array. The number of grids on each side Returns ------- dist: array. The distance between the particles and the nearest mesh center. indx_grid: array. The index of the nearest mesh center. """ box_resol = box_len / box_ndim mesh_edges = [np.linspace(0, box_len[i], box_ndim[i] + 1) for i in range(3)] mesh_cen = [(mesh_edges[i][:-1] + mesh_edges[i][1:]) / 2 for i in range(3)] indx_grid = [] for i in range(3): indx_i = np.digitize(particle_pos[:, i], mesh_edges[i]) - 1 # if particle is outside the box, set to 0 or box_ndim-1 as appropriate indx_i[indx_i < 0] = 0 indx_i[indx_i >= box_ndim[i]] = box_ndim[i] - 1 indx_grid += [ indx_i, ] mesh_cen = [ (np.linspace(0, box_ndim[i] - 1, box_ndim[i]) + 0.5) * box_resol[i] for i in range(3) ] particle_pos_mesh = [mesh_cen[i][indx_grid[i]] for i in range(3)] particle_pos_mesh = np.array(particle_pos_mesh).T return (particle_pos - particle_pos_mesh) / box_resol[None, :], indx_grid
[docs] def project_particle_to_regular_grid( particle_pos, box_len, box_ndim, grid_scheme="nnb", particle_mass=None, particle_weights=None, average=True, shift=0.0, compensate=False, ): """ Project particles into a regular grid with a certain mass assignment scheme. Parameters ---------- particle_pos: array. The coordinates of the particles. box_len: array. The length of the box on each side box_ndim: array. The number of grids on each side grid_scheme: str, default 'nnb' The mass assignment scheme. particle_mass: array, default None. The mass of each particle. particle_weights: array, default None. The weights of each particle. average: bool, default True. The grid values are weighted averages of the particles if True and weighted sums of the particles if False. shift: float, default 0.0. Shift the position of the particles by the same amount in all directions, in the unit of cell size. Returns ------- mesh_mass: array. The mass of each grid. mesh_weights: array. The weights of each grid. mesh_counts: array. The effective number of particles in each grid. """ p = allowed_window_scheme.index(grid_scheme) if particle_mass is None: particle_mass = np.ones(len(particle_pos)) if particle_weights is None: particle_weights = np.ones(len(particle_pos)) box_resol = box_len / box_ndim mesh_mass = np.zeros(box_ndim) mesh_weights = np.zeros(box_ndim) mesh_counts = np.zeros(box_ndim) par_pos = particle_pos + shift * box_resol[None, :] particle_s, indx_grid = particle_to_mesh_distance(par_pos, box_len, box_ndim) indx_grid = np.array(indx_grid).T shift_limit = np.floor(p / 2 + 0.5) shift_mat = np.meshgrid( np.arange(-shift_limit, shift_limit + 1), np.arange(-shift_limit, shift_limit + 1), np.arange(-shift_limit, shift_limit + 1), indexing="ij", ) shift_mat = np.array([shift_mat[i].ravel() for i in range(3)]).T for shift in shift_mat: s_shift = particle_s + shift[None, :] grid_func_shift = project_function(s_shift, grid_scheme) indx_shift = (indx_grid - shift[None, :]).astype("int") indx_sel = np.prod(indx_shift >= 0, axis=1) indx_sel *= np.prod(indx_shift < box_ndim[None, :], axis=1) indx_sel = indx_sel.astype("bool") np.add.at( mesh_mass, tuple(indx_shift[indx_sel].T), (particle_mass * particle_weights * np.prod(grid_func_shift, axis=1))[ indx_sel ], ) np.add.at( mesh_weights, tuple(indx_shift[indx_sel].T), (particle_weights * np.prod(grid_func_shift, axis=1))[indx_sel], ) np.add.at( mesh_counts, tuple(indx_shift[indx_sel].T), np.prod(grid_func_shift, axis=1)[indx_sel], ) if average: with np.errstate(divide="ignore", invalid="ignore"): mesh_mass = np.where(mesh_weights > 0, mesh_mass / mesh_weights, 0) if compensate: mesh_mass = compensate_grid_window_effects( mesh_mass, grid_scheme, ) return mesh_mass, mesh_weights, mesh_counts
[docs] def rotation_matrix_to_radec0(ra, dec): """ Find the rotation matrix to rotate the input point at (ra, dec) to (0, 0), by first rotating to (0, dec) and then to (0, 0). """ # step 1: rotate to RA=0 rot_mat_1 = np.array( [ [np.cos(np.deg2rad(ra)), np.sin(np.deg2rad(ra)), 0], [-np.sin(np.deg2rad(ra)), np.cos(np.deg2rad(ra)), 0], [0, 0, 1], ] ) # step 2: rotate to dec=0 rot_mat_2 = np.array( [ [np.cos(np.deg2rad(dec)), 0, np.sin(np.deg2rad(dec))], [0, 1, 0], [-np.sin(np.deg2rad(dec)), 0, np.cos(np.deg2rad(dec))], ] ) return rot_mat_2 @ rot_mat_1
[docs] def sky_partition_for_radecrange( ra_range, dec_range, nside_out=128, nside_in=1024, dec_pad=0 ): """ Find a partition of the sky, so that each patch can be rotated to cover the specified RA and Dec range. Parameters ---------- ra_range: array_like The range of RA to cover. dec_range: array_like The range of Dec to cover. nside_out: int, default 128 The HEALPix NSIDE of the output map pixel id. nside_in: int, default 1024 The HEALPix NSIDE of the map pixel id for inner calculation. dec_pad: int, default 0 The number of extra rows to pad in Dec. Increasing this number will result in patches overlapping with each other. Returns ------- pix_id_for_patch_i: list The list of pixel id for each patch. rot_mat_for_patch_i: list The list of rotation matrix for each patch, to rotate the patch back to cover the range. """ npix = hp.nside2npix(nside_in) ra_grid, dec_grid = hp.pix2ang(nside_in, np.arange(npix), lonlat=True) selection_grid = angle_in_range(ra_grid, ra_range[0], ra_range[1]) * angle_in_range( dec_grid, dec_range[0], dec_range[1] ) ra_region = ra_grid[selection_grid] dec_region = dec_grid[selection_grid] vec_region = hp.ang2vec(ra_region, dec_region, lonlat=True) vec_mean = vec_region.mean(axis=0) ra_mean, dec_mean = hp.vec2ang(vec_mean, lonlat=True) ra_mean = ra_mean[0] dec_mean = dec_mean[0] # rotate range to ra=0, dec=0 rot_mat_0 = rotation_matrix_to_radec0(ra_mean, dec_mean) vec_region_rot = np.dot(rot_mat_0, vec_region.T) pix_region_rot = hp.vec2pix( nside_in, vec_region_rot[0], vec_region_rot[1], vec_region_rot[2] ) ra_region_rot, dec_region_rot = hp.pix2ang(nside_in, pix_region_rot, lonlat=True) # find the enclosing rectangle ra_temp = ra_region_rot.copy() ra_temp[ra_temp > 180] -= 360 ra_range_0 = [-np.abs(ra_temp).max(), np.abs(ra_temp).max()] dec_range_0 = [-np.abs(dec_region_rot).max(), np.abs(dec_region_rot).max()] delta_dec = dec_range_0[1] - dec_range_0[0] delta_ra = ra_range_0[1] - ra_range_0[0] ra_range_0 = [-np.abs(ra_temp).max(), np.abs(ra_temp).max()] dec_range_0 = [-np.abs(dec_region_rot).max(), np.abs(dec_region_rot).max()] selection_grid_0 = angle_in_range( ra_grid, ra_range_0[0], ra_range_0[1] ) * angle_in_range(dec_grid, dec_range_0[0], dec_range_0[1]) ra_region_0 = ra_grid[selection_grid_0] dec_region_0 = dec_grid[selection_grid_0] vec_region_0 = hp.ang2vec(ra_region_0, dec_region_0, lonlat=True) dec_loop_num = int(90 * np.cos(np.deg2rad(ra_range_0[0])) // delta_dec) + dec_pad delta_dec_loop = 90 / max(dec_loop_num, 1) pix_id_for_patch_i = [] rot_mat_for_patch_i = [] for j in range(-dec_loop_num, dec_loop_num + 1): delta_dec_j = delta_dec_loop * j ra_loop_num_j = max( int( 360 * np.cos(np.deg2rad(np.abs(delta_dec_j) + delta_dec_loop / 2)) // delta_ra ), 1, ) if ra_loop_num_j == 1 and np.abs(j) != dec_loop_num: ra_loop_num_j = 2 for i in range(0, ra_loop_num_j): delta_ra_i = 360 / (ra_loop_num_j) * i rot_mat = np.linalg.inv(rotation_matrix_to_radec0(delta_ra_i, delta_dec_j)) vec_region_rot = np.dot(rot_mat, vec_region_0.T) pix_region_rot = hp.vec2pix( nside_out, vec_region_rot[0], vec_region_rot[1], vec_region_rot[2] ) pix_id_for_patch_i.append(pix_region_rot) rot_mat_for_patch_i.append( np.linalg.inv(rot_mat_0) @ np.linalg.inv(rot_mat) ) return pix_id_for_patch_i, rot_mat_for_patch_i
[docs] def shot_noise_correction_from_gridding( box_ndim, grid_scheme, ): """ Calculate the multiplicative correction from gridding to the shot noise. Support 'nnb', 'cic' and 'tsc'. The correction is taken from Jing (2005), astro-ph/0409240. Parameters ---------- box_ndim: array. The number of grids on each side. grid_scheme: str. The mass assignment scheme. Returns ------- shot_noise_correction: array. The multiplicative correction from gridding to the shot noise. """ p = allowed_window_scheme.index(grid_scheme) if p == 0: return np.ones([box_ndim[0], box_ndim[1], box_ndim[2] // 2 + 1]) sinpikiHover2 = [np.sin(np.fft.fftfreq(box_ndim[i]) * np.pi) for i in range(2)] sinpikiHover2.append(np.sin(np.fft.rfftfreq(box_ndim[2]) * np.pi)) if p == 1: ci = [1 - 2 / 3 * sinpikiHover2[i] ** 2 for i in range(3)] ci = ci[0][:, None, None] * ci[1][None, :, None] * ci[2][None, None, :] if p == 2: ci = [ 1 - sinpikiHover2[i] ** 2 + 2 / 15 * sinpikiHover2[i] ** 4 for i in range(3) ] ci = ci[0][:, None, None] * ci[1][None, :, None] * ci[2][None, None, :] return ci