# Ryan Turner (turnerry@iro.umontreal.ca)
from __future__ import absolute_import, division, print_function
from builtins import range
from sys import version_info
import numpy as np
import scipy.interpolate as si
from scipy.special import logsumexp
STRICT_SPACING = False
def clip_chk(a, a_min, a_max):
a_clip = np.clip(a, a_min, a_max)
# Check that clipping was small effect
assert np.all((a == a_clip) | np.isclose(a, a_min) | np.isclose(a, a_max))
return a_clip
[docs]def one_hot(y, n_labels):
"""Same functionality `sklearn.preprocessing.OneHotEncoder` but avoids
extra dependency.
Parameters
----------
y : ndarray of type int, shape (n_samples,)
Integers in range ``[0, n_labels)`` to be one-hot encoded.
n_labels : int
Number of labels, must be >= 1. This is not infered from `y` because
some labels may not be found in small data chunks.
Returns
-------
y_bin : ndarray of type bool, shape (n_samples, n_labels)
One hot encoding of `y`, with size ``(len(y), n_labels)``
"""
N, = y.shape
assert n_labels >= 1
assert y.dtype.kind == "i" # bool would confuse np indexing
assert np.all(0 <= y) and np.all(y < n_labels)
y_bin = np.zeros((N, n_labels), dtype=bool)
y_bin[range(N), y] = True
return y_bin
[docs]def normalize(log_pred_prob):
"""Normalize log probability distributions for classification.
Parameters
----------
log_pred_prob : ndarray, shape (n_samples, n_labels)
Each row corresponds to a categorical distribution with unnormalized
probabilities in log scale. Therefore, the number of columns must be at
least 1.
Returns
-------
log_pred_prob : ndarray, shape (n_samples, n_labels)
A row-wise normalized (``exp(log_pred_prob)`` sums to 1 on each row)
version of the input.
"""
assert log_pred_prob.ndim == 2
assert log_pred_prob.shape[1] >= 1 # Otherwise, can't make it sum to 1
normalizer = logsumexp(log_pred_prob, axis=1, keepdims=True)
log_pred_prob = log_pred_prob - normalizer
return log_pred_prob
[docs]def epsilon_noise(x, default_epsilon=1e-10, max_epsilon=1.0):
"""Add a small amount of noise to a vector such that the output vector has
all unique values. The ordering of the resutiling vector remains the
same: ``argsort(output) = argsort(input)`` if input values are unique.
Parameters
----------
x : ndarray, shape (n_samples,)
Input vector to be noise corrupted. Must have all finite values.
default_epsilon : float
Default noise to add for singleton lists, musts be > 0.0.
max_epsilon : float
Maximum amount of noise corruption regardless of scale found in `x`.
Returns
-------
x : ndarray, shape (n_samples,)
Noise correupted version of input. All values are unique with
probability 1. The ordering is the same as the input if the inputs
values are all unique.
"""
assert x.ndim == 1
assert np.all(np.isfinite(x))
u_x = np.unique(x)
delta = default_epsilon if len(u_x) <= 1 else np.min(np.diff(u_x))
delta = np.minimum(max_epsilon, delta)
assert 0.0 < delta and delta <= max_epsilon
x = x + delta * (np.random.rand(len(x)) - 0.5)
return x
# ============================================================================
# Area for annoying routines where we need use version_info and write
# different versions for Py 2 and Py3.
# ============================================================================
def _remove_chars_py2(x_str, del_chars):
"""See `_remove_chars_py3`."""
x_str = x_str.translate(None, del_chars)
return x_str
def _remove_chars_py3(x_str, del_chars):
"""Utility to remove specified characters from string.
Parameters
----------
x_str : str
Generic input string.
del_chars : str
String containing characters we would like to remove.
Returns
-------
x_str : str
Generic input string after removing characters in `del_chars`.
"""
translator = str.maketrans("", "", del_chars)
x_str = x_str.translate(translator)
return x_str
# We can probably remove this soon since Py2 is dead:
remove_chars = _remove_chars_py3 if version_info[0] >= 3 else _remove_chars_py2
# ============================================================================
# Interpolation utils
# Everything in here will be obsolete once various enhancements get merged
# into scipy for scipy.interpolate.interp1d.
# ============================================================================
[docs]def unique_take_last(xp, yp=None):
"""Take unique points in a sorted list `xp`. When duplicates occur take the
last element and its corresponding element in an auxilary list `yp`.
This function is useful for taking a set of points and making a proper step
function from them. A step function is ambiguous when there are multiple
points at the same x coordinate. Similar functionality can be obtained from
`np.unique` but it takes the first rather than last element when duplicates
occur.
Parameters
----------
xp : ndarray, shape (n_samples,)
A sorted list of points.
yp : None or ndarray of shape (n_samples,)
Optional points that must be kept allong with the x points. If `xp`
are points on the x-axis, then yp are the y coordinate points.
Returns
-------
xp : ndarray, shape (m_samples,)
Input `xp` after removing extra points. m_samples <= n_samples.
yp : ndarray, shape (m_samples,)
Input `yp` after removing extra points. m_samples <= n_samples.
"""
assert xp.ndim == 1
assert not np.any(np.isnan(xp))
assert yp is None or xp.shape == yp.shape
assert yp is None or (not np.any(np.isnan(xp)))
# Get deltas to determine unique points, and check pre-sorted exactly
deltas = np.diff(xp)
assert np.all(deltas >= 0)
idx = [] if xp.size == 0 else np.concatenate((deltas > 0, [True]))
xp = xp[idx]
yp = None if yp is None else yp[idx]
return xp, yp
[docs]def cummax_strict(x, copy=True):
"""Minimally increase array elements to make the array strictly increasing.
Parameters
----------
x : ndarray, shape (n_samples,)
A list of points.
copy : bool
If False, modify x in place.
Returns
-------
x : ndarray, shape (n_samples,)
A list of points that are now *strictly* sorted. If `x` was already
sorted then the new points will be as miniminally changed as the
floating point representation allows.
"""
assert x.ndim == 1
x = np.copy(x) if copy else x
for ii in range(1, len(x)):
x[ii] = np.maximum(np.nextafter(x[ii - 1], np.inf), x[ii])
assert np.all(np.diff(x) > 0)
return x
[docs]def eval_step_func(x_grid, xp, yp, ival=None, assume_sorted=False, skip_unique_chk=False):
"""Evaluate a stepwise function. Based on the ECDF class in statsmodels.
The function is assumed to cadlag (like a CDF function).
This is a non-OOP equivalent to class:
`statsmodels.distributions.empirical_distribution.StepFunction`
with ``side='right'`` option to be like a CDF.
Parameters
----------
x_grid : ndarray, shape (n_grid,)
Values to evaluate the stepwise function at.
xp : ndarray, shape (n_samples,)
Points at which the step function changes. Typically of type float.
yp : ndarray, shape (n_samples,)
The new values at each of the steps
ival : scalar or None
Initial value for step function, e.g., the value of the step function
at -inf. If None, we just require that all `x_grid` values are after
the first step.
assume_sorted : bool
Set to True is `xp` is alreaded sorted in increasing order. This skips
sorting for computational speed.
skip_unique_chk: bool
Assume all values in `xp` are sorted and unique. Setting to True skips
checking this condition for speed.
Returns
-------
y_grid : ndarray, shape (n_grid,)
Step function defined by `xp` and `yp` evaluated at the points in
`x_grid`.
"""
assert x_grid.ndim == 1
assert xp.ndim == 1 and xp.shape == yp.shape
if not assume_sorted:
idx = np.argsort(xp)
xp, yp = xp[idx], yp[idx]
# Step function not well defined if xp grid has duplicates
assert skip_unique_chk or np.all(np.diff(xp) > 0.0)
if ival is None:
# This will have error if xp is empty
assert len(x_grid) == 0 or np.all(xp[0] <= x_grid)
else:
assert np.ndim(ival) == 0.0
xp = np.concatenate(([-np.inf], xp))
yp = np.concatenate(([ival], yp))
idx = np.searchsorted(xp, x_grid, side="right") - 1
y_grid = yp[idx]
return y_grid
def _interp1d(x_grid, xp, yp, kind="linear"):
"""Wrap `scipy.interpolate.interp1d` so it can handle ``'previous'`` like
MATLAB's `interp1` function. ``'next'`` may be added here in the future.
This wrapper does not support extrapolation at the moment. Scipy ENH such
as #6718 may be merged to scipy master in the near future and make this
wrapper obsolete.
Parameters
----------
x_grid : ndarray, shape (n_grid,)
Values to evaluate the stepwise function at.
xp : ndarray, shape (n_samples,)
Points at which the step function changes. Typically of type float.
yp : ndarray, shape (n_samples,)
The new values at each of the steps
kind : str
Type of interpolation scheme, must be ``'previous'`` or any method that
`scipy.interpolate.interp1d` can process such as ``'linear'``.
Returns
-------
y_grid : ndarray, shape (n_grid,)
Interpolation `xp` and `yp` evaluated at the points in `x_grid`.
"""
assert x_grid.ndim == 1
assert xp.ndim == 1 and xp.shape == yp.shape
assert xp.size >= 2 # at least 2 points need to do area
assert np.all(np.diff(xp) >= 0)
if kind == "previous":
# eval_step_func does not work when x points overlap so need to call
# unique_take_last to get final one.
xp, yp = unique_take_last(xp, yp)
y_grid = eval_step_func(x_grid, xp, yp, assume_sorted=True)
elif kind == "next":
# It would be easy to modify eval_step_func to handle this case, but we
# don't have any need for it right now.
raise NotImplementedError
else:
# interp1d appears to do the right thing with points on top of each
# other if assume_sorted=True and given sorted data, but can apply
# cummax strict to make all points exactly unique to be extra safe.
xp = cummax_strict(xp) if STRICT_SPACING else xp
f = si.interp1d(xp, yp, kind=kind, assume_sorted=True)
y_grid = f(x_grid)
return y_grid
# Make a version of interp1d that supprts vectorized inputs:
interp1d = np.vectorize(_interp1d, otypes=(float,), excluded=("kind", 3), signature="(n),(m),(m)->(n)")
[docs]def area(x_curve, y_curve, kind):
"""Compute area under function in vectorized way.
Parameters
----------
x_curve : ndarray, shape (n_boot, n_thresholds)
The sample points corresponding to the y values. Must be sorted.
y_curve : ndarray, shape (n_boot, n_thresholds)
Input array to integrate. Must be same size as `x_curve`. Operation
performed independently for each column.
kind : {'linear', 'kind'}
Type of interpolation scheme to turn points into lines.
Returns
-------
auc : ndarray, shape (n_boot,)
Area under curve. Has same length as `x_curve` has columns.
"""
# Note: has some tests in perf_curves_test in addition to util_test.
assert x_curve.ndim == 2
assert x_curve.shape[1] >= 2 # at least 2 points need to do area
assert not np.any(np.isnan(x_curve))
assert y_curve.shape == x_curve.shape
assert not np.any(np.isnan(y_curve))
if kind == "previous":
with np.errstate(invalid="ignore"):
# Use nansum so we consider inf y_curve for 0 width as 0 area
auc = np.nansum(y_curve[:, :-1] * np.diff(x_curve, axis=1), axis=1)
elif kind == "linear":
auc = np.trapz(y_curve, x_curve, axis=1)
else:
# 'next' could easily be added, but others would be a pain. We could
# simply use interp1d to make fine grid then use previous to get area.
raise NotImplementedError
# Make sure we have legit area, this could happen with inf & -inf in curves
assert not np.any(np.isnan(auc))
return auc