import os
from os import path
from typing import Optional
import emcee
import numpy as np
from autoconf import conf
from autofit.database.sqlalchemy_ import sa
from autofit.mapper.model_mapper import ModelMapper
from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.non_linear.abstract_search import PriorPasser
from autofit.non_linear.initializer import Initializer
from autofit.non_linear.mcmc.abstract_mcmc import AbstractMCMC
from autofit.non_linear.mcmc.auto_correlations import AutoCorrelationsSettings
from autofit.non_linear.mcmc.emcee.samples import SamplesEmcee
from autofit.plot import EmceePlotter
from autofit.plot.output import Output
[docs]class Emcee(AbstractMCMC):
__identifier_fields__ = ("nwalkers",)
def __init__(
self,
name: Optional[str] = None,
path_prefix: Optional[str] = None,
unique_tag: Optional[str] = None,
prior_passer: Optional[PriorPasser] = None,
initializer: Optional[Initializer] = None,
auto_correlations_settings=AutoCorrelationsSettings(),
iterations_per_update: int = None,
number_of_cores: int = None,
session: Optional[sa.orm.Session] = None,
**kwargs
):
"""
An Emcee non-linear search.
For a full description of Emcee, checkout its Github and readthedocs webpages:
https://github.com/dfm/emcee
https://emcee.readthedocs.io/en/stable/
If you use `Emcee` as part of a published work, please cite the package following the instructions under the
*Attribution* section of the GitHub page.
Parameters
----------
name
The name of the search, controlling the last folder results are output.
path_prefix
The path of folders prefixing the name folder where results are output.
unique_tag
The name of a unique tag for this model-fit, which will be given a unique entry in the sqlite database
and also acts as the folder after the path prefix and before the search name.
prior_passer
Controls how priors are passed from the results of this `NonLinearSearch` to a subsequent non-linear search.
initializer
Generates the initialize samples of non-linear parameter space (see autofit.non_linear.initializer).
auto_correlations_settings
Customizes and performs auto correlation calculations performed during and after the search.
number_of_cores
The number of cores Emcee sampling is performed using a Python multiprocessing Pool instance. If 1, a
pool instance is not created and the job runs in serial.
session
An SQLalchemy session instance so the results of the model-fit are written to an SQLite database.
"""
number_of_cores = (
self._config("parallel", "number_of_cores")
if number_of_cores is None
else number_of_cores
)
super().__init__(
name=name,
path_prefix=path_prefix,
unique_tag=unique_tag,
prior_passer=prior_passer,
initializer=initializer,
auto_correlations_settings=auto_correlations_settings,
iterations_per_update=iterations_per_update,
number_of_cores=number_of_cores,
session=session,
**kwargs
)
self.logger.debug("Creating Emcee Search")
class Fitness(AbstractMCMC.Fitness):
def figure_of_merit_from(self, parameter_list):
"""
The figure of merit is the value that the `NonLinearSearch` uses to sample parameter space. `Emcee`
uses the log posterior.
"""
return self.log_posterior_from(parameter_list=parameter_list)
def _fit(self, model: AbstractPriorModel, analysis, log_likelihood_cap=None):
"""
Fit a model using Emcee and the Analysis class which contains the data and returns the log likelihood from
instances of the model, which the `NonLinearSearch` seeks to maximize.
Parameters
----------
model : ModelMapper
The model which generates instances for different points in parameter space.
analysis : Analysis
Contains the data and the log likelihood function which fits an instance of the model to the data, returning
the log likelihood the `NonLinearSearch` maximizes.
Returns
-------
A result object comprising the Samples object that inclues the maximum log likelihood instance and full
chains used by the fit.
"""
fitness_function = self.fitness_function_from_model_and_analysis(
model=model, analysis=analysis
)
pool = self.make_sneaky_pool(fitness_function)
emcee_sampler = emcee.EnsembleSampler(
nwalkers=self.config_dict_search["nwalkers"],
ndim=model.prior_count,
log_prob_fn=fitness_function.__call__,
backend=emcee.backends.HDFBackend(filename=self.backend_filename),
pool=pool,
)
try:
emcee_state = emcee_sampler.get_last_sample()
samples = self.samples_from(model=model)
total_iterations = emcee_sampler.iteration
if samples.converged:
iterations_remaining = 0
else:
iterations_remaining = self.config_dict_run["nsteps"] - total_iterations
self.logger.info(
"Existing Emcee samples found, resuming non-linear search."
)
except AttributeError:
(
unit_parameter_lists,
parameter_lists,
log_posterior_list,
) = self.initializer.samples_from_model(
total_points=emcee_sampler.nwalkers,
model=model,
fitness_function=fitness_function,
)
emcee_state = np.zeros(shape=(emcee_sampler.nwalkers, model.prior_count))
self.logger.info("No Emcee samples found, beginning new non-linear search.")
for index, parameters in enumerate(parameter_lists):
emcee_state[index, :] = np.asarray(parameters)
total_iterations = 0
iterations_remaining = self.config_dict_run["nsteps"]
while iterations_remaining > 0:
if self.iterations_per_update > iterations_remaining:
iterations = iterations_remaining
else:
iterations = self.iterations_per_update
for sample in emcee_sampler.sample(
initial_state=emcee_state,
iterations=iterations,
progress=True,
skip_initial_state_check=True,
store=True,
):
pass
emcee_state = emcee_sampler.get_last_sample()
total_iterations += iterations
iterations_remaining = self.config_dict_run["nsteps"] - total_iterations
samples = self.perform_update(
model=model, analysis=analysis, during_analysis=True
)
if self.auto_correlations_settings.check_for_convergence:
if emcee_sampler.iteration > self.auto_correlations_settings.check_size:
if samples.converged:
iterations_remaining = 0
self.logger.info("Emcee sampling complete.")
def config_dict_with_test_mode_settings_from(self, config_dict):
return {
**config_dict,
"nwalkers": 20,
"nsteps": 10,
}
def fitness_function_from_model_and_analysis(
self, model, analysis, log_likelihood_cap=None
):
return Emcee.Fitness(
paths=self.paths,
model=model,
analysis=analysis,
samples_from_model=self.samples_from,
log_likelihood_cap=log_likelihood_cap,
)
@property
def backend_filename(self):
return path.join(self.paths.samples_path, "emcee.hdf")
@property
def backend(self) -> emcee.backends.HDFBackend:
"""
The `Emcee` hdf5 backend, which provides access to all samples, likelihoods, etc. of the non-linear search.
The sampler is described in the "Results" section at https://dynesty.readthedocs.io/en/latest/quickstart.html
"""
if os.path.isfile(self.backend_filename):
return emcee.backends.HDFBackend(filename=self.backend_filename)
else:
raise FileNotFoundError(
"The file emcee.hdf does not exist at the path "
+ self.paths.samples_path
)
def samples_from(self, model):
return SamplesEmcee.from_results_internal(
model=model,
results_internal=self.backend,
auto_correlation_settings=self.auto_correlations_settings,
time=self.timer.time,
)
def plot_results(self, samples):
def should_plot(name):
return conf.instance["visualize"]["plots_search"]["emcee"][name]
plotter = EmceePlotter(
samples=samples,
output=Output(
path=path.join(self.paths.image_path, "search"), format="png"
),
)
if should_plot("corner"):
plotter.corner()
if should_plot("trajectories"):
plotter.trajectories()
if should_plot("likelihood_series"):
plotter.likelihood_series()
if should_plot("time_series"):
plotter.time_series()