import numpy as np
import scipy.stats as spstats
from abc import ABC, abstractmethod
# TODO: implement histogram mdl with proper discretization.
[docs]def np_number_1d(x, dtype=np.dtype("f8"), copy=True):
"""Convert x to 1d np number array
Args:
x (1d sequence of values convertable to np.number)
dtype (np number type): default to 64-bit float
copy (bool): passed to np.array()
Returns:
xarr (1d np.number array)
Raises:
ValueError: If x is not convertable to provided dtype or non-1d.
If dtype is not subdtype of np number.
"""
if not np.issubdtype(dtype, np.number):
raise ValueError("dtype must be a type of numpy number")
xarr = np.array(x, dtype=dtype, copy=copy)
if xarr.ndim != 1:
raise ValueError("x should be 1D array. "
"x.shape: {}".format(xarr.shape))
return xarr
[docs]class Mdl(ABC):
"""Minimum description length abstract base class
Interface of various mdl schemas. Subclasses must implement mdl property
and encode method.
Attributes:
_x (1D np.number array): data used for fit mdl
_n (np.int): number of points in x
"""
@abstractmethod
def __init__(self, x, dtype=np.dtype("f8"), copy=True):
"""Initialize
Args:
x (1D np.number array): data used for fit mdl
dtype (np.dtype): default to 64-bit float
copy (bool): passed to np.array()
"""
self._x = np_number_1d(x, dtype=dtype, copy=copy)
# avoid divide by 0 exception
self._n = np.int_(self._x.shape[0])
[docs] @abstractmethod
def encode(self, x):
"""Encode another 1D number array with fitted code
Args:
x (1D np.number array): data to encode
"""
raise NotImplementedError
@property
def x(self):
return self._x.copy()
@property
@abstractmethod
def mdl(self):
raise NotImplementedError
[docs]class MultinomialMdl(Mdl):
""" Encode discrete values using multinomial distribution
Args:
x (1D np.number array): data used for fit mdl
dtype (np.dtype): default to 64-bit float
copy (bool): passed to np.array()
Note:
When x only has 1 uniq value. Encode the the number of values only.
"""
def __init__(self, x, dtype=np.dtype("f8"), copy=True):
super().__init__(x, dtype=dtype, copy=copy)
uniq_vals, uniq_val_cnts = np.unique(x, return_counts=True)
self._n_uniq = len(uniq_vals)
self._uniq_vals = uniq_vals
self._uniq_val_cnts = uniq_val_cnts
# make division by 0 valid.
self._uniq_val_ps = uniq_val_cnts / self._n
# create a lut for unique vals and ps
self._uniq_val_p_lut = dict(zip(uniq_vals, self._uniq_val_ps))
if len(self._uniq_vals) > 1:
mdl = (-np.log(self._uniq_val_ps) * self._uniq_val_cnts).sum()
elif len(self._uniq_vals) == 1:
mdl = np.log(self._n)
else:
# len(x) == 0
mdl = 0
self._mdl = mdl
return
[docs] def encode(self, qx, use_adjescent_when_absent=False):
"""Encode another 1D float array with fitted code
Args:
qx (1d float array): query data
use_adjescent_when_absent (bool): whether to use adjascent value
to compute query mdl. If not, uniform mdl is used. If
adjascent values have same distance to query value, choose the
one with smaller mdl.
Returns:
qmdl (float)
"""
qx = np_number_1d(qx, copy=False)
if qx.size == 0:
return 0
# Encode with 32bit float
unif_q_val_mdl = np.log(max(np.max(np.abs(qx))*2, 1))
if self._n == 0:
# uniform
return qx.size * unif_q_val_mdl
q_uniq_vals, q_uniq_val_cnts = np.unique(qx, return_counts=True)
q_mdl = 0
for uval, ucnt in zip(q_uniq_vals, q_uniq_val_cnts):
uval_p = self._uniq_val_p_lut.get(uval)
if uval_p is None:
if use_adjescent_when_absent:
uind = np.searchsorted(self._uniq_vals, uval)
if uind <= 0:
# uval lower than minimum
uval_p = self._uniq_val_ps[0]
elif uind >= self._n_uniq:
# uval higher than maximum
uval_p = self._uniq_val_ps[-1]
else:
# uval within range [1, _n_uniq-1]
# abs diff between uval and left val
l_diff = np.abs(self._uniq_vals[uind-1] - uval)
# abs diff between uval and right avl
r_diff = np.abs(self._uniq_vals[uind] - uval)
if l_diff < r_diff:
# closer to left
uval_p = self._uniq_val_ps[uind-1]
elif l_diff > r_diff:
# closer to right
uval_p = self._uniq_val_ps[uind]
else:
# same distance, choose max p
uval_p = max(self._uniq_val_ps[uind-1],
self._uniq_val_ps[uind])
uval_mdl = -np.log(uval_p)
else:
uval_mdl = unif_q_val_mdl
else:
uval_mdl = -np.log(uval_p)
q_mdl += uval_mdl * ucnt
return q_mdl
@property
def mdl(self):
return self._mdl
[docs]class ZeroIMdl(Mdl):
"""Encode an indicator vector of 0s and non-0s
"""
def __init__(self, x, dtype=np.dtype("f8"), copy=True):
super().__init__(x, dtype=dtype, copy=copy)
self._x_equal_zero = self._x == 0
self._mn_encoder = MultinomialMdl(self._x == 0, dtype=np.dtype("i1"),
copy=False)
# log(3) to encode 3 conditions:
# - empty
# - all non-0s or 0s
# - mixed non-0s and 0s
self._mdl = self._mn_encoder.mdl + np.log(3)
[docs] def encode(self, qx):
qx = np_number_1d(qx, copy=False)
return self._mn_encoder.encode(qx == 0) + np.log(3)
@property
def mdl(self):
return self._mdl
[docs]class ZeroIMultinomialMdl(Mdl):
def __init__(self, x, dtype=np.dtype("f8"), copy=True):
super().__init__(x, dtype=dtype, copy=copy)
self._x_nonzero = self._x[np.nonzero(self._x)]
self._k = self._x_nonzero.shape[0]
self._zi_encoder = ZeroIMdl(self._x, dtype=dtype, copy=False)
self._zi_mdl = self._zi_encoder.mdl
self._mn_encoder = MultinomialMdl(self._x_nonzero, dtype=dtype,
copy=False)
self._mn_mdl = self._mn_encoder.mdl
self._mdl = self._zi_mdl + self._mn_mdl
[docs] def encode(self, qx, use_adjescent_when_absent=False):
qx = np_number_1d(qx)
qx_nonzero = qx[np.nonzero(qx)]
qzi_mdl = self._zi_encoder.encode(qx)
qmn_mdl = self._mn_encoder.encode(
qx_nonzero, use_adjescent_when_absent=use_adjescent_when_absent)
return qzi_mdl + qmn_mdl
@property
def mdl(self):
return self._mdl
[docs]class GKdeMdl(Mdl):
"""Use Gaussian kernel density estimation to compute mdl
Args:
x (1D np.number array): data used for fit mdl
bandwidth_method: string
KDE bandwidth estimation method bing passed to
`scipy.stats.gaussian_kde`.
Types:
* `"scott"`: Scott's rule of thumb.
* `"silverman"`: Silverman"s rule of thumb.
* `constant`: constant will be timed by x.std(ddof=1) internally,
because scipy times bw_method value by std. "Scipy weights its
bandwidth by the ovariance of the input data" [3].
* `callable`: scipy calls the function on self
dtype (np.dtype): default to 64-bit float
copy (bool): passed to np.array()
Attributes:
_x (1d float array): data to fit
_n (int): number of elements in data
_bw_method (str): bandwidth method
_kde (:obj:`scipy kde`)
_logdens (1d float array): log density
"""
def __init__(self, x, kde_bw_method="scott", dtype=np.dtype("f8"),
copy=True):
super().__init__(x, dtype=dtype, copy=copy)
self._bw_method = kde_bw_method
if self._n == 0:
kde = None
logdens = None
bw_factor = None
# no non-zery vals. Indicator encoded by zi mdl.
kde_mdl = 0
else:
try:
logdens, kde = self.gaussian_kde_logdens(
self._x, bandwidth_method=self._bw_method,
ret_kernel=True)
# log(2) to encode kde or not
kde_mdl = -logdens.sum() + np.log(2)
bw_factor = kde.factor
except Exception as e:
kde = None
logdens = None
bw_factor = None
# encode just single value or multiple values
# fall back on uniform encoding
unif_sval_mdl = np.log(max(np.max(np.abs(self._x))*2, 1))
kde_mdl = unif_sval_mdl * self._n
self._bw_factor = bw_factor
self._kde = kde
self._logdens = logdens
self._mdl = kde_mdl
[docs] def encode(self, qx, mdl_scale_factor=1):
"""Encode query data using fitted KDE code
Args:
qx (1d float array)
mdl_scale_factor (number): times mdl by this number
Returns:
float: mdl
"""
qx = np_number_1d(qx, copy=False)
if qx.size == 0:
return 0
unif_sval_mdl = np.log(max(np.max(np.abs(qx))*2, 1))
if self._kde is None:
return unif_sval_mdl * len(qx) * mdl_scale_factor
else:
logdens = self._kde.logpdf(qx)
return -logdens.sum() * mdl_scale_factor
@property
def bandwidth(self):
if self._bw_factor is None:
return None
else:
return self._bw_factor * self._x.std(ddof=1)
@property
def mdl(self):
return self._mdl
@property
def kde(self):
return self._kde
[docs] @staticmethod
def gaussian_kde_logdens(x, bandwidth_method="scott",
ret_kernel=False):
"""
Estimate Gaussian kernel density estimation bandwidth for input `x`.
Parameters
----------
x: float array of shape `(n_samples)` or `(n_samples, n_features)`
Data points for KDE estimation.
bandwidth_method: string
KDE bandwidth estimation method bing passed to
`scipy.stats.gaussian_kde`.
"""
# This package uses (n_samples, n_features) convention
# scipy uses (n_featues, n_samples) convention
# so it is necessary to reshape the data
if x.ndim == 1:
x = x.reshape(1, -1)
elif x.ndim == 2:
x = x.T
else:
raise ValueError("x should be 1/2D array. "
"x.shape: {}".format(x.shape))
kde = spstats.gaussian_kde(x, bw_method=bandwidth_method)
logdens = np.log(kde.evaluate(x))
if ret_kernel:
return (logdens, kde)
else:
return logdens
[docs]class ZeroIGKdeMdl(Mdl):
"""
Zero indicator Gaussian KDE MDL
Encode the 0s and non-0s using bernoulli distribution.
Then, encode non-0s using gaussian kde. Finally, one ternary val indicates
all 0s, all non-0s, or otherwise
Args:
x (1D np.number array): data used for fit mdl
bandwidth_method: string
KDE bandwidth estimation method bing passed to
`scipy.stats.gaussian_kde`.
Types:
* `"scott"`: Scott's rule of thumb.
* `"silverman"`: Silverman"s rule of thumb.
* `constant`: constant will be timed by x.std(ddof=1) internally,
because scipy times bw_method value by std. "Scipy weights its
bandwidth by the ovariance of the input data" [3].
* `callable`: scipy calls the function on self
dtype (np.dtype): default to 64-bit float
copy (bool): passed to np.array()
References:
[1] https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.gaussian_kde.html
[2] https://en.wikipedia.org/wiki/Kernel_density_estimation
[3] https://jakevdp.github.io/blog/2013/12/01/kernel-density-estimation/
[4] https://github.com/scipy/scipy/blob/v1.0.0/scipy/stats/kde.py#L42-L564
""" # noqa
def __init__(self, x, kde_bw_method="scott", dtype=np.dtype("f8"),
copy=True):
super().__init__(x, dtype=dtype, copy=copy)
self._x_nonzero = self._x[np.nonzero(self._x)]
self._k = self._x_nonzero.shape[0]
self._bw_method = kde_bw_method
self._zi_encoder = ZeroIMdl(self._x, dtype=dtype, copy=False)
self._zi_mdl = self._zi_encoder.mdl
self._kde_encoder = GKdeMdl(self._x_nonzero, kde_bw_method,
dtype=dtype, copy=False)
self._kde_mdl = self._kde_encoder.mdl
self._mdl = self._zi_mdl + self._kde_mdl
[docs] def encode(self, qx):
"""Encode qx
Args:
qx (1d np number array)
Returns:
mdl (float)
"""
qx = np_number_1d(qx, copy=False)
qx_nonzero = qx[np.nonzero(qx)]
qzi_mdl = self._zi_encoder.encode(qx)
qkde_mdl = self._kde_encoder.encode(qx_nonzero)
return qzi_mdl + qkde_mdl
@property
def zi_mdl(self):
return self._zi_mdl
@property
def bandwidth(self):
return self._kde_encoder.bandwidth
@property
def kde(self):
return self._kde_encoder.kde
@property
def kde_mdl(self):
return self._kde_mdl
@property
def mdl(self):
return self._mdl
@property
def x_nonzero(self):
return self._x_nonzero.copy()