"""
Classes to generate random proposals within the prior or trust bounds, used to generate
initial samples for the active learning cycle, or as starting points from which to
optimize the acquisition function.
"""
from abc import ABCMeta, abstractmethod
from functools import partial
from math import inf
from warnings import warn
import scipy.stats
import numpy as np
from gpry.tools import check_random_state, is_in_bounds
from gpry import mpi
[docs]
def check_in_bounds(get_method):
"""
Decorator for ``get`` methods of ``Proposer`` sub-classes, that call the method until
the returned proposal falls within the ``bounds`` defined as an attribute.
Print a warning every 1000 failed attempts.
It does not need to be used if the returned proposals are guaranteed to fulfil it.
"""
def wrapper(self, *args, **kwargs):
i = 0
x = np.nan
while not is_in_bounds(x, self.bounds, check_shape=False)[0]:
i += 1
if not i % 1000:
warn(
f"[{self.__class__.__name__}] Could produce a proposal"
f" within the given bounds after {i} tries."
)
x = get_method(self, *args, **kwargs)
return x
return wrapper
[docs]
class Proposer(metaclass=ABCMeta):
"""
Base proposer class. All other proposers inherit from it. If you want to define your
own custom proposer it should also inherit from it.
"""
[docs]
@abstractmethod
def get(self, rng=None):
"""
Returns a random sample (given a certain random state) in the parameter space for
getting initial training samples or the acquisition function to be optimized from.
If the output is not guaranteed by construction to be within the bounds defined in
the ``bounds`` attribute, decorated this method with ``check_in_bounds``.
Parameters
----------
rng : int or numpy.random.Generator, optional
The generator used to propose points. If an integer is given, it is used as a
seed for the default global numpy random number generator.
"""
# pylint: disable=attribute-defined-outside-init
[docs]
def update_bounds(self, bounds):
"""
Updates the bounds for the proposal.
Parameters
----------
bounds : array
Bounds in which to optimize the acquisition function, assumed to be of shape
(d,2) for d dimensional prior
"""
self.bounds = np.atleast_2d(bounds)
[docs]
def update(self, gpr):
"""
Updates the internal GP instance if it has been updated with new data.
Parameters
----------
gpr : GaussianProcessRegressor
The gpr instance that has been updated.
"""
[docs]
class InitialPointProposer(metaclass=ABCMeta):
"""
Base proposer class for all proposers which work for initial point generation.
"""
[docs]
class ReferenceProposer(Proposer, InitialPointProposer):
"""
Generates proposals from the "reference" distribution defined in the Truth. If no
reference distribution is defined it defaults to the prior.
Parameters
----------
truth : Truth
The true model from which to draw the samples.
"""
def __init__(self, truth, bounds=None):
self.truth = truth
self.update_bounds(bounds if bounds is not None else truth.prior_bounds)
[docs]
@check_in_bounds
def get(self, rng=None):
return self.truth.ref_sample(rng)
[docs]
class PriorProposer(Proposer, InitialPointProposer):
"""
Generates proposals from the prior of the problem.
Parameters
----------
truth : Truth
The true model from which to draw the samples.
"""
def __init__(self, truth, bounds=None):
self.truth = truth
self.update_bounds(bounds if bounds is not None else truth.prior_bounds)
[docs]
@check_in_bounds
def get(self, rng=None):
return self.truth.prior_sample(rng)
[docs]
class PartialProposer(Proposer, InitialPointProposer):
"""
Combines any of the other proposers with a :class:`UniformProposer` with
a fraction drawn from the uniform proposer to encourage exploration.
.. warning::
If you want to use this proposer for initial point generation make sure that your
true_proposer is compatible.
Parameters
----------
bounds : array-like, shape=(n_dims,2)
Array of bounds of the prior [lower, upper] along each dimension.
true_proposer: Proposer
The initialized Proposer instance to use instead of uniform for a
fraction of samples.
random_proposal_fraction : float, between 0 and 1, optional (default=0.25)
The fraction of proposals that is drawn from the UniformProposer.
"""
# Either sample from true_proposer, or give a random prior sample
def __init__(self, bounds, true_proposer, random_proposal_fraction=0.25):
if random_proposal_fraction > 1.0 or random_proposal_fraction < 0.0:
raise ValueError(
"Cannot pass a fraction outside of [0,1]. "
f"You passed 'random_proposal_fraction={random_proposal_fraction}'"
)
if not isinstance(true_proposer, Proposer):
raise ValueError("The true proposer needs to be a valid proposer.")
self.rpf = random_proposal_fraction
# TODO: Make this a sample of the prior instead of uniform hypercube.
self.random_proposer = UniformProposer(bounds)
self.true_proposer = true_proposer
# Not decorating it, assuming the individual ones fulfil the in-bounds criterion,
# either by construction or because of their own ``check_in_bounds`` decorator.
[docs]
def get(self, rng=None):
rng = check_random_state(rng)
if rng.random() > self.rpf:
return self.true_proposer.get(rng=rng)
return self.random_proposer.get(rng=rng)
[docs]
def update(self, gpr):
self.true_proposer.update(gpr)
[docs]
def update_bounds(self, bounds):
self.random_proposer.update_bounds(bounds)
self.true_proposer.update_bounds(bounds)
[docs]
class MeanCovProposer(Proposer, InitialPointProposer):
"""
Generates proposals from a multivariate normal distribution given a mean
vector and covariance matrix.
Parameters
----------
mean : array-like, shape=(n_dims,)
Mean-vector of the multivariate normal distribution.
cov : array-like, shape=(n_dims, n_dims)
Covariance matrix of the multivariate normal distribution.
.. note::
We conduct no explicit checks on whether the covariance matrix you
provide is singular. Make sure that your matrix is a valid
covariance matrix!
include_mean : bool (defaulf: False)
If True, returns the mean in the first call to ``get`` (only for the 1st MPI rank)
"""
def __init__(self, bounds, mean, cov, include_mean=False):
self.update_bounds(bounds)
self._mean_used = not include_mean
self._mean = np.array(mean)
self.proposal_function = scipy.stats.multivariate_normal(
mean=mean, cov=cov, allow_singular=True
).rvs
[docs]
@check_in_bounds
def get(self, rng=None):
if not self._mean_used:
self._mean_used = True
if mpi.is_main_process:
return self._mean
return self.proposal_function(random_state=rng)
[docs]
class CentroidsProposer(Proposer):
"""
Proposes points at the centroids of subsets of dim-1 training points. It
perturbs some of the proposals away from the centroids to encourage
exploration.
bounds : array-like, shape=(n_dims,2)
Array of bounds of the prior [lower, upper] along each dimension.
lambda: float, optional (default=1)
Controls the scale of the perturbation of samples. Lower values
correspond to more exploration.
"""
def __init__(self, bounds, lambd=1.0):
self.training = None
self.training_ = None # in-bounds subset
self.update_bounds(bounds)
# TODO: adapt lambda to dimensionality!
# e.g. 1 seems to work well for d=2, and ~0.5 for d=30
self.kicking_pdf = scipy.stats.expon(scale=1 / lambd)
@property
def d(self):
"""Dimensionality of the prior."""
return len(self.bounds)
# No need for check_in_bounds, by construction (uses np.clip)
[docs]
def get(self, rng=None):
rng = check_random_state(rng)
m = self.d + 1
# If possible, get points inside bounds, otherwise outside (1st iteration(s))
try:
subset = self.training_[
rng.choice(len(self.training_), size=m, replace=False)
]
except ValueError: # m > len(training_)
subset = self.training[rng.choice(len(self.training), size=m, replace=False)]
centroid = np.average(subset, axis=0)
# perturb the point: per dimension, add a random multiple of the difference
# between the centroid and one of the points.
kick = -centroid + np.array(
[
subset[j][i]
for i, j in enumerate(rng.choice(m, size=self.d, replace=False))
]
)
kick *= self.kicking_pdf.rvs(self.d, random_state=rng)
# This might have to be modified if the optimizer can't deal with
# points which are exactly on the edges.
return np.clip(centroid + kick, self.bounds[:, 0], self.bounds[:, 1])
[docs]
def update(self, gpr):
# Get training locations from gpr and save them
self.training = np.copy(gpr.X_train)
[docs]
def update_bounds(self, bounds):
super().update_bounds(bounds)
# Save training samples inside new bounds
if self.training is None:
return
self.training_ = self.training[is_in_bounds(self.training, bounds)]
# FROM HERE ON, UNUSED, POSSIBLY OUTDATED (and Cobaya-dependent) #########################
[docs]
class MeanAutoCovProposer(Proposer, InitialPointProposer):
"""
Does the same as :class:`MeanCovProposer` but tries to get an automatically
generated covariance matrix from Cobaya's `Cosmo Input Generator`.
Parameters
----------
mean : array-like, shape=(n_dims,)
Mean-vector of the multivariate normal distribution.
model_info : dict
The info dictionary used to generate the model.
"""
def __init__(self, mean, model_info):
from cobaya.cosmo_input.autoselect_covmat import get_best_covmat
from cobaya.tools import resolve_packages_path
cmat_dir = get_best_covmat(model_info, packages_path=resolve_packages_path())
if np.any(d != 0 for d in cmat_dir["covmat"].shape):
self.proposal_function = scipy.stats.multivariate_normal(
mean=mean, cov=cmat_dir["covmat"], allow_singular=True
).rvs
else:
# TODO :: how to gracefully fall back if autocovmat not found
raise Exception("Autocovmat is not valid")
# UNDEFINED: model
self.proposal_function = model.prior.sample
[docs]
@check_in_bounds
def get(self, rng=None):
return self.proposal_function(random_state=rng)
[docs]
class SmallChainProposer(Proposer):
"""
Uses a short MCMC chain starting from a random training point of the GP to
generate proposals. Runs a chain of `npoints` length and takes the `nsteps`
last points as proposals. Once the proposals have been exhausted or the GP
has been fit with new data a new chain is run from a different starting
location.
Parameters
----------
bounds : array-like, shape=(n_dims,2)
Array of bounds of the prior [lower, upper] along each dimension.
n_points : int, optional (default=100)
The max number of samples that is generated from the chain
nsteps : int, optional (default=10)
The number of MC steps that is taken from the end of the chain.
nretries : int, optional (default=3)
The number of times that the MC chain is restarted if it fails.
If the chain fails `nretries` times a warning is printed and samples
from a uniform distribution are returned.
"""
def __init__(self, bounds, npoints=100, nsteps=20, nretries=3):
self.samples = []
self.update_bounds(bounds)
self.nretries = nretries
self.npoints = npoints
self.nsteps = nsteps
[docs]
def update_bounds(self, bounds):
super().update_bounds(bounds)
self.random_proposer = UniformProposer(bounds)
[docs]
@check_in_bounds
def get(self, rng=None):
if len(self.samples) > 0:
last, self.samples = self.samples[-1], self.samples[:-1]
return last
else:
self.resample(rng)
last, self.samples = self.samples[-1], self.samples[:-1]
return last
[docs]
def resample(self, rng=None):
rng = check_random_state(rng)
for i in range(self.nretries):
this_i = rng.choice(range(len(self.gpr.X_train)))
this_X = np.copy(self.gpr.X_train[this_i])
logpost = self.gpr.y_train[this_i]
from cobaya.model import LogPosterior
self.sampler.current_point.add(this_X, LogPosterior(logpost=logpost))
# reset random state and number of samples
self.sampler._rng = rng
self.sampler.collection.reset()
try:
self.sampler.run()
points = self.sampler.products()["sample"][self.parnames].values
self.samples = points[:: -self.nsteps]
return
except Exception:
pass
# if resample_tries failed raise Warning and pass uniform points
print("[proposer] WARNING: MC chain got stuck. Taking random uniform points")
self.samples = np.empty((self.nsteps, len(self.bounds)))
for i in range(self.nsteps):
self.samples[i] = self.random_proposer.get()
[docs]
def update(self, gpr):
self.samples = []
from gpry.mc import mc_sample_from_gp_cobaya
surr_info, sampler = mc_sample_from_gp_cobaya(
gpr,
self.bounds,
sampler="mcmc",
run=False,
sampler_options={"max_samples": self.npoints, "max_tries": 10 * self.npoints},
)
self.sampler = sampler
self.parnames = list(surr_info["params"])
self.gpr = gpr