"""
This module contains several classes and methods for calculating different
convergence criterions which can be used to determine if the BO loop has
converged.
"""
import sys
import inspect
from copy import deepcopy
from warnings import warn
from abc import ABCMeta, abstractmethod
import numpy as np
from gpry.mc import cobaya_generate_gp_model_input, mcmc_info_from_run
from gpry.tools import kl_norm, is_valid_covmat, nstd_of_1d_nstd, mean_covmat_from_evals, \
credibility_of_nstd
from gpry import mpi
# Policies and default ("necessary") for convergence criteria
_all_convergence_policies_dict = {
"n": "necessary",
"s": "sufficient",
"ns": "necessary and sufficient",
"m": "monitor",
}
_default_convergence_policy = "n"
[docs]
class ConvergenceCheckError(Exception):
"""
Exception to be raised when the computation of the convergence criterion failed.
"""
def builtin_names():
"""
Lists all names of all built-in convergence criteria.
"""
list_conv_names = [
name
for name, obj in inspect.getmembers(sys.modules[__name__])
if (
issubclass(obj.__class__, ConvergenceCriterion.__class__)
and obj is not ConvergenceCriterion
)
]
return list_conv_names
[docs]
class ConvergenceCriterion(metaclass=ABCMeta):
"""Base class for all convergence criteria (CCs). A CC quantifies the
convergence of the GP surrogate model. If this value goes below a certain,
user-set value we consider the GP to have converged to the true posterior
distribution.
Currently several CCs are supported which should be versatile enough for
most tasks. If however one wants to specify a custom CC
it should be a class which inherits from this abstract class.
This class needs to be of the format::
from gpry.convergence import ConvergenceCriterion
class Custom_convergence_criterion(ConvergenceCriterion):
def __init__(self, prior_bounds, params):
# prior_bounds should be a list of prior bounds for all parameters.
# As a minimal requirement this method should set a number
# at which the algorithm is considered to have converged.
# Furthermore this method should initialize empty lists in
# which we can write the values of the convergence criterion
# as well as the total number of posterior evaluations and the
# number of accepted posterior evaluations. This allows for
# easy tracking/plotting of the convergence.
self.values = []
self.n_posterior_evals = []
self.n_accepted_evals = []
self.limit = ... # stores the limit for convergence
def is_converged(self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None):
# Basically a wrapper for the 'criterion_value' method which
# returns True if the convergence criterion is met and False
# otherwise.
def criterion_value():
# Returns the value of the convergence criterion. Should also
# append the current value and the number of posterior
# evaluations to the corresponding variables.
"""
@abstractmethod
def __init__(self, prior_bounds, params):
"""Sets all relevant initial parameters from the 'params' dict."""
self.values = []
self.n_posterior_evals = []
self.n_accepted_evals = []
self._set_convergence_policy(params)
[docs]
def get_history(self):
"""Returns the two lists containing the values of the convergence
criterion at each step as well as the total number of evaluations and
the number of accepted evaluations.
"""
try:
values = self.values
n_posterior_evals = self.n_posterior_evals
n_accepted_evals = self.n_accepted_evals
except Exception as excpt:
raise AttributeError(
"The convergence criterion does not save its convergence history."
) from excpt
if len(values) == 0 or len(n_posterior_evals) == 0:
raise ValueError(
"Make sure to call the convergence criterion "
"before getting it's history."
)
return values, n_posterior_evals, n_accepted_evals
[docs]
@abstractmethod
def is_converged(
self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None, acquisition=None
):
"""
Returns False if the algorithm hasn't converged and True if it has.
If gp_2 is None the last GP is taken from the model instance.
"""
[docs]
@abstractmethod
def criterion_value(self, gp, gp_2=None):
"""
Returns the value of the convergence criterion for the current
gp. If gp_2 is None the last GP is taken from the model instance.
"""
@property
def last_value(self):
"""Last value of the convergence criterion."""
return deepcopy(self.values[-1])
@property
def is_MPI_aware(self):
"""
Should return True if the convergence criterion should run in multiple processes
using MPI communication.
"""
return False
def _set_convergence_policy(self, params):
# pylint: disable=attribute-defined-outside-init
self._convergence_policy = (params or {}).get(
"policy", _default_convergence_policy
)
try:
self._convergence_policy = self._convergence_policy.lower()
if self._convergence_policy not in _all_convergence_policies_dict:
raise ValueError()
except (AttributeError, ValueError) as excpt:
raise ValueError(
"Convergence 'policy' must be one of the following strings: "
f"{_all_convergence_policies_dict}. Got {self._convergence_policy}."
) from excpt
@property
def convergence_policy(self):
"""
Returns a string describing the convergence policy.
"""
return self._convergence_policy
@property
def convergence_policy_MPI(self):
"""
Returns a string describing the convergence policy (MPI-wrapped!)
"""
if self.is_MPI_aware or mpi.is_main_process:
convergence_policy = self._convergence_policy
else:
convergence_policy = None
return mpi.bcast(convergence_policy)
[docs]
def is_converged_MPIwrapped(self, *args, **kwargs):
"""
MPI-aware wrapper for calling is_converged inside the runner.
Returns convergence criterion value for process 0.
If fails, raises ConvergenceCheckError in all processes.
"""
mpi.sync_processes() # for timing
failed = False
has_converged = None
err_msg = None
if self.is_MPI_aware or mpi.is_main_process:
try:
has_converged = self.is_converged(*args, **kwargs)
except ConvergenceCheckError as excpt:
failed = True
err_msg = str(excpt)
if any(mpi.allgather(failed)):
# Take lowest-rank non-null error message
err_msg = [msg for msg in mpi.allgather(err_msg) if msg is not None][0]
raise ConvergenceCheckError(err_msg)
return mpi.bcast(has_converged)
class DummyMPIConvergeCriterion(ConvergenceCriterion):
"""
Class to be held by non-0 rank MPI processes if the corresponding criterion is not
MPI-aware, in order to simplify the code.
"""
def __init__(self):
pass
def criterion_value(self, *args, **kwargs):
raise TypeError("This method should not be called for this class.")
def is_converged(self, *args, **kwargs):
raise TypeError("This method should not be called for this class.")
@property
def last_value(self):
"""Last value of the convergence criterion."""
return np.nan
[docs]
class DontConverge(ConvergenceCriterion):
"""
This convergence criterion is mainly for testing purposes and always
returns False when ``is_converged`` is called. Use this method together
with the ``max_points`` and ``max_accepted`` keys in the options dict to stop
the BO loop at a set number of iterations.
"""
def __init__(self, prior_bounds=None, params=None):
self.values = []
self.limit = np.nan
self.thres = []
self.n_posterior_evals = []
self.n_accepted_evals = []
self.prior_bounds = prior_bounds
# Explicitly set "necessary" as policy
self._set_convergence_policy({"policy": "n"})
[docs]
def criterion_value(self, gp, gp_2=None):
self.values.append(np.nan)
self.thres.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
return np.nan
[docs]
def is_converged(
self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None, acquisition=None
):
self.criterion_value(gp, gp_2)
return False
[docs]
class GaussianKL(ConvergenceCriterion):
"""
This criterion estimates convergence as stability of the Gaussian-approximated,
single-mode KL divergence of surrogate posterior samples between runs.
If a valid GPAcquisition instance is passed to ``is_converged``, mean and covariance
will be extracted from it. Otherwise, it estimates the mean and covariance by running
an MCMC sampler on the GP (slow).
In the second case, this convergence criterion is MPI-aware, such that it will run as
many parallel MCMC chains as running processes to improve the estimation of the mean
and covariance.
Parameters
----------
prior_bounds : list
List of prior bounds.
params : dict
Dict with the following keys:
* ``"limit"``: Value of the KL divergence for which we consider the algorithm
converged (default ``2e-2``).
* ``"limit_times"``: Number of consecutive times that the KL divergence must be
lower than the ``limit`` parameter (default ``2``).
* ``"n_draws"``: Number of steps of the MCMC chain (default: ignored in favour of
``"n_draws_per_dimsquared"``).
* ``"n_draws_per_dimsquared"``: idem, as a factor of the dimensionality squared
(default 10).
* ``"max_reused"``: number of times a sample can be reweighted and reused (may
miss new high-value regions) (default 4).
"""
@property
def is_MPI_aware(self):
return True
def __init__(self, prior_bounds, params):
self.prior_bounds = prior_bounds
self.mean = None
self.cov = None
# The limit cannot be too strict, since the NS sample is not so stable itself
self.limit = params.get("limit", 2e-2)
d = len(self.prior_bounds)
# Needs to at least encompass 2 full MC samples -- TODO: fix in run.py at init
self.limit_times = int(np.round(params.get("limit_times", d)))
self._set_convergence_policy(params)
self.values = []
self.thres = []
self.n_posterior_evals = []
self.n_accepted_evals = []
# Number of MCMC chains to generate samples
if params.get("n_draws") and params.get("n_draws_per_dimsquared"):
raise ValueError(
"Pass either 'n_draws' or 'n_draws_per_dimsquared', not both"
)
if params.get("n_draws"):
self._n_draws = int(params.get("n_draws"))
else:
self.n_draws_per_dimsquared = params.get("n_draws_per_dimsquared", 10)
# Max times a sample can be reweighted and reused (we may miss new high regions)
self.max_reused = params.get("max_reused", 4)
self.n_reused = 0
# We'll some hight MCMC temperature, to get the tails right
self.temperature = 2
# Prepare Cobaya's input
self.paramnames = [f"x_{i + 1}" for i in range(d)]
self.cobaya_input = None
# Save last sample
self._last_info = {}
self._last_collection = None
def _get_new_mean_and_cov(self, gp, acquisition=None):
try:
return self._get_new_mean_and_cov_from_acquisition(acquisition)
except AttributeError:
warn(
"Could not get sample from acquisition object. "
"Running MC process to get mean and covmat."
)
return self._get_new_mean_and_cov_from_mc(gp)
[docs]
def _get_new_mean_and_cov_from_acquisition(self, acquisition):
"""
Tries to extract the mean and covmat from the acquisition object.
Raises AttributeError for null acquisition object or it not having samples.
"""
mean, cov = None, None
attr_error, num_error = None, None
if mpi.is_main_process:
try:
X, _, _, w = acquisition.last_MC_sample(warn_reweight=False)
except AttributeError as excpt:
attr_error = excpt
else:
try:
mean = np.average(X, weights=w, axis=0)
cov = np.atleast_2d(np.cov(X.T, aweights=w, ddof=0))
except (ValueError, TypeError) as excpt:
num_error = excpt
attr_error = mpi.bcast(attr_error)
if attr_error:
raise AttributeError from attr_error # all processes!
num_error = mpi.bcast(num_error)
if num_error:
raise ConvergenceCheckError(
f"Numerical error when computing new mean and cov: {num_error}"
) from num_error
return mpi.bcast((mean, cov) if mpi.is_main_process else None)
def _get_new_mean_and_cov_from_mc(self, gp):
self.thres.append(self.limit)
cov_mcmc = None
if mpi.is_main_process:
reused = False
if self._last_collection is not None:
points = self._last_collection[self.paramnames].to_numpy(np.float64)
old_gp_values = -0.5 * self._last_collection["chi2"].to_numpy(np.float64)
new_gp_values = gp.predict(points)
weights = self._last_collection["weight"].to_numpy(np.float64)
logratio = new_gp_values - old_gp_values
logratio -= max(logratio)
reweights = weights * np.exp(logratio)
# Remove points with very small weight: more numerically stable
i_nonzero = np.argwhere(reweights > 1e-8).T[0]
reweights = reweights[i_nonzero]
points = points[i_nonzero]
mean_reweighted = np.average(points, weights=reweights, axis=0)
cov_reweighted = np.cov(points.T, aweights=reweights)
cov_mcmc = cov_reweighted
# Use max of them
try:
kl_reweight = max(
kl_norm(mean_reweighted, cov_reweighted, self.mean, self.cov),
kl_norm(self.mean, self.cov, mean_reweighted, cov_reweighted),
)
except np.linalg.LinAlgError as excpt:
raise ConvergenceCheckError(
f"Could not compute KL norm: {excpt}."
) from excpt
# If very small, we've probably found nothing yet, so nothing new
# But assume that if we have hit 10 * limit, we are right on track
min_kl = self.limit * 1e-2 if max(self.values) < 10 * self.limit else 0
# If larger than the difference with the last one, bad
max_kl = self.values[-1]
if min_kl < kl_reweight < max_kl and self.n_reused < self.max_reused:
self.n_reused += 1
reused = True
if mpi.multiple_processes:
reused = mpi.bcast(reused if mpi.is_main_process else None)
if reused:
if mpi.multiple_processes:
mean_reweighted, cov_reweighted = mpi.bcast(
(mean_reweighted, cov_reweighted) if mpi.is_main_process else None
)
return mean_reweighted, cov_reweighted
# No previous mcmc sample, or reweighted mean+cov too different
self._last_info, samples = self._sample_mcmc(gp, covmat=cov_mcmc)
# Compute mean and cov, and broadcast
if mpi.is_main_process:
mean_new, cov_new = samples.mean(), samples.cov()
# Only main process caches this one, to save memory
self._last_collection = samples
# Broadcast results
if mpi.multiple_processes:
mean_new, cov_new = mpi.bcast(
(mean_new, cov_new) if mpi.is_main_process else None
)
return mean_new, cov_new
# pylint: disable=import-outside-toplevel
def _sample_mcmc(self, gpr, covmat=None):
from cobaya.model import get_model
from cobaya.sampler import get_sampler
from cobaya.log import LoggedError
# Update Cobaya's input: mcmc's proposal covmat and log-likelihood
self.cobaya_input = cobaya_generate_gp_model_input(
gpr, self.prior_bounds, self.paramnames
)
# Supress Cobaya's output
# (set to True for debug output, or comment out for normal output)
self.cobaya_input["debug"] = 50
# Create model and sampler
model = get_model(self.cobaya_input)
if covmat is not None and is_valid_covmat(covmat):
cov = covmat
else:
cov = self.cov
sampler_info = mcmc_info_from_run(model, gpr, cov=cov)
sampler_info["mcmc"]["temperature"] = self.temperature
high_prec_threshold = (self.values[-1] < 1) if len(self.values) > 0 else False
# Relax stopping criterion if not yet well converged
sampler_info["mcmc"].update(
{
"Rminus1_stop": (0.01 if high_prec_threshold else 0.2),
"Rminus1_cl_stop": (0.2 if high_prec_threshold else 0.5),
}
)
mcmc_sampler = get_sampler(sampler_info, model)
try:
mcmc_sampler.run()
success = True
except LoggedError:
success = False
if mpi.multiple_processes:
success = all(mpi.allgather(success))
if not success:
raise ConvergenceCheckError
updated_info = model.info()
updated_info["sampler"] = {"mcmc": mcmc_sampler.info()}
samples = mcmc_sampler.samples(combined=True, skip_samples=0.33)
samples.reset_temperature()
return updated_info, samples
[docs]
def criterion_value(self, gp, gp_2=None, acquisition=None):
try:
mean_new, cov_new = self._get_new_mean_and_cov(gp, acquisition=acquisition)
except ConvergenceCheckError as excpt:
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Error when computing mean and covmat: {excpt}"
) from excpt
if gp_2 is not None:
# TODO: Nothing yet to do with gp2
pass
if self.mean is None or self.cov is None:
# Nothing to compare to! But save mean, cov for next call
self.mean, self.cov = mean_new, cov_new
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError("No previous call: cannot compute criterion yet.")
else:
mean_old, cov_old = np.copy(self.mean), np.copy(self.cov)
# Compute the KL divergence (gaussian approx) with the previous iteration
try:
kl = kl_norm(mean_new, cov_new, mean_old, cov_old)
if kl < 0:
raise ValueError("Negative KL -> undefined")
self.mean = mean_new
self.cov = cov_new
self.values.append(kl)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
except Exception as excpt:
kl = np.nan
self.mean = mean_new
self.cov = cov_new
self.values.append(kl)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(f"Computation error in KL: {excpt}") from excpt
return kl
[docs]
def is_converged(
self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None, acquisition=None
):
self.criterion_value(gp, gp_2, acquisition)
try:
if np.all(np.abs(np.array(self.values[-self.limit_times:])) < self.limit):
return True
except IndexError:
pass
return False
# Safe copying and pickling
def __getstate__(self):
return deepcopy(self).__dict__
def __deepcopy__(self, memo=None):
# Remove non-picklable gp model likelihood
if self.cobaya_input and "likelihood" in self.cobaya_input:
like = list(self.cobaya_input["likelihood"])[0]
self.cobaya_input["likelihood"][like]["external"] = True
if self._last_info and "likelihood" in self._last_info:
self._last_info["likelihood"][like]["external"] = True
new = (lambda cls: cls.__new__(cls))(self.__class__)
new.__dict__ = {k: deepcopy(v) for k, v in self.__dict__.items() if k != "log"}
return new
[docs]
class GaussianKLTrain(GaussianKL):
"""
This criterion is not aimed at estimating convergence, but at discarding cases in
which a MC sample from the GPR (the last one obtained by the acquisition step, if it
exists, otherwise computed on the fly) would not sample the mode mapped by the
training set, but instead some overshooting or large baseline plateau. It compares the
Gaussian approximation of the last MC sample by the acquisition step with the mean and
covariance matrix computed from the training set using probabilities as weights.
Since its a check in the current iteration, by default it is enough for this criterion
to be satisfied in the last step, and with a high tolerance, since it affects extreme
cases only.
At the moment, it assumes that there is a single mode.
If a valid GPAcquisition instance is passed to ``is_converged``, mean and covariance
will be extracted from it. Otherwise, it estimates the mean and covariance by running
an MCMC sampler on the GP (slow).
In the second case, this convergence criterion is MPI-aware, such that it will run as
many parallel MCMC chains as running processes to improve the estimation of the mean
and covariance.
Parameters
----------
prior_bounds : list
List of prior bounds.
params : dict
Dict with the following keys:
* ``"limit"``: Value of the KL divergence for which we consider the algorithm
converged (default ``2e-2``).
* ``"limit_times"``: Number of consecutive times that the KL divergence must be
lower than the ``limit`` parameter (default ``2``).
* ``"n_draws"``: Number of steps of the MCMC chain (default: ignored in favour of
``"n_draws_per_dimsquared"``).
* ``"n_draws_per_dimsquared"``: idem, as a factor of the dimensionality squared
(default 10).
* ``"max_reused"``: number of times a sample can be reweighted and reused (may
miss new high-value regions) (default 4).
"""
def __init__(self, prior_bounds, params):
params = params or {}
if params.get("limit") is None:
params["limit"] = len(prior_bounds)
if params.get("limit_times") is None:
params["limit_times"] = 2
super().__init__(prior_bounds, params)
def _get_mean_and_cov_from_training(self, gp):
return mean_covmat_from_evals(gp.X_train, gp.y_train)
[docs]
def criterion_value(self, gp, gp_2=None, acquisition=None):
try:
mean_new, cov_new = self._get_new_mean_and_cov(gp, acquisition=acquisition)
except ConvergenceCheckError as excpt:
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Error when computing mean and covmat: {excpt}"
) from excpt
try:
mean_training, cov_training = self._get_mean_and_cov_from_training(gp)
except Exception as excpt:
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Error when computing mean and covmat from training: {excpt}"
) from excpt
if gp_2 is not None:
# TODO: Nothing yet to do with gp2
pass
# Compute the KL divergence (gaussian approx) between with the previous iteration
try:
kl = kl_norm(mean_new, cov_new, mean_training, cov_training)
if kl < 0:
raise ValueError("Negative KL -> undefined")
self.mean = mean_new
self.cov = cov_new
self.values.append(kl)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
except Exception as excpt:
kl = np.nan
self.mean = mean_new
self.cov = cov_new
self.values.append(kl)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(f"Computation error in KL: {excpt}") from excpt
return kl
[docs]
class TrainAlignment(GaussianKL):
"""
This criterion is not aimed at estimating convergence, but at discarding cases in
which a MC sample from the GPR (the last one obtained by the acquisition step, if it
exists, otherwise computed on the fly) would not sample the mode mapped by the
training set, but instead some overshooting or large baseline plateau.
It computes the minimum central confidence level of the mean of the training set with
respect to a Gaussian approximation of the surrogate posterior.
Its maximum value is obviously 1, and for the kind of test that this criterion
addresses a value below 0.5 should be enough. It's minimum value is clipped at 0.001,
to avoid spoiling the convergence plots with numerical noise.
Since its a check in the current iteration, by default it is enough for this criterion
to be satisfied in the last step, and with a high tolerance, since it affects extreme
cases only.
At the moment, it assumes that there is a single mode.
If a valid GPAcquisition instance is passed to ``is_converged``, mean and covariance
will be extracted from it. Otherwise, it estimates the mean and covariance by running
an MCMC sampler on the GP (slow).
In the second case, this convergence criterion is MPI-aware, such that it will run as
many parallel MCMC chains as running processes to improve the estimation of the mean
and covariance.
Parameters
----------
prior_bounds : list
List of prior bounds.
params : dict
Dict with the following keys:
* ``"frac_training"``: fraction, starting from the latest, of the training set to
be used (default: 1)
* ``"limit"``: Probability mass within the minimum CL enclosing the training mean
(default ``0.5``).
* ``"limit_times"``: Number of consecutive times that the criterion must be
fulfilled (default ``1``).
* ``"n_draws"``: Number of steps of the MCMC chain (default: ignored in favour of
``"n_draws_per_dimsquared"``).
* ``"n_draws_per_dimsquared"``: idem, as a factor of the dimensionality squared
(default 10).
* ``"max_reused"``: number of times a sample can be reweighted and reused (may
miss new high-value regions) (default 4).
"""
def __init__(self, prior_bounds, params):
params = params or {}
self.frac_training = params.get("frac_training", 1)
if params.get("limit") is None:
params["limit"] = 0.5
if params.get("limit_times") is None:
params["limit_times"] = 1
super().__init__(prior_bounds, params)
def _get_mean_from_training(self, gp):
Nfrac = int(gp.n * self.frac_training)
return mean_covmat_from_evals(gp.X_train[-Nfrac:], gp.y_train[-Nfrac:])[0]
@staticmethod
def criterion_value_from_means_cov(mean1, mean2, cov):
mean_diff = mean1 - mean2
chi2 = mean_diff @ np.linalg.inv(cov) @ mean_diff
return credibility_of_nstd(np.sqrt(chi2), len(mean1))
[docs]
def criterion_value(self, gp, gp_2=None, acquisition=None):
try:
mean_new, cov_new = self._get_new_mean_and_cov(gp, acquisition=acquisition)
except ConvergenceCheckError as excpt:
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Error when computing mean and covmat: {excpt}"
) from excpt
try:
mean_training = self._get_mean_from_training(gp)
except Exception as excpt:
self.values.append(np.nan)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Error when computing mean and covmat from training: {excpt}"
) from excpt
if gp_2 is not None:
# TODO: Nothing yet to do with gp2
pass
# Compute the CL corresponding to the Chi-squared of the mean of the training set
try:
eps = self.criterion_value_from_means_cov(mean_new, mean_training, cov_new)
if eps < 0:
raise ValueError("Negative credibility -> undefined")
eps = max(eps, 1e-3) # cap to avoid plotting numerical noise
self.mean = mean_new
self.cov = cov_new
self.values.append(eps)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
except Exception as excpt:
eps = np.nan
self.mean = mean_new
self.cov = cov_new
self.values.append(eps)
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
raise ConvergenceCheckError(
f"Computation error in train mean alignment: {excpt}"
) from excpt
return eps
[docs]
class CorrectCounter(ConvergenceCriterion):
r"""
This convergence criterion determines convergence by requiring that the GP's
predictions of the posterior values in the last :math:`n` steps are correct up to a
certain threshold. This condition is fulfilled if
.. math::
|f(x)-\overline{f}_{\mathrm{GP}}(x)| < (f_{\mathrm{max}}(x) - f(x)) \cdot r + a
where the parameters :math:`r` and :math:`a` are the relative and absolute
tolerances controlled by the `reltol` and `abstol` parameters.
We set the "value" of the criterion to be the maximum difference of the GP
prediction and the true posterior in the last batch of accepted evaluations.
Furthermore this class contains an internal list `thres` which contains the
threshold values corresponding to this difference.
Parameters
----------
prior_bounds : list
List of prior bounds.
params : dict
Dict with the following keys:
* ``"n_correct"``: Number of consecutive samples which need to be under the
threshold (default ``max(4, 0.5*N_d)``)
* ``"reltol"``: Relative tolerance parameter (default ``0.01``)
* ``"abstol"``: Absolute tolerance parameter (default ``"0.01s"``)
* ``"verbose"``: Verbosity
.. note::
The ``"reltol"`` and ``"abstol"`` parameters can be passed as a string ending
with either ``"l"`` or ``"s"``. In this case the value of this parameter is
scaled with the number of dimensions as either linear (``"l"``) or square
(``"s"``) of the depth of the :math:`\chi^2` of the :math:`1-\sigma`-contour
assuming a gaussian distribution.
"""
def __init__(self, prior_bounds, params):
d = len(prior_bounds)
self.ncorrect = params.get("n_correct", max(4, np.ceil(0.5 * d)))
reltol = params.get("reltol", 0.01)
if isinstance(reltol, str):
try:
assert reltol[-1] == "l" or reltol[-1] == "s" or reltol[-1] == "r"
if reltol[-1] == "l":
reltol = float(reltol[:-1]) * nstd_of_1d_nstd(1, d)
elif reltol[-1] == "s":
reltol = float(reltol[:-1]) * nstd_of_1d_nstd(1, d) ** 2.0
elif reltol[-1] == "r":
reltol = float(reltol[:-1]) * np.sqrt(nstd_of_1d_nstd(1, d))
except Exception as excpt:
raise ValueError(
"The 'reltol' parameter can either be a number "
+ f"or a string with a number followed by 'l' or 's'. Got {reltol}"
) from excpt
self.reltol = reltol
abstol = params.get("abstol", "0.01s")
if isinstance(abstol, str):
try:
assert abstol[-1] == "l" or abstol[-1] == "s" or abstol[-1] == "r"
if abstol[-1] == "l":
abstol = float(abstol[:-1]) * nstd_of_1d_nstd(1, d)
elif abstol[-1] == "s":
abstol = float(abstol[:-1]) * nstd_of_1d_nstd(1, d) ** 2.0
elif abstol[-1] == "r":
abstol = float(abstol[:-1]) * np.sqrt(nstd_of_1d_nstd(1, d))
except Exception as excpt:
raise ValueError(
"The 'abstol' parameter can either be a number "
+ f"or a string with a number followed by 'l' or 's'. Got {abstol}"
) from excpt
self.abstol = abstol
self.verbose = params.get("verbose", 0)
self._set_convergence_policy(params)
self.values = []
self.n_posterior_evals = []
self.n_accepted_evals = []
self.thres = []
self.n_pred = 0
[docs]
def is_converged(
self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None, acquisition=None
):
self.criterion_value(gp, new_X=new_X, new_y=new_y, pred_y=pred_y)
return self.n_pred > self.ncorrect
[docs]
def criterion_value(self, gp, gp_2=None, new_X=None, new_y=None, pred_y=None):
n_new = len(new_y)
assert n_new == len(pred_y)
max_val = 0
max_diff = 0
max_thres = 0
for yn, yl in zip(new_y, pred_y):
# Remove warning case that does not trigger any condition below
if yn == -np.inf:
continue
# rel_difference = np.abs((yl - gp.y_max) / (yn - gp.y_max) - 1.)
diff = np.abs(yl - yn)
thres = np.abs(yn - gp.y_max) * self.reltol + self.abstol
if diff / thres > max_val:
max_val = diff / thres
max_diff = diff
max_thres = thres
if diff < thres:
self.n_pred += 1
if self.verbose > 0:
print(f"Already {self.n_pred} correctly predicted \n")
else:
self.n_pred = 0
if self.verbose > 0:
print("Mispredict...")
self.values.append(max_diff if n_new > 0 else self.values[-1])
self.thres.append(max_thres if n_new > 0 else self.thres[-1])
self.n_posterior_evals.append(gp.n_total)
self.n_accepted_evals.append(gp.n)
return max_val if n_new > 0 else self.values[-1]
@property
def limit(self):
"""Limit for the criterion value (changes along iterations for this criterion)."""
return self.thres[-1]