Source code for acquisition_functions

"""
Base Class
==========
All acquisition functions are derived from this class. If you want to define
your own acquisition functions it needs to inherit from this class. A tutorial
on how to define such a class is given in :class:`.AcquisitionFunction`

.. autosummary::

     AcquisitionFunction

Inbuilt Acquisition Functions
=============================

All inbuilt Acquisition Functions can evaluate the gradient in addition
to the value of the acquisition function at point X.

The recommended acquisition that we derived in order to efficiently sample the parameter
space is called ``LogExp``:

.. autosummary::
     LogExp

Furthermore there are more inbuilt acquisition functions (building blocks) which should
offer a great deal of flexibility. If you want to define your own acquisition function
it needs to inherit from the parent class :class:`AcquisitionFunction`.

.. autosummary::

     ConstantAcqFunc
     Mu
     Std
     ExponentialMu
     ExponentialStd
     ExpectedImprovement

Additional things
=================

The things listed here are tools and similar things which in normal operation
should not be needed.

.. autosummary::

     is_acquisition_function
     Hyperparameter
     AcquisitionFunctionOperator
     Sum
     Product
     Exponentiation

"""

import sys
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from collections.abc import Iterable
from inspect import signature, getmembers
import warnings

import numpy as np
from scipy.stats import norm
from sklearn.base import clone


# UNUSED
def _safe_log_expm1(x):
    """
    Numerically safer ``log(exp(x) - 1)``.
    """
    mask = x < 1
    ret = np.empty_like(x)
    ret[mask] = np.log(np.expm1(x[mask]))
    ret[~mask] = x[~mask] + np.log1p(-np.exp(-x[~mask]))
    return ret


[docs] def builtin_names(): """ Lists all names of all built-in acquisition functions criteria. """ list_names = [name for name, obj in getmembers(sys.modules[__name__]) if (issubclass(obj.__class__, AcquisitionFunction.__class__) and obj is not AcquisitionFunction)] return list_names
[docs] class AcquisitionFunction(metaclass=ABCMeta): """Base class for all Acquisition Functions (AF's). All acquisition functions are derived from this class. Currently several acquisition functions are supported which should be versatile enough for most tasks. If however one wants to specify a custom acquisition function it should be a class which inherits from this abstract class. This class needs to be of the format:: from Acquisition_functions import Acquisition_function Class custom_acq_func(Acquisition_Function): def __init__(self, param_1, ..., fixed=..., dimension=...): # * 'hyperparam_i': The hyperparameters of the custom # acquisition function. # * 'fixed': whether the hyperparameters of the acquisition # function are to be kept fixed. # * 'dimension': the dimensionality of the target function, # which can be used to automatically adapt hyperparameters # function are to be kept fixed. # * 'hasgradient': Whether the acquisition function can return # a gradient. Furthermore the bool hasgradient needs to be # specified to 'True' if the acquisition function can return # gradient(s) or 'False' otherwise. self.param_1 = param_1 ... self.fixed=fixed self.hasgradient = True/False @property def hyperparameter_param_1(self): # Returns the type of hyperparameter and whether it is # fixed or not. This method needs to exist for every hyper- # parameter. return Hyperparameter( "param_1", "numeric", fixed=self.fixed) return Hyperparameter( "param_1", "numeric", fixed=self.fixed) def __call__(self, X, gp, eval_gradient=False): # * 'X': The value(s) at which the acquisition function is # evaluated # * 'GP': The surrogate GP model which shall be used. # * 'eval_gradient': Whether the gradient shall be given or # not. Only required if 'self.hasgradient' is true. .... # Returned are the value(s) of the acquisition function at # point(s) X and optionally their gradient(s) Once the Acquisition function is defined in this way it can be used with the same operators as the inbuilt acquisition functions. .. note:: If one of the operands of a composite acquisiton function does not return a gradient the same applies for all operands. Furthermore some optimizers require gradients, which cannot be used in this case. """
[docs] def get_params(self, deep=True): """Get hyperparameters of this Acquisition function. Parameters ---------- deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators. Returns ------- params : dict Parameter names mapped to their values. """ params = dict() # introspect the constructor arguments to find the model parameters # to represent cls = self.__class__ init = getattr(cls.__init__, 'deprecated_original', cls.__init__) init_sign = signature(init) args, varargs = [], [] for parameter in init_sign.parameters.values(): if parameter.kind != parameter.VAR_KEYWORD and \ parameter.name != 'self': args.append(parameter.name) if parameter.kind == parameter.VAR_POSITIONAL: varargs.append(parameter.name) if len(varargs) != 0: raise RuntimeError("GPry acquisition functions should always " "specify their parameters in the signature" " of their __init__ (no varargs)." " %s doesn't follow this convention." % (cls, )) for arg in args: try: value = getattr(self, arg) except AttributeError: warnings.warn('From version 0.24, get_params will raise an ' 'AttributeError if a parameter cannot be ' 'retrieved as an instance attribute. Previously ' 'it would return None.', FutureWarning) value = None params[arg] = value return params
[docs] def set_params(self, **params): """Set the parameters of this acquisition function. The method works on simple AF's as well as on nested AF's. The latter have parameters of the form ``<component>__<parameter>`` so that it's possible to update each component of a nested object. Parameters ---------- **params : dict Any number of parameters which shall be set. Should be of the form ``{"parameter_1_name" : parameter_1_value, "parameter_2_name" : parameter_2_value, ...}`` Returns ------- self """ if not params: # Simple optimisation to gain speed (inspect is slow) return self valid_params = self.get_params(deep=True) for key, value in params.items(): split = key.split('__', 1) if len(split) > 1: # nested objects case name, sub_name = split if name not in valid_params: raise ValueError('Invalid parameter %s for AF %s. ' 'Check the list of available parameters ' 'with `acquisition_function.get_params().keys()`.' % (name, self)) sub_object = valid_params[name] sub_object.set_params(**{sub_name: value}) else: # simple objects case if key not in valid_params: raise ValueError('Invalid parameter %s for AF %s. ' 'Check the list of available parameters ' 'with `acquisition_function.get_params().keys()`.' % (key, self.__class__.__name__)) setattr(self, key, value) return self
[docs] def clone_with_theta(self, theta): """Returns a clone of self with given hyperparameters theta. Parameters ---------- theta : ndarray of shape (n_dims,) The hyperparameters """ cloned = clone(self) cloned.theta = theta return cloned
[docs] def check_X(self, X): """Internal method to check the dimensionality of any input X provided to an AF when called. Checks the correct type and turns X into a 2d array if a 1d array is provided. .. warning:: This method only checks for the correct type of an input, inappropriate values might still cause problems. Parameters ---------- X : ndarray of shape (n_samples, n_dims) or (ndims,) The input array for any X value passed to an acquisition function Returns ------- X_new: ndarray of shape (n_samples, n_dims) The reshaped array of input data X """ if not isinstance(X, np.ndarray): raise ValueError( "Expected a numpy array for X, instead got %s" % X) if X.ndim == 1: return X.reshape(1, -1) else: return X
@property def n_dims(self): """Returns the number of non-fixed hyperparameters of the acquisition function.""" return self.theta.shape[0] @property def hyperparameters(self): """Returns a list of all hyperparameter specifications.""" r = [getattr(self, attr) for attr in dir(self) if attr.startswith("hyperparameter_")] return r @property def theta(self): """Returns the (flattened, log-transformed) non-fixed hyperparameters. Note that theta are typically the log-transformed values of the acquisition function's hyperparameters as this representation of the search space is more amenable for hyperparameter search, as hyperparameters likelength-scales naturally live on a log-scale. Returns ------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function. """ theta = [] params = self.get_params() for hyperparameter in self.hyperparameters: if not hyperparameter.fixed: theta.append(params[hyperparameter.name]) if len(theta) > 0: return np.log(np.hstack(theta)) else: return np.array([]) @theta.setter def theta(self, theta): """Sets the (flattened, log-transformed) non-fixed hyperparameters. Parameters ---------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function. """ params = self.get_params() i = 0 for hyperparameter in self.hyperparameters: if hyperparameter.fixed: continue if hyperparameter.n_elements > 1: # vector-valued parameter params[hyperparameter.name] = np.exp( theta[i:i + hyperparameter.n_elements]) i += hyperparameter.n_elements else: params[hyperparameter.name] = np.exp(theta[i]) i += 1 if i != len(theta): raise ValueError("theta has not the correct number of entries." " Should be %d; given are %d" % (i, len(theta))) self.set_params(**params) @property def hasgradient(self): """Specifies whether a certain acquisition function can return gradients or not. """ return self._hasgradient @hasgradient.setter def hasgradient(self, hasgradient): if isinstance(hasgradient, bool): self._hasgradient = hasgradient else: raise TypeError("hasgradient needs to be" "bool, not %s" % hasgradient)
[docs] @abstractmethod def __call__(self, X, gp, eval_gradient=False): """Evaluate the acquisition function."""
def __add__(self, b): if not isinstance(b, AcquisitionFunction): return Sum(self, ConstantAcqFunc(b)) return Sum(self, b) def __radd__(self, b): if not isinstance(b, AcquisitionFunction): return Sum(ConstantAcqFunc(b), self) return Sum(b, self) def __mul__(self, b): if not isinstance(b, AcquisitionFunction): return Product(self, ConstantAcqFunc(b)) return Product(self, b) def __rmul__(self, b): if not isinstance(b, AcquisitionFunction): return Product(ConstantAcqFunc(b), self) return Product(b, self) def __pow__(self, b): return Exponentiation(self, b) def __eq__(self, b): if type(self) != type(b): return False params_a = self.get_params() params_b = b.get_params() for key in set(list(params_a.keys()) + list(params_b.keys())): if np.any(params_a.get(key, None) != params_b.get(key, None)): return False return True def __repr__(self): return "{0}({1})".format(self.__class__.__name__, ", ".join(map("{0:.3g}".format, self.theta)))
[docs] class ConstantAcqFunc(AcquisitionFunction): r"""Constant Acquisition function. Can be used as part of a product-Composition where it scales the magnitude of the other factor or as part of a sum. .. math:: A_f(X) = constant\_value \;\forall\; X Parameters ---------- constant_value : float, default=1.0 The constant value : A_f(X) = constant_value fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, constant_value=1.0, fixed=False, dimension=None): self.constant_value = constant_value self.fixed = fixed self.hasgradient = True @property def hyperparameter_constant_value(self): return Hyperparameter( "constant_value", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) if not np.iterable(X): X = np.array([X]) A_f = np.ones(X.shape[0]) * self.constant_value if eval_gradient: return A_f, np.zeros(X.shape) else: return A_f
def __repr__(self): return "{0:.3g}**2".format(np.sqrt(self.constant_value))
# UNUSED
[docs] class Mu(AcquisitionFunction): r""":math:`\mu(X)` of the surrogate model. .. math:: A_f(X) = a\cdot\mu(X) Parameters ---------- a : float, default=1.0 The value with which :math:`\mu` is multiplied . fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, a=1.0, fixed=False, dimension=None): self.a = a self.fixed = fixed self.hasgradient = True @property def hyperparameter_a(self): return Hyperparameter( "a", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) with warnings.catch_warnings(): warnings.simplefilter("ignore") if eval_gradient: mu, _, mu_grad = gp.predict( X, return_std=True, return_mean_grad=True, return_std_grad=False) else: mu, std = gp.predict(X, return_std=True) if eval_gradient: return mu, mu_grad else: return mu
def __repr__(self): return "{0:.3g}**2".format(np.sqrt(self.a))
# UNUSED
[docs] class ExponentialMu(AcquisitionFunction): r""":math:`\exp[\mu(X)]` of the surrogate model. .. math:: A_f(X) = \exp(a\cdot\mu(X)) Parameters ---------- a : float, default=1.0 The value with which :math:`\mu` is multiplied before exponentiating. fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, a=1.0, fixed=False, dimension=None): self.a = a self.fixed = fixed self.hasgradient = True @property def hyperparameter_a(self): return Hyperparameter( "a", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) if not np.iterable(X): X = np.array([X]) mu, mu_grad = gp.predict(X, return_std=False, return_cov=False, return_mean_grad=True, return_std_grad=False) A_f = np.exp(self.a * mu) if eval_gradient: A_f_grad = self.a * mu_grad * np.exp(self.a * mu) return A_f, A_f_grad else: return A_f
def __repr__(self): return "{0:.3g}**2".format(np.sqrt(self.a))
# UNUSED
[docs] class Std(AcquisitionFunction): r""":math:`\sigma(X)` of the surrogate model. .. math:: A_f(X) = a\cdot\sigma(X) Parameters ---------- a : float, default=1.0 The value with which :math:`\sigma` is multiplied before exponentiating. fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, a=1.0, fixed=False, dimension=None): self.a = a self.fixed = fixed self.hasgradient = True @property def hyperparameter_a(self): return Hyperparameter( "a", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) with warnings.catch_warnings(): warnings.simplefilter("ignore") if eval_gradient: _, std, _, std_grad = gp.predict( X, return_std=True, return_mean_grad=True, return_std_grad=True) else: mu, std = gp.predict(X, return_std=True) if eval_gradient: return std, std_grad else: return std
def __repr__(self): return "{0:.3f}".format(self.a)
# UNUSED
[docs] class ExponentialStd(AcquisitionFunction): r""":math:`\exp[\sigma(X)]` of the surrogate model. .. math:: A_f(X) = \exp(a\cdot\sigma(X)) Parameters ---------- a : float, default=1.0 The value with which :math:`\sigma` is multiplied before exponentiating. fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, a=1.0, fixed=False, dimension=None): self.a = a self.fixed = fixed self.hasgradient = True @property def hyperparameter_a(self): return Hyperparameter( "a", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) if not np.iterable(X): X = np.array([X]) _, std, _, std_grad = gp.predict(X, return_std=True, return_cov=False, return_mean_grad=True, return_std_grad=True) A_f = np.exp(self.a * std) if eval_gradient: A_f_grad = self.a * std_grad * np.exp(self.a * std) return A_f, A_f_grad else: return A_f
def __repr__(self): return "{0:.3f}".format(self.a)
# UNUSED
[docs] class ExpectedImprovement(AcquisitionFunction): r"""Computes the (negative) Expected improvement function. The conditional probability `P(y=f(x) | x)` form a gaussian with a certain mean and standard deviation approximated by the model. The EI condition is derived by computing :math:`E[u(f(x))]` where :math:`u(f(x)) = 0`, if :math:`f(x) > y_{\mathrm{opt}}` and :math:`u(f(x)) = y_{\mathrm{opt}} - f(x)`, if :math:`f(x) < y_{\mathrm{opt}}`. This solves one of the issues of the PI condition by giving a reward proportional to the amount of improvement got. Parameters ---------- xi : float, default=0.01 Controls how much improvement one wants over the previous best values. Useful only when ``method`` is set to "EI" fixed: bool, default=False, whether the constant value shall be fixed or not. """ def __init__(self, xi=0.01, fixed=False, dimension=None): self.xi = xi self.fixed = fixed self.hasgradient = True @property def hyperparameter_xi(self): return Hyperparameter( "xi", "numeric", fixed=self.fixed)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) with warnings.catch_warnings(): warnings.simplefilter("ignore") if eval_gradient: mu, std, mu_grad, std_grad = gp.predict( X, return_std=True, return_mean_grad=True, return_std_grad=True) else: mu, std = gp.predict(X, return_std=True) y_opt = gp.y_max values = np.zeros_like(mu) mask = std > 0 improve = mu[mask] - y_opt - self.xi scaled = improve / std[mask] cdf = norm.cdf(scaled) pdf = norm.pdf(scaled) exploit = improve * cdf explore = std[mask] * pdf values[mask] = exploit + explore if eval_gradient: if not np.all(mask): return values, np.zeros_like(std_grad) # Substitute (y_opt - xi - mu) / sigma = t and apply chain rule. # improve_grad is the gradient of t wrt x. improve_grad = -mu_grad * std - std_grad * improve improve_grad /= std ** 2 cdf_grad = improve_grad * pdf pdf_grad = -improve * cdf_grad exploit_grad = -mu_grad * cdf - pdf_grad explore_grad = std_grad * pdf + pdf_grad grad = exploit_grad + explore_grad return values, grad return values
def __repr__(self): return "{0:.3f}".format(self.xi)
[docs] class BaseLogExp(AcquisitionFunction, metaclass=ABCMeta): r"""Acquisition function which is designed to efficiently sample log-probability distributions. This is achieved by transforming :math:`\tilde{\mu}\cdot\tilde{\sigma}` (of the true, non-logarithmic probability distribution) to logarithmic space. Parameters ---------- zeta : float, default=1 Controls the exploration-exploitation tradeoff parameter. The value of :math:`\zeta` should not exceed 1 under normal circumstances as a value <1 accounts for the fact that the GP's estimate for :math:`\mu` is not correct at the beginning. A good suggestion for setting zeta which is inspired by simulated annealing is .. math:: \zeta = \exp(-N_0/N) where :math:`N_0\geq 0` is a "decay constant" and :math:`N` the number of training points in the GP. sigma_n : float, default=None The (constant) noise level of the data. If set to ``None`` the square-root of alpha of the training data (or the square root of the mean of alpha if alpha is an array) will be used. fixed: bool, default=False, whether zeta and sigma_n shall be fixed or not. dimension: double, default=None the dimension of the parameter space used for auto-scaling the zeta zeta_scaling: double, default=0.85 the scaling power of the zeta with dimension, if auto-scaled """ def __init__(self, zeta=None, sigma_n=None, fixed=False, dimension=None, zeta_scaling=0.85, linear=True): if zeta is None: if dimension is None: raise ValueError("We need the dimensionality of the problem to " "guess an appropriate zeta value.") self.zeta = self.auto_zeta(dimension, scaling=zeta_scaling) else: self.zeta = zeta self.sigma_n = sigma_n self.fixed = fixed self.hasgradient = True @abstractmethod def f(mu, std, zeta): return @property def hyperparameter_zeta(self): return Hyperparameter( "zeta", "numeric", fixed=self.fixed) @property def hyperparameter_sigma_n(self): return Hyperparameter( "sigma_n", "numeric", fixed=self.fixed) def auto_zeta(self, dimension, scaling=0.85): return dimension**(-scaling)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) with warnings.catch_warnings(): warnings.simplefilter("ignore") if eval_gradient: mu, std, mu_grad, std_grad = gp.predict( X, return_std=True, return_mean_grad=True, return_std_grad=True) else: mu, std = gp.predict(X, return_std=True) if self.sigma_n is None: sigma_n = gp.noise_level if isinstance(sigma_n, Iterable): noise_var = np.mean(sigma_n) else: noise_var = sigma_n else: noise_var = self.sigma_n zeta = self.zeta var = std**2 - noise_var**2. mask = (var > 0) & np.isfinite(mu) values = np.zeros_like(std) baseline = gp.y_max # Alternative option, but found not to work extremely well # baseline = gp.preprocessing_y.inverse_transform([0])[0] if np.any(mask): values[mask] = self.f(mu[mask], std[mask], baseline, noise_var, zeta) if np.any(~mask): values[~mask] = - np.inf if eval_gradient: if np.array(std_grad).ndim > 1: grad = np.zeros_like(std_grad) if np.any(mask): grad[mask] = np.array(std_grad)[mask] / \ (std[mask] - sigma_n) + 2 * zeta * np.array(mu_grad)[mask] if np.any(~mask): grad[~mask] = np.ones_like(std_grad[~mask]) * np.inf else: std = std[0] if std > sigma_n: grad = std_grad / (std - sigma_n) + 2 * zeta * mu_grad else: grad = np.ones_like(std_grad) * np.inf return values, grad else: return values
def __repr__(self): return str(self.__class__) + "with zeta={0:.3f}".format(self.zeta)
[docs] class LogExp(BaseLogExp): r"""Acquisition function which is designed to efficiently sample log-probability distributions. This is achieved by transforming :math:`\tilde{\mu}\cdot\tilde{\sigma}` (of the true, non-logarithmic probability distribution) to logarithmic space which yields .. math:: A_{\mathrm{LE}}(X) = \exp(2\zeta\cdot\mu(X))\cdot (\sigma(X)-\sigma_n) For numerical convenience we take the log of this expression which yields: .. math:: \log(A_{\mathrm{LE}})(X) = 2\zeta\cdot\mu(X) + \log(\sigma(X)-\sigma_n) .. note:: :math:`\mu(x)` and :math:`\sigma(X)` are the mean and sigma of the GP regressor which follows the **log**-probability distribution. Parameters ---------- zeta : float, default=1 Controls the exploration-exploitation tradeoff parameter. The value of :math:`\zeta` should not exceed 1 under normal circumstances as a value <1 accounts for the fact that the GP's estimate for :math:`\mu` is not correct at the beginning. A good suggestion for setting zeta which is inspired by simulated annealing is .. math:: \zeta = \exp(-N_0/N) where :math:`N_0\geq 0` is a "decay constant" and :math:`N` the number of training points in the GP. sigma_n : float, default=None The (constant) noise level of the data. If set to ``None`` the square-root of alpha of the training data (or the square root of the mean of alpha if alpha is an array) will be used. fixed: bool, default=False, whether zeta and sigma_n shall be fixed or not. dimension: double, default=None the dimension of the parameter space used for auto-scaling the zeta zeta_scaling: double, default=0.85 the scaling power of the zeta with dimension, if auto-scaled """
[docs] @staticmethod def f(mu, std, baseline, noise_level, zeta): """Linearized exponentiated log-error bar.""" return ( 2 * zeta * (mu - baseline) + np.log(np.sqrt(np.clip(std**2. - noise_level**2., 0., None))) )
# UNUSED # TODO: gradient assumed by parent class is not correct for this acquisition function
[docs] class NonlinearLogExp(BaseLogExp): r""" .. warning:: The gradients for this acquisition function are not yet implemented correctly. Use with caution! An alternative approach which keeps both scales exponentiated: .. math:: A_{\mathrm{LE}}(X) = \exp(2\zeta\cdot\mu(X))\cdot \exp(\sigma(X)-\sigma_n) Again we take the log of this. Parameters ---------- zeta : float, default=1 Controls the exploration-exploitation tradeoff parameter. The value of :math:`\zeta` should not exceed 1 under normal circumstances as a value <1 accounts for the fact that the GP's estimate for :math:`\mu` is not correct at the beginning. A good suggestion for setting zeta which is inspired by simulated annealing is .. math:: \zeta = \exp(-N_0/N) where :math:`N_0\geq 0` is a "decay constant" and :math:`N` the number of training points in the GP. sigma_n : float, default=None The (constant) noise level of the data. If set to ``None`` the square-root of alpha of the training data (or the square root of the mean of alpha if alpha is an array) will be used. fixed: bool, default=False, whether zeta and sigma_n shall be fixed or not. dimension: double, default=None the dimension of the parameter space used for auto-scaling the zeta zeta_scaling: double, default=0.85 the scaling power of the zeta with dimension, if auto-scaled """
[docs] @staticmethod def f(mu, std, baseline, noise_level, zeta): """Exponentiated log-error bar""" return 2 * zeta * (mu - baseline) + _safe_log_expm1(np.sqrt(np.clip(std**2.-noise_level**2., 0., None)))
# Function for determining whether an object is an acquisition function
[docs] def is_acquisition_function(acq_func): """Determines if a given object is an acquisition function or not. Parameters ---------- acq_func: Any The object which shall be examined Returns ------- is_acquisition_function: bool whether the specified object is an acquisition function. """ return isinstance(acq_func, AcquisitionFunction)
[docs] class Hyperparameter(namedtuple('Hyperparameter', ('name', 'value_type', 'n_elements', 'fixed'))): """An acquisition function hyperparameter's specification in form of a namedtuple. This formalism is copied from the ``kernel`` module of Scikit-Learn. .. note:: The current code does not support optimization of any hyperparameters of the acquisition functions. This might be added in the future (which is why the ``fixed`` parameter exists). Attributes ---------- name : str The name of the hyperparameter. Unlike in the kernels for the GPRegressor the hyperparameters of the Acquisition functions do not have bounds. value_type : str The type of the hyperparameter. Currently, only ``'numeric'`` hyperparameters are supported. n_elements : int, default=1 The number of elements of the hyperparameter value. Defaults to 1, which corresponds to a scalar hyperparameter. n_elements > 1 corresponds to a hyperparameter which is vector-valued, such as, e.g., anisotropic length-scales. fixed : bool, default=False Whether the value of this hyperparameter is fixed, i.e., cannot be changed during hyperparameter tuning. """ # A raw namedtuple is very memory efficient as it packs the attributes # in a struct to get rid of the __dict__ of attributes in particular it # does not copy the string for the keys on each instance. # By deriving a namedtuple class just to introduce the __init__ method we # would also reintroduce the __dict__ on the instance. By telling the # Python interpreter that this subclass uses static __slots__ instead of # dynamic attributes. Furthermore we don't need any additional slot in the # subclass so we set __slots__ to the empty tuple. __slots__ = () def __new__(cls, name, value_type, n_elements=1, fixed=False): return super(Hyperparameter, cls).__new__( cls, name, value_type, n_elements, fixed) # This is mainly a testing utility to check that two hyperparameters # are equal. def __eq__(self, other): return (self.name == other.name and self.value_type == other.value_type and self.n_elements == other.n_elements and self.fixed == other.fixed)
[docs] class AcquisitionFunctionOperator(AcquisitionFunction): """Base class for all AF operators.""" def __init__(self, k1, k2): self.k1 = k1 self.k2 = k2 self.hasgradient = k1.hasgradient and k2.hasgradient
[docs] def get_params(self, deep=True): """Get parameters of this acquisition function. Parameters ---------- deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators. Returns ------- params : dict Parameter names mapped to their values. """ params = dict(k1=self.k1, k2=self.k2) if deep: deep_items = self.k1.get_params().items() params.update(('k1__' + k, val) for k, val in deep_items) deep_items = self.k2.get_params().items() params.update(('k2__' + k, val) for k, val in deep_items) return params
@property def hyperparameters(self): """Returns a list of all hyperparameter.""" r = [Hyperparameter("k1__" + hyperparameter.name, hyperparameter.value_type, hyperparameter.n_elements) for hyperparameter in self.k1.hyperparameters] for hyperparameter in self.k2.hyperparameters: r.append(Hyperparameter("k2__" + hyperparameter.name, hyperparameter.value_type, hyperparameter.n_elements)) return r @property def theta(self): """Returns the (flattened, log-transformed) non-fixed hyperparameters. Note that theta are typically the log-transformed values of the AF's hyperparameters as this representation of the search space is more amenable for hyperparameter search, as hyperparameters like length-scales naturally live on a log-scale. Returns ------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function """ return np.append(self.k1.theta, self.k2.theta) @theta.setter def theta(self, theta): """Sets the (flattened, log-transformed) non-fixed hyperparameters. Parameters ---------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function """ k1_dims = self.k1.n_dims self.k1.theta = theta[:k1_dims] self.k2.theta = theta[k1_dims:] def __eq__(self, b): if type(self) != type(b): return False return (self.k1 == b.k1 and self.k2 == b.k2) \ or (self.k1 == b.k2 and self.k2 == b.k1)
[docs] class Sum(AcquisitionFunctionOperator): """Overwrites the ``+`` operator for two or more AF's. Additionally gradients are computed and calculated together according to the rules of differentiation. A sum of an AF can be either with another (composite) AF or a real number. """
[docs] def __call__(self, X, gp, eval_gradient=False): if eval_gradient: k1, k1_grad = self.k1(X, gp, eval_gradient) k2, k2_grad = self.k2(X, gp, eval_gradient) return k1 + k2, k1_grad + k2_grad else: return self.k1(X, gp) + self.k2(X, gp)
def __repr__(self): return "{0} + {1}".format(self.k1, self.k2)
[docs] class Product(AcquisitionFunctionOperator): """Overwrites the ``*`` operator for two or more AF's. Additionally gradients are computed and calculated together according to the rules of differentiation. A product of an AF can be either with another (composite) AF or a real number. """
[docs] def __call__(self, X, gp, eval_gradient=False): if eval_gradient: k1, k1_grad = self.k1(X, gp, eval_gradient) k2, k2_grad = self.k2(X, gp, eval_gradient) return k1 * k2, k1_grad * k2 + k2_grad * k1 else: return self.k1(X, gp) * self.k2(X, gp)
def __repr__(self): return "{0} * {1}".format(self.k1, self.k2)
[docs] class Exponentiation(AcquisitionFunction): """Defines the expontentiation of an AF with a real number. Additionally gradients are computed and calculated together according to the rules of differentiation. .. warning:: An AF can only be exponentiated with a number and not with another AF:: new_af = old_af ** number """ def __init__(self, acquisition_function, exponent): self.acquisition_function = acquisition_function self.exponent = exponent self.hasgradient = acquisition_function.hasgradient
[docs] def get_params(self, deep=True): """Get parameters of this Acquisition function. Parameters ---------- deep : bool, default=True If True, will return the parameters for this estimator and contained subobjects that are estimators. Returns ------- params : dict Parameter names mapped to their values. """ params = dict(acquisition_function=self.acquisition_function, exponent=self.exponent) if deep: deep_items = self.acquisition_function.get_params().items() params.update(('acquisition_function__' + k, val) for k, val in deep_items) return params
@property def hyperparameters(self): """Returns a list of all hyperparameter.""" r = [] for hyperparameter in self.acquisition_function.hyperparameters: r.append(Hyperparameter("acquisition_function__" + hyperparameter.name, hyperparameter.value_type, hyperparameter.n_elements)) return r @property def theta(self): """Returns the (flattened, log-transformed) non-fixed hyperparameters. Note that theta are typically the log-transformed values of the acquisition function's hyperparameters as this representation of the search space is more amenable for hyperparameter search, as hyperparameters like length-scales naturally live on a log-scale. Returns ------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function """ return self.acquisition_function.theta @theta.setter def theta(self, theta): """Sets the (flattened, log-transformed) non-fixed hyperparameters. Parameters ---------- theta : ndarray of shape (n_dims,) The non-fixed, log-transformed hyperparameters of the acquisition function """ self.acquisition_function.theta = theta def __eq__(self, b): if type(self) != type(b): return False return (self.acquisition_function == b.acquisition_function and self.exponent == b.exponent)
[docs] def __call__(self, X, gp, eval_gradient=False): """Return the Value of the AF at x (``A_f(X, gp)``) and optionally its gradient. Parameters ---------- X : array-like of shape (n_samples_X, n_features) or list of object X-Value at which the Acquisition function shall be evaluated gp : SKLearn GaussianProcessRegressor The GPRegressor (surrogate model) from which to evaluate GP(X) and optionally X_train and Y_train. eval_gradient : bool, default=False Determines whether the gradient with respect to X is calculated. Returns ------- A_f : array of shape (n_samples_X) The value of the acquisition function at point(s) X A_f_gradient : array of shape (n_samples_X, n_dim) The gradient of the Acquisition function with respect to X. Only returned when eval_gradient is True. """ X = self.check_X(X) if eval_gradient: K, K_grad = self.acquisition_function(X, gp, eval_gradient) return K ** self.exponent, K_grad * self.exponent * \ K ** (self.exponent - 1) else: K = self.acquisition_function(X, gp) return K ** self.exponent
def __repr__(self): return "{0} ** {1}".format(self.acquisition_function, self.exponent)