"""
This module contains the sampler classes for performing model fitting.
Similar to :mod:`meer21cm.transfer`, sampler class takes in the attributes of a :class:`meer21cm.power.PowerSpectrum` instance,
and perform the model fitting that includes the observational effects, window functions etc defined in the PS instance.
The difference is that, the sampler class takes the dictionary of attributes instead of the PS itself,
so you should always use :func:`meer21cm.inference.extract_model_fitting_inputs` to extract the attributes from the PS instance,
and use the dictionary to initialize the sampler class. For example:
.. code-block:: python
>>> from meer21cm.inference import extract_model_fitting_inputs, SamplerEmcee
>>> from meer21cm.power import PowerSpectrum
>>> # the ps instance is usually something you have already defined and used to read data
>>> # or it can be a MockSimulation instance you have used to generate the mock
>>> ps = PowerSpectrum(band="L", survey="meerklass_2021", sigma_v_1=100, tracer_bias_1=1.5, tracer_bias_2=2.0)
>>> ps_dict = extract_model_fitting_inputs(ps)
>>> sampler = SamplerEmcee(ps_dict, ...)
similarily, you can also use :class:`meer21cm.inference.SamplerNautilus` to perform model fitting.
Caution:
1. ``data_vector``, ``data_covariance`` and ``observables`` must be in the same order. For example, if ``observables`` is ["cross", "hi-auto"], then the first half of ``data_vector`` should be the cross-power and second half should be the hi-auto power. Similarly, the [:half, :half] of ``data_covariance`` should be the cross-power covariance and the [half:, half:] should be the hi-auto power covariance (the rest of the matrix should be the cross-correlation between cross and auto powers).
2. ``params_name`` must be in the same order as the ``params_prior``.
"""
import numpy as np
from meer21cm.power import PowerSpectrum
from meer21cm.transfer import required_attrs
import emcee
import os
import nautilus
from multiprocessing import Pool
from scipy.stats import norm
[docs]
class SamplerBase:
"""
Base class for all samplers.
Parameters
----------
ps_dict : dict
A dictionary of model fitting inputs.
data_vector : np.ndarray
The data vector.
data_covariance : np.ndarray
The data covariance matrix.
params_name : list[str]
The names of the parameters.
Must match the attribute names of the :class:`meer21cm.power.PowerSpectrum` instance.
params_prior : list[tuple[str, float, float]]
The prior distribution of the parameters.
The first element of the tuple is the distribution type, which can be "uniform" or "gaussian".
The second and third elements are the parameters of the distribution.
For uniform distribution, the second and third elements are the lower and upper bounds of the distribution.
For gaussian distribution, the second and third elements are the mean and standard deviation of the distribution.
observables : list[str], default ["cross"]
The observables to fit.
Must be a list of "cross", "hi-auto", or "gg-auto".
save : bool, default False
Whether to save the fitting results while running the sampler.
save_filename : str | None, default None
The filename to save the fitting results when ``save`` is True.
If not provided, the results will not be saved.
save_model_blobs : bool, default False
Whether to save the model blobs.
The model blobs are the model vectors listed in the ``observables`` argument.
do_hartlap_correction : bool, default False
Whether to apply the Hartlap correction to the data covariance matrix.
num_mocks : int, default None
The number of mock data vectors to use for the Hartlap correction.
If not provided, the Hartlap correction will not be applied.
do_percival_correction : bool, default False
Whether to apply the Percival correction to the data covariance matrix.
"""
def __init__(
self,
ps_dict: dict,
data_vector: np.ndarray,
data_covariance: np.ndarray,
params_name: list[str],
params_prior: list[tuple[str, float, float]],
observables: list[str] = ["cross"],
save: bool = False,
save_filename: str | None = None,
save_model_blobs: bool = False,
do_hartlap_correction: bool = False,
num_mocks: int | None = None,
do_percival_correction: bool = False,
):
self.ps_dict = ps_dict
self.params_name = params_name
self.params_prior = params_prior
self.save = save
self.save_filename = save_filename
self.observables = observables
self.data_vector = data_vector
self.data_covariance = data_covariance
self._inverse_covariance = None
self.save_model_blobs = save_model_blobs
self.do_hartlap_correction = do_hartlap_correction
self.num_mocks = num_mocks
self.do_percival_correction = do_percival_correction
self.validate_input()
@property
def do_hartlap_correction(self) -> bool:
"""
Whether to apply the Hartlap correction to the data covariance matrix.
"""
return self._do_hartlap_correction
@do_hartlap_correction.setter
def do_hartlap_correction(self, value: bool):
self._do_hartlap_correction = value
self._inverse_covariance = None
@property
def do_percival_correction(self) -> bool:
"""
Whether to apply the Percival correction to the data covariance matrix.
"""
return self._do_percival_correction
@do_percival_correction.setter
def do_percival_correction(self, value: bool):
self._do_percival_correction = value
self._inverse_covariance = None
@property
def num_mocks(self) -> int | None:
"""
The number of mock data vectors to use for the Hartlap and Percival corrections.
"""
return self._num_mocks
@num_mocks.setter
def num_mocks(self, value: int | None):
self._num_mocks = value
self._inverse_covariance = None
@property
def hartlap_factor(self) -> float:
r"""
The Hartlap factor.
The Hartlap correction is given by [1]_ :
.. math::
\hat{\Sigma^{-1}} = \frac{N_m - N_d - 2}{N_m - 1} \Sigma^{-1}
where :math:`N_m` is the number of mock realisations for calculating the covariance, :math:`N_d` is the number of data points,
and :math:`\Sigma` is the mock-based covariance matrix.
References
----------
.. [1] Hartlap, J., et al. 2007, A&A, astro-ph/0608064
"""
if self.do_hartlap_correction and self.num_mocks is not None:
return (self.num_mocks - self.data_vector.size - 2) / (self.num_mocks - 1)
return 1.0
@property
def percival_factor(self) -> float:
r"""
The Percival factor.
The Percival correction is given by [1]_ :
.. math::
\hat{\Sigma^{-1}} = \frac{1 + B(N_d - N_p)}{1 + A + B(N_p + 1)} \Sigma^{-1}
.. math::
A = \frac{2}{(N_m - N_d - 1) (N_m - N_d - 4)}
.. math::
B = \frac{N_m - N_d - 2}{(N_m - N_d - 1)(N_m - N_d - 4)}
where :math:`N_d` is the number of data points,
:math:`N_p` is the number of parameters,
:math:`N_m` is the number of mock realisations for calculating the covariance,
and :math:`\Sigma` is the mock-based covariance matrix.
References
----------
.. [1] Percival, W. J. et al. 2014, MNRAS, 1312.4841
"""
if self.do_percival_correction and self.num_mocks is not None:
N_d = self.data_vector.size
N_p = len(self.params_name)
N_m = self.num_mocks
A = 2 / ((N_m - N_d - 1) * (N_m - N_d - 4))
B = (N_m - N_d - 2) / ((N_m - N_d - 1) * (N_m - N_d - 4))
return (1 + B * (N_d - N_p)) / (1 + A + B * (N_p + 1))
return 1.0
@property
def inverse_covariance(self) -> np.ndarray:
"""
The matrix inverse of the data covariance.
"""
if self._inverse_covariance is None:
self._inverse_covariance = np.linalg.inv(self.data_covariance)
self._inverse_covariance = (
self.hartlap_factor * self.percival_factor * self._inverse_covariance
)
return self._inverse_covariance
@property
def data_covariance(self) -> np.ndarray:
"""
The data covariance matrix.
"""
return self._data_covariance
@data_covariance.setter
def data_covariance(self, data_covariance: np.ndarray):
self._data_covariance = data_covariance
self._inverse_covariance = None
[docs]
def get_model_instance(self, params: list[float] | np.ndarray) -> PowerSpectrum:
"""
Get the model instance from the input ``ps_dict`` and the updated parameters ``params``.
Parameters
----------
params : list[float] | np.ndarray
The updated values of the parameters defined in the ``params_name`` argument.
Returns
-------
model_instance : :class:`meer21cm.power.PowerSpectrum`
The model PS instance to calculate the model vector.
"""
model_instance = PowerSpectrum(**self.ps_dict)
model_instance._box_len = self.ps_dict["box_len"]
model_instance._box_ndim = self.ps_dict["box_ndim"]
model_instance.propagate_field_k_to_model()
for param_name, param_value in zip(self.params_name, params):
setattr(model_instance, param_name, param_value)
return model_instance
[docs]
def get_model_vector(self, params: list[float] | np.ndarray) -> np.ndarray:
"""
Calculate the model vector from the input parameter values.
Parameters
----------
params : list[float] | np.ndarray
The updated values of the parameters defined in the ``params_name`` argument.
Returns
-------
model_vector : np.ndarray
The model vector.
The shape is (len(observables), len(k1dbins) - 1).
"""
model_instance = self.get_model_instance(params)
model_vector = np.zeros(
(len(self.observables), len(self.ps_dict["k1dbins"]) - 1)
)
i = 0
for obs in self.observables:
if obs == "cross":
model_vector[i], _, _ = model_instance.get_1d_power(
model_instance.cross_power_tracer_model,
k1dweights=model_instance.k1dweights,
)
elif obs == "hi-auto":
model_vector[i], _, _ = model_instance.get_1d_power(
model_instance.auto_power_tracer_1_model,
k1dweights=model_instance.k1dweights,
)
elif obs == "gg-auto":
model_vector[i], _, _ = model_instance.get_1d_power(
model_instance.auto_power_tracer_2_model,
k1dweights=model_instance.k1dweights,
)
i += 1
return model_vector
[docs]
def compute_log_likelihood(
self, params: list[float] | np.ndarray
) -> float | tuple[float, np.ndarray]:
"""
Calculate the log likelihood from the input parameter values.
Parameters
----------
params : list[float] | np.ndarray
The updated values of the parameters defined in the ``params_name`` argument.
Returns
-------
log_likelihood : float
The log likelihood.
"""
model_vector = self.get_model_vector(params)
model_minus_data = model_vector.ravel() - self.data_vector.ravel()
log_likelihood = (
-0.5 * model_minus_data @ self.inverse_covariance @ model_minus_data
)
if self.save_model_blobs:
return log_likelihood, model_vector
return log_likelihood
[docs]
class SamplerEmcee(SamplerBase):
"""
A sampler class to perform model fitting using the ``emcee`` sampler.
Parameters
----------
ps_dict : dict
A dictionary of model fitting inputs.
data_vector : np.ndarray
The data vector.
data_covariance : np.ndarray
The data covariance matrix.
params_name : list[str]
The names of the parameters.
Must match the attribute names of the :class:`meer21cm.power.PowerSpectrum` instance.
params_prior : list[tuple[str, float, float]]
The prior distribution of the parameters.
The first element of the tuple is the distribution type, which can be "uniform" or "gaussian".
The second and third elements are the parameters of the distribution.
For uniform distribution, the second and third elements are the lower and upper bounds of the distribution.
For gaussian distribution, the second and third elements are the mean and standard deviation of the distribution.
nwalkers : int
The number of random walkers.
nsteps : int
The number of sampling steps.
nthreads : int
The number of parallel threads.
observables : list[str], default ["cross"]
The observables to fit.
Must be a list of "cross", "hi-auto", or "gg-auto".
mp_backend : str, default "multiprocessing"
The backend to use for the multiprocessing.
Can be "multiprocessing" or "mpi".
save : bool, default False
Whether to save the fitting results while running the sampler.
save_filename : str | None, default None
The filename to save the fitting results when ``save`` is True.
If not provided, the results will not be saved.
save_model_blobs : bool, default False
Whether to save the model blobs.
The model blobs are the model vectors listed in the ``observables`` argument.
do_hartlap_correction : bool, default False
Whether to apply the Hartlap correction to the data covariance matrix.
num_mocks : int, default None
The number of mock data vectors to use for the Hartlap correction.
If not provided, the Hartlap correction will not be applied.
do_percival_correction : bool, default False
Whether to apply the Percival correction to the data covariance matrix.
"""
def __init__(
self,
ps_dict: dict,
data_vector: np.ndarray,
data_covariance: np.ndarray,
params_name: list[str],
params_prior: list[tuple[str, float, float]],
nwalkers: int,
nsteps: int,
nthreads: int,
observables: list[str] = ["cross"],
mp_backend: str = "multiprocessing",
save: bool = False,
save_filename: str | None = None,
save_model_blobs: bool = False,
do_hartlap_correction: bool = False,
num_mocks: int | None = None,
do_percival_correction: bool = False,
):
self.nwalkers = nwalkers
self.nsteps = nsteps
self.nthreads = nthreads
self.mp_backend = mp_backend
super().__init__(
ps_dict,
data_vector,
data_covariance,
params_name,
params_prior,
observables,
save,
save_filename,
save_model_blobs,
do_hartlap_correction,
num_mocks,
do_percival_correction,
)
[docs]
def log_prior_gaussian(self, value, mean, sigma):
"""
Calculate the log prior for a gaussian distribution.
Parameters
----------
value : float
The value to calculate the log prior for.
mean : float
The mean of the distribution.
sigma : float
The standard deviation of the distribution.
Returns
-------
log_prior : float
The log prior.
"""
return -0.5 * (value - mean) ** 2 / sigma**2
[docs]
def log_prior(self, params_values):
"""
Calculate the log prior from the input parameter values.
Parameters
----------
params_values : list[float] | np.ndarray
The updated values of the parameters defined in the ``params_name`` argument.
Returns
-------
log_prior : float
The log prior.
"""
log_prior = 0.0
for i, param_name in enumerate(self.params_name):
prior_func = getattr(self, f"log_prior_{self.params_prior[i][0]}")
log_prior_i = prior_func(
params_values[i],
self.params_prior[i][1],
self.params_prior[i][2],
)
log_prior += log_prior_i
return log_prior
[docs]
def log_likelihood(self, params_values):
"""
Calculate the log likelihood plus the log prior from the input parameter values.
Parameters
----------
params_values : list[float] | np.ndarray
The updated values of the parameters defined in the ``params_name`` argument.
Returns
-------
log_likelihood : float
The log likelihood plus the log prior.
model_vector : np.ndarray, optional
The model vector.
Only returned when ``save_model_blobs`` is True.
"""
lp = self.log_prior(params_values)
if not np.isfinite(lp):
if self.save_model_blobs:
return lp, np.zeros(
(len(self.observables), (len(self.ps_dict["k1dbins"]) - 1))
)
return lp
else:
if self.save_model_blobs:
ll, model_vector = self.compute_log_likelihood(params_values)
return lp + ll, model_vector
return lp + self.compute_log_likelihood(params_values)
@property
def ndim(self):
"""
The number of parameters to fit.
"""
return len(self.params_name)
[docs]
def run(
self,
resume: bool,
progress: bool = True,
start_coord: np.ndarray | None = None,
run_sampler: bool = True,
):
"""
Run the sampler.
Parameters
----------
resume : bool
Whether to resume the sampler from the last saved state.
If ``resume`` is False and the save file already exists,
**the sampler will reset the save file**.
progress : bool, default True
Whether to show the progress bar.
start_coord : np.ndarray | None, default None
The initial coordinates of the walkers.
If not provided, the sampler will randomly initialize the walkers.
run_sampler : bool, default True
Whether to run the sampler.
If False, the sampler will only initialize the walkers and return the sampler object.
Returns
-------
sampler : emcee.EnsembleSampler
The sampler object.
"""
if resume and self.save:
start_coord = None
else:
# TODO: need smarter auto init position
if start_coord is None:
init_pos = np.array(
[self.ps_dict[param_name] for param_name in self.params_name]
)
# TODO: need smarter auto init position
start_coord = (
1 + np.random.uniform(-1e-2, 1e-2, size=(self.nwalkers, self.ndim))
) * init_pos[None, :] + np.random.uniform(
0, 1e-2, size=(self.nwalkers, self.ndim)
) * (
np.abs(init_pos[None, :]) < 1e-2
)
nsteps = self.nsteps
if self.save:
backend = emcee.backends.HDFBackend(self.save_filename)
if os.path.isfile(self.save_filename):
if not resume and backend.iteration > 0:
backend.reset(self.nwalkers, self.ndim)
nsteps = nsteps - backend.iteration
else:
backend = None
pool = None
if self.mp_backend == "multiprocessing":
pool_func = Pool
elif self.mp_backend == "mpi":
from mpi4py.futures import MPIPoolExecutor
pool_func = MPIPoolExecutor
with pool_func(self.nthreads) as pool:
sampler = emcee.EnsembleSampler(
self.nwalkers,
self.ndim,
self.log_likelihood,
backend=backend,
pool=pool,
)
if run_sampler:
sampler.run_mcmc(start_coord, nsteps, progress=progress)
return sampler
[docs]
def get_backend(self) -> emcee.backends.HDFBackend:
"""
Get the backend of the sampler.
"""
if self.save:
return emcee.backends.HDFBackend(self.save_filename)
else:
raise ValueError("No save filename provided")
[docs]
def get_points(self, sampler: emcee.EnsembleSampler | None = None) -> np.ndarray:
"""
Get the chain from the sampler.
Parameters
----------
sampler : emcee.EnsembleSampler | None, default None
The sampler object.
If not provided, the sampler will use the backend to get the chain
(only works if ``save`` is True).
Returns
-------
chain : np.ndarray
The chain.
The shape is (nwalkers, nsteps, ndim).
"""
if sampler is None:
return self.get_backend().get_chain()
return sampler.get_chain()
[docs]
def get_blobs(self, sampler: emcee.EnsembleSampler | None = None) -> np.ndarray:
"""
Get the blobs from the sampler.
Parameters
----------
sampler : emcee.EnsembleSampler | None, default None
The sampler object.
If not provided, the sampler will use the backend to get the blobs
(only works if ``save`` is True).
Returns
-------
blobs : np.ndarray
The model vectorblobs.
"""
if sampler is None:
return self.get_backend().get_blobs()
return sampler.get_blobs()
[docs]
def get_log_prob(self, sampler: emcee.EnsembleSampler | None = None) -> np.ndarray:
"""
Get the log probability from the sampler.
Parameters
----------
sampler : emcee.EnsembleSampler | None, default None
The sampler object.
If not provided, the sampler will use the backend to get the log probability
(only works if ``save`` is True).
Returns
-------
log_prob : np.ndarray
The log probability.
The shape is (nwalkers, nsteps).
"""
if sampler is None:
return self.get_backend().get_log_prob()
return sampler.get_log_prob()
[docs]
class SamplerNautilus(SamplerBase):
"""
A sampler class to perform model fitting using the ``nautilus`` sampler.
Parameters
----------
ps_dict : dict
A dictionary of model fitting inputs.
data_vector : np.ndarray
The data vector.
data_covariance : np.ndarray
The data covariance matrix.
params_name : list[str]
The names of the parameters.
Must match the attribute names of the :class:`meer21cm.power.PowerSpectrum` instance.
params_prior : list[tuple[str, float, float]]
The prior distribution of the parameters.
The first element of the tuple is the distribution type, which can be "uniform" or "gaussian".
The second and third elements are the parameters of the distribution.
For uniform distribution, the second and third elements are the lower and upper bounds of the distribution.
For gaussian distribution, the second and third elements are the mean and standard deviation of the distribution.
observables : list[str], default ["cross"]
The observables to fit.
Must be a list of "cross", "hi-auto", or "gg-auto".
n_live_points : int, default 2000
The number of live points.
f_live : float, default 0.01
The fraction of live points to keep.
n_shell : int, default 1
The number of shells to use.
n_eff : int, default 10000
The effective number of samples.
save : bool, default False
Whether to save the fitting results while running the sampler.
save_filename : str | None, default None
The filename to save the fitting results when ``save`` is True.
If not provided, the results will not be saved.
mp_backend : str, default "multiprocessing"
The backend to use for the multiprocessing.
Can be "multiprocessing" or "mpi".
nthreads : int, default 1
The number of parallel threads.
timeout : float, default np.inf
The timeout for the sampler.
If the sampler runs longer than the timeout, it will be terminated.
save_model_blobs : bool, default False
Whether to save the model blobs.
The model blobs are the model vectors listed in the ``observables`` argument.
hartlap_correction : bool, default False
Whether to apply the Hartlap correction to the data covariance matrix.
num_mocks : int, default None
The number of mock data vectors to use for the Hartlap correction.
If not provided, the Hartlap correction will not be applied.
percival_correction : bool, default False
Whether to apply the Percival correction to the data covariance matrix.
"""
def __init__(
self,
ps_dict: dict,
data_vector: np.ndarray,
data_covariance: np.ndarray,
params_name: list[str],
params_prior: list[tuple[str, float, float]],
observables: list[str] = ["cross"],
n_live_points: int = 2000,
f_live: float = 0.01,
n_shell: int = 1,
n_eff: int = 10000,
save: bool = False,
save_filename: str | None = None,
mp_backend: str = "multiprocessing",
nthreads: int = 1,
timeout: float = np.inf,
save_model_blobs: bool = False,
do_hartlap_correction: bool = False,
num_mocks: int | None = None,
do_percival_correction: bool = False,
):
self.n_live_points = n_live_points
self.f_live = f_live
self.n_shell = n_shell
self.n_eff = n_eff
self.mp_backend = mp_backend
self.nthreads = nthreads
self.timeout = timeout
super().__init__(
ps_dict,
data_vector,
data_covariance,
params_name,
params_prior,
observables,
save,
save_filename,
save_model_blobs,
do_hartlap_correction,
num_mocks,
do_percival_correction,
)
[docs]
def get_nautilus_prior(self):
"""
Generate the ``nautilus.Prior`` objects from the input parameters.
"""
prior = nautilus.Prior()
for i, param_name in enumerate(self.params_name):
if self.params_prior[i][0] == "uniform":
dist = (self.params_prior[i][1], self.params_prior[i][2])
elif self.params_prior[i][0] == "gaussian":
dist = norm(loc=self.params_prior[i][1], scale=self.params_prior[i][2])
prior.add_parameter(param_name, dist=dist)
return prior
[docs]
def run(self, resume: bool, progress: bool = True, run_sampler: bool = True):
"""
Run the sampler.
Parameters
----------
resume : bool
Whether to resume the sampler from the last saved state.
progress : bool, default True
Whether to show the progress bar.
run_sampler : bool, default True
Whether to run the sampler.
If False, the sampler will only initialize the walkers and return the sampler object.
Returns
-------
sampler : nautilus.Sampler
The sampler object.
"""
if not self.save:
resume = False
if self.mp_backend == "multiprocessing":
pool_func = Pool
elif self.mp_backend == "mpi":
from mpi4py.futures import MPIPoolExecutor
pool_func = MPIPoolExecutor
with pool_func(self.nthreads) as pool:
sampler = nautilus.Sampler(
self.get_nautilus_prior(),
self.compute_log_likelihood,
pass_dict=False,
pool=pool,
n_live=self.n_live_points,
resume=resume,
filepath=self.save_filename,
)
# nautilus does not timeout properly, so no test coverage for this
if run_sampler: # pragma: no cover
sampler.run(
f_live=self.f_live,
n_shell=self.n_shell,
n_eff=self.n_eff,
discard_exploration=True,
verbose=progress,
timeout=self.timeout,
)
return sampler
[docs]
def get_posterior(self, sampler: nautilus.Sampler | None = None):
"""
Get the posterior from the sampler.
Parameters
----------
sampler : nautilus.Sampler | None, default None
The sampler object.
If not provided, the sampler will use the backend to get the posterior
(only works if ``save`` is True).
Returns
-------
posterior : tuple
The posterior.
The first element is the points.
The second element is the log weights.
The third element is the log likelihoods.
If ``save_model_blobs`` is True, the fourth element is the model blobs.
"""
if sampler is None:
sampler = nautilus.Sampler(
self.get_nautilus_prior(),
self.compute_log_likelihood,
pass_dict=False,
filepath=self.save_filename,
)
return sampler.posterior(return_blobs=self.save_model_blobs)