Source code for meer21cm.telescope

import numpy as np
from .util import get_wcs_coor, get_ang_between_coord, freq_to_redshift, tagging
from astropy import units, constants
from scipy.signal import convolve
from astropy.cosmology import Planck18
import healpy as hp
from astropy.wcs import WCS
import meer21cm

default_data_dir = meer21cm.__file__.rsplit("/", 1)[0] + "/data/"

meerkat_L_band_nu_min = 856.0 * 1e6  # in Hz
meerkat_L_band_nu_max = 1712.0 * 1e6  # in Hz
meerkat_L_4k_delta_nu = 0.208984375 * 1e6  # in Hz

meerklass_L_deep_nu_min = 971 * 1e6
meerklass_L_deep_nu_max = 1023.8 * 1e6

meerklass_L_pilot_nu_min = 971 * 1e6
meerklass_L_pilot_nu_max = 1023.2 * 1e6

meerkat_UHF_band_nu_min = 544.0 * 1e6  # in Hz
meerkat_UHF_band_nu_max = 1088.0 * 1e6  # in Hz
meerkat_UHF_4k_delta_nu = 0.1328125 * 1e6  # in Hz

meerklass_UHF_deep_nu_min = 610.0 * 1e6
meerklass_UHF_deep_nu_max = 929.2 * 1e6

default_nu_min = {
    "meerkat_L": meerkat_L_band_nu_min,
    "meerkat_UHF": meerkat_UHF_band_nu_min,
    "meerklass_2021_L": meerklass_L_deep_nu_min,
    "meerklass_2019_L": meerklass_L_pilot_nu_min,
    "meerklass_UHF": meerklass_UHF_deep_nu_min,
}

default_nu_max = {
    "meerkat_L": meerkat_L_band_nu_max,
    "meerkat_UHF": meerkat_UHF_band_nu_max,
    "meerklass_2021_L": meerklass_L_deep_nu_max,
    "meerklass_2019_L": meerklass_L_pilot_nu_max,
    "meerklass_UHF": meerklass_UHF_deep_nu_max,
}

default_num_pix_x = {
    "meerkat_L": None,
    "meerkat_UHF": None,
    "meerklass_2021_L": 133,
    "meerklass_2019_L": None,
    "meerklass_UHF": None,
}

default_num_pix_y = {
    "meerkat_L": None,
    "meerkat_UHF": None,
    "meerklass_2021_L": 73,
    "meerklass_2019_L": None,
    "meerklass_UHF": None,
}

default_wproj = {
    "meerkat_L": None,
    "meerkat_UHF": None,
    "meerklass_2021_L": WCS(default_data_dir + "test_fits.fits").dropaxis(-1),
    "meerklass_2019_L": None,
    "meerklass_UHF": None,
}


[docs] def weighted_convolution( signal, kernel, weights, kernel_renorm=True, los_axis=-1, ): r""" Perform weighted convolution of signal. The weighted convolution of the signal is defined as .. math:: \tilde{s} = [(s \cdot w) * b]/[w * b], where :math:`s` is the signal, :math:`w` is the weight, :math:`b` is the convolution kernel and :math:`w` denotes convolution. The convolution also creates new weights for the output signal so that .. math:: \tilde{w} = [w * b]^2 / [w * b^2] Parameters ---------- signal: float. The input signal to be convolved kernel: float. The convolution kernel weights: float. The weights for the signal kernel_renorm: boolean, default True. Whether to renormalise the kernel so that the sum of the kernel is one. Should be set to ``True`` for temperature and ``False`` for flux density. los_axis: int, default -1. which axis is the los. Returns ------- conv_signal: float. The convolved signal. conv_weights: float. The convolved weights. """ if los_axis < 0: los_axis += 3 # make sure los is the last axis axes = [0, 1, 2] axes.remove(los_axis) axes = axes + [ los_axis, ] signal = np.transpose(signal * weights, axes=axes) kernel = np.transpose(kernel, axes=axes) weights = np.transpose(weights, axes=axes) if kernel_renorm: kernel /= kernel.sum(axis=(0, 1))[None, None, :] kernel_square = kernel**2 kernel_square /= kernel_square.sum(axis=(0, 1))[None, None, :] conv_signal = np.zeros_like(signal) conv_variance = np.zeros_like(signal) for ch_i in range(signal.shape[-1]): weight_renorm = convolve( weights[:, :, ch_i], kernel[:, :, ch_i], mode="same", ) weight_renorm[weight_renorm == 0] = np.inf conv_signal[:, :, ch_i] = ( convolve( signal[:, :, ch_i], kernel[:, :, ch_i], mode="same", ) / weight_renorm ) conv_variance[:, :, ch_i] = ( convolve( weights[:, :, ch_i], kernel_square[:, :, ch_i], mode="same", ) / weight_renorm**2 ) conv_variance[conv_variance == 0] = np.inf conv_weights = 1 / conv_variance conv_weights = np.transpose(conv_weights, axes=axes) conv_signal = np.transpose(conv_signal, axes=axes) return conv_signal, conv_weights
[docs] def get_beam_xy(wproj, xdim, ydim): """ Get the x and y angular coordinates of the given wcs. """ x_cen, y_cen = xdim // 2, ydim // 2 ra_cen, dec_cen = get_wcs_coor( wproj, x_cen, y_cen, ) xx, yy = np.meshgrid(np.arange(xdim), np.arange(ydim), indexing="ij") ra, dec = get_wcs_coor(wproj, xx, yy) vec = hp.ang2vec(ra, dec, lonlat=True) vec_cen = hp.ang2vec(ra_cen, dec_cen, lonlat=True) xx = np.arcsin(vec - vec_cen[None, None, :])[:, :, 0].T * 180 / np.pi yy = np.arcsin(vec - vec_cen[None, None, :])[:, :, 1].T * 180 / np.pi return xx, yy
[docs] @tagging("anisotropic") def kat_beam(nu, wproj, xdim, ydim, band="L"): r""" Returns a beam model from the ``katbeam`` model, which is a simplification of the model reported in Asad et al. [1]. The katbeam implementation here still needs validation. Use it with caution, especially if you want correct orientation of the beam. References ---------- .. [1] Asad et al., "Primary beam effects of radio astronomy antennas -- II. Modelling the MeerKAT L-band beam", https://arxiv.org/abs/1904.07155 """ from katbeam import JimBeam xx, yy = get_beam_xy( wproj, xdim, ydim, ) beam = JimBeam(f"MKAT-AA-{band}-JIM-2020") freqMHz = nu / 1e6 beam_image = np.zeros((xdim, ydim, len(nu))) for i, freq in enumerate(freqMHz): beam_image[:, :, i] = beam.I(xx, yy, freq) ** 2 return beam_image
[docs] @tagging("isotropic") def gaussian_beam(sigma): r""" Returns a Gaussian beam function .. math:: B(\theta) = {\rm exp}[-\frac{\theta^2}{2\sigma^2}] when the beam width :math:`\sigma` is specified. Parameters ---------- sigma: float. The width of the gaussian beam profile. Returns ------- beam_func: function. The beam function. """ return lambda x: np.exp(-(x**2) / 2 / sigma**2)
[docs] @tagging("isotropic") def cos_beam(sigma): r""" Returns a cosine-tapered beam function [1] .. math:: B(\theta) = \bigg[ \frac{\cos \big( 1.189 \pi \theta / \theta_b \big)} {1-4\big( 1.189 \theta / \theta_b \big)} \bigg]^2 for given input parameter :math:`\sigma`, the FWHM is set to :math:`\theta_b = 2\sqrt{2{\rm log}2 \sigma}`. Parameters ---------- sigma: float. The width of the beam profile. Returns ------- beam_func: function. The beam function. References ---------- .. [1] Mauch et al., "The 1.28 GHz MeerKAT DEEP2 Image", https://arxiv.org/abs/1912.06212 """ theta_b = 2 * np.sqrt(2 * np.log(2)) * sigma def beam_func(ang_dist): beam = ( np.cos(1.189 * ang_dist * np.pi / theta_b) / (1 - 4 * (1.189 * ang_dist / theta_b) ** 2) ) ** 2 return beam return beam_func
[docs] def isotropic_beam_profile( xdim, ydim, wproj, beam_func, ang_unit=units.deg, ): """ Generate an isotropic image of the beam for given ``wproj`` and ``beam_func``. The image can later be used to convolve or deconvolve with intensity maps. Parameters ---------- xdim: int. The number of pixels in the first axis. ydim: int. The number of pixels in the second axis. wproj: :class:`astropy.wcs.WCS` object. The two-dimensional wcs object for the map. beam_func: function. The beam function. ang_unit: str or :class:`astropy.units.Unit`. The unit of input values for ``beam_func``. Returns ------- beam_image: float array. The image of the beam. """ xx, yy = np.meshgrid(np.arange(xdim), np.arange(ydim), indexing="ij") ra, dec = get_wcs_coor(wproj, xx, yy) ra_cen, dec_cen = get_wcs_coor(wproj, xdim // 2, ydim // 2) ang_dist = ( (get_ang_between_coord(ra, dec, ra_cen, dec_cen) * units.deg).to(ang_unit).value ) beam_image = beam_func(ang_dist) return beam_image
[docs] def gaussian_beam_window(sigma_rad, lmax): r""" Closed-form Gaussian beam window function in spherical-harmonic space. .. math:: B_\ell = \exp\left(-\frac{\ell(\ell+1)\sigma^2}{2}\right), where :math:`\sigma` is the Gaussian beam dispersion in radians. Parameters ---------- sigma_rad : float Gaussian beam dispersion in radians. lmax : int Maximum multipole; the returned array has length ``lmax + 1``. Returns ------- b_ell : ndarray of shape ``(lmax + 1,)`` """ lmax_i = int(lmax) ell = np.arange(lmax_i + 1, dtype=np.float64) sigma = float(sigma_rad) return np.exp(-0.5 * ell * (ell + 1.0) * sigma**2)
[docs] def isotropic_beam_window(beam_func, sigma, lmax, theta_grid=None): r""" Numerical beam window function for an isotropic beam via ``healpy.sphtfunc.beam2bl``. The beam profile ``beam_func(theta)`` is sampled on a fine ``theta`` grid covering :math:`[0, \theta_{\rm max}]` with :math:`\theta_{\rm max} \approx 8\sigma` (truncated at :math:`\pi`), and the resulting :math:`B(\theta)` is converted to :math:`B_\ell` with ``hp.sphtfunc.beam2bl``. Parameters ---------- beam_func : callable Maps ``theta`` (radians or the ``ang_unit`` chosen by the caller's beam factory) to beam amplitude. Must accept vector inputs. sigma : float Beam dispersion in the same angular units as ``beam_func`` expects. Used to pick the default ``theta_grid`` when none is provided. lmax : int Maximum multipole. theta_grid : ndarray, optional Explicit ``theta`` sample points (same units as expected by ``beam_func``). Must start at 0 and be monotonically increasing. Returns ------- b_ell : ndarray of shape ``(lmax + 1,)`` """ lmax_i = int(lmax) if theta_grid is None: theta_max = min(8.0 * float(sigma), np.pi) theta_grid = np.linspace(0.0, theta_max, 1024) theta_grid = np.asarray(theta_grid, dtype=np.float64) if theta_grid[0] != 0: raise ValueError("theta_grid must start at 0.") profile = np.asarray(beam_func(theta_grid), dtype=np.float64) return hp.sphtfunc.beam2bl(profile, theta_grid, lmax_i)
[docs] def weighted_smoothing_healpix( data_pix, weights_pix, beam_window_ch, hp_nside, pixel_id, kernel_renorm=True, nside_out=None, pixel_id_out=None, ): r""" Weighted HEALPix smoothing, mirroring :func:`weighted_convolution` semantics. For each channel :math:`c` with beam window :math:`B^{(c)}_\ell`, this returns .. math:: \tilde s^{(c)} = \frac{\mathcal{S}[s^{(c)} w^{(c)} \, B^{(c)}_\ell]} {\mathcal{S}[w^{(c)} \, B^{(c)}_\ell]},\qquad \tilde w^{(c)} = \frac{\bigl(\mathcal{S}[w^{(c)} B^{(c)}_\ell]\bigr)^2} {\mathcal{S}[w^{(c)} (B^{(c)}_\ell)^2]}, where :math:`\mathcal{S}` denotes harmonic-space smoothing via ``hp.smoothing(..., beam_window=B_\ell)`` on a full-sphere scratch buffer (allocated per channel, not kept). Parameters ---------- data_pix : (n_pix, n_ch) ndarray weights_pix : (n_pix, n_ch) ndarray beam_window_ch : (n_ch, lmax+1) ndarray or (lmax+1,) ndarray Per-channel beam window. A 1D array is broadcast over all channels. hp_nside : int HEALPix :math:`N_{\rm side}` that ``pixel_id`` refers to. pixel_id : ndarray of int HEALPix indices at ``hp_nside``, matching rows of ``data_pix`` / ``weights_pix``, used to paint the high-resolution sphere before smoothing. pixel_id_out : ndarray of int, optional If ``nside_out`` is below ``hp_nside``, indices at ``nside_out`` where outputs are sampled (survey footprint). Required in that case. When no downgrade, defaults to ``pixel_id``. kernel_renorm : bool, default True Kept for API symmetry with :func:`weighted_convolution`. ``B_\ell`` is already normalised by ``hp.smoothing`` (``B_0 = 1`` for Gaussian windows), so this flag is informational only. nside_out : int, optional If set to a HEALPix :math:`N_{\\rm side}` strictly less than ``hp_nside``, harmonic smoothing is carried out at ``hp_nside`` on full spheres, maps are ``hp.ud_grade`` down to ``nside_out``, then values are sampled at ``pixel_id_out``. Returns ------- conv_data_pix : (n_pix_out, n_ch) ndarray conv_weights_pix : (n_pix_out, n_ch) ndarray """ del kernel_renorm nside_i = int(hp_nside) if nside_out is None: nside_o = nside_i else: nside_o = int(nside_out) if nside_o >= nside_i: raise ValueError("nside_out must be no greater than hp_nside.") if nside_i % nside_o != 0: raise ValueError( f"hp_nside={nside_i} must be divisible by nside_out={nside_o} " "(standard HEALPix downgrade)." ) npix_full = hp.nside2npix(nside_i) pid = np.asarray(pixel_id, dtype=np.int64).ravel() data_pix = np.asarray(data_pix) weights_pix = np.asarray(weights_pix) if data_pix.ndim != 2 or weights_pix.ndim != 2: raise ValueError( "data_pix and weights_pix must be 2D arrays of shape (n_pix, n_ch); " f"got {data_pix.shape} and {weights_pix.shape}." ) if data_pix.shape != weights_pix.shape: raise ValueError( f"data/weights shape mismatch: {data_pix.shape} vs {weights_pix.shape}." ) n_pix, n_ch = data_pix.shape if n_pix != pid.size: raise ValueError(f"pixel_id length {pid.size} does not match n_pix={n_pix}.") if nside_o == nside_i: pid_out = pid else: if pixel_id_out is None: raise ValueError( "pixel_id_out is required when nside_out is less than hp_nside." ) pid_out = np.asarray(pixel_id_out, dtype=np.int64).ravel() n_pix_out = int(pid_out.size) bwin = np.asarray(beam_window_ch, dtype=np.float64) if bwin.ndim == 1: bwin = np.broadcast_to(bwin, (n_ch, bwin.size)) elif bwin.ndim == 2: if bwin.shape[0] != n_ch: raise ValueError( f"beam_window_ch first axis must equal n_ch={n_ch}; got {bwin.shape[0]}." ) else: raise ValueError("beam_window_ch must be 1D or 2D.") real_dtype = np.result_type(data_pix.dtype, weights_pix.dtype, np.float64) conv_data = np.zeros((n_pix_out, n_ch), dtype=real_dtype) conv_weights = np.zeros((n_pix_out, n_ch), dtype=real_dtype) for ci in range(n_ch): s = np.asarray(data_pix[:, ci], dtype=np.float64) w = np.asarray(weights_pix[:, ci], dtype=np.float64) b_ell = np.asarray(bwin[ci], dtype=np.float64) b_ell_sq = b_ell**2 sw_full = np.zeros(npix_full, dtype=np.float64) w_full = np.zeros(npix_full, dtype=np.float64) np.add.at(sw_full, pid, s * w) np.add.at(w_full, pid, w) smoothed_sw = hp.smoothing( sw_full, beam_window=b_ell, iter=0, pol=False, use_weights=False, ) smoothed_w = hp.smoothing( w_full, beam_window=b_ell, iter=0, pol=False, use_weights=False, ) smoothed_w_sq = hp.smoothing( w_full, beam_window=b_ell_sq, iter=0, pol=False, use_weights=False, ) if nside_o != nside_i: smoothed_sw = hp.ud_grade(smoothed_sw, nside_o, order_in="RING") smoothed_w = hp.ud_grade(smoothed_w, nside_o, order_in="RING") smoothed_w_sq = hp.ud_grade(smoothed_w_sq, nside_o, order_in="RING") denom = smoothed_w[pid_out] num = smoothed_sw[pid_out] with np.errstate(divide="ignore", invalid="ignore"): cdata = np.where(np.abs(denom) > 0, num / denom, 0.0) denom_sq = smoothed_w_sq[pid_out] with np.errstate(divide="ignore", invalid="ignore"): cvar = np.where(np.abs(denom_sq) > 0, denom_sq / denom**2, np.inf) cvar = np.where(denom != 0, cvar, np.inf) cw = np.where(np.isfinite(cvar) & (cvar > 0), 1.0 / cvar, 0.0) conv_data[:, ci] = cdata conv_weights[:, ci] = cw return conv_data, conv_weights
[docs] def dish_beam_sigma(dish_diameter, nu, gamma=1.0, ang_unit=units.deg): r""" Calculate the beam size of a dish telescope assuming .. math:: \theta_{\rm FWHM} = \gamma \frac{\lambda}{D}, where :math:`\theta_{\rm FWHM}` is the FWHM of the beam, :math:`\gamma` is the aperture efficiency, :math:`\lambda` is the observing wavelength, and D is the dish diameter. The sigma of the Gaussian beam is then :math:`\sigma = \theta_{\rm FWHM}/ 2\sqrt{2 {\rm ln}2}`. Parameters ---------- dish_diameter: float. The diameter of the dish in metre. nu: float. The observing frequency in Hz. gamma: float, default 1.0. The aperture efficiency. ang_unit: str or :class:`astropy.units.Unit`, default ``deg``. The unit of the output. Returns ------- beam_sigma: float. The sigma of the beam. """ beam_fwhm = ( constants.c / (nu * units.Hz * dish_diameter * units.m) * units.rad ).to(ang_unit).value * gamma beam_sigma = beam_fwhm / (2 * np.sqrt(2 * np.log(2))) return beam_sigma
[docs] def cmb_temperature(nu, tcmb0=Planck18.Tcmb0.value): """ Calculate the background CMB temperature at given frequencies. Parameters ---------- nu: float. The observing frequency in Hz. tcmb0: float, default ``Planck18.Tcmb0.value``. The background CMB temperature at z=0. Returns ------- tcmb: float. The CMB temperature at given frequencies in Kelvin. """ redshift = freq_to_redshift(nu) return tcmb0 * (1 + redshift)
[docs] def receiver_temperature_meerkat(nu): """ The receiver temperature of MeerKAT. Parameters ---------- nu: float. The observing frequency in Hz. Returns ------- Trx: float. The receiver temperature at given frequencies in Kelvin. """ Trx = 7.5 + 10 * (nu / 1e9 - 0.75) ** 2 return Trx
[docs] def galaxy_temperature(nu, tgal_408MHz=25, sp_indx=-2.75): """ The temperature template of the Milky Way. Note that, for an accurate T_sky, you can instead use :class:`meer21cm.fg.ForegroundSimulation` to generate the foregrounds. Parameters ---------- nu: float. The observing frequency in Hz. tgal_408MHz: float. The average galaxy temperature at 408MHz in Kelvin. sp_indx: float. The spectral index to extrapolate it to input frequencies. Returns ------- Tgal: float. The galaxy temperature at given frequencies in Kelvin. """ Tgal = tgal_408MHz * (nu / 408 / 1e6) ** sp_indx return Tgal