Source code for autofit.non_linear.initializer

import configparser
import logging
import os
import random
from abc import ABC, abstractmethod
from typing import Dict, Tuple, List, Optional

import numpy as np

from autofit import exc
from autofit.non_linear.test_mode import is_test_mode
from autofit.non_linear.paths.abstract import AbstractPaths
from autofit.mapper.prior.abstract import Prior
from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.non_linear.parallel import SneakyPool

logger = logging.getLogger(__name__)


class AbstractInitializer(ABC):

    @abstractmethod
    def _generate_unit_parameter_list(self, model):
        pass

    def info_from_model(self, model : AbstractPriorModel) -> str:
        raise NotImplementedError

    @staticmethod
    def figure_of_metric(args) -> Optional[float]:
        fitness, parameter_list = args
        try:
            figure_of_merit = fitness(parameters=parameter_list)

            if np.isnan(figure_of_merit) or figure_of_merit < -1e98:
                return None

            return figure_of_merit
        except exc.FitException:
            return None

    def samples_from_model(
            self,
            total_points: int,
            model: AbstractPriorModel,
            fitness,
            paths: AbstractPaths,
            use_prior_medians: bool = False,
            test_mode_samples: bool = True,
            n_cores: int = 1,
    ):
        """
        Generate the initial points of the non-linear search, by randomly drawing unit values from a uniform
        distribution between the ball_lower_limit and ball_upper_limit values.

        Parameters
        ----------
        total_points
            The number of points in non-linear paramemter space which initial points are created for.
        model
            An object that represents possible instances of some model with a given dimensionality which is the number
            of free dimensions of the model.
        """

        if is_test_mode() and test_mode_samples:
            return self.samples_in_test_mode(total_points=total_points, model=model)

        if n_cores == 1:
            return self.samples_jax(
                total_points=total_points,
                model=model,
                fitness=fitness,
                use_prior_medians=use_prior_medians
            )

        unit_parameter_lists = []
        parameter_lists = []
        figures_of_merit_list = []

        sneaky_pool = SneakyPool(n_cores, fitness, paths)

        logger.info(f"Generating initial samples of model using {n_cores} cores")

        while len(figures_of_merit_list) < total_points:
            remaining_points = total_points - len(figures_of_merit_list)
            batch_size = min(remaining_points, n_cores)
            parameter_lists_ = []
            unit_parameter_lists_ = []

            for _ in range(batch_size):
                if not use_prior_medians:
                    unit_parameter_list = self._generate_unit_parameter_list(model)
                else:
                    unit_parameter_list = [0.5] * model.prior_count

                parameter_list = model.vector_from_unit_vector(
                    unit_vector=unit_parameter_list
                )

                parameter_lists_.append(parameter_list)
                unit_parameter_lists_.append(unit_parameter_list)

            for figure_of_merit, unit_parameter_list, parameter_list in zip(
                    sneaky_pool.map(
                        function=self.figure_of_metric,
                        args_list=[(fitness, parameter_list) for parameter_list in parameter_lists_],
                        log_info=False
                    ),
                    unit_parameter_lists_,
                    parameter_lists_,
            ):
                if figure_of_merit is not None:
                    unit_parameter_lists.append(unit_parameter_list)
                    parameter_lists.append(parameter_list)
                    figures_of_merit_list.append(figure_of_merit)

        if total_points > 1 and np.allclose(
                a=figures_of_merit_list[0], b=figures_of_merit_list[1:]
        ):
            raise exc.InitializerException(
                """
                The initial samples all have the same figure of merit (e.g. log likelihood values).

                The non-linear search will therefore not progress correctly.

                Possible causes for this behaviour are:

                - The `log_likelihood_function` of the analysis class is defined incorrectly.
                - The model parameterization creates numerically inaccurate log likelihoods.
                - The`log_likelihood_function`  is always returning `nan` values.            
                """
            )

        logger.info(f"Initial samples generated, starting non-linear search")

        return unit_parameter_lists, parameter_lists, figures_of_merit_list

    def samples_jax(
            self,
            total_points: int,
            model: AbstractPriorModel,
            fitness,
            use_prior_medians: bool = False,
    ):
        """
        Generate the initial points of the non-linear search, by randomly drawing unit values from a uniform
        distribution between the ball_lower_limit and ball_upper_limit values.

        Parameters
        ----------
        total_points
            The number of points in non-linear paramemter space which initial points are created for.
        model
            An object that represents possible instances of some model with a given dimensionality which is the number
            of free dimensions of the model.
        """

        unit_parameter_lists = []
        parameter_lists = []
        figures_of_merit_list = []

        logger.info(f"Generating initial samples of model using JAX LH Function cores")

        while len(figures_of_merit_list) < total_points:

            if not use_prior_medians:
                unit_parameter_list = self._generate_unit_parameter_list(model)
            else:
                unit_parameter_list = [0.5] * model.prior_count

            parameter_list = model.vector_from_unit_vector(
                unit_vector=unit_parameter_list
            )

            figure_of_merit = self.figure_of_metric((fitness, parameter_list))

            if figure_of_merit is not None:
                unit_parameter_lists.append(unit_parameter_list)
                parameter_lists.append(parameter_list)
                figures_of_merit_list.append(figure_of_merit)

        if total_points > 1 and np.allclose(
                a=figures_of_merit_list[0], b=figures_of_merit_list[1:]
        ):
            raise exc.InitializerException(
                """
                The initial samples all have the same figure of merit (e.g. log likelihood values).

                The non-linear search will therefore not progress correctly.

                Possible causes for this behaviour are:

                - The `log_likelihood_function` of the analysis class is defined incorrectly.
                - The model parameterization creates numerically inaccurate log likelihoods.
                - The`log_likelihood_function`  is always returning `nan` values.            
                """
            )

        logger.info(f"Initial samples generated, starting non-linear search")

        return unit_parameter_lists, parameter_lists, figures_of_merit_list

    def samples_in_test_mode(self, total_points: int, model: AbstractPriorModel):
        """
        Generate the initial points of the non-linear search in test mode. Like normal, test model draws points, by
        randomly drawing unit values from a uniform distribution between the ball_lower_limit and ball_upper_limit
        values.

        However, the log likelihood function is bypassed and all likelihoods are returned with a value -1.0e99. This
        is so that integration testing of large-scale model-fitting projects can be performed efficiently by bypassing
        sampling of points using the `log_likelihood_function`.

        Parameters
        ----------
        total_points
            The number of points in non-linear paramemter space which initial points are created for.
        model
            An object that represents possible instances of some model with a given dimensionality which is the number
            of free dimensions of the model.
        """

        logger.warning(
            "TEST MODE 1 (reduced iterations): Initial samples assigned "
            "arbitrary large likelihoods to accelerate sampler convergence."
        )

        unit_parameter_lists = []
        parameter_lists = []
        figure_of_merit_list = []

        point_index = 0

        figure_of_merit = -1.0e99

        while point_index < total_points:
            try:
                unit_parameter_list = self._generate_unit_parameter_list(model)
                parameter_list = model.vector_from_unit_vector(
                    unit_vector=unit_parameter_list
                )
                model.instance_from_vector(vector=parameter_list)
                unit_parameter_lists.append(unit_parameter_list)
                parameter_lists.append(parameter_list)
                figure_of_merit_list.append(figure_of_merit)
                figure_of_merit *= 10.0
                point_index += 1
            except exc.FitException:
                pass

        return unit_parameter_lists, parameter_lists, figure_of_merit_list


class InitializerParamBounds(AbstractInitializer):
    def __init__(
        self,
        parameter_dict: Dict[Prior, Tuple[float, float]],
        lower_limit=0.0,
        upper_limit=1.0,
    ):
        """
        Initializer which uses the bounds on input parameters as the starting point for the search (e.g. where
        an MLE optimization starts or MCMC walkers are initialized).

        Parameters
        ----------
        parameter_dict
            A dictionary mapping each parameter path to bounded ranges of physical values that
            are where the search begins.
        lower_limit
            A default, unit lower limit used when a prior is not specified
        upper_limit
            A default, unit upper limit used when a prior is not specified
        """
        
        self.parameter_dict = parameter_dict
        self.lower_limit = lower_limit
        self.upper_limit = upper_limit

        self._generated_warnings = set()

    def _generate_unit_parameter_list(self, model: AbstractPriorModel) -> List[float]:
        """
        Generate a unit vector for the model. The default limits are used for any
        priors which the model has but are not found in the parameter dict.

        Parameters
        ----------
        model
            A model for which initial points are required

        Returns
        -------
        A unit vector
        """

        unit_parameter_list = []
        for prior in model.priors_ordered_by_id:

            try:
                lower, upper = map(prior.unit_value_for, self.parameter_dict[prior])
                value = random.uniform(lower, upper)
            except KeyError:
                key = ".".join(model.path_for_prior(prior))
                if key not in self._generated_warnings:
                    logger.warning(
                        f"Range for {key} not set in the InitializerParamBounds. "
                        f"Using defaults."
                    )
                    self._generated_warnings.add(key)

                lower = self.lower_limit
                upper = self.upper_limit

                value = prior.unit_value_for(prior.random(lower, upper))

            unit_parameter_list.append(value)

        return unit_parameter_list

    def info_from_model(self, model : AbstractPriorModel) -> str:
        """
        Returns a string showing the bounds of the parameters in the initializer.
        """
        info = "Total Free Parameters = " + str(model.prior_count) + "\n"
        info += "Total Starting Points = " + str(len(self.parameter_dict)) + "\n\n"
        for prior in model.priors_ordered_by_id:

            key = ".".join(model.path_for_prior(prior))

            try:

                value = self.info_value_from(self.parameter_dict[prior])

                info += f"{key}: Start[{value}]\n"

            except KeyError:

                info += f"{key}: {prior})\n"

        return info

    def info_value_from(self, value : Tuple[float, float]) -> Tuple[float, float]:
        """
        Returns the value that is used to display the bounds of the parameters in the initializer.

        This function simply returns the input value, but it can be overridden in subclasses for diffferent
        initializers.

        Parameters
        ----------
        value
            The value to be displayed in the initializer info which is a tuple of the lower and upper bounds of the
            parameter.
        """
        return value


class InitializerParamStartPoints(InitializerParamBounds):
    def __init__(
        self,
        parameter_dict: Dict[Prior, float],
    ):
        """
        Initializer which input values of the parameters as the starting point for the search (e.g. where
        an MLE optimization starts or MCMC walkers are initialized).

        Parameters
        ----------
        parameter_dict
            A dictionary mapping each parameter path to the starting point physical values that
            are where the search begins.
        lower_limit
            A default, unit lower limit used when a prior is not specified
        upper_limit
            A default, unit upper limit used when a prior is not specified
        """
        parameter_dict_new = {}

        for key, value in parameter_dict.items():
            parameter_dict_new[key] = (value - 1.0e-8, value + 1.0e-8)

        super().__init__(parameter_dict=parameter_dict_new)

    def info_value_from(self, value : Tuple[float, float]) -> float:
        """
        Returns the value that is used to display the starting point of the parameters in the initializer.

        This function returns the mean of the input value, as the starting point is a single value in the center of the
        bounds.

        Parameters
        ----------
        value
            The value to be displayed in the initializer info which is a tuple of the lower and upper bounds of the
            parameter.
        """
        return (value[1] + value[0]) / 2.0


class Initializer(AbstractInitializer):
    def __init__(self, lower_limit: float, upper_limit: float):
        """
        The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a
        `NonLinearSearch` to define where to begin sampling.

        Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter
        resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems.
        """
        self.lower_limit = lower_limit
        self.upper_limit = upper_limit

    @classmethod
    def from_config(cls, config):
        """
        Load the Initializer from a non_linear config file.
        """

        try:
            initializer = config("initialize", "method")

        except configparser.NoSectionError:
            return None

        if initializer in "prior":
            return InitializerPrior()

        elif initializer in "ball":
            ball_lower_limit = config("initialize", "ball_lower_limit")
            ball_upper_limit = config("initialize", "ball_upper_limit")

            return InitializerBall(
                lower_limit=ball_lower_limit, upper_limit=ball_upper_limit
            )

    def _generate_unit_parameter_list(self, model):
        return model.random_unit_vector_within_limits(
            lower_limit=self.lower_limit, upper_limit=self.upper_limit
        )


[docs] class InitializerPrior(Initializer): def __init__(self): """ The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a `NonLinearSearch` to define where to begin sampling. Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems. The InitializerPrior class generates from the priors, by drawing all values as unit values between 0.0 and 1.0 and mapping them to physical values via the prior. """ super().__init__(lower_limit=0.0, upper_limit=1.0)
[docs] class InitializerBall(Initializer): def __init__(self, lower_limit: float, upper_limit: float): """ The Initializer creates the initial set of samples in non-linear parameter space that can be passed into a `NonLinearSearch` to define where to begin sampling. Although most non-linear searches have in-built functionality to do this, some do not cope well with parameter resamples that are raised as FitException's. Thus, PyAutoFit uses its own initializer to bypass these problems. The InitializerBall class generates the samples in a small compact volume or 'ball' in parameter space, which is the recommended initialization strategy for the MCMC `NonLinearSearch` Emcee. Parameters ---------- lower_limit The lower limit of the uniform distribution unit values are drawn from when initializing walkers in a small compact ball. upper_limit The upper limit of the uniform distribution unit values are drawn from when initializing walkers in a small compact ball. """ super().__init__(lower_limit=lower_limit, upper_limit=upper_limit)