import logging
from typing import Dict, Optional
import numpy as np
import os
from autofit.database.sqlalchemy_ import sa
from autofit.mapper.model_mapper import ModelMapper
from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.non_linear.fitness import Fitness
from autofit.non_linear.initializer import Initializer
from autofit.non_linear.search.mcmc.abstract_mcmc import AbstractMCMC
from autofit.non_linear.search.mcmc.auto_correlations import AutoCorrelationsSettings
from autofit.non_linear.search.mcmc.auto_correlations import AutoCorrelations
from autofit.non_linear.samples.sample import Sample
from autofit.non_linear.test_mode import is_test_mode
from autofit.non_linear.samples.mcmc import SamplesMCMC
logger = logging.getLogger(__name__)
[docs]
class Zeus(AbstractMCMC):
__identifier_fields__ = (
"nwalkers",
"tune",
"tolerance",
"patience",
"mu",
"light_mode",
)
def __init__(
self,
name: Optional[str] = None,
path_prefix: Optional[str] = None,
unique_tag: Optional[str] = None,
nwalkers: int = 50,
nsteps: int = 2000,
tune: bool = True,
tolerance: float = 0.05,
patience: int = 5,
mu: float = 1.0,
light_mode: bool = False,
maxsteps: int = 10000,
maxiter: int = 10000,
vectorize: bool = False,
shuffle_ensemble: bool = True,
check_walkers: bool = True,
maxcall: Optional[int] = None,
initializer: Optional[Initializer] = None,
auto_correlation_settings=AutoCorrelationsSettings(),
iterations_per_quick_update: int = None,
iterations_per_full_update: int = None,
number_of_cores: int = 1,
silence: bool = False,
session: Optional[sa.orm.Session] = None,
**kwargs
):
"""
A Zeus non-linear search.
For a full description of Zeus, checkout its Github and readthedocs webpages:
https://github.com/minaskar/zeus
https://zeus-mcmc.readthedocs.io/en/latest/
If you use `Zeus` as part of a published work, please cite the package following the instructions under the
*Attribution* section of the GitHub page.
Parameters
----------
name
The name of the search, controlling the last folder results are output.
path_prefix
The path of folders prefixing the name folder where results are output.
unique_tag
The name of a unique tag for this model-fit, which will be given a unique entry in the sqlite database
and also acts as the folder after the path prefix and before the search name.
nwalkers
The number of walkers in the ensemble used to sample parameter space.
nsteps
The number of steps that must be taken by every walker.
initializer
Generates the initialize samples of non-linear parameter space (see autofit.non_linear.initializer).
auto_correlation_settings
Customizes and performs auto correlation calculations performed during and after the search.
number_of_cores
The number of cores Zeus sampling is performed using a Python multiprocessing Pool instance.
silence
If True, the default print output of the non-linear search is silenced.
session
An SQLalchemy session instance so the results of the model-fit are written to an SQLite database.
"""
super().__init__(
name=name,
path_prefix=path_prefix,
unique_tag=unique_tag,
initializer=initializer,
auto_correlation_settings=auto_correlation_settings,
iterations_per_quick_update=iterations_per_quick_update,
iterations_per_full_update=iterations_per_full_update,
number_of_cores=number_of_cores,
silence=silence,
session=session,
**kwargs
)
self.nwalkers = nwalkers
self.nsteps = nsteps
self.tune = tune
self.tolerance = tolerance
self.patience = patience
self.mu = mu
self.light_mode = light_mode
self.maxsteps = maxsteps
self.maxiter = maxiter
self.vectorize = vectorize
self.shuffle_ensemble = shuffle_ensemble
self.check_walkers = check_walkers
self.maxcall = maxcall
if is_test_mode():
self.apply_test_mode()
self.logger.debug("Creating Zeus Search")
[docs]
def apply_test_mode(self):
logger.warning(
"TEST MODE 1 (reduced iterations): Sampler will run with "
"minimal iterations for faster completion."
)
self.nwalkers = 20
self.nsteps = 10
def _fit(self, model: AbstractPriorModel, analysis):
"""
Fit a model using Zeus and the Analysis class which contains the data and returns the log likelihood from
instances of the model, which the `NonLinearSearch` seeks to maximize.
Parameters
----------
model : ModelMapper
The model which generates instances for different points in parameter space.
analysis : Analysis
Contains the data and the log likelihood function which fits an instance of the model to the data, returning
the log likelihood the `NonLinearSearch` maximizes.
Returns
-------
A result object comprising the Samples object that inclues the maximum log likelihood instance and full
chains used by the fit.
"""
try:
import zeus
except ModuleNotFoundError:
raise ModuleNotFoundError(
"\n--------------------\n"
"You are attempting to perform a model-fit using Zeus. \n\n"
"However, the optional library Zeus (https://zeus-mcmc.readthedocs.io/en/latest/) is "
"not installed.\n\n"
"Install it via the command `pip install zeus-mcmc==2.5.4`.\n\n"
"----------------------"
)
pool = self.make_pool()
fitness = Fitness(
model=model,
analysis=analysis,
paths=self.paths,
fom_is_log_likelihood=False,
resample_figure_of_merit=-np.inf,
)
try:
search_internal = self.paths.load_search_internal()
state = search_internal.get_last_sample()
log_posterior_list = search_internal.get_last_log_prob()
samples = self.samples_from(model=model, search_internal=search_internal)
total_iterations = search_internal.iteration
if samples.converged:
iterations_remaining = 0
else:
iterations_remaining = self.nsteps - total_iterations
self.logger.info(
"Resuming Zeus non-linear search (previous samples found)."
)
except (FileNotFoundError, AttributeError):
search_internal = zeus.EnsembleSampler(
nwalkers=self.nwalkers,
ndim=model.prior_count,
logprob_fn=fitness.call_wrap,
pool=pool,
)
search_internal.ncall_total = 0
(
unit_parameter_lists,
parameter_lists,
log_posterior_list,
) = self.initializer.samples_from_model(
total_points=search_internal.nwalkers,
model=model,
fitness=fitness,
test_mode_samples=False,
paths=self.paths,
n_cores=self.number_of_cores,
)
self.plot_start_point(
parameter_vector=parameter_lists[0],
model=model,
analysis=analysis,
)
state = np.zeros(shape=(search_internal.nwalkers, model.prior_count))
self.logger.info(
"Starting new Zeus non-linear search (no previous samples found)."
)
for index, parameters in enumerate(parameter_lists):
state[index, :] = np.asarray(parameters)
total_iterations = 0
iterations_remaining = self.nsteps
while iterations_remaining > 0:
if self.iterations_per_full_update > iterations_remaining:
iterations = iterations_remaining
else:
iterations = self.iterations_per_full_update
for sample in search_internal.sample(
start=state,
log_prob0=log_posterior_list,
iterations=iterations,
progress=True,
):
pass
search_internal.ncall_total += search_internal.ncall
self.paths.save_search_internal(
obj=search_internal,
)
state = search_internal.get_last_sample()
log_posterior_list = search_internal.get_last_log_prob()
total_iterations += iterations
iterations_remaining = self.nsteps - total_iterations
samples = self.samples_from(model=model, search_internal=search_internal)
if self.auto_correlation_settings.check_for_convergence:
if (
search_internal.iteration
> self.auto_correlation_settings.check_size
):
if samples.converged:
iterations_remaining = 0
auto_correlation_time = zeus.AutoCorrTime(
samples=search_internal.get_chain()
)
discard = int(3.0 * np.max(auto_correlation_time))
thin = int(np.max(auto_correlation_time) / 2.0)
chain = search_internal.get_chain(discard=discard, thin=thin, flat=True)
if self.maxcall is not None:
if search_internal.ncall_total > self.maxcall:
iterations_remaining = 0
if iterations_remaining > 0:
self.perform_update(
model=model,
analysis=analysis,
search_internal=search_internal,
fitness=fitness,
during_analysis=True,
)
return search_internal, fitness
def samples_info_from(self, search_internal=None):
search_internal = search_internal or self.paths.load_search_internal()
auto_correlations = self.auto_correlations_from(search_internal=search_internal)
return {
"check_size": auto_correlations.check_size,
"required_length": auto_correlations.required_length,
"change_threshold": auto_correlations.change_threshold,
"total_walkers": len(search_internal.get_chain()[0, :, 0]),
"total_steps": int(search_internal.ncall_total),
"time": self.timer.time if self.timer else None,
}
[docs]
def samples_via_internal_from(self, model, search_internal=None):
"""
Returns a `Samples` object from the zeus internal results.
The samples contain all information on the parameter space sampling (e.g. the parameters,
log likelihoods, etc.).
The internal search results are converted from the native format used by the search to lists of values
(e.g. `parameter_lists`, `log_likelihood_list`).
Parameters
----------
model
Maps input vectors of unit parameter values to physical values and model instances via priors.
"""
search_internal = search_internal or self.paths.load_search_internal()
if is_test_mode():
samples_after_burn_in = search_internal.get_chain(
discard=5, thin=5, flat=True
)
else:
auto_correlations = self.auto_correlations_from(
search_internal=search_internal
)
discard = int(3.0 * np.max(auto_correlations.times))
thin = int(np.max(auto_correlations.times) / 2.0)
samples_after_burn_in = search_internal.get_chain(
discard=discard, thin=thin, flat=True
)
if len(samples_after_burn_in) == 0:
logging.info(
"""
After thinnng the Zeus samples in order to remove burn-in, no samples were left.
To create a samples object containing samples, so that the code can continue and results
can be inspected, the full list of samples before removing burn-in has been used. This may
indicate that the sampler has not converged and therefore your results may not be reliable.
To fix this, run Zeus with more steps to ensure convergence is achieved or change the auto
correlation settings to be less aggressive in thinning samples.
"""
)
samples_after_burn_in = search_internal.get_chain(flat=True)
parameter_lists = samples_after_burn_in.tolist()
log_posterior_list = search_internal.get_log_prob(flat=True).tolist()
log_prior_list = model.log_prior_list_from(parameter_lists=parameter_lists)
log_likelihood_list = [
log_posterior - log_prior
for log_posterior, log_prior in zip(log_posterior_list, log_prior_list)
]
weight_list = len(log_likelihood_list) * [1.0]
sample_list = Sample.from_lists(
model=model,
parameter_lists=parameter_lists,
log_likelihood_list=log_likelihood_list,
log_prior_list=log_prior_list,
weight_list=weight_list,
)
return SamplesMCMC(
model=model,
sample_list=sample_list,
samples_info=self.samples_info_from(search_internal=search_internal),
auto_correlation_settings=self.auto_correlation_settings,
auto_correlations=self.auto_correlations_from(
search_internal=search_internal
),
)
def auto_correlations_from(self, search_internal=None):
import zeus
search_internal = search_internal or self.paths.load_search_internal()
times = zeus.AutoCorrTime(samples=search_internal.get_chain())
try:
previous_auto_correlation_times = zeus.AutoCorrTime(
samples=search_internal.get_chain()[
: -self.auto_correlation_settings.check_size, :, :
],
)
except IndexError:
self.logger.debug("Unable to compute previous auto correlation times.")
previous_auto_correlation_times = None
return AutoCorrelations(
check_size=self.auto_correlation_settings.check_size,
required_length=self.auto_correlation_settings.required_length,
change_threshold=self.auto_correlation_settings.change_threshold,
times=times,
previous_times=previous_auto_correlation_times,
)