import configparser
import logging
import os
import random
from abc import ABC, abstractmethod
from typing import Dict, Tuple, List, Optional
import numpy as np
from autofit import exc
from autofit.non_linear.paths.abstract import AbstractPaths
from autofit.mapper.prior.abstract import Prior
from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.non_linear.parallel import SneakyPool
logger = logging.getLogger(__name__)
class AbstractInitializer(ABC):
"""
Family of classes used to provide initial points for non-linear search
"""
@abstractmethod
def _generate_unit_parameter_list(self, model):
pass
@staticmethod
def figure_of_metric(args) -> Optional[float]:
fitness, parameter_list = args
try:
figure_of_merit = fitness(parameters=parameter_list)
if np.isnan(figure_of_merit) or figure_of_merit < -1e98:
return None
return figure_of_merit
except exc.FitException:
return None
def samples_from_model(
self,
total_points: int,
model: AbstractPriorModel,
fitness,
paths: AbstractPaths,
use_prior_medians: bool = False,
test_mode_samples: bool = True,
n_cores: int = 1,
):
"""
Generate the initial points of the non-linear search, by randomly drawing unit values from a uniform
distribution between the ball_lower_limit and ball_upper_limit values.
Parameters
----------
total_points
The number of points in non-linear paramemter space which initial points are created for.
model
An object that represents possible instances of some model with a given dimensionality which is the number
of free dimensions of the model.
"""
if os.environ.get("PYAUTOFIT_TEST_MODE") == "1" and test_mode_samples:
return self.samples_in_test_mode(total_points=total_points, model=model)
logger.info(
f"Generating initial samples of model, which are subject to prior limits and other constraints. "
f"Using {n_cores} cores."
)
unit_parameter_lists = []
parameter_lists = []
figures_of_merit_list = []
sneaky_pool = SneakyPool(n_cores, fitness, paths)
while len(figures_of_merit_list) < total_points:
remaining_points = total_points - len(figures_of_merit_list)
batch_size = min(remaining_points, n_cores)
parameter_lists_ = []
unit_parameter_lists_ = []
for _ in range(batch_size):
if not use_prior_medians:
unit_parameter_list = self._generate_unit_parameter_list(model)
else:
unit_parameter_list = [0.5] * model.prior_count
parameter_list = model.vector_from_unit_vector(
unit_vector=unit_parameter_list
)
parameter_lists_.append(parameter_list)
unit_parameter_lists_.append(unit_parameter_list)
for figure_of_merit, unit_parameter_list, parameter_list in zip(
sneaky_pool.map(
self.figure_of_metric,
[(fitness, parameter_list) for parameter_list in parameter_lists_],
),
unit_parameter_lists_,
parameter_lists_,
):
if figure_of_merit is not None:
unit_parameter_lists.append(unit_parameter_list)
parameter_lists.append(parameter_list)
figures_of_merit_list.append(figure_of_merit)
if total_points > 1 and np.allclose(
a=figures_of_merit_list[0], b=figures_of_merit_list[1:]
):
raise exc.InitializerException(
"""
The initial samples all have the same figure of merit (e.g. log likelihood values).
The non-linear search will therefore not progress correctly.
Possible causes for this behaviour are:
- The `log_likelihood_function` of the analysis class is defined incorrectly.
- The model parameterization creates numerically inaccurate log likelihoods.
- The`log_likelihood_function` is always returning `nan` values.
"""
)
return unit_parameter_lists, parameter_lists, figures_of_merit_list
def samples_in_test_mode(self, total_points: int, model: AbstractPriorModel):
"""
Generate the initial points of the non-linear search in test mode. Like normal, test model draws points, by
randomly drawing unit values from a uniform distribution between the ball_lower_limit and ball_upper_limit
values.
However, the log likelihood function is bypassed and all likelihoods are returned with a value -1.0e99. This
is so that integration testing of large-scale model-fitting projects can be performed efficiently by bypassing
sampling of points using the `log_likelihood_function`.
Parameters
----------
total_points
The number of points in non-linear paramemter space which initial points are created for.
model
An object that represents possible instances of some model with a given dimensionality which is the number
of free dimensions of the model.
"""
logger.warning(
f"TEST MODE ON: SAMPLES BEING ASSIGNED ABRITRARY LARGE LIKELIHOODS"
)
unit_parameter_lists = []
parameter_lists = []
figure_of_merit_list = []
point_index = 0
figure_of_merit = -1.0e99
while point_index < total_points:
try:
unit_parameter_list = self._generate_unit_parameter_list(model)
parameter_list = model.vector_from_unit_vector(
unit_vector=unit_parameter_list
)
model.instance_from_vector(vector=parameter_list)
unit_parameter_lists.append(unit_parameter_list)
parameter_lists.append(parameter_list)
figure_of_merit_list.append(figure_of_merit)
figure_of_merit *= 10.0
point_index += 1
except exc.FitException:
pass
return unit_parameter_lists, parameter_lists, figure_of_merit_list
class SpecificRangeInitializer(AbstractInitializer):
def __init__(
self,
parameter_dict: Dict[Prior, Tuple[float, float]],
lower_limit=0.0,
upper_limit=1.0,
):
"""
Initializer that allows the range of possible starting points for each prior
to be specified explicitly.
Parameters
----------
parameter_dict
A dictionary mapping priors to inclusive ranges of physical values that
the initial values for that dimension in the search may take
lower_limit
A default, unit lower limit used when a prior is not specified
upper_limit
A default, unit upper limit used when a prior is not specified
"""
self.parameter_dict = parameter_dict
self.lower_limit = lower_limit
self.upper_limit = upper_limit
def _generate_unit_parameter_list(self, model: AbstractPriorModel) -> List[float]:
"""
Generate a unit vector for the model. The default limits are used for any
priors which the model has but are not found in the parameter dict.
Parameters
----------
model
A model for which initial points are required
Returns
-------
A unit vector
"""
unit_parameter_list = []
for prior in model.priors_ordered_by_id:
try:
lower, upper = map(prior.unit_value_for, self.parameter_dict[prior])
value = random.uniform(lower, upper)
except KeyError:
logger.warning(
f"Range for {'.'.join(model.path_for_prior(prior))} not set in the SpecificRangeInitializer. "
f"Using defaults."
)
lower = self.lower_limit
upper = self.upper_limit
value = prior.unit_value_for(prior.random(lower, upper))
unit_parameter_list.append(value)
return unit_parameter_list
class Initializer(AbstractInitializer):
def __init__(self, lower_limit: float, upper_limit: float):
"""
The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a
`NonLinearSearch` to define where to begin sampling.
Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter
resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems.
"""
self.lower_limit = lower_limit
self.upper_limit = upper_limit
@classmethod
def from_config(cls, config):
"""
Load the Initializer from a non_linear config file.
"""
try:
initializer = config("initialize", "method")
except configparser.NoSectionError:
return None
if initializer in "prior":
return InitializerPrior()
elif initializer in "ball":
ball_lower_limit = config("initialize", "ball_lower_limit")
ball_upper_limit = config("initialize", "ball_upper_limit")
return InitializerBall(
lower_limit=ball_lower_limit, upper_limit=ball_upper_limit
)
def _generate_unit_parameter_list(self, model):
return model.random_unit_vector_within_limits(
lower_limit=self.lower_limit, upper_limit=self.upper_limit
)
[docs]class InitializerPrior(Initializer):
def __init__(self):
"""
The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a
`NonLinearSearch` to define where to begin sampling.
Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter
resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems.
The InitializerPrior class generates from the priors, by drawing all values as unit values between 0.0 and 1.0
and mapping them to physical values via the prior.
"""
super().__init__(lower_limit=0.0, upper_limit=1.0)
[docs]class InitializerBall(Initializer):
def __init__(self, lower_limit: float, upper_limit: float):
"""
The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a
`NonLinearSearch` to define where to begin sampling.
Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter
resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems.
The InitializerBall class generates the samples in a small compact volume or 'ball' in parameter space, which is
the recommended initialization strategy for the MCMC `NonLinearSearch` Emcee.
Parameters
----------
lower_limit
The lower limit of the uniform distribution unit values are drawn from when initializing walkers in a small
compact ball.
upper_limit
The upper limit of the uniform distribution unit values are drawn from when initializing walkers in a small
compact ball.
"""
super().__init__(lower_limit=lower_limit, upper_limit=upper_limit)