"""
Module gathering general plots.
(Other plots are in methods of some classes, e.g. Progress contains the timings plot.)
"""
import warnings
from typing import Sequence, Mapping
from numbers import Number
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
from tqdm import tqdm
from gpry.gpr import GaussianProcessRegressor
from gpry.mc import process_gdsamples
from gpry.tools import (
credibility_of_nstd,
nstd_of_1d_nstd,
volume_sphere,
gaussian_distance,
delta_logp_of_1d_nstd,
generic_params_names,
)
# Use latex labels when available
plt.rcParams["text.usetex"] = True
_plot_dist_fontsize = 7
[docs]
def simple_latex_sci_notation(string):
"""
If ``string`` contains a ``%g`` or ``%e`` number representation, substitutes the ``e``
for a power of 10.
It does *not* add dollars around the string.
NB: it assumes that the string passed contains a single number, and nothing else.
"""
if "e" not in string:
return string
sigfigs, exp = string.split("e")
sign = "" if exp.startswith("+") else "-"
return f"{sigfigs}\\cdot 10^{{{sign}{exp[1:].lstrip('0')}}}"
[docs]
def param_samples_for_slices(X, i, bounds, n=200):
"""
From an array of points `X = [X^i] = [[X^1_1, X^1_2,...], [X^2_1,...], ...]`, it
generates a list of points per sample, where the `i` coordinate is sliced within the
region defined by `bounds`, and the rest of them are kept fixed.
"""
X = np.atleast_2d(X)
Xs_i = np.linspace(bounds[0], bounds[1], n)
X_slices = np.empty(shape=(X.shape[0], n, X.shape[1]), dtype=float)
for j, X_j in enumerate(X):
X_slices[j, :, :] = n * [X_j]
X_slices[:, :, i] = Xs_i
return X_slices
[docs]
def prepare_slices_func(func, X_fiducial, bounds, indices=None, n=50):
"""
Prepare slices of the given function,
Parameters
----------
func : callable
Function for which to prepare slices. It needs to take arguments in the way
``X_fiducial`` is passed: ``func(*X_fiducial)`` if ``X_fiducual`` is a list, or
``func(**X_fiducial)`` if X_fiducial is a dictionary.
X_fiducial : array-like, shape = (n_dimensions), or dict
Fiducial point for the slices: slice ``i`` corresponds to fixing all parameters
but that with index ``i``, which is evaluaded on a grid within its bounds. It can
be a dictionary with arguments of ``func`` as keys.
bounds : array-like, shape = (n_dimensions, 2), or dict
Bounds for the slices per parameter.
indices : list, optional
A list of integers (if ``X_fiducial`` is a list) or parameter names (if
``X_fiducial`` is a dict), denoting the parameters for which the slices will be
prepared (all of them, if left unspecified).
n : int
Number of samples per slice (default: 50). Careful if the posterior is slow!
Returns
-------
indices, params, Xs, ys : list(int), list(str) len=len(indices), array-like
shape=(len(indices), n, dim)), array-like shape=(len(indices, n))
"""
# Parse and check input
if isinstance(X_fiducial, Mapping):
is_kwarg = True
dim = len(X_fiducial)
X_fiducial = np.array(list(X_fiducial.values()))
params = list(X_fiducial)
if indices is None:
indices = params
try: # Assumes indices is a list of str
indices = [params.index(p) for p in indices]
except ValueError as excpt:
raise ValueError(
"`indices` is not a list of parameter names, or contains names not in "
"`X_fiducial`."
) from excpt
try: # Assumes bonds is a Mapping
bounds = [bounds[p] for p in params]
except (TypeError, IndexError) as excpt:
raise ValueError(
"`bounds` is not a dict, or bounds could not be founds for all "
"parameters."
) from excpt
else:
is_kwarg = False
X_fiducial = np.atleast_1d(X_fiducial)
dim = len(X_fiducial)
params = generic_params_names(dim)
if indices is None:
indices = list(range(dim))
# Assumes indices is a list of int
if not all((isinstance(i, int) and i < dim) for i in indices):
raise ValueError(
"`indices` is not a list of integer indices, or contains indices larger "
"than the length of `X_fiducial`."
)
# Assumes bonds is a (2d) list
if len(bounds) < len(params):
raise ValueError("`bounds` is not a list of bounds of the right lenght.")
if not isinstance(n, int) or n < 2:
raise ValueError("`n` must be a positive integer > 2.")
# Prepare and evaluate slices
Xs, ys = np.empty(shape=(len(indices), n, dim)), np.empty(shape=(len(indices), n))
for j, index in enumerate(indices):
Xs[j] = param_samples_for_slices([X_fiducial], index, bounds[index], n=n)[0]
progress_bar_desc = f"Slicing param {j + 1} of {len(indices)}"
for k, x in tqdm(enumerate(Xs[j]), total=n, desc=progress_bar_desc):
try:
if is_kwarg:
x_arg = dict(zip(params, x))
ys[j][k] = func(**x_arg)
else:
x_arg = x
ys[j][k] = func(*x_arg)
except TypeError as excpt:
raise TypeError(
f"Could not call the target function with arguments {x_arg}. Maybe "
"`X_fiducial` contained keys that are not argument names of the "
f"function? Err msg: {excpt}"
) from excpt
except Exception as excpt: # pylint: disable=broad-exception-caught
warnings.warn(
f"The function failed when called with arguments {x_arg}. Using NaN."
f"Err masg: {excpt}"
)
ys[j][k] = np.nan
return indices, [params[i] for i in indices], Xs, ys
# NB: use this one in the future to reformulate the other ones: more generic
[docs]
def plot_slices_func(
func, X_fiducial, bounds, indices=None, n=50, fig_kwargs=None, labels=None,
):
"""
Plot slices of the given function,
Parameters
----------
func : callable
Function for which to prepare slices. It needs to take arguments in the way
``X_fiducial`` is passed: ``func(*X_fiducial)`` if ``X_fiducual`` is a list, or
``func(**X_fiducial)`` if X_fiducial is a dictionary.
X_fiducial : array-like, shape = (n_dimensions), or dict
Fiducial point for the slices: slice ``i`` corresponds to fixing all parameters
but that with index ``i``, which is evaluaded on a grid within its bounds. It can
be a dictionary with arguments of ``func`` as keys.
bounds : array-like, shape = (n_dimensions, 2), or dict
Bounds for the slices per parameter.
indices : list, optional
A list of integers (if ``X_fiducial`` is a list) or parameter names (if
``X_fiducial`` is a dict), denoting the parameters for which the slices will be
prepared (all of them, if left unspecified).
n : int
Number of samples per slice (default: 50). Careful if the posterior is slow!
fig_kwargs : dict, optional
Dict of kw arguments to pass to the `subplots` constructor. Only ``layout``,
``dpi`` considered safe.
labels : lst(str), optional
Strings (possibly Latex) to use for axes labels. Length cases: None or len=0:
plain parameter names for x labels and no y label; len=1: used as y label, plain
names for x labels; len=len(indices): used as x labels; len=len(indices)+1: used
as x labels and y label, in that order.
Returns
-------
fig, axarr: figure and array of axes used for the plot.
"""
indices, params, Xs, ys = prepare_slices_func(
func, X_fiducial, bounds, indices=indices, n=n
)
if not isinstance(labels, Sequence) or len(labels) == 0:
x_labels = params
y_label = None
elif len(labels) == 1:
x_labels = params
y_label = labels[0]
elif len(labels) == len(params):
x_labels = labels
y_label = None
elif len(labels) == len(params) + 1:
x_labels = labels[:-1]
y_label = labels[-1]
else:
raise ValueError("Value for `labels` not recognised, or length not valid.")
fig_kwargs_defaults = {
"nrows": 1,
"ncols": len(indices),
"layout": "constrained",
"figsize": (4 * len(indices), 2),
"dpi": 200,
}
fig_kwargs_defaults.update(fig_kwargs or {})
fig, axes = plt.subplots(**fig_kwargs_defaults)
if not isinstance(axes, Sequence):
axes = [axes]
color = "tab:blue"
for j, (i, p) in enumerate(zip(indices, params)):
axes[j].axvline(X_fiducial[i], c="0.75", ls="--")
axes[j].plot(Xs[j, :, i], ys[j], c=color)
axes[j].scatter(Xs[j, :, i], ys[j], marker=".", s=10, c=color)
axes[j].set_xlabel(x_labels[j])
axes[j].set_ylabel(y_label)
return fig, axes
[docs]
def plot_slices(truth, gpr, acquisition, X=None, reference=None):
"""
Plots slices along parameter coordinates for a series `X` of given points (the GPR
training set if not specified). For each coordinate, there is a slice per point,
leaving all coordinates of that point fixed except for the one being sliced.
Lines are coloured according to the value of the mean GP at points X.
# TODO: make acq func optional
"""
params = truth.params
fig, axes = plt.subplots(
nrows=2,
ncols=len(params),
sharex="col",
layout="constrained",
figsize=(4 * len(params), 4),
dpi=200,
)
# Define X to plot
if X is None:
X = gpr.X_train.copy()
y = gpr.y_train.copy()
else:
y = gpr.predict(X)
min_y, max_y = min(y), max(y)
norm_y = lambda y: (y - min_y) / (max_y - min_y)
prior_bounds = truth.prior_bounds
Xs_for_plots = dict(
(p, param_samples_for_slices(X, i, prior_bounds[i], n=200))
for i, p in enumerate(params)
)
if reference is not None:
reference = _prepare_reference(reference, truth)
cmap = matplotlib.colormaps["viridis"]
for i, p in enumerate(params):
for j, Xs_j in enumerate(Xs_for_plots[p]):
cmap_norm = cmap(norm_y(y[j]))
alpha = 1
# TODO: could cut by half # of GP evals by reusing for acq func
axes[0, i].plot(Xs_j[:, i], gpr.predict(Xs_j), c=cmap_norm, alpha=alpha)
axes[0, i].scatter(X[j][i], y[j], color=cmap_norm, alpha=alpha)
axes[0, i].set_ylabel(r"$\log(p)$")
acq_values = acquisition(Xs_j, gpr)
axes[1, i].plot(Xs_j[:, i], acq_values, c=cmap_norm, alpha=alpha)
axes[1, i].set_ylabel(r"$\alpha(\mu,\sigma)$")
label = truth.labels[i] if truth.labels is not None else p
if label != p:
label = "$" + label + "$"
axes[1, i].set_xlabel(label)
bounds = (reference or {}).get(p)
if bounds is not None:
for ax in axes[:, i]:
if len(bounds) == 5:
ax.axvspan(
bounds[0], bounds[4], facecolor="tab:blue", alpha=0.2, zorder=-99
)
ax.axvspan(
bounds[1], bounds[3], facecolor="tab:blue", alpha=0.2, zorder=-99
)
ax.axvline(bounds[2], c="tab:blue", alpha=0.3, ls="--")
[docs]
def plot_slices_reference(truth, gpr, X, plot_truth=True, reference=None):
"""
Plots slices of the gpr model and true log-posterior (if ``plot_truth=True``) along
parameter coordinates for a given point ``X``, leaving all coordinates of that point
fixed except for the one being sliced.
"""
params = truth.params
fig, axes = plt.subplots(
nrows=1,
ncols=len(params),
sharex="col",
layout="constrained",
figsize=(4 * len(params), 2),
dpi=200,
)
prior_bounds = truth.prior_bounds
if X is None:
if reference is None:
raise ValueError("Needs at least a reference point or a reference sample.")
# TODO: if reference given as a sample, take best point from it.
X_array = np.array([X[p] for p in params])
# y_gpr_centre = gpr.predict(np.atleast_2d(X_array))[0]
if plot_truth:
y_truth_centre = truth.logp(X_array)
Xs_for_plots, ys_gpr_for_plot, sigmas_gpr_for_plot, ys_truth_for_plot = {}, {}, {}, {}
for i, p in enumerate(params):
Xs_for_plots[p] = param_samples_for_slices([X_array], i, prior_bounds[i], n=200)[
0
]
ys_gpr_for_plot[p], sigmas_gpr_for_plot[p] = gpr.predict(
Xs_for_plots[p], return_std=True
)
if plot_truth:
ys_truth_for_plot[p] = np.array([truth.logp(x) for x in Xs_for_plots[p]])
# Training set referenced to X and div by X, for distance-based transparency
X_train_diff = (gpr.X_train - X_array) / X_array
if reference is not None:
reference = _prepare_reference(reference, truth)
for i, p in enumerate(params):
axes[i].fill_between(
Xs_for_plots[p][:, i],
ys_gpr_for_plot[p] - 2 * sigmas_gpr_for_plot[p],
ys_gpr_for_plot[p] + 2 * sigmas_gpr_for_plot[p],
color="0.5",
alpha=0.25,
edgecolor="none",
)
axes[i].fill_between(
Xs_for_plots[p][:, i],
ys_gpr_for_plot[p] - 1 * sigmas_gpr_for_plot[p],
ys_gpr_for_plot[p] + 1 * sigmas_gpr_for_plot[p],
color="0.5",
alpha=0.25,
edgecolor="none",
)
if plot_truth:
axes[i].plot(Xs_for_plots[p][:, i], ys_truth_for_plot[p], ls="--")
axes[i].scatter(X[p], y_truth_centre, marker="*")
axes[i].set_ylabel(r"$\log(p)$")
label = truth.labels[i] if truth.labels is not None else p
if label != p:
label = "$" + label + "$"
axes[i].set_xlabel(label)
# If there is an infinities classifier, use if for the lower bound on y:
diff_min_logp = getattr(gpr, "diff_threshold", None)
if diff_min_logp is not None:
try:
max_y = max(ys_gpr_for_plot[p])
upper_y = max_y
if plot_truth:
upper_y = max(max_y, *ys_truth_for_plot[p])
axes[i].set_ylim(
max_y - 1.05 * diff_min_logp, upper_y + 0.05 * diff_min_logp
)
except ValueError as e:
print(
f"ERROR when setting y-lims for '{p}': max(y) was "
f"{max(ys_gpr_for_plot[p])}, diff_threshold was {diff_min_logp}, "
f"lower_bound was {max(ys_gpr_for_plot[p]) - 1.05 * diff_min_logp}, "
f"upper bound was {upper_y}, MSG was {e}"
)
# Add training set
dists = np.sqrt(np.sum(np.power(np.delete(X_train_diff, i, axis=-1), 2), axis=-1))
dists_relative = dists / max(dists)
axes[i].scatter(
gpr.X_train[:, i],
gpr.y_train,
marker=".",
alpha=1 - dists_relative,
zorder=-9,
)
bounds = (reference or {}).get(p)
if bounds is not None:
if len(bounds) == 5:
axes[i].axvspan(
bounds[0], bounds[4], facecolor="tab:blue", alpha=0.2, zorder=-99
)
axes[i].axvspan(
bounds[1], bounds[3], facecolor="tab:blue", alpha=0.2, zorder=-99
)
axes[i].axvline(bounds[2], c="tab:blue", alpha=0.3, ls="--")
[docs]
def plot_corner_getdist(
mc_samples,
params=None,
bounds=None,
filled=None,
training=None,
training_highlight_last=False,
markers=None,
output=None,
output_dpi=200,
subplot_size=2,
):
"""
Creates a corner plot the given MC samples, and optionally shows evaluation locations.
If called repeatedly, it may leak memory, unfortunately. To avoid this, execute it
like this:
.. code:: python
# Temporarily switch to Agg backend
prev_backend = matplotlib.get_backend()
matplotlib.use("Agg")
try:
plot_corner_getdist(...)
except:
...
finally:
# Switch back to prev backend
mpl.use(prev_backend)
Parameters
----------
mc_samples: dict(str, (cobaya.SampleCollection, getdist.MCSamples, str))
Dict of MC samples, with their plot label as key, and the sample as value, either
as GetDist or Cobaya samples, or as a path where there are samples saved.
params : list(str), optional
List of parameter names to be plotted, by default all of the ones in the first
MC sample, included derived ones like probability densities.
bounds :array-like, shape = (len(params), 2), list(array-like, shape = (2))
Dict or list (sorted as ``params``) of parameter bounds.
filled : dict(str, bool), list
Dictionary with labels as keys specifying the `filled` property of the contours.
Contours are filled by default when unspecified (including key missing for a
passed sample). If it is a list, the same order as in ``mc_samples`` is assumed.
training : GaussianProcessRegressor, dict(str or tuple, GaussianProcessRegressor), optional
If a GPR is passed, it plots the training samples (including the discarded ones)
on top of the contours. Samples outside the axes ranges are not plotted.
The parameters of the GPR need to be assumed, since the GPR does not save names:
if a GPR is passed, the sampled parameters of the first MC sample will be used; if
a single-key dict is passed with a str as a key, it will used the parameter names
from the MC sample with that label; if the key is a tuple of strings, they will be
used as parameters
subplot_size : float, default = 2
Size of each subplot in the corner plot.
output : str, optional (default=None)
Path, including name and extension, of the saved figure.
Not saved if left unspecified.
output_dpi : int (default: 200)
The resolution of the generated plot in DPI.
Returns
-------
getdist.plots.GetDistPlotter object containing the figure.
"""
# TODO: manage whether to plot log-post/like, add kwarg opt
if not isinstance(mc_samples, Mapping):
raise TypeError(
"The first argument must be a list of MC samples with the sample legend "
"labels as keys."
)
gdsamples_dict = process_gdsamples(mc_samples)
# Prepare training samples early -- fail asap.
training_params = None
if training is not None:
if isinstance(training, Mapping):
training_key = list(training)[0]
training = list(training.values())[0]
if isinstance(training_key, tuple):
training_params = training_key
else: # assumed key corresponding to passed mcsamples
if training_key not in mc_samples:
raise ValueError(
"`training` passed as dict, but key not found in mc_samples."
)
training_params = \
gdsamples_dict[training_key].getParamNames().getRunningNames()
elif isinstance(training, GaussianProcessRegressor):
# Use first MC passed
training_params = \
list(gdsamples_dict.values())[0].getParamNames().getRunningNames()
else:
raise TypeError("'training' is not a GaussianProcessRegressor instance.")
import getdist.plots as gdplt # pylint: disable=import-outside-toplevel
gdplot = gdplt.get_subplot_plotter(subplot_size=subplot_size, auto_close=True)
gdplot.settings.line_styles = 'tab10'
gdplot.settings.solid_colors = 'tab10'
triang_args = [list(gdsamples_dict.values())]
if params is not None:
# GetDist failsafe: can only plot the params of the last sample in the input
at_most_params = \
list(gdsamples_dict.values())[-1].getParamNames().getRunningNames()
params = [p for p in params if p in at_most_params]
triang_args.append(params)
if isinstance(bounds, Mapping):
bounds = {p: bounds.get(p) for p in params}
elif isinstance(bounds, (Sequence, np.ndarray)):
if len(bounds) != len(params):
raise ValueError("`bounds` and `params` have different number of elements.")
bounds = {p: bounds[i] for i, p in enumerate(params)}
if isinstance(filled, Sequence):
# Assume it refers to the first samples if not complete; the rest are filled.
filled = {
name: (filled[i] if i < len(filled) else True)
for i, name in enumerate(gdsamples_dict)
}
triang_kwargs = {
"legend_labels": list(gdsamples_dict),
"filled": [(filled or {}).get(k, True) for k in gdsamples_dict],
"param_limits": bounds or {},
"markers": markers,
}
try:
gdplot.triangle_plot(*triang_args, **triang_kwargs)
except Exception as excpt:
raise ValueError(
f"Could not do corner plot. GetDist err. msg.: {excpt}"
) from excpt
if training is not None and training.d > 1:
getdist_add_training(
gdplot, training_params, training, highlight_last=training_highlight_last
)
if output is not None:
plt.savefig(output, dpi=output_dpi)
return gdplot
[docs]
def getdist_add_training(
getdist_plot,
params,
gpr,
colormap="viridis",
marker=".",
marker_inf="x",
highlight_last=False,
):
"""
Adds the training points to a GetDist triangle plot, coloured according to
their log-posterior value.
Parameters
----------
getdist_plot : `GetDist triangle plot <https://getdist.readthedocs.io/en/latest/plots.html?highlight=triangle_plot#getdist.plots.GetDistPlotter.triangle_plot>`_
Contains the marginalized contours and potentially other things.
params : list(str)
The assumed parameter names for the GPR samples. Need to be a subset of the ones
plotted by the GetDistPlotter.
gpr : GaussianProcessRegressor
The trained GP Regressor containing the samples.
colormap : matplotlib colormap, optional (default="viridis")
Color map from which to get the color scale to represent the GP model value for
the training points.
marker : matplotlib marker, optional (default=".")
Marker to be used for the training points.
marker_inf : matplotlib marker, optional (default=".")
Marker to be used for the non-finite training points.
highlight_last: bool (default=False)
Draw a red circle around the points added in the last iteration
Returns
-------
The GetDist triangle plot with the added training points.
"""
# Gather axes and bounds
d = len(params)
ax_dict = {}
bounds = [None] * len(params)
for i, pi in enumerate(params):
for j, pj in enumerate(params):
ax = getdist_plot.get_axes_for_params(pi, pj, ordered=True)
if not ax:
continue
ax_dict[(i, j)] = ax
bounds[i] = ax.get_xlim()
bounds[j] = ax.get_ylim()
# Now reduce the set of points to the ones within ranges
# (needed to get good limits for the colorbar of the log-posterior)
Xs_finite = np.copy(gpr.X_train)
ys_finite = np.copy(gpr.y_train)
Xs_infinite = np.copy(gpr.X_train_infinite)
for i, (mini, maxi) in enumerate(bounds):
i_within_finite = np.argwhere(
np.logical_or(mini < Xs_finite[:, i], Xs_finite[:, i] < maxi)
)
Xs_finite = np.atleast_2d(np.squeeze(Xs_finite[i_within_finite]))
ys_finite = np.atleast_1d(np.squeeze(ys_finite[i_within_finite]))
i_within_infinite = np.argwhere(
np.logical_or(mini < Xs_infinite[:, i], Xs_infinite[:, i] < maxi)
)
Xs_infinite = np.atleast_2d(np.squeeze(Xs_infinite[i_within_infinite]))
if highlight_last:
Xs_last = gpr.last_appended[0]
i_within_last = np.argwhere(
np.logical_or(mini < Xs_last[:, i], Xs_last[:, i] < maxi)
)
Xs_last = np.atleast_2d(np.squeeze(Xs_last[i_within_last]))
if len(Xs_finite) == 0 and len(Xs_infinite) == 0: # no points within plotting ranges
return getdist_plot
# Create colormap with appropriate limits
cmap = matplotlib.colormaps[colormap]
if len(Xs_finite):
Ncolors = 256
color_bounds = np.linspace(min(ys_finite), max(ys_finite), Ncolors)
norm = matplotlib.colors.BoundaryNorm(color_bounds, Ncolors)
# Add points
for (i, j), ax in ax_dict.items():
# 1st -inf points, so the are displayed in the background of the finite ones.
# and we give them low zorder anyway, so that they lay behind the contours
if len(Xs_infinite) > 0:
points_infinite = Xs_infinite[:, [i, j]]
ax.scatter(
*points_infinite.T, marker=marker_inf, s=20, c="k", alpha=0.3, zorder=-99
)
if len(Xs_finite) > 0:
points_finite = Xs_finite[:, [i, j]]
ax.scatter(
*points_finite.T, marker=marker, c=norm(ys_finite), alpha=0.3, cmap=cmap
)
if highlight_last and len(Xs_last) > 0:
points_last = Xs_last[:, [i, j]]
ax.scatter(
*points_last.T,
marker="o",
c=len(points_last) * [[0, 0, 0, 0]],
edgecolor="r",
lw=0.5,
)
# Colorbar
if len(Xs_finite) > 0 and not np.isclose(min(ys_finite), max(ys_finite)):
getdist_plot.fig.colorbar(
cm.ScalarMappable(norm=norm, cmap=cmap),
label=r"$\log(p)$",
ax=getdist_plot.fig.add_axes(
[1 - 0.2 / d, 1 - 0.85 / d, 0.5 / d, 0.5 / d],
frame_on=False,
xticks=[],
yticks=[],
),
ticks=np.linspace(min(ys_finite), max(ys_finite), 5),
location="left",
)
return getdist_plot
[docs]
def plot_convergence(
convergence_criterion,
evaluations="total",
marker="",
axes=None,
ax_labels=True,
legend_loc="upper right",
):
"""
Plots the value of the convergence criterion as function of the number of
(accepted) training points.
Parameters
----------
convergence_criterion : The instance of the convergence criterion which has
been called in the BO loop
evaluations : "total" or "accepted"
Whether to plot the total number of posterior evaluations or only the
accepted steps.
marker : matplotlib marker, optional (default="")
Marker used for the plot. Will be passed to ``matplotlib.pyplot.plot``.
axes : matplotlib axes, optional
Axes to be used, if passed.
ax_labels : bool, optional (default: True)
Add axes labels.
legend_loc : str (default: "upper right")
Location of the legend.
Returns
-------
The plot convergence criterion vs. number of training points
"""
if not isinstance(convergence_criterion, Sequence):
convergence_criterion = [convergence_criterion]
if axes is None:
fig, axes = plt.subplots()
else:
fig = axes.get_figure()
for i, cc in enumerate(convergence_criterion):
color = plt.rcParams["axes.prop_cycle"].by_key()["color"][i]
values, n_posterior_evals, n_accepted_evals = cc.get_history()
name = cc.__class__.__name__
n_evals = np.array(
{"total": n_posterior_evals, "accepted": n_accepted_evals}[evaluations],
dtype=int,
)
try:
axes.plot(n_evals, values, marker=marker, color=color, label=name)
except KeyError as excpt:
raise ValueError(
"'evaluations' must be either 'total' or 'accepted'."
) from excpt
if hasattr(cc, "limit"):
axes.axhline(cc.limit, ls="--", lw=1, c=color)
if ax_labels:
axes.set_xlabel(f"{evaluations} number of posterior evaluations")
axes.set_ylabel("Value of convergence criterion")
axes.set_yscale("log")
axes.grid(axis="y")
axes.legend(loc=legend_loc, prop={"size": _plot_dist_fontsize})
return fig, axes
def _prepare_reference(
reference,
truth,
):
"""
Turns `reference` into a dict with parameters as keys and a list of 5 numbers as
values: two lower bounds, a central value, and two upper bounds, e.g. percentiles
5, 25, 50, 75, 95.
If getdist.MCSamples passed, bounds are by default 68% and 95%, and the central value
is the mean.
"""
# Ensure it is a dict
try:
from getdist import MCSamples # pylint: disable=import-outside-toplevel
if isinstance(reference, MCSamples):
means = reference.getMeans()
margstats = reference.getMargeStats()
bounds = {}
for p in truth.params:
# NB: numerOfName doest not use renames; needs to find "original" name
p_in_ref = reference.paramNames.parWithName(p).name
i_p = reference.paramNames.numberOfName(p_in_ref)
# by default lims/contours are [68, 95, 99]
try:
lims = margstats.parWithName(p).limits
except AttributeError as excpt:
raise ValueError(
f"Could not find parameter {p} in reference sample, which "
f"includes {reference.getParamNames().list()})"
) from excpt
bounds[p] = [
lims[1].lower,
lims[0].lower,
means[i_p],
lims[0].upper,
lims[1].upper,
]
reference = bounds
except ModuleNotFoundError: # getdist not installed
return None
if not isinstance(reference, Mapping):
# Assume parameters in order; check right number of them
if len(reference) != truth.d:
raise ValueError(
"reference must be a list containing bounds per parameter for all of them"
", or a dict with parameters as keys and these same values."
)
reference = dict(zip(truth.params, reference))
# Ensure it contains all parameters and 5 numbers (or None's) per parameter
for p in truth.params:
if p not in reference:
reference[p] = [None] * 5
values = reference[p]
if isinstance(values, Number):
values = [values]
if len(values) == 1:
reference[p] = [None, None] + list(values) + [None, None]
elif len(values) != 5:
raise ValueError(
"the elements of reference must be either a single central value, or a "
"list of 5 elements: [lower_bound_2, lower_bound_1, central_value, "
"upper_bound_2, upper_bound_1]."
)
return reference
[docs]
def plot_trace(
truth,
gpr,
convergence_criterion,
progress,
colormap="viridis",
reference=None,
):
"""
Plots the evolution of the run along true model evaluations, showing evolution of the
convergence criterion and the values of the log-posterior and the individual
parameters.
Can take a reference sample or reference bounds (dict with parameters as keys and 5
sorted bounds as values, or alternatively just a central value).
"""
X = gpr.X_train_all
y = gpr.y_train_all
if gpr.infinities_classifier is not None:
y_finite = gpr.infinities_classifier.y_finite
else:
y_finite = np.full(shape=len(y), fill_value=True)
if reference is not None:
reference = _prepare_reference(reference, truth)
fig, axes = plt.subplots(
nrows=2 + truth.d,
ncols=1,
sharex=True,
layout="constrained",
figsize=(min(4, 0.3 * len(X)), 1.5 * (2 + X.shape[1])),
dpi=400,
)
i_eval = list(range(1, 1 + len(X)))
# TOP: convergence plot
try:
plot_convergence(
convergence_criterion,
evaluations="total",
marker="",
axes=axes[0],
ax_labels=False,
legend_loc="lower left",
)
except ValueError: # no criterion computed yet
pass
axes[0].set_ylabel("Conv. crit.")
# 2nd: posterior plot
kwargs_accepted = {
"marker": ".",
"linewidths": 0.1,
"edgecolor": "0.1",
"cmap": colormap,
}
axes[1].scatter(i_eval, y, c=np.where(y_finite, y, np.inf), **kwargs_accepted)
# Gaussian contours
dashdotdotted = (0, (3, 5, 1, 5, 1, 5))
nsigmas_styles = {1: "-", 2: "--", 5: "-.", 10: ":", 20: dashdotdotted}
y_min_plot, y_max_plot = axes[1].get_ylim()
y_max = np.max(y)
for ns, nsls in nsigmas_styles.items():
y_ns = y_max - delta_logp_of_1d_nstd(ns, truth.d)
if y_ns > y_min_plot:
axes[1].axhline(
y_ns,
ls=nsls,
c="0.3",
lw=0.75,
zorder=-1,
label=f"{ns}-$\\sigma$ (Gauss. approx.)",
)
axes[1].set_ylabel(r"$\log(p)$")
axes[1].grid(axis="y")
axes[1].legend(loc="lower left", prop={"size": _plot_dist_fontsize})
# Kernel scales
output_scale, length_scales = gpr.scales
scales_kwargs = {
"verticalalignment": "center",
"horizontalalignment": "right",
"fontsize": _plot_dist_fontsize,
"bbox": {
"facecolor": "white",
"alpha": 0.5,
},
}
axes[1].text(
0.965,
0.12,
f"Output scale: ${simple_latex_sci_notation(f'{output_scale:.2g}')}$",
transform=axes[1].transAxes,
**scales_kwargs,
)
# NEXT: parameters plots
for i, p in enumerate(truth.params):
label = truth.labels[i] if truth.labels else p
ax = axes[i + 2]
if gpr.infinities_classifier is not None and sum(y_finite) < len(X):
ax.scatter(
i_eval,
X[:, i],
marker="x",
c=np.where(y_finite, None, 0.5),
cmap="gray",
vmin=0,
vmax=1,
s=20,
)
ax.scatter(
i_eval,
X[:, i],
c=np.where(y_finite, y, np.inf),
**kwargs_accepted,
)
bounds = (reference or {}).get(p)
if bounds is not None:
if len(bounds) == 5:
ax.axhspan(
bounds[0], bounds[4], facecolor="tab:blue", alpha=0.2, zorder=-99
)
ax.axhspan(
bounds[1], bounds[3], facecolor="tab:blue", alpha=0.2, zorder=-99
)
ax.axhline(bounds[2], c="tab:blue", alpha=0.3, ls="--")
ax.set_ylabel("$" + label + "$" if label != p else p)
ax.grid(axis="y")
# Length scales
ax.text(
0.965,
0.12,
f"Length scale: ${simple_latex_sci_notation(f'{length_scales[i]:.2g}')}$",
transform=ax.transAxes,
**scales_kwargs,
)
# Format common x-axis
axes[0].set_xlim(0, len(X) + 0.5)
axes[-1].set_xlabel("Number of posterior evaluations")
n_train = progress.data["n_total"][1]
for ax in axes:
ax.axvspan(0, n_train + 0.5, facecolor="0.85", zorder=-999)
for n_iteration in progress.data["n_total"][1:]:
ax.axvline(n_iteration + 0.5, ls="--", c="0.75", lw=0.75, zorder=-9)
# TODO: make sure the x ticks are int
[docs]
def plot_distance_distribution(
points, mean, covmat, density=False, show_added=True, ax=None
):
"""
Plots a histogram of the distribution of points with respect to the number of standard
deviations. Confidence level boundaries (Gaussian approximantion, dimension-dependent)
are shown too.
Parameters
----------
points: array-like, with shape ``(N_points, N_dimensions)``, or GPR instance
Points to be used for the histogram.
mean: array-like, ``(N_dimensions)``.
Mean of the distribution.
covmat: array-like, ``(N_dimensions, N_dimensions)``.
Covariance matrix of the distribution.
density: bool (default: False)
If ``True``, bin height is normalised to the (hyper)volume of the (hyper)spherical
shell corresponding to each standard deviation.
show_added: bool (default True)
Colours the stacks depending on how early or late the corresponding points were
added (bluer stacks represent newer points).
ax: matplotlib axes
If provided, they will be used for the plot.
Returns
-------
Tuple of current figure and axes ``(fig, ax)``.
"""
if isinstance(points, GaussianProcessRegressor):
points = points.X_train
dim = np.atleast_2d(points).shape[1]
radial_distances = gaussian_distance(points, mean, covmat)
bins = list(range(0, int(np.ceil(np.max(radial_distances))) + 1))
num_or_dens = "Density" if density else "Number"
if density:
volumes = [
volume_sphere(bins[i], dim) - volume_sphere(bins[i - 1], dim)
for i in range(1, len(bins))
]
weights = [1 / volumes[int(np.floor(r))] for r in radial_distances]
else:
weights = np.ones(len(radial_distances))
if ax is None:
fig, ax = plt.subplots()
else:
fig = ax.get_figure()
title_str = f"{num_or_dens} of points per standard deviation"
if show_added:
title_str += " (bluer=newer)"
cmap = plt.get_cmap("Spectral")
colors = [cmap(i / len(points)) for i in range(len(points))]
ax.hist(
np.atleast_2d(radial_distances),
bins=bins,
weights=np.atleast_2d(weights),
color=colors,
stacked=True,
)
else:
ax.hist(radial_distances, bins=bins, weights=weights)
ax.set_title(title_str)
# cls = [credibility_of_nstd(s, 1) for s in [1, 2, 3, 4]] # use 1d cl's as reference
nstds = [1, 2, 3, 4]
linestyles = ["-", "--", "-.", ":"]
for nstd, ls in zip(nstds, linestyles):
std_of_cl = nstd_of_1d_nstd(nstd, dim)
if std_of_cl < max(radial_distances):
ax.axvline(
std_of_cl,
c="0.75",
ls=ls,
zorder=-99,
label=f"${100 * credibility_of_nstd(std_of_cl, dim):.2f}\\%$ prob mass",
)
ax.set_ylabel(f"{num_or_dens} of points")
ax.set_xlabel("Number of standard deviations")
ax.legend(loc="upper right")
return (fig, ax)
def _plot_2d_model_acquisition(gpr, acquisition, last_points=None, res=200):
"""
Contour plots for model prediction and acquisition function value of a 2d model.
If ``last_points`` passed, they are highlighted.
"""
if gpr.d != 2:
warnings.warn("This plots are only possible in 2d.")
return
# TODO: option to restrict bounds to the min square containing traning samples,
# with some padding
bounds = gpr.bounds
x = np.linspace(bounds[0][0], bounds[0][1], res)
y = np.linspace(bounds[1][0], bounds[1][1], res)
X, Y = np.meshgrid(x, y)
xx = np.ascontiguousarray(np.vstack([X.reshape(X.size), Y.reshape(Y.size)]).T)
model_mean = gpr.predict(xx)
# TODO: maybe change this one below if __call__ method added to GP_acquisition
acq_value = acquisition(xx, gpr, eval_gradient=False)
# maybe show the next max of acquisition
acq_max = xx[np.argmax(acq_value)]
fig, ax = plt.subplots(1, 2, figsize=(8, 4))
cmap = [plt.get_cmap("magma"), plt.get_cmap("viridis")]
label = ["Model mean (log-posterior)", "Acquisition function value"]
for i, Z in enumerate([model_mean, acq_value]):
ax[i].set_title(label[i])
# Boost the upper limit to avoid truncation errors.
Z = np.clip(Z, min(Z[np.isfinite(Z)]), max(Z[np.isfinite(Z)]))
levels = np.arange(min(Z) * 0.99, max(Z) * 1.01, (max(Z) - min(Z)) / 500)
Z = Z.reshape(*X.shape)
norm = cm.colors.Normalize(vmax=Z.max(), vmin=Z.min())
# # Background of the same color as the bottom of the colormap, to avoid "gaps"
# plt.gca().set_facecolor(cmap[i].colors[0])
ax[i].contourf(X, Y, Z, levels, cmap=plt.get_cmap(cmap[i], 256), norm=norm)
points = ax[i].scatter(
*gpr.X_train.T, edgecolors="deepskyblue", marker=r"$\bigcirc$"
)
# Plot position of next best sample
point_max = ax[i].scatter(*acq_max, marker="x", color="k")
if last_points is not None:
points_last = ax[i].scatter(
*last_points.T, edgecolors="violet", marker=r"$\bigcirc$"
)
# Bounds
ax[i].set_xlim(bounds[0][0], bounds[0][1])
ax[i].set_ylim(bounds[1][0], bounds[1][1])
# Remove ticks, for ilustrative purposes only
# ax[i].set_xticks([], minor=[])
# ax[i].set_yticks([], minor=[])
legend_labels = {points: "Training points"}
if last_points is not None:
legend_labels[points_last] = "Points added in last iteration."
legend_labels[point_max] = "Next optimal location"
fig.legend(
list(legend_labels), list(legend_labels.values()), loc="lower center", ncol=99
)
plt.subplots_adjust(left=0.1, right=0.9, bottom=0.15)
def _plot_2d_model_acquisition_finite(gpr, acquisition, last_points=None, res=200):
"""
Contour plots for model prediction and acquisition function value of a 2d model.
If ``last_points`` passed, they are highlighted.
"""
if gpr.d != 2:
warnings.warn("This plots are only possible in 2d.")
return
# TODO: option to restrict bounds to the min square containing traning samples,
# with some padding
bounds = gpr.bounds
x = np.linspace(bounds[0][0], bounds[0][1], res)
y = np.linspace(bounds[1][0], bounds[1][1], res)
X, Y = np.meshgrid(x, y)
xx = np.ascontiguousarray(np.vstack([X.reshape(X.size), Y.reshape(Y.size)]).T)
model_mean = gpr.predict(xx)
# TODO: maybe change this one below if __call__ method added to GP_acquisition
acq_value = acquisition(xx, gpr, eval_gradient=False)
# maybe show the next max of acquisition
acq_max = xx[np.argmax(acq_value)]
fig, ax = plt.subplots(1, 2, figsize=(8, 4))
cmap = [plt.get_cmap("magma"), plt.get_cmap("viridis")]
label = ["Model mean (log-posterior)", "Acquisition function value"]
for i, Z in enumerate([model_mean, acq_value]):
ax[i].set_title(label[i])
# Boost the upper limit to avoid truncation errors.
Z_finite = Z[np.isfinite(Z)]
# Z_clipped = np.clip(Z_finite, min(Z[np.isfinite(Z)]), max(Z[np.isfinite(Z)]))
# Z_sort = np.sort(Z_finite)[::-1]
top_x_perc = np.sort(Z_finite)[::-1][: int(len(Z_finite) * 0.5)]
relevant_range = max(top_x_perc) - min(top_x_perc)
levels = np.linspace(
max(Z_finite) - 1.99 * relevant_range,
max(Z_finite) + 0.01 * relevant_range,
500,
)
Z[np.isfinite(Z)] = np.clip(Z_finite, min(levels), max(levels))
Z = Z.reshape(*X.shape)
norm = cm.colors.Normalize(vmax=max(levels), vmin=min(levels))
ax[i].set_facecolor("grey")
# # Background of the same color as the bottom of the colormap, to avoid "gaps"
# plt.gca().set_facecolor(cmap[i].colors[0])
ax[i].contourf(X, Y, Z, levels, cmap=plt.get_cmap(cmap[i], 256), norm=norm)
points = ax[i].scatter(
*gpr.X_train.T, edgecolors="deepskyblue", marker=r"$\bigcirc$"
)
# Plot position of next best sample
point_max = ax[i].scatter(*acq_max, marker="x", color="k")
if last_points is not None:
points_last = ax[i].scatter(
*last_points.T, edgecolors="violet", marker=r"$\bigcirc$"
)
# Bounds
ax[i].set_xlim(bounds[0][0], bounds[0][1])
ax[i].set_ylim(bounds[1][0], bounds[1][1])
# Remove ticks, for ilustrative purposes only
# ax[i].set_xticks([], minor=[])
# ax[i].set_yticks([], minor=[])
legend_labels = {points: "Training points"}
if last_points is not None:
legend_labels[points_last] = "Points added in last iteration."
legend_labels[point_max] = "Next optimal location"
fig.legend(
list(legend_labels), list(legend_labels.values()), loc="lower center", ncol=99
)
plt.subplots_adjust(left=0.1, right=0.9, bottom=0.15)
def _plot_2d_model_acquisition_std(gpr, acquisition, last_points=None, res=200):
"""
Contour plots for model prediction and acquisition function value of a 2d model.
If ``last_points`` passed, they are highlighted.
"""
if gpr.d != 2:
warnings.warn("This plots are only possible in 2d.")
return
# TODO: option to restrict bounds to the min square containing traning samples,
# with some padding
bounds = gpr.bounds
x = np.linspace(bounds[0][0], bounds[0][1], res)
y = np.linspace(bounds[1][0], bounds[1][1], res)
X, Y = np.meshgrid(x, y)
xx = np.ascontiguousarray(np.vstack([X.reshape(X.size), Y.reshape(Y.size)]).T)
model_mean, model_std = gpr.predict(xx, return_std=True)
# TODO: maybe change this one below if __call__ method added to GP_acquisition
acq_value = acquisition(xx, gpr, eval_gradient=False)
# maybe show the next max of acquisition
acq_max = xx[np.argmax(acq_value)]
fig, ax = plt.subplots(1, 3, figsize=(12, 4))
cmap = [plt.get_cmap("magma"), plt.get_cmap("viridis"), plt.get_cmap("magma")]
label = ["Model mean (log-posterior)", "Acquisition function value", "Model std dev."]
for i, Z in enumerate([model_mean, acq_value]):
ax[i].set_title(label[i])
# Boost the upper limit to avoid truncation errors.
Z_finite = Z[np.isfinite(Z)]
# Z_clipped = np.clip(Z_finite, min(Z[np.isfinite(Z)]), max(Z[np.isfinite(Z)]))
# Z_sort = np.sort(Z_finite)[::-1]
top_x_perc = np.sort(Z_finite)[::-1][: int(len(Z_finite) * 0.5)]
relevant_range = max(top_x_perc) - min(top_x_perc)
levels = np.linspace(
max(Z_finite) - 1.99 * relevant_range,
max(Z_finite) + 0.01 * relevant_range,
500,
)
Z[np.isfinite(Z)] = np.clip(Z_finite, min(levels), max(levels))
Z = Z.reshape(*X.shape)
norm = cm.colors.Normalize(vmax=max(levels), vmin=min(levels))
ax[i].set_facecolor("grey")
# # Background of the same color as the bottom of the colormap, to avoid "gaps"
# plt.gca().set_facecolor(cmap[i].colors[0])
ax[i].contourf(X, Y, Z, levels, cmap=cm.get_cmap(cmap[i], 256), norm=norm)
points = ax[i].scatter(
*gpr.X_train.T, edgecolors="deepskyblue", marker=r"$\bigcirc$"
)
# Plot position of next best sample
point_max = ax[i].scatter(*acq_max, marker="x", color="k")
if last_points is not None:
points_last = ax[i].scatter(
*last_points.T, edgecolors="violet", marker=r"$\bigcirc$"
)
# Bounds
ax[i].set_xlim(bounds[0][0], bounds[0][1])
ax[i].set_ylim(bounds[1][0], bounds[1][1])
# Remove ticks, for ilustrative purposes only
# ax[i].set_xticks([], minor=[])
# ax[i].set_yticks([], minor=[])
ax[2].set_title(label[2])
Z = model_std
Z_finite = Z[np.isfinite(model_mean)]
Z[~np.isfinite(model_mean)] = -np.inf
minz = min(Z_finite)
zrange = max(Z_finite) - minz
levels = np.linspace(minz, minz + (zrange if zrange > 0 else 0.00001), 500)
# Z[np.isfinite(model_mean)] = np.clip(Z_finite, min(levels), max(levels))
Z = Z.reshape(*X.shape)
norm = cm.colors.Normalize(vmax=max(levels), vmin=min(levels))
ax[2].set_facecolor("grey")
ax[2].contourf(X, Y, Z, levels, cmap=plt.get_cmap(cmap[2], 256), norm=norm)
points = ax[2].scatter(*gpr.X_train.T, edgecolors="deepskyblue", marker=r"$\bigcirc$")
# Plot position of next best sample
point_max = ax[2].scatter(*acq_max, marker="x", color="k")
if last_points is not None:
points_last = ax[2].scatter(
*last_points.T, edgecolors="violet", marker=r"$\bigcirc$"
)
# Bounds
ax[2].set_xlim(bounds[0][0], bounds[0][1])
ax[2].set_ylim(bounds[1][0], bounds[1][1])
legend_labels = {points: "Training points"}
if last_points is not None:
legend_labels[points_last] = "Points added in last iteration."
legend_labels[point_max] = "Next optimal location"
fig.legend(
list(legend_labels), list(legend_labels.values()), loc="lower center", ncol=99
)
plt.subplots_adjust(left=0.1, right=0.9, bottom=0.15)