"""
GPAcquisition classes, which take care of proposing new locations where to evaluate the
true function.
"""
import os
import sys
import warnings
import inspect
import tempfile
from time import time
from copy import deepcopy
from typing import Mapping
from functools import partial
import numpy as np
import scipy.optimize
from sklearn.base import is_regressor
import gpry.acquisition_functions as gpryacqfuncs
from gpry.proposal import PartialProposer, CentroidsProposer, Proposer, UniformProposer
from gpry import mpi
from gpry.tools import NumpyErrorHandling, get_Xnumber, remove_0_weight_samples, \
is_in_bounds
import gpry.ns_interfaces as nsint
from gpry.mc import samples_dict_to_getdist, _name_logp
[docs]
def builtin_names():
"""
Lists all names of all built-in acquisition functions criteria.
"""
list_names = [name for name, obj in inspect.getmembers(sys.modules[__name__])
if (issubclass(obj.__class__, GenericGPAcquisition.__class__) and
obj is not GenericGPAcquisition)]
return list_names
[docs]
class GenericGPAcquisition():
"""Generic class for acquisition objects."""
def __init__(self,
bounds,
preprocessing_X=None,
verbose=1,
acq_func="LogExp",
):
self.bounds_ = np.array(bounds).copy()
self.n_d = bounds.shape[0]
self.preprocessing_X = preprocessing_X
self.verbose = verbose
if gpryacqfuncs.is_acquisition_function(acq_func):
self.acq_func = acq_func
elif isinstance(acq_func, (Mapping, str)):
if isinstance(acq_func, str):
acq_func = {acq_func: {}}
acq_func_name = list(acq_func)[0]
acq_func_args = acq_func[acq_func_name] or {}
acq_func_args["dimension"] = self.n_d
try:
acq_func_class = getattr(gpryacqfuncs, acq_func_name)
except AttributeError as excpt:
raise ValueError(
f"Unknown AcquisitionFunction class {acq_func_name}. "
f"Available convergence criteria: {gpryacqfuncs.builtin_names()}"
) from excpt
try:
self.acq_func = acq_func_class(**acq_func_args)
except Exception as excpt:
raise ValueError(
"Error when initialising the AcquisitionFunction object "
f"{acq_func_name} with arguments {acq_func_args}: "
f"{str(excpt)}"
) from excpt
else:
raise TypeError(
"acq_func should be an AcquisitionFunction or a str or dict "
f"specification. Got {acq_func}"
)
def __call__(self, X, gpr, eval_gradient=False):
"""Returns the value of the acquision function at ``X`` given a ``gpr``."""
return self.acq_func(X, gpr, eval_gradient=eval_gradient)
[docs]
def multi_add(self, gpr, n_points=1, bounds=None, rng=None):
r"""Method to query multiple points where the objective function
shall be evaluated.
The strategy differs depending on the acquisition class.
When run in parallel (MPI), it must return the same values for all processes.
Parameters
----------
gpr : GaussianProcessRegressor
The GP Regressor which is used as surrogate model.
n_points : int, optional (default=1)
Number of points to be returned. A value large than 1 is useful if you can
evaluate your objective in parallel, and thus obtain more objective function
evaluations per unit of time.
bounds : np.array, optional
Bounds inside which to look for the next proposals, e.g. the GPR trust region.
If not defined, the prior bounds are used.
rng : int or numpy.random.Generator, optional
The generator used to perform the acquisition process. If an integer is given,
it is used as a seed for the default global numpy random number generator.
Returns
-------
X : numpy.ndarray, shape = (X_dim, n_points)
The X values of the found optima
y_lies : numpy.ndarray, shape = (n_points,)
The predicted values of the GP at the proposed sampling locations
fval : numpy.ndarray, shape = (n_points,)
The values of the acquisition function at X_opt
"""
[docs]
class BatchOptimizer(GenericGPAcquisition):
"""
Run Gaussian Process acquisition.
Works similarly to a GPRegressor but instead of optimizing the kernel's
hyperparameters it optimizes the Acquisition function in order to find one
or multiple points at which the likelihood/posterior should be evaluated
next.
Furthermore contains a framework for different lying strategies in order to
improve the performance if multiple processors are available
Use this class directly if you want to control the iterations of your
bayesian quadrature loop.
Parameters
----------
bounds : array
Bounds in which to optimize the acquisition function,
assumed to be of shape (d,2) for d dimensional prior
proposer : Proposer object, optional (default: "ParialProposer", producing a mixture
of points drawn from an "UniformProposer" and from a "CentroidsProposer")
Proposes points from which the acquisition function should be optimized.
acq_func : GPry Acquisition Function, dict, optional (default: "LogExp")
Acquisition function to maximize/minimize. If none is given the
`LogExp` acquisition function will be used. Can also be a dictionary with the name
of the acquisition function as the single key, and as value a dict of its
arguments.
acq_optimizer : string or callable, optional (default: "auto")
Can either be one of the internally supported optimizers for optimizing
the acquisition function, specified by a string, or an externally
defined optimizer passed as a callable. If a callable is passed, it
must have the signature::
def optimizer(obj_func, initial_guess, bounds):
# * 'obj_func' is the objective function to be maximized, which
# takes the hyperparameters theta as parameter and an
# optional flag eval_gradient, which determines if the
# gradient is returned additionally to the function value
# * 'initial_guess': the initial value for X, which can be
# used by local optimizers
# * 'bounds': the bounds on the values of X
....
# Returned are the best found X and
# the corresponding value of the target function.
return X_opt, func_min
if set to 'auto' either the 'fmin_l_bfgs_b' or 'sampling' algorithm
from scipy.optimize is used depending on whether gradient information
is available or not.
.. note::
The default optimizers are designed to **maximize** the acquisition
function.
preprocessing_X : X-preprocessor, Pipeline_X, optional (default: None)
Single preprocessor or pipeline of preprocessors for X. Preprocessing
makes sense if the scales along the different dimensions are vastly
different which means that the optimizer struggles to find the maximum
of the acquisition function. If None is passed the data is not
preprocessed.
n_restarts_optimizer : int, default=0
The number of restarts of the optimizer for finding the maximum of the
acquisition function. The first run of the optimizer is performed from
the last X fit to the model if available, otherwise it is drawn at
random.
The remaining ones (if any) from X's sampled uniform randomly
from the space of allowed X-values. Note that n_restarts_optimizer == 0
implies that one run is performed.
verbose : 1, 2, 3, optional (default: 1)
Level of verbosity. 3 prints Infos, Warnings and Errors, 2
Warnings and Errors, and 1 only Errors. Should be set to 2 or 3 if
problems arise.
Attributes
----------
gpr_ : GaussianProcessRegressor
The GP Regressor which is currently used for optimization.
"""
def __init__(self,
bounds,
preprocessing_X=None,
verbose=1,
acq_func="LogExp",
# Class-specific:
proposer=None,
acq_optimizer="fmin_l_bfgs_b",
n_restarts_optimizer="5d",
n_repeats_propose=10,
):
super().__init__(
bounds=bounds, preprocessing_X=preprocessing_X,
verbose=verbose, acq_func=acq_func,
)
self.proposer = proposer
self.obj_func = None
# If nothing is provided for the proposal, we use a centroids proposer with
# a fraction of uniform samples.
if self.proposer is None:
self.proposer = PartialProposer(self.bounds_, CentroidsProposer(self.bounds_))
else:
if not isinstance(proposer, Proposer):
raise TypeError(
"'proposer' must be a Proposer instance. "
f"Got {proposer} of type {type(proposer)}."
)
self.proposer = proposer
self.proposer.update_bounds(self.bounds_)
# Configure optimizer
# decide optimizer based on gradient information
if acq_optimizer == "auto":
if self.acq_func.hasgradient:
self.acq_optimizer = "fmin_l_bfgs_b"
else:
self.acq_optimizer = "sampling"
elif isinstance(acq_optimizer, str):
if acq_optimizer == "fmin_l_bfgs_b":
if not self.acq_func.hasgradient:
raise ValueError(
"In order to use the 'fmin_l_bfgs_b' "
"optimizer the acquisition function needs to be able "
"to return gradients. Got %s" % self.acq_func)
self.acq_optimizer = "fmin_l_bfgs_b"
elif acq_optimizer == "sampling":
self.acq_optimizer = "sampling"
else:
raise ValueError("Supported internal optimizers are 'auto', "
"'lbfgs' or 'sampling', "
"got {0}".format(acq_optimizer))
else:
self.acq_optimizer = acq_optimizer
self.n_restarts_optimizer = get_Xnumber(
n_restarts_optimizer, "d", self.n_d, int, "n_restarts_optimizer"
)
self.n_repeats_propose = n_repeats_propose
self.mean_ = None
self.cov = None
[docs]
def optimize_acquisition_function(self, gpr, i, bounds=None, rng=None):
"""Exposes the optimization method for the acquisition function. When
called it proposes a single point where for where to evaluate the true
model next. It is internally called in the :meth:`multi_add` method.
Parameters
----------
gpr : GaussianProcessRegressor
The GP Regressor which is used as surrogate model.
i : int
Internal counter which is used to enable MPI support. If you want
to optimize from a single location and rerun the optimizer from
multiple starting locations loop over this parameter.
rng : numpy.random.Generator, optional
The generator used for the optimization process.
Returns
-------
X_opt : numpy.ndarray, shape = (X_dim,)
The X value of the found optimum
func : float
The value of the acquisition function at X_opt
"""
# Update proposer with new gpr and new bounds
self.proposer.update(gpr)
use_bounds = self.bounds_ if bounds is None else bounds
self.proposer.update_bounds(use_bounds)
# If we do a first-time run, use this
if not self.obj_func:
# Check whether gpr is a GP regressor
if not is_regressor(gpr):
raise ValueError("surrogate model has to be a GP Regressor. "
"Got %s instead." % gpr)
# Check whether the GP has been fit to data before
if not hasattr(gpr, "X_train_"):
raise AttributeError(
"The model which is given has not been fed "
"any points. Please make sure, that the model already "
"contains data when trying to optimize an acquisition "
"function on it as optimizing priors is not supported yet.")
def obj_func(X, eval_gradient=False):
# TODO: optionally suppress this checks if called by optimiser
# Check inputs
X = np.asarray(X)
X = np.expand_dims(X, axis=0)
if X.ndim != 2:
raise ValueError("X is {}-dimensional, however, "
"it must be 2-dimensional.".format(X.ndim))
if self.preprocessing_X is not None:
X = self.preprocessing_X.inverse_transform(X)
if eval_gradient:
acq, grad = self.acq_func(X, gpr, eval_gradient=True)
return -1 * acq, -1 * grad
else:
return -1 * self.acq_func(X, gpr, eval_gradient=False)
self.obj_func = obj_func
# Preprocessing
if self.preprocessing_X is not None:
transformed_bounds = self.preprocessing_X.transform_bounds(use_bounds)
else:
transformed_bounds = use_bounds
if i == 0:
# Perform first run from last (in-bounds) training point.
# Cannot raise StopIteration if trust_region contains at least one point!
x0 = next(
X for X in gpr.X_train[::-1]
if np.all(is_in_bounds(X, bounds, check_shape=False))
)
if self.preprocessing_X is not None:
x0 = self.preprocessing_X.transform(x0)
return self._constrained_optimization(self.obj_func, x0, transformed_bounds)
else:
d = self.bounds_.shape[0]
n_tries = 10 * d * self.n_restarts_optimizer
x0s = np.empty((self.n_repeats_propose + 1, d))
values = np.empty(self.n_repeats_propose + 1)
ifull = 0
for n_try in range(n_tries):
x0 = self.proposer.get(rng=rng)
value = self.acq_func(x0, gpr)
if not np.isfinite(value):
continue
x0s[ifull] = x0
values[ifull] = value
ifull += 1
if ifull > self.n_repeats_propose:
x0 = x0s[np.argmax(values)]
if self.preprocessing_X is not None:
x0 = self.preprocessing_X.transform(x0)
return self._constrained_optimization(
self.obj_func, x0, transformed_bounds
)
# if there's at least one finite value try optimizing from
# there, otherwise take the last x0 and add that to the GP
if ifull > 0:
x0 = x0s[np.argmax(values[:ifull])]
if self.preprocessing_X is not None:
x0 = self.preprocessing_X.transform(x0)
return self._constrained_optimization(
self.obj_func, x0, transformed_bounds
)
else:
if self.verbose > 1:
print(f"of {n_tries} initial samples for the "
"acquisition optimizer none returned a "
"finite value")
if self.preprocessing_X is not None:
x0 = self.preprocessing_X.transform(x0)
return x0, -1 * value
[docs]
def multi_add(
self, gpr, n_points=1, bounds=None, rng=None, force_resample=False
):
r"""Method to query multiple points where the objective function
shall be evaluated. The strategy which is used to query multiple
points is by using the :math:`f(x)\sim \mu(x)` strategy and and not
changing the hyperparameters of the model.
This is done to increase speed since then the blockwise matrix
inversion lemma can be used to invert the K matrix. The optimization
for a single point is done using the :meth:`optimize_acquisition_func` method.
When run in parallel (MPI), returns the same values for all processes.
Parameters
----------
gpr : GaussianProcessRegressor
The GP Regressor which is used as surrogate model.
n_points : int, optional (default=1)
Number of points to be returned. A value large than 1 is useful if you can
evaluate your objective in parallel, and thus obtain more objective function
evaluations per unit of time.
bounds : np.array, optional
Bounds inside which to look for the next proposals, e.g. the GPR trust region.
If not defined, the prior bounds are used.
rng : int or numpy.random.Generator, optional
The generator used to perform the acquisition process. If an integer is given,
it is used as a seed for the default global numpy random number generator.
Returns
-------
X : numpy.ndarray, shape = (X_dim, n_points)
The X values of the found optima
y_lies : numpy.ndarray, shape = (n_points,)
The predicted values of the GP at the proposed sampling locations
fval : numpy.ndarray, shape = (n_points,)
The values of the acquisition function at X_opt
"""
# Check if n_points is positive and an integer
if not (isinstance(n_points, int) and n_points > 0):
raise ValueError(f"n_points should be int > 0, got {n_points}")
# Create (parallel) generator(s) if int passed as rng
rng = mpi.get_random_generator(rng)
use_bounds = self.bounds_ if bounds is None else bounds
if mpi.is_main_process:
# Initialize arrays for storing the optimized points
X_opts = np.empty((n_points, gpr.d))
y_lies = np.empty(n_points)
acq_vals = np.empty(n_points)
# Copy the GP instance as it is modified during
# the optimization. The GP will be reset after the
# Acquisition is done.
gpr_ = deepcopy(gpr)
gpr_ = mpi.bcast(gpr_ if mpi.is_main_process else None)
n_acq_per_process = \
mpi.split_number_for_parallel_processes(self.n_restarts_optimizer)
n_acq_this_process = n_acq_per_process[mpi.RANK]
i_acq_this_process = sum(n_acq_per_process[:mpi.RANK])
proposal_X = np.empty((n_acq_this_process, gpr_.d))
acq_X = np.empty((n_acq_this_process,))
for ipoint in range(n_points):
# Optimize the acquisition function to get a few possible next proposal points
# (done in parallel)
for i in range(n_acq_this_process):
proposal_X[i], acq_X[i] = self.optimize_acquisition_function(
gpr_, i + i_acq_this_process, bounds=use_bounds, rng=rng
)
proposal_X_main, acq_X_main = mpi.multi_gather_array(
[proposal_X, acq_X])
# Reset the objective function, such that afterwards the correct one is used
self.obj_func = None
# Now take the best and add it to the gpr (done in sequence)
if mpi.is_main_process:
# Find out which one of these is the best
max_pos = np.argmin(acq_X_main) if np.any(
np.isfinite(acq_X_main)) else len(acq_X_main) - 1
X_opt = proposal_X_main[max_pos]
# Transform X and clip to bounds
if self.preprocessing_X is not None:
X_opt = self.preprocessing_X.inverse_transform(X_opt)
# Get the value of the acquisition function at the optimum value
acq_val = -1 * acq_X_main[max_pos]
X_opt = np.array([X_opt])
# Get the "lie" (prediction of the GP at X)
y_lie = gpr_.predict(X_opt)
# Try to append the lie to change uncertainties (and thus acq func)
# (no need to append if it's the last iteration)
if ipoint < n_points - 1:
# Take the mean of errors as supposed measurement error
lie_noise_level = (
np.array([np.mean(gpr_.noise_level)])
if np.iterable(gpr_.noise_level) else None
)
# Add lie to GP
gpr_.append_to_data(
X_opt, y_lie, noise_level=lie_noise_level,
fit_gpr=False, fit_classifier=False,
)
# Append the points found to the array
X_opts[ipoint] = X_opt[0]
y_lies[ipoint] = y_lie[0]
acq_vals[ipoint] = acq_val
# Send this new gpr_ instance to all mpi
gpr_ = mpi.bcast(gpr_ if mpi.is_main_process else None)
gpr.n_eval = gpr_.n_eval # gather #evals of the GP, for cost monitoring
return mpi.bcast(
(X_opts, y_lies, acq_vals) if mpi.is_main_process else (None, None, None))
def _constrained_optimization(self, obj_func, initial_X, bounds):
if self.acq_optimizer == "fmin_l_bfgs_b":
# with warnings.catch_warnings():
# warnings.simplefilter("ignore")
opt_res = scipy.optimize.fmin_l_bfgs_b(
obj_func, initial_X, args={"eval_gradient": True},
bounds=bounds, approx_grad=False)
theta_opt, func_min = opt_res[0], opt_res[1]
elif self.acq_optimizer == "sampling":
opt_res = scipy.optimize.minimize(
obj_func, initial_X, args=(False), method="Powell",
bounds=bounds)
theta_opt, func_min = opt_res.x, opt_res.fun
elif callable(self.acq_optimizer):
theta_opt, func_min = \
self.acq_optimizer(obj_func, initial_X, bounds=bounds)
else:
raise ValueError("Unknown optimizer %s." % self.acq_optimizer)
return theta_opt, func_min
[docs]
class NORA(GenericGPAcquisition):
"""
Run Gaussian Process acquisition with NORA (Nested sampling Optimization for Ranked
Acquistion).
Uses kriging believer while it samples the acquisition function using nested
sampling (with PolyChord or UltraNest).
Parameters
----------
bounds : array
Bounds in which to optimize the acquisition function,
assumed to be of shape (d,2) for d dimensional prior
acq_func : GPry Acquisition Function, dict, optional (default: "LogExp")
Acquisition function to maximize/minimize. If none is given the
`LogExp` acquisition function will be used. Can also be a dictionary with the name
of the acquisition function as the single key, and as value a dict of its
arguments.
mc_every : int
If >1, only calls the MC sampler every `mc_steps`, and reuses previous X
otherwise, recomputing y and sigma with the new GPR.
nlive_per_training: int
live points per sample in the current training set.
Not recommended to decrease it.
nlive_max: int
live points max cap
num_repeats: int
length of slice-chains
precision_criterion_target: float
Cap on precision criterion of Nested Sampling
nprior_per_nlive: int
Number of initial samples times dimension.
preprocessing_X : X-preprocessor, Pipeline_X, optional (default: None)
Single preprocessor or pipeline of preprocessors for X. Preprocessing
makes sense if the scales along the different dimensions are vastly
different which means that the optimizer struggles to find the maximum
of the acquisition function. If None is passed the data is not
preprocessed.
verbose : 1, 2, 3, optional (default: 1)
Level of verbosity. 3 prints Infos, Warnings and Errors, 2
Warnings and Errors, and 1 only Errors. Should be set to 2 or 3 if
problems arise.
Attributes
----------
gpr_ : GaussianProcessRegressor
The GP Regressor which is currently used for optimization.
"""
def __init__(self,
bounds,
preprocessing_X=None,
verbose=1,
acq_func="LogExp",
# Class-specific:
sampler=None,
mc_every="1d",
nlive_per_training=3,
nlive_max="25d",
nlive_per_dim_max=None, # deprecated
num_repeats="5d",
num_repeats_per_dim=None, # deprecated
precision_criterion_target=0.01,
nprior_per_nlive=10,
max_ncalls=None,
tmpdir=None,
):
super().__init__(
bounds=bounds, preprocessing_X=preprocessing_X,
verbose=verbose, acq_func=acq_func,
)
self.log_header = f"[ACQUISITION : {self.__class__.__name__}] "
self.mc_every = get_Xnumber(mc_every, "d", self.n_d, int, "mc_every")
self.mc_every_i = 0
self.tmpdir = tmpdir
self.i = 0
self.acq_func_y_sigma = None
# Configure nested sampler
self.sampler = sampler
self._init_nested_sampler()
self.nlive_per_training = nlive_per_training
if nlive_per_dim_max is not None:
self.log(
"*Warning: 'nlive_per_dim_max' is deprecated. Use e.g. 'nlive_max: 25d'. "
"This will fail in the future."
)
self.nlive_max = nlive_per_dim_max * self.n_d
else:
self.nlive_max = get_Xnumber(nlive_max, "d", self.n_d, int, "nlive_max")
if num_repeats_per_dim is not None:
self.log(
"*Warning: 'num_repeats_per_dim' is deprecated. Use e.g. "
"'num_repeats: 5d'. This will fail in the future."
)
self.num_repeats = num_repeats_per_dim * self.n_d
else:
self.num_repeats = get_Xnumber(num_repeats, "d", self.n_d, int, "num_repeats")
self.precision_criterion_target = precision_criterion_target
self.nprior_per_nlive = nprior_per_nlive
self.max_ncalls = max_ncalls
# Pool for storing intermediate results during parallelised acquisition
self._X_mc, self._y_mc, self._sigma_y_mc, self._w_mc = None, None, None, None
self._X_mc_reweight, self._y_mc_reweight = None, None
self._sigma_y_mc_reweight, self._w_mc_reweight = None, None
self.is_last_MC_reweighted = None
self.pool = None
self._acq_mc = None
@property
def pool_size(self):
"""Size of the pool of points."""
if self.pool is None:
return None
return len(self.pool)
def _init_nested_sampler(self, sampler=None):
"""
Initializes the nested sampler, managing defaults with a recursive call.
Raises
------
``gpry.ns_interfaces.NestedSamplerNotInstalledError`` if the requested or fallback
sampler cannot be loaded, or ``ValueError`` if the sampler is not recognised.
"""
this_sampler = sampler or self.sampler
# Manage defaults
if this_sampler is None:
try:
self._init_nested_sampler("polychord")
return
except nsint.NestedSamplerNotInstalledError as excpt:
self.log(
f"Importing the default NS PolyChord failed (Err msg: {excpt}). "
"Defaulting to UltraNest."
)
self._init_nested_sampler("ultranest")
return
# Load the requested sampler
try:
self.sampler_interface = \
nsint._ns_interfaces[this_sampler.lower()](self.bounds_, self.verbose)
except (AttributeError, KeyError) as excpt:
if this_sampler.lower() != "uniform":
raise ValueError(
"No interface found for the requested nested sampler "
f"'{this_sampler}'. Use one of {list(nsint._ns_interfaces)}"
) from excpt
self.sampler = this_sampler
[docs]
def update_NS_precision(self, gpr):
"""
Updates NS precision parameters:
- num_repeats: constant for now
- nlive: `nlive_per_training` times the size of the training set, capped at
`nlive_max` (typically 25 * dimension).
- precision_criterion: constant for now.
"""
nlive = min(self.nlive_per_training * gpr.n, self.nlive_max)
return {
"nlive": nlive,
"num_repeats": self.num_repeats,
"precision_criterion": self.precision_criterion_target,
"nprior": int(self.nprior_per_nlive * nlive),
"max_ncalls": self.max_ncalls
}
[docs]
def log(self, msg, level=None):
"""
Print a message if its verbosity level is equal or lower than the given one (or
always if ``level=None``.
"""
if level is None or level <= self.verbose:
print(self.log_header + msg)
def _get_output_folder(self):
"""
Prepare a new output folder. Returns always with a ``/`` at the end.
If one was specified at init, it saves to a subfolder with an increasing index.
Otherwise, it creates a random one every time.
"""
if not mpi.is_main_process:
return None
if self.tmpdir is None:
# pylint: disable=consider-using-with
tmpdir = tempfile.TemporaryDirectory().name
else:
tmpdir = os.path.abspath(os.path.join(self.tmpdir, str(self.i)))
self.i += 1
if not tmpdir.endswith("/"):
tmpdir += "/"
return tmpdir
[docs]
def do_MC_sample(self, gpr, bounds, rng=None, sampler=None):
"""
Returns
-------
X, y, sigma_y, weights
May return None for any of y, sigma_y, weights
"""
if sampler is None:
sampler = self.sampler
if sampler.lower() == "uniform":
return self._do_MC_sample_uniform(gpr, bounds=bounds, rng=rng)
if sampler.lower() == "polychord":
return self._do_MC_sample_polychord(gpr, bounds=bounds, rng=rng)
if sampler.lower() == "ultranest":
return self._do_MC_sample_ultranest(gpr, bounds=bounds, rng=rng)
if sampler.lower() == "nessai":
return self._do_MC_sample_nessai(gpr, bounds=bounds, rng=rng)
raise ValueError(f"Sampler '{sampler}' not known.")
# For tests only.
# TODO: merge samples for >1 MPI processes.
def _do_MC_sample_uniform(self, gpr, bounds=None, rng=None):
if not mpi.is_main_process:
return None, None, None, None
proposer = UniformProposer(self.bounds_ if bounds is None else bounds)
n_total = 1000 * gpr.d
X = np.empty(shape=(n_total, gpr.d))
for i in range(n_total):
X[i] = proposer.get(rng=rng)
return X, None, None, None
def _do_MC_sample_polychord(self, gpr, bounds=None, rng=None):
# Update prior bounds
self.sampler_interface.set_prior(self.bounds_ if bounds is None else bounds)
# Update PolyChord precision settings
self.sampler_interface.set_precision(**self.update_NS_precision(gpr))
# Prepare seed for reproducibility (positive integer < 2^31); only rank 0 used.
seed = rng.integers(2**31 - 1) if rng is not None else None
# Output (PolyChord needs a "/" at the end).
# Run and get products
X_MC, y_MC, w_MC = self.sampler_interface.run(
lambda X: gpr.predict(np.atleast_2d(X), return_std=False, validate=False)[0],
out_dir=self._get_output_folder(),
keep_all=False,
seed=seed,
)
self.sampler_interface.delete_output()
# We will recompute y values, because quantities in PolyChord have to go through
# text i/o, and some precision may be lost -- so we do not return them.
y_MC = None
return X_MC, y_MC, None, w_MC
def _do_MC_sample_ultranest(self, gpr, bounds=None, rng=None):
# Initialise "likelihood" -- returns GPR value and deals with pooling/ranking
def logp(X):
"""
Returns the predicted value at a given point (-inf if prior=0).
"""
# Ultranest cannot deal with -np.inf
prev_miv = gpr.minus_inf_value
gpr.minus_inf_value = -1e-300
logp = gpr.predict(np.atleast_2d(X), return_std=False, validate=False)
gpr.minus_inf_value = prev_miv
return logp
# Update prior bounds
self.sampler_interface.set_prior(self.bounds_ if bounds is None else bounds)
# Update precision settings
prec_settings = {
k: v for k, v in self.update_NS_precision(gpr).items()
if k in ["nlive", "precision_criterion", "max_ncalls"]
}
self.sampler_interface.set_precision(**prec_settings)
# Seeding -- for now UltraNest does not accept an rng or a seed, only setting the
# np.random seed globally, which is dangerous!
# TODO: Create a PR for taking a custom RNG in UltraNest.
if rng is not None:
if mpi.is_main_process:
warnings.warn("Seeded runs are not supported for UltraNest.")
# Run and get products
X_MC, y_MC, w_MC = self.sampler_interface.run(
logp, out_dir=self._get_output_folder(), keep_all=False
)
self.sampler_interface.delete_output()
# We will recompute y values, because quantities in PolyChord have to go through
# text i/o, and some precision may be lost -- so we do not return them.
y_MC = None
return X_MC, y_MC, None, w_MC
# pylint: disable=import-outside-toplevel
def _do_MC_sample_nessai(self, gpr, bounds=None, rng=None):
if not mpi.is_main_process:
return None, None, None, None
if mpi.multiple_processes:
warnings.warn(
"Support for Nessai is experimental at the moment, and not MPI-compatible"
" (running in rank-0 process only)."
)
# Initialise "likelihood" -- returns GPR value and deals with pooling/ranking
def logp(X):
"""
Returns the predicted value at a given point (-inf if prior=0).
"""
return gpr.predict(X, return_std=False, validate=False)
# Update prior bounds
self.sampler_interface.set_prior(self.bounds_ if bounds is None else bounds)
# Update precision settings
prec_settings = {
k: v for k, v in self.update_NS_precision(gpr).items()
if k in ["nlive", "precision_criterion"]
}
self.sampler_interface.set_precision(**prec_settings)
# Prepare seed for reproducibility (positive integer < 2^31); only rank 0 used.
seed = rng.integers(2**31 - 1) if rng is not None else None
# Run and get products
X_MC, y_MC, w_MC = self.sampler_interface.run(
logp,
out_dir=self._get_output_folder(),
keep_all=False,
seed=seed,
)
self.sampler_interface.delete_output()
# We will recompute y values, because quantities in PolyChord have to go through
# text i/o, and some precision may be lost -- so we do not return them.
y_MC = None
return X_MC, y_MC, None, w_MC
def _set_MC_sample(self, X, y, sigma_y, w, ensure_y_sigma_y=False, gpr=None):
"""
Stores the MC sample as attributes.
If either `y` and/or `sigma_y` are passed as `None`, you can ensure their
calculation (in parallel) with `ensure_y_sigma=True`. In that case, a `gpr` is
needed.
Use ``last_MC_sample[_getdist]`` to retrieve it.
"""
self.is_last_MC_reweighted = False
self._X_mc, self._y_mc, self._sigma_y_mc, self._w_mc = X, y, sigma_y, w
if ensure_y_sigma_y:
self._y_mc, self._sigma_y_mc = mpi.compute_y_parallel(
gpr, self._X_mc, self._y_mc, self._sigma_y_mc, ensure_sigma_y=True
)
def _reweight_last_MC_sample(self, gpr, bounds=None, ensure_sigma_y=False):
"""Stores the MC sample as attributes. Use ``last_MC_sample`` to retrieve it."""
self.is_last_MC_reweighted = True
X_excpt, y_excpt = None, None
if mpi.is_main_process and self._X_mc is None:
X_excpt = ValueError("No samples yet!")
X_excpt = mpi.bcast(X_excpt)
if X_excpt is not None:
raise X_excpt
if mpi.is_main_process and self._y_mc is None:
y_excpt = ValueError("Original logp was not stored. Cannot reweight!")
y_excpt = mpi.bcast(y_excpt)
if y_excpt is not None:
raise y_excpt
# Ensure y and sigma_y (optional) are computed
self._X_mc_reweight = None
if mpi.is_main_process:
self._X_mc_reweight = np.copy(self._X_mc)
if bounds is not None:
# Keep points within new bounds (maybe none!)
i_within = is_in_bounds(self._X_mc_reweight, bounds, check_shape=False)
self._X_mc_reweight = self._X_mc_reweight[i_within]
# TODO: not handled: there could be 0 points within new bounds
self._y_mc_reweight, self._sigma_y_mc_reweight = mpi.compute_y_parallel(
gpr, self._X_mc_reweight, None, None, ensure_sigma_y=ensure_sigma_y
)
if mpi.is_main_process:
# Reweight, and drop 0 weights
with NumpyErrorHandling(all="ignore") as _:
y_mc = self._y_mc
w_mc = self._w_mc
if bounds is not None:
y_mc = y_mc[i_within]
w_mc = w_mc[i_within] if w_mc is not None else None
reweight_factor = np.exp(self._y_mc_reweight - y_mc)
w_mc_reweight = (
w_mc if w_mc is not None
else np.ones(shape=self._X_mc_reweight.shape[0])
) * reweight_factor
w_mc_reweight /= max(w_mc_reweight)
self._w_mc_reweight, self._X_mc_reweight, self._y_mc_reweight, \
self._sigma_y_mc_reweight = remove_0_weight_samples(
w_mc_reweight, self._X_mc_reweight,
self._y_mc_reweight, self._sigma_y_mc_reweight
)
[docs]
def last_MC_sample(self, copy=False, warn_reweight=True):
"""
Returns the last MC sample as ``(X, y, sigma_y, weights)``. ``y, sigma_y``
may be None if not computed while sampling. They can be generated with the gpr.
If ``weights`` is None, all samples should be assumed to have equal weights.
Prints a warning if it is a reweighted sample.
"""
if self.is_last_MC_reweighted:
if warn_reweight:
warnings.warn(
"This is a reweighted sample! (disable with `warn_reweight=False`)"
)
return_values = (
self._X_mc_reweight, self._y_mc_reweight,
self._sigma_y_mc_reweight, self._w_mc_reweight
)
else:
return_values = (self._X_mc, self._y_mc, self._sigma_y_mc, self._w_mc)
if copy:
return_values = tuple(
(np.copy(val) if val is not None else None) for val in return_values
)
return return_values
@property
def mean(self):
Xs, _, _, ws = self.last_MC_sample(copy=False, warn_reweight=False)
return np.average(Xs.T, weights=ws, axis=-1)
@property
def cov(self):
Xs, _, _, ws = self.last_MC_sample(copy=False, warn_reweight=False)
return np.cov(Xs.T, aweights=ws, ddof=0)
[docs]
def last_MC_sample_getdist(self, params, warn_reweight=True):
"""
Returns the last MC sample as a ``getdist.MCSamples`` instance.
Prints a warning if it is a reweighted sample.
"""
X, y, _, w = self.last_MC_sample(warn_reweight=warn_reweight)
samples_dict = {"w": w, "X": X, _name_logp: y}
return samples_dict_to_getdist(
samples_dict,
params=params,
bounds=self.bounds_,
sampler_type="nested",
)
[docs]
def multi_add(
self, gpr, n_points=1, bounds=None, rng=None, force_resample=False
):
r"""Method to query multiple points where the objective function
shall be evaluated.
The strategy which is used to query multiple points is by using
the :math:`f(x)\sim \mu(x)` strategy and and not changing the
hyperparameters of the model.
It runs NS on the mean of the GP model, tracking the value
of the acquisition function at every evaluation, and keeping a
pool of candidates which is re-sorted whenever a new good candidate
is found.
When run in parallel (MPI), returns the same values for all processes.
Parameters
----------
gpr : GaussianProcessRegressor
The GP Regressor which is used as surrogate model.
n_points : int, optional (default=1)
Number of points to be returned. A value large than 1 is useful if you can
evaluate your objective in parallel, and thus obtain more objective function
evaluations per unit of time.
bounds : np.array, optional
Bounds inside which to look for the next proposals, e.g. the GPR trust region.
If not defined, the prior bounds are used.
rng : int or numpy.random.Generator, optional
The generator used to perform the acquisition process. If an integer is given,
it is used as a seed for the default global numpy random number generator.
Returns
-------
X : numpy.ndarray, shape = (X_dim, n_points)
The X values of the found optima
y_lies : numpy.ndarray, shape = (n_points,)
The predicted values of the GP at the proposed sampling locations
fval : numpy.ndarray, shape = (n_points,)
The values of the acquisition function at X_opt
"""
# Check if n_points is positive and an integer
if not (isinstance(n_points, int) and n_points > 0):
raise ValueError(f"n_points should be int > 0, got {n_points}")
# Create (parallel) generator(s) if int passed as rng
rng = mpi.get_random_generator(rng)
# Gather an MC sample, only not-None for rank 0; bcasted by _split_and_compute_acq
if mpi.is_main_process:
start_sample = time()
mc_sample_this_time = not bool(self.mc_every_i % self.mc_every) or force_resample
if mc_sample_this_time:
self._set_MC_sample(
*self.do_MC_sample(gpr, bounds=bounds, rng=rng),
ensure_y_sigma_y=True, gpr=gpr
)
self._X_already_proposed = np.empty(shape=(0, gpr.d))
else:
self._reweight_last_MC_sample(gpr, bounds=bounds, ensure_sigma_y=True)
self.mc_every_i += 1
X_mc, y_mc, sigma_y_mc, _ = self.last_MC_sample(warn_reweight=False)
# Find indices of already used elements to exclude them.
# Needs to be here because _reweight_last_MC_sample changes the indices.
# Both the X's of the MC sample and the pool are assumed unique.
if mpi.is_main_process and self._X_already_proposed.size > 0:
i_already_proposed = []
for X_i in self._X_already_proposed:
i_this_one = np.flatnonzero(
np.all(np.isin(X_mc, X_i, assume_unique=True), axis=1)
)
if i_this_one.size > 0:
i_already_proposed.append(i_this_one[0])
X_mc = np.delete(X_mc, i_already_proposed, axis=0)
y_mc = np.delete(y_mc, i_already_proposed, axis=0)
sigma_y_mc = np.delete(sigma_y_mc, i_already_proposed, axis=0)
# Compute acq functions and missing quantities.
self.acq_func_y_sigma = partial(
self.acq_func.f, baseline=gpr.y_max,
noise_level=gpr.noise_level, zeta=self.acq_func.zeta)
# *Split* among MPI processes and compute acq func value (in parallel)
this_X, this_y, this_sigma_y, this_acq = \
self._split_and_compute_acq(X_mc, y_mc, sigma_y_mc)
if mpi.is_main_process:
what_we_did = (
f"Obtained new MC sample with {self.sampler}" if mc_sample_this_time
else "Re-evaluated previous MC sample"
)
self.log(
f"({(time()-start_sample):.2g} sec) {what_we_did}")
# Rank to get best points:
mpi.sync_processes()
if mpi.is_main_process:
start_rank = time()
args = (this_X, this_y, this_sigma_y, this_acq, n_points, gpr)
# TODO: facility to test speed of ranking methods
# CHECK: if no MPI, all methods starting with the same one should yield the same
# IMPLEMENT: non-parallel even if MPI
# TODO: update "auto" method default values
# if not hasattr(self, "totals"):
# self.totals = {}
for method, merge_method in [
# ("bulk", "bulk"),
# ("bulk", "single sort acq"),
("single sort acq", "bulk"), # seems to be the fastest
# ("bulk", "single sort y"),
# ("single sort acq", "single sort acq"),
# ("single sort y", "bulk"),
# ("single sort y", "single sort y"),
# ("single", "single"),
]:
# start = time()
merged_pool = self._parallel_rank_and_merge(
*args, method=method, merge_method=merge_method)
# delta = time() - start
# if (method, merge_method) not in self.totals:
# self.totals[(method, merge_method)] = 0
# self.totals[(method, merge_method)] += delta
# if mpi.is_main_process:
# print(
# f"TOOK: {delta:.2g} ; "
# f"TOTAL: {self.totals[(method, merge_method)]:.2g}; "
# f"methods: {(method, merge_method)}"
# )
# In case the pool is not full (not enough "good" points added), drop empty slots
merged_pool = merged_pool.copy(drop_empty=True)
X_pool, y_pool = merged_pool.X[:n_points], merged_pool.y[:n_points]
with np.errstate(divide='ignore'):
acq_pool = self.acq_func_y_sigma(y_pool, merged_pool.sigma[:n_points])
# Track the used ones, to ignore them until new MC sample drawn.
self._X_already_proposed = np.concatenate([self._X_already_proposed, X_pool])
mpi.sync_processes()
self.pool.reset_cache() # reduces size of pickled object
if mpi.is_main_process:
self.log(
f"({(time()-start_rank):.2g} sec) Ranked pool of candidates.")
return X_pool, y_pool, acq_pool
def _split_and_compute_acq(self, X, y, sigma_y):
"""
Scatters `(X, y, sigma_y)` between processes, and returns them together with the
acquisition function values, computed in parallel.
"""
# We don't use mpi.split_number_for_parallel_processes because for the ranking
# it is good to have similar y scaling in all MPI processes. But if we use that
# function, some processes get the top of the dist, and others the bottom, and
# the bottom one's scaling does not perform well with the add_one method, even
# after sorting, and the slowest MPI process sets the global speed
this_X = mpi.step_split(X)
this_y = mpi.step_split(y)
this_sigma_y = mpi.step_split(sigma_y)
with np.errstate(divide='ignore'):
this_acq = self.acq_func_y_sigma(this_y, this_sigma_y)
return this_X, this_y, this_sigma_y, this_acq
# Parallel version of the ranking of the MC points
def _parallel_rank_and_merge(
self, this_X, this_y, this_sigma_y, this_acq, n_points, gpr, method=None,
merge_method=None):
if method is None:
method = "auto"
# For dimensionalities 4 and smaller, bulk adding is expected to be faster.
if method.lower() == "auto":
method = "bulk" if gpr.d <= 4 else "single sort acq"
merge_method = "bulk"
# The size of the pool should be at least the amount of points to be acquired.
# If running several processes in parallel, it can be reduced down to the number
# of points to be evaluated per process, but with less guarantee to find an
# optimal set.
self.pool = RankedPool(
n_points, gpr=gpr, acq_func=self.acq_func_y_sigma, verbose=self.verbose - 3)
with np.errstate(divide='ignore'):
self.pool.add(this_X, this_y, this_sigma_y, this_acq, method=method)
merged_pool = self._merge_pools(n_points, gpr, method=merge_method)
return merged_pool
def _gather_pools(self):
"""
Merges the points in all pools, discarding the empty ones.
rank-0 process returns [X, y, sigma, acq], where the last two are the
unconditioned input ones.
"""
pool_X = mpi.gather(self.pool.X[:len(self.pool)])
pool_y = mpi.gather(self.pool.y[:len(self.pool)])
pool_sigma = mpi.gather(self.pool.sigma[:len(self.pool)])
pool_acq = mpi.gather(self.pool.acq[:len(self.pool)])
# Using the conditional acq value just to discard empty slots (acq=-inf)
# Later discarded (not returned), since they need to be recomputed anyway.
pool_acq_cond = mpi.gather(self.pool.acq_cond[:len(self.pool)])
if mpi.is_main_process:
# Discard unfilled positions
pool_acq_cond = np.concatenate(pool_acq_cond)
i_notnan = np.isfinite(pool_acq_cond)
pool_X = np.concatenate(pool_X)[i_notnan]
pool_y = np.concatenate(pool_y)[i_notnan]
pool_sigma = np.concatenate(pool_sigma)[i_notnan]
pool_acq = np.concatenate(pool_acq)[i_notnan]
return pool_X, pool_y, pool_sigma, pool_acq
return None, None, None, None
def _merge_pools(self, n_points, gpr, method=None):
"""
Merges the pools of parallel processes to find ``n_points`` optimal locations.
Returns all these locations for all processes.
Acquisition values returned are *unconditional*.
"""
if not mpi.multiple_processes: # no need to merge and re-sort
return self.pool
pool_X, pool_y, pool_sigma, pool_acq = self._gather_pools()
merged_pool = None
if mpi.is_main_process:
merged_pool = RankedPool(
n_points, gpr=gpr, acq_func=self.acq_func_y_sigma,
verbose=self.pool.verbose)
merged_pool.add(pool_X, pool_y, pool_sigma, pool_acq, method=method)
merged_pool = mpi.bcast(merged_pool)
return merged_pool
[docs]
class RankedPool():
"""
Keeps a ranked pool of sample proposal for Krigging-believer, given a GP regressor
and an acquisition function.
Parameters
----------
size : int
Number of points sampled proposals targeted.
gpr : GaussianProcessRegressor
The GP Regressor which is used as surrogate model.
acq_func : callable
Acquisition function used to rank the pool. Must be a function of ``(y, sigma)``
only, partially evaluated its hyperparameters, if necessary.
verbose : 1, 2, 3, optional (default: 1)
Level of verbosity. 3 prints Infos, Warnings and Errors, 2
Warnings and Errors, and 1 only Errors. Should be set to 2 or 3 if
problems arise.
"""
def __init__(self, size, gpr, acq_func, verbose=1):
self._gpr = gpr
self._acq_func = acq_func
self.verbose = verbose
# The pool should have one more element than the number of desired points.
self.X = np.zeros((size + 1, gpr.d))
self.y = np.zeros((size + 1))
# Condioned acquisition, used for ranking; -np.inf means "empty slot"
self.acq_cond = np.full((size + 1), -np.inf)
# Input quantities, only used at point, stored just for logging
self.sigma = np.zeros((size + 1))
self.acq = np.zeros((size + 1))
# Cached conditioned GPR's
self.reset_cache()
# Counter how many models have been cached, for efficieny checks
self.cache_counter = 0
def __len__(self):
return len(self.y) - 1
@property
def min_acq(self):
"""
Minimum acquisition function value in order for a point to be considered, i.e.
the conditioned acquisition function value of the last element in the pool.
NB: while the pool is not yet fool, empty points are assigned minus infinity as
acquisition function value, so one can still use the condition
``acq_value > RankedPool.min_acq`` in order to decide whether to add a point.
"""
return self.acq_cond[len(self) - 1]
# TODO: abstract these to a class
[docs]
def log(self, level=None, msg=""):
"""
Print a message if its verbosity level is equal or lower than the given one (or
always if ``level=None``.
"""
if level is None or level <= self.verbose:
print(msg)
[docs]
def str_point(self, X, y, sigma, acq, sigma_cond=None, acq_cond=None):
"""Retuns a standardised string to log a point."""
sigma_cond_str = f" (cond: {sigma_cond})" if sigma_cond is not None else ""
acq_cond_str = f" (cond: {acq_cond})" if acq_cond is not None else ""
return f"{X}, y = {y} +/- {sigma}{sigma_cond_str}; acq = {acq}{acq_cond_str}"
[docs]
def str_pool(
self, include_last=False, last_sorted=None, prefix=None, suffix_last=None):
"""Returns a string representation of the current pool."""
pool_str = ""
for i in range(len(self.X) + (-1 if not include_last else 0)):
pool_str += (
(prefix or "") + f"{i + 1} : " + self.str_point(
self.X[i], self.y[i], self.sigma[i], self.acq[i],
acq_cond=self.acq_cond[i]
) + (" [last sorted]" if i == last_sorted else "") + "\n"
)
return pool_str.rstrip("\n") + (
f" {suffix_last}" if include_last and suffix_last else "")
[docs]
def log_pool(
self, level=4, include_last=False, last_sorted=None, prefix=None,
suffix_last=None):
"""Prints the current pool."""
if self.verbose >= level:
self.log(level=level, msg=self.str_pool(
include_last=include_last, last_sorted=last_sorted, prefix=prefix,
suffix_last=suffix_last))
def __str__(self):
return self.str_pool(include_last=False)
[docs]
def add(self, X, y=None, sigma=None, acq=None, method="single sort acq"):
"""
Adds points to the pool.
Parameters
----------
X: np.ndarray (1- or 2-dimensional)
Position of the proposed sample.
y: np.ndarray (1 dimension fewer than X) or float, optional
Predicted value under the GPR.
sigma: np.ndarray (1 dimension fewer than X) or float, optional
Predicted standard deviation under the GPR. Will be computed if not passed.
acq: np.ndarray (1 dimension fewer than X) or float, optional
Acquisition function values (unconditioned). Will be computed if not passed.
method: {"single", "single sort acq", "single sort y", "bulk"}
Uses the one-by-one algorithm ("single", with pre-sorting according to X if
"single sort X"), or the bulk algorithm.
"""
X = np.atleast_2d(X)
if y is not None:
y = np.atleast_1d(y)
if sigma is not None:
sigma = np.atleast_1d(sigma)
if y is None:
y, sigma = self._gpr.predict(X, return_std=True, validate=False)
elif sigma is None:
sigma = self._gpr.predict_std(X, validate=False)
if acq is None:
acq = self._acq_func(y, sigma)
if method.lower() == "bulk":
self.add_bulk(X, y, sigma, acq)
elif method.lower().startswith("single"):
i_sort = None
if "sort" in method.lower():
i_sort = np.argsort(
{"acq": acq, "y": y}[method.lower().split()[-1]])[::-1]
# Descending order of unconditioned acq or unconditional mean prediction:
# minimizes the number of swaps: model caches + calculation of acq_cond
for i in (i_sort if i_sort is not None else range(len(X))):
self.add_one(X[i], y[i], sigma[i], acq[i])
else:
raise ValueError(f"Algorithm '{method}' not known.")
[docs]
def add_bulk(self, X, y, sigma, acq, i_start=0):
"""
Tries to fill the pull using a batch of points at once:
1. Compute their acquisition value conditioned to the position above (if any).
2. Pick the best and delete infinities (acq cannot grow with more conditioning).
3. Place it in the current position, and do a recursive call for the next one.
The advantage of this method with respect to ``add_one`` is that it can use
vectorization to compute the std's, but on the other hand it needs to compute
many more of them, so it will be better only up to some dimension and some amount
of training, and then ``add_one`` will take over.
"""
# Compute acq using the model just above (and cache it if needed)
if i_start == 0: # no need to condition
gpr = self._gpr
acq_cond = acq if isinstance(acq, np.ndarray) else np.array(acq)
else:
gpr = self.cache_model(i_start - 1)
sigma_cond = gpr.predict_std(X, validate=False)
acq_cond = self._acq_func(y, sigma_cond)
if acq_cond.size == 0:
self.log(
level=4,
msg=f"No finite acq points to fill the pool from [{i_start}] down."
)
return
# Find best
i_max = np.argmax(acq_cond)
acq_cond_max = acq_cond[i_max]
if acq_cond_max == np.inf:
self.log(
level=4,
msg=f"No finite acq points to fill the pool from [{i_start}] down."
)
return
self.X[i_start] = X[i_max]
self.y[i_start] = y[i_max]
self.sigma[i_start] = sigma[i_max]
self.acq[i_start] = acq[i_max]
self.acq_cond[i_start] = acq_cond_max
if i_start == len(self) - 1:
# Last position just filled. We are done.
return
# Remove infinities (cond acq will cannot grow when conditioning even more points)
i_finite_acq_cond = np.logical_not(acq_cond == -np.inf)
i_finite_acq_cond[i_max] = False # also remove point just added
self.add_bulk(
X[i_finite_acq_cond],
y[i_finite_acq_cond],
sigma[i_finite_acq_cond],
acq[i_finite_acq_cond],
i_start=i_start + 1,
)
[docs]
def add_one(self, X, y=None, sigma=None, acq=None, acq_nan_is_null=False):
"""
Tries to add one point to the pool:
1. Computes its acquisition function value.
(If the pools is not full, just adds it and re-sorts.)
2. Finds the provisional position of the point in the list, without KB.
Notice that once KB is taken into account, the KB-ranked position can only be
lower. (If the acq. fun. value is lower than the last point, it is discarded.)
3. Updates its aquisition function value KB-informed by the points above, finds
the new provisional position, and repeats until the position stabilises.
4. Sorts the list of points below the new one, recursively applying KB.
Parameters
----------
X: np.ndarray with 1 dimension
Position of the proposed sample.
y: float, optional
Predicted value under the GPR.
sigma: float, optional
Predicted standard deviation under the GPR.
acq: float, optional
Value of the acquisition function.
acq_nan_is_null: bool, optional (default: False)
Whether NaN's in the acquisition function should be interpreted as null value.
Raises
------
ValueError: if invalid acq. function value, unless ``acq_nan_is_null=True``.
"""
# Discard the point as early as possible!
# NB: The equals sign below takes care of the case in which we are trying to add a
# point with -inf acq. func. to a pool which is not full (min acq. func. = -inf)
if acq <= self.min_acq:
self.log(level=4, msg="[pool.add] Acq. func. value too small. Ignoring.")
return
X = np.atleast_2d(X)
if y is None: # assume sigma is also None
y, sigma = self._gpr.predict(X, return_std=True, validate=False)
y, sigma = y[0], sigma[0]
elif not hasattr(y, "__len__"):
y = np.array([y])
if sigma is None:
sigma = self._gpr.predict_std(X, validate=False)
if acq is None:
acq = self._acq_func(y, sigma)
if self.verbose >= 4:
self.log(
level=4, msg=("[pool.add] Checking point " +
self.str_point(X, y, sigma, acq)))
# Repeat the min acq test above to leave asap
if acq <= self.min_acq:
self.log(level=4, msg="[pool.add] Acq. func. value too small. Ignoring.")
return
if np.isnan(acq):
if not acq_nan_is_null:
raise ValueError(f"Acquisition function value not a number: {acq}")
acq = -np.inf
self.log(level=4, msg="[pool.add] Initial pool:")
self.log_pool(level=4, prefix="[pool.add] ")
# Find its position in the list, conditioned to those on top
# Shortcut: start from the bottom, ignore last element (just a placeholder)
i_new_last = len(self)
acq_cond = deepcopy(acq)
sigma_cond = sigma
while True:
try:
# Notice that we start always from the bottom of the list, in case a point
# with high acq. func. value is close to one of the top points, and its
# conditioned acq. func. value drops to a small value after conditioning.
# The equals sign below prevents -inf's from climbing up the list.
i_new = (len(self) -
next(i for i in range(len(self))
if self.acq_cond[-(i + 2)] >= acq_cond))
except StopIteration: # top of the list reached
i_new = 0
self.log(level=4, msg=f"[pool.add] Provisional position: [{i_new + 1}]")
# top, same as last or last: final ranking reached
if i_new in [0, i_new_last, len(self)]:
break
# Otherwise, compute conditioned acquisition value, using point above,
# and continue to the next iteration to re-rank
sigma_cond = self.gpr_cond[i_new - 1].predict_std(X, validate=False)[0]
# New acquisition should not be higher than the old one, since the new one
# corresponds to a model with more training points (though fake ones).
# This may happen anyway bc numerical errors, e.g. when the correlation
# length is really huge. Also, when alpha or the noise level for two cached
# models are different. We can just ignore it.
# (Sometimes, it's just ~1e6 relative differences, which is not worrying)
acq_cond = min(acq_cond, self._acq_func(y, sigma_cond))
i_new_last = i_new
if self.verbose >= 4: # avoid creating the f-strings
self.log(level=4,
msg=f"[pool.add] Updated conditional std: {sigma_cond}")
self.log(level=4,
msg=f"[pool.add] Updated conditional acquisition: {acq_cond}")
# The last position is just a place-holder: don't save it if it falls there.
if i_new >= len(self):
self.log(level=4, msg="[pool.add] Discarded!")
return
self.log(level=4, msg=f"[pool.add] Final position: [{i_new + 1}] of {len(self)}")
# Insert the new one in its place, and push the rest down one place.
# We track the conditioned acq. value (but not the sigma), to retain the
# information about whether each slot was empty (acq = -inf)
# (but not for the acq values, which will be updated below)
for pool, value in [(self.X, X), (self.y, y),
(self.sigma, sigma), (self.acq, acq),
(self.acq_cond, acq_cond)]:
pool[i_new + 1:] = pool[i_new:-1]
pool[i_new] = value
# If not in the last position, we can safely assume that it has finite value,
# since -inf's from conditional acq cannot climb.
assert self.acq_cond[i_new] > -np.inf
self.log(level=4, msg="[pool.add] Current unsorted pool:")
self.log_pool(level=4, include_last=True, last_sorted=i_new, prefix="[pool.add] ")
# Sort the sublist below the new element
self.sort(i_new + 1)
self.log(level=4, msg="[pool.add] The new pool, sorted:")
self.log_pool(level=4, include_last=True, prefix="[pool.add] ",
suffix_last="[unused]")
# Make sure that the last slot (buffer) is marked as empty
self.acq_cond[-1] = -np.inf
[docs]
def cache_model(self, i):
"""
Cache the GP model that contains the training set plus the pool points up to
position ``i`` (0-based), with predicted dummy y, keeping the GPR hyperparameters
unchanged.
Stores and returns the conditioned gpr (or the original one if ``i=-1``).
"""
# This function accounts for ~50% of the ranking time in add_one() (the rest is
# mostly predict_std()).
# Taking dim=8 as reference, deepcopy is ~1/3 and append_to_data ~2/3 of the cost.
# Possible optimization strategies:
# - Disable SVM in cached models (no need to copy, fit, or evaluate), assuming all
# passed points are finite [tested to improve <10% overall in add_one{}]
# - Copy model above instead of original one, and fit to single new point only
# [tested: no appreciable gain]
# - Keep a single cached model, adding points to it when going down the list.
# If there are very few inversions, we save the cost of copying [potentially
# ~30% this function, 15% overall in add_one()]
# - Disable the copying of stuff not needed to compute std.
# - At append_to_data, compute only what is strictly needed for std (I think the
# kernel gradient is the only such thing.
# - Create model anew with fixed given kernel and no SVM, and fit all points.
# In any case, the cost is now very low compared to nested sampling and hyperparam
# fitting.
if i < 0:
return self._gpr
self.log(level=4, msg=f"[pool.cache] Caching model [{i + 1}]")
self.gpr_cond[i] = deepcopy(self._gpr)
self.gpr_cond[i].append_to_data(
self.X[:i + 1], self.y[:i + 1], fit_gpr=False, fit_classifier=False
)
self.cache_counter += 1
return self.gpr_cond[i]
[docs]
def reset_cache(self):
"""
Deletes the cached GPR models when there are not needed any more.
"""
# No need to store the last element in the list (or the last buffer slot)
self.gpr_cond = [None] * len(self.X - 1)
def __getstate__(self):
return deepcopy(self).__dict__
# Remove references to external objects (gpr and acq_func) and cached gpr's
def __deepcopy__(self, memo=None):
attrs_ignore_at_copy = ["_gpr", "_acq_func", "gpr_cond"]
new = self.__class__.__new__(self.__class__)
new.__dict__ = {k: deepcopy(v) for k, v in self.__dict__.items()
if k not in attrs_ignore_at_copy}
return new
[docs]
def copy(self, drop_empty=False):
"""
Returns a copy of the pool, missing references to external objects.
If ``drop_empty=True`` (default: ``False``), the returned copy has its size
reduced to contain just the final set of finite conditioned acquisition points.
"""
copy_ = deepcopy(self)
if drop_empty:
try:
i_first_empty = next(
i for i, acq in enumerate(copy_.acq_cond[:-1]) if acq == -np.inf
)
except StopIteration:
return copy_
copy_.X = copy_.X[:i_first_empty]
copy_.y = copy_.y[:i_first_empty]
copy_.acq_cond = copy_.acq_cond[:i_first_empty]
copy_.sigma = copy_.sigma[:i_first_empty]
copy_.acq = copy_.acq[:i_first_empty]
return copy_
return copy_
[docs]
def sort(self, i_start=0):
"""
Sorts in descending order of acquisition function value, where the acq of the
``i``-th element (0-based) is conditioned on the GPR model that includes the
points with j<i with their predicted (mean) y.
If ``i_start!=0`` is given, assumes the upper elements in the list are already
sorted following this criterion.
This function assumes that the augmented model just abobe ``i_start`` has not
already been cached, and starts by caching it and computing acquisition function
values ``i_start`` down.
"""
# If beyond last position, nothing to do (the = case is the last buffer slot):
if i_start >= len(self):
self.log(
level=4,
msg="[pool.sort] Nothing to do (sorting beyond last position).")
return
self.log(
level=4,
msg=f"[pool.sort] Sorting the pool starting at [{i_start + 1}]",
)
upper_gpr_cond = self.cache_model(i_start - 1)
# If list not full (first sublist element's acq_cond=-inf), do nothing
if self.acq_cond[i_start] == -np.inf:
self.log(level=4, msg="[pool.sort] Nothing to do (list is not full yet).")
return
# Compute acq_cond for non-empty points (acq != -inf) only. Assume always sorted.
try:
i_1st_inf = next(i for i, ac in enumerate(self.acq_cond) if ac == -np.inf)
except StopIteration:
i_1st_inf = len(self) + 1
sigma_cond = upper_gpr_cond.predict_std(
self.X[i_start:i_1st_inf], validate=False)
# Cond acq cannot be higher than less cond one. This clipping takes care of
# numerical noise that may make it higher. In particular, keeps -inf if it was so.
acq_cond = np.clip(
self._acq_func(self.y[i_start:i_1st_inf], sigma_cond),
None, np.inf if i_start == 0 else self.acq_cond[i_start - 1]
)
if self.verbose >= 4: # avoid creating the f-strings
self.log(level=4, msg=f"[pool.sort] New conditioned std: {sigma_cond}")
self.log(level=4, msg=f"[pool.sort] New conditioned acq: {acq_cond}")
j_sort = np.argsort(-acq_cond) # descending order! -- This is a *sub*list index!
acq_cond_max = acq_cond[j_sort[0]]
# If the max found was -inf, no need to re-sort points: disable all and return
if acq_cond_max == -np.inf:
self.log(
level=4,
msg="[pool.sort] Nothing to do (all sublist elements have -inf acq)."
)
self.acq_cond[i_start:i_1st_inf] = -np.inf
return
self.log(level=4, msg=(
f"[pool.sort] New max acq_cond = {acq_cond_max} "
f"at position [{i_start + j_sort[0] + 1}]")
)
# Reorder (not enough to swap the max here, because new -inf may have been
# generated, and we need to push them too the bottom.
i_sort_partial = i_start + j_sort
self.X[i_start:i_1st_inf] = self.X[i_sort_partial]
self.y[i_start:i_1st_inf] = self.y[i_sort_partial]
self.sigma[i_start:i_1st_inf] = self.sigma[i_sort_partial]
self.acq[i_start:i_1st_inf] = self.acq[i_sort_partial]
# Strictly, we only need to assign the highest acq_cond, since the rest will be
# discarded, but logging is cleaner if all assigned sorted
self.acq_cond[i_start:i_1st_inf] = acq_cond[j_sort]
self.log(level=4, msg=f"[pool.sort] Partial sort up to {i_start + 1}:")
self.log_pool(
level=4, include_last=True, last_sorted=i_start, prefix="[pool.sort] ")
# Sort the sublist below (if empty or single element, taken care of inside)
self.sort(i_start + 1)