"""
This module contains general tools used in different parts of the code.
"""
from copy import deepcopy
from inspect import signature
import inspect
from typing import Mapping, Iterable
from warnings import warn
import numpy as np
from numpy import trace as tr
from numpy.linalg import det
from scipy.linalg import eigh
from scipy.special import gamma, erfc
from scipy.stats import chi2
from sklearn.utils import check_random_state as check_random_state_sklearn
[docs]
def kl_norm(mean_0, cov_0, mean_1, cov_1):
"""
Computes the KL divergence between two normal distributions defined by their means
and covariance matrices.
May raise ``numpy.linalg.LinAlgError``.
"""
cov_1_inv = np.linalg.inv(cov_1)
dim = len(mean_0)
with NumpyErrorHandling(all="ignore") as _:
return 0.5 * (
np.log(det(cov_1))
- np.log(det(cov_0))
- dim
+ tr(cov_1_inv @ cov_0)
+ (mean_1 - mean_0).T @ cov_1_inv @ (mean_1 - mean_0)
)
[docs]
def kl_mc(X, logq_func, logp=None, logp_func=None, weight=None):
"""
Computes KL(P||Q) divergence using Monte Carlo samples X of P, and the logpdf of Q.
All functions assumed vectorised.
The logpdf's must be both normalised, or with the same off-normalisation factor, e.g.
different normalised likelihods with the same prior which is much larger than the
mode.
"""
if logp is None and logp_func is None:
raise ValueError("Needs either logp at X, or a logp(X) function.")
if logp is None:
logp = logp_func(X)
mask = np.isfinite(logp)
logp = logp[mask]
logq_at_X = logq_func(X[mask])
if weight is None:
weight = np.ones(len(logp))
total_weight = len(logp)
else:
weight = weight[mask]
total_weight = np.sum(weight)
return np.dot(weight, logp - logq_at_X) / total_weight
[docs]
def is_valid_covmat(covmat):
"""Returns True for a Real, positive-definite, symmetric matrix."""
covmat = np.atleast_2d(covmat)
try:
if np.allclose(covmat.T, covmat) and np.all(eigh(covmat, eigvals_only=True) > 0):
return True
return False
except (AttributeError, np.linalg.LinAlgError):
return False
[docs]
def gaussian_distance(points, mean, covmat):
"""
Computes radial Gaussian distance in units of standar deviations
(Mahalanobis distance).
"""
dim = np.atleast_2d(points).shape[1]
mean = np.atleast_1d(mean)
covmat = np.atleast_2d(covmat)
assert mean.shape == (dim,) and covmat.shape == (dim, dim), (
f"Mean and/or covmat have wrong dimensionality: dim={dim}, "
f"mean.shape={mean.shape} and covmat.shape={covmat.shape}."
)
assert is_valid_covmat(covmat), "Covmat passed is not a valid covariance matrix."
# Transform to normalised gaussian
std_diag = np.diag(np.sqrt(np.diag(covmat)))
invstd_diag = np.linalg.inv(std_diag)
corrmat = invstd_diag.dot(covmat).dot(invstd_diag)
Lscalefree = np.linalg.cholesky(corrmat)
L = np.linalg.inv(std_diag).dot(Lscalefree)
points_transf = L.dot((points - mean).T).T
# Compute distance
return np.sqrt(np.sum(points_transf**2, axis=1))
[docs]
def nstd_of_1d_nstd(n1, d, warn_inf=True):
"""
Radius of (hyper)volume in units of std's of a multivariate Gaussian of dimension
``d`` for a credible (hyper)volume defined by the equivalent 1-dimensional
``n1``-sigma interval.
"""
nstd = np.sqrt(chi2.isf(erfc(n1 / np.sqrt(2)), d))
if warn_inf and not np.isfinite(nstd):
warn(f"Got -inf for n1={n1} and d={d}. This may cause errors.")
return nstd
[docs]
def delta_logp_of_1d_nstd(n1, d):
"""
Difference between the peak/mean of a Gaussian log-probability and the level
corresponding to the credible (hyper)volume defined by the equivalent 1-dimensional
``n1``-sigma interval in ``d`` dimensions.
"""
return 0.5 * nstd_of_1d_nstd(n1, d) ** 2
[docs]
def credibility_of_nstd(n, d):
"""
Posterior mass inside of the (hyper)volume of radius ``n`` (in units of std's) of a
multivariate Gaussian of dimension ``d``.
"""
return chi2.cdf(n**2, d)
[docs]
def volume_sphere(r, dim=3):
"""Volume of a sphere of radius ``r`` in dimension ``dim``."""
return np.pi ** (dim / 2) / gamma(dim / 2 + 1) * r**dim
[docs]
def check_random_state(seed, convert_to_random_state=False):
"""
Extension to sklearn.utils for numpy *Generators* to pass through.
Includes workaround from https://github.com/scikit-learn/scikit-learn/issues/16988
"""
if isinstance(seed, np.random.Generator):
if convert_to_random_state:
seed = np.random.RandomState(seed.bit_generator)
return seed
return check_random_state_sklearn(seed)
[docs]
def generic_params_names(n, prefix="x_"):
"""
Returns ``n`` generic parameter names (1-based) as ``[prefix]1``, ``[prefix]2``, etc.
"""
if (
not isinstance(n, int)
and not (isinstance(n, float) and np.isclose(n, int(n)))
or n <= 0
):
raise TypeError(f"'n' must be a positive integer. Got {n!r} of type {type(n)}).")
if not isinstance(prefix, str):
raise TypeError(
f"'prefix' must be a string. Got {prefix!r} of type {type(prefix)})."
)
return [prefix + str(i + 1) for i in range(int(n))]
[docs]
class NumpyErrorHandling:
"""
Context for manual handling of numpy errors (e.g. ignoring, just printing...).
NB: the call to ``deepcopy`` at init can become expensive if this ``with`` context
is used repeatedly. One may want to put it at an upper level then.
"""
def __init__(self, all):
self.all = all
self.error_handler = deepcopy(np.geterr())
def __enter__(self):
np.seterr(all=self.all)
def __exit__(self, error_type, error_value, error_traceback):
np.seterr(**self.error_handler)
if error_type is not None:
raise
[docs]
def get_Xnumber(value, X_letter, X_value=None, dtype=int, varname=None):
"""
Reads a value out of an X-number, e.g.: "5X" as 5 times the value of X.
If ``X_value`` is not defined, returns a tuple ``(value, has_X, X_power)``.
"""
not_allowed = [" ", ".", "-", "+", "e", "E", ",", ";"]
if X_letter in not_allowed:
raise ValueError(
f"X_letter not allowed: '{X_letter}'. It cannot any of {not_allowed}"
)
if not isinstance(dtype, type):
raise ValueError(f"'dtype' arg must be a type, not {type(dtype)}.")
if value == X_letter:
value = "1" + X_letter
# Avoid exceptions until the 'try' block
if isinstance(value, str) and X_letter in value:
has_X = True
num_value, X_power = value.split(X_letter)
if not num_value:
num_value = 1
if not X_power:
X_power = None
else:
has_X = False
num_value = value
X_power = None
try:
# Start with float. Impose dtype only at return
num_value = float(num_value)
if X_value is None: # special case: X value undefined (see docstring)
return (
dtype(num_value),
has_X,
X_power if X_power is None else float(X_power),
)
if has_X:
X_multiplier = X_value
if X_power is not None:
X_multiplier = X_multiplier ** float(X_power)
else:
X_multiplier = 1
return dtype(num_value * X_multiplier)
except (ValueError, TypeError) as excpt:
pre = f"Error setting variable '{varname}': " if varname else ""
raise ValueError(
pre + f"Could not convert {value} of type {type(value)} into type "
f"{dtype.__name__}. Pass either a string ending in '{X_letter}' or a valid "
f"{dtype.__name__} value."
) from excpt
[docs]
def check_candidates(gpr, new_X, tol=1e-8):
"""
Method for determining whether points which have been found by the
acquisition algorithm are already in the GP or appear multiple times
so that they can be removed.
Returns two boolean arrays, the first one indicating whether the point
is already in the GP and the second one indicating whether the point
appears multiple times.
"""
if gpr.preprocessing_X is not None:
new_X = gpr.preprocessing_X.transform(np.copy(new_X))
X_train = np.copy(gpr.X_train_)
new_X_r = np.round(new_X, decimals=int(-np.log10(tol)))
X_train_r = np.round(X_train, decimals=int(-np.log10(tol)))
in_training_set = np.any(np.all(X_train_r[:, None, :] == new_X_r, axis=2), axis=0)
unique_rows, indices, counts = np.unique(
new_X_r, axis=0, return_index=True, return_counts=True
)
is_duplicate = counts > 1
duplicates = np.isin(new_X_r, unique_rows[is_duplicate]).all(axis=1)
duplicates[indices[is_duplicate]] = False
return in_training_set, duplicates
[docs]
def is_in_bounds(points, bounds, check_shape=False):
"""
Checks if a point or set of points is within the given bounds.
Parameters
----------
points: numpy.ndarray
An (N, d) array of points to check
bounds: numpy.ndarray
An (d, 2) array of parameter bounds
check_shape : bool (default: False)
Whether to check for consistency of array shapes.
Returns
-------
numpy.ndarray:
A boolean array of length N indicating whether each point is within the bounds.
"""
points = np.atleast_2d(points)
if check_shape:
bounds = check_and_return_bounds(bounds)
if bounds.shape[0] != points.shape[1]:
raise ValueError(
"bounds and point appear to have different dimensionalities: "
f"{bounds.shape[0]} for bounds and {points.shape[1]} for point."
)
return np.all((points >= bounds[:, 0]) & (points <= bounds[:, 1]), axis=1)
[docs]
def check_and_return_bounds(bounds):
"""
Returns the passed bounds as a (dim, 2)-shaped array if it can be mapped to one,
and raises TypeError otherwise.
"""
try:
bounds_ = np.atleast_2d(bounds)
if bounds_.shape[1] != 2:
raise ValueError
except ValueError as excpt:
raise TypeError(
f"bounds must be a (dim, 2) array of bounds, but is {bounds}"
) from excpt
return bounds_
[docs]
def shrink_bounds(bounds, samples, factor=1):
"""
Reduces the given bounds to the minimal hypercube containing a set of `samples`.
If ``factor != 1``, the width is multiplied by that factor, while keeping the
hypercube centered.
If the samples span a longer region that that defined by the bounds, the given
bounds are preferred.
Parameters
----------
bounds: numpy.ndarray
An (d, 2) array of parameter bounds
samples: numpy.ndarray
An (N, d) array of sampling locations
factor: float
A factor by which to multiply the hypercube width
Returns
-------
numpy.ndarray:
An (d, 2) array of updated parameter bounds
Raises
------
TypeError:
If bounds or samples are not well formatted or inconsistent with each other.
"""
bounds = check_and_return_bounds(bounds)
try:
samples = np.atleast_2d(samples)
except ValueError as excpt:
raise TypeError(
"samples are not correctly formatted as an array with sha[e (nsamples, dim)"
) from excpt
if bounds.shape[0] != samples.shape[1]:
raise TypeError(
"bounds and samples appear to have different dimensionalities: "
f"{bounds.shape[0]} for bounds and {samples.shape[1]} for samples."
)
updated_bounds = np.empty(shape=bounds.shape, dtype=float)
# Find training bounds
updated_bounds[:, 0] = samples.min(axis=0)
updated_bounds[:, 1] = samples.max(axis=0)
# Apply factor
width = updated_bounds[:, 1] - updated_bounds[:, 0]
Delta = (factor - 1) / 2 * width
updated_bounds[:, 0] -= Delta
updated_bounds[:, 1] += Delta
# Restrict to prior
updated_bounds[:, 0] = np.array([updated_bounds[:, 0], bounds[:, 0]]).max(axis=0)
updated_bounds[:, 1] = np.array([updated_bounds[:, 1], bounds[:, 1]]).min(axis=0)
return updated_bounds
[docs]
def wrap_likelihood(loglike, ndim):
"""
Wraps a likelihood function to accept a single argument (a vector of parameters) if it takes
multiple arguments.
"""
sig = inspect.signature(loglike)
params = sig.parameters
# Count parameters by kind
positional = [
p for p in params.values()
if p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
]
keyword = [
p for p in params.values()
if p.kind == inspect.Parameter.KEYWORD_ONLY
]
var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in params.values())
var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values())
n_args = len(positional)
n_all = n_args + len(keyword)
# Case 1: function takes exactly ndim arguments (positional/keyword), wrap it
if n_args == ndim or n_all == ndim:
def wrapped(X):
return loglike(*X)
return wrapped
# Case 2: function takes one argument (e.g. already accepts a vector), pass through
elif n_args == 1 and not var_positional and not var_keyword:
return loglike
else:
raise ValueError(f"Function has signature {sig} which is incompatible with ndim={ndim}")
[docs]
def remove_0_weight_samples(weights, *arrays):
"""
Removes the elements of ``arrays`` (at axis 0) corresponding to the null ``weights``.
Returns a tuple with the non-null weights as the first element, and the rest of them
being copies of the given arrays without null-weighted samples in the order with which
they were passed. If an element of the list of arrays is ``None``, ``None`` is
returned in its place.
"""
i_zero_w = np.where(weights == 0)[0]
new_arrays = [np.delete(weights, i_zero_w)]
for array in arrays:
if array is None:
new_arrays.append(None)
elif array.shape[0] != len(weights):
raise ValueError("weights and some of the arrays have different lengths.")
else:
new_arrays.append(np.delete(array, i_zero_w, axis=0))
return new_arrays
[docs]
def mean_covmat_from_samples(X, w=None):
"""
Returns an estimation of the mean and covariance of a set ``X`` of points, using their
``logp`` as weights.
"""
mean = np.average(X, weights=w, axis=0)
cov = np.cov(X.T, aweights=w)
return mean, cov
[docs]
def mean_covmat_from_evals(X, logp):
"""
Returns an estimation of the mean and covariance of a set ``X`` of points, using their
``logp`` as weights.
"""
weights = np.exp(logp - max(logp))
weights_, X_ = remove_0_weight_samples(weights, X)
mean = np.average(X_, axis=0, weights=weights_)
covmat = np.cov(X_.T, aweights=weights_, ddof=0)
return mean, covmat