import numpy as np
from scedar import eda
from scedar import utils
[docs]class RareSampleDetection(object):
"""
K nearest neighbor detection of rare samples
Perform the rare sample detection procedure in parallel, with each
combination of parameters as a process. Because this procedure runs
iteratively, parallelizing each individual parameter combination run is
not implemented.
Stores the results for further lookup.
Parameters
----------
sdm: SampleDistanceMatrix or its subclass
Attributes
----------
_sdm: SampleDistanceMatrix
_res_lut: dict
lookup table of KNN rare sample detection results
"""
def __init__(self, sdm):
super(RareSampleDetection, self).__init__()
self._sdm = sdm
self._res_lut = {}
def _no_pdist_rare_s_detect(self, k, d_cutoff, n_iter, metric=None,
use_pca=False, use_hnsw=False,
index_params=None, query_params=None):
param_key = (k, d_cutoff, n_iter)
if param_key in self._res_lut:
return self._res_lut[param_key]
curr_sdm = self._sdm
curr_knn_s_inds, curr_knn_distances = curr_sdm.s_knns(
k, metric=metric, use_pca=use_pca, use_hnsw=use_hnsw,
index_params=index_params, query_params=query_params)
d_max = max([x[-1] for x in curr_knn_distances])
# curr_dist_mat being reduced by removing samples, but the data matrix
# is not edited. Thus, no need to copy.
# curr_dist_mat = self._sdm._d
# TODO: optimize to keep track of the removed samples.
curr_s_inds = np.arange(len(curr_knn_s_inds))
progress_list = [curr_s_inds.tolist()]
for i in range(1, n_iter+1):
i_d_cutoff = (d_cutoff +
(n_iter - i) / n_iter * max(0, d_max - d_cutoff))
# print(i_d_cutoff)
# print(curr_knn_distances)
# i_k = min(curr_dist_mat.shape[0]-1, k)
kept_curr_s_inds = []
# each column is sorted. Not in-place.
# sorted_curr_dist_mat = np.sort(curr_dist_mat, axis=0)
# size of curr_dist_mat is updated each iteration
for s_ind in range(len(curr_knn_s_inds)):
# i_k'th neighbor distance of sample s_ind
if curr_knn_distances[s_ind][-1] <= i_d_cutoff:
kept_curr_s_inds.append(s_ind)
curr_s_inds = curr_s_inds[kept_curr_s_inds]
progress_list.append(curr_s_inds.tolist())
if len(curr_s_inds) == 0:
return curr_s_inds.tolist(), progress_list
# no change of value. No need to copy.
curr_sdm = eda.SampleDistanceMatrix(
curr_sdm._x[kept_curr_s_inds, :], d=None,
metric=curr_sdm._metric, use_pdist=False,
nprocs=curr_sdm._nprocs)
curr_knn_s_inds, curr_knn_distances = curr_sdm.s_knns(
k, metric=metric, use_pca=use_pca, use_hnsw=use_hnsw,
index_params=index_params, query_params=query_params)
# thes last one of progress_list is equal to the curr_s_inds
return curr_s_inds.tolist(), progress_list
def _pdist_rare_s_detect(self, k, d_cutoff, n_iter, metric=None,
use_pca=False, use_hnsw=False,
index_params=None, query_params=None):
"""
KNN rare sample detection with scalar tuple of parameters.
Keep the same interface as the _no_pdist_rare_s_detect. The following
parameters are not used, [metric, use_pca, use_hnsw, index_params,
query_params].
"""
# TODO: copy lookup results
param_key = (k, d_cutoff, n_iter)
if param_key in self._res_lut:
return self._res_lut[param_key]
# curr_dist_mat being reduced by removing samples, but the data matrix
# is not edited. Thus, no need to copy.
curr_dist_mat = self._sdm._d
sorted_curr_dist_mat = np.sort(curr_dist_mat, axis=0)
d_max = sorted_curr_dist_mat[k, :].max()
curr_s_inds = np.arange(curr_dist_mat.shape[0])
progress_list = [curr_s_inds.tolist()]
for i in range(1, n_iter+1):
i_d_cutoff = (d_cutoff +
(n_iter - i) / n_iter * max(0, d_max - d_cutoff))
i_k = min(curr_dist_mat.shape[0]-1, k)
kept_curr_s_inds = []
# each column is sorted. Not in-place.
# size of curr_dist_mat is updated each iteration
for s_ind in range(curr_dist_mat.shape[0]):
# i_k'th neighbor distance of sample s_ind
if sorted_curr_dist_mat[i_k, s_ind] <= i_d_cutoff:
kept_curr_s_inds.append(s_ind)
# no change of value. No need to copy.
curr_dist_mat = curr_dist_mat[np.ix_(kept_curr_s_inds,
kept_curr_s_inds)]
sorted_curr_dist_mat = np.sort(curr_dist_mat, axis=0)
curr_s_inds = curr_s_inds[kept_curr_s_inds]
progress_list.append(curr_s_inds.tolist())
# thes last one of progress_list is equal to the curr_s_inds
return curr_s_inds.tolist(), progress_list
[docs] def detect_rare_samples(self, k, d_cutoff, n_iter, nprocs=1, metric=None,
use_pca=False, use_hnsw=False,
index_params=None, query_params=None):
"""
KNN rare sample detection with multiple parameter combinations
Assuming that there are at least k samples look similar in this
dataset, the samples with less than k similar neighbors may be
rare. The rare samples can either be really distinct from the general
populaton or caused by technical errors.
This procedure iteratively detects samples according to their k-th
nearest neighbors. The samples most distinct from its k-th nearest
neighbors are detected first. Then, the left samples are detected
by less stringent distance cutoff. The distance cutoff decreases
linearly from maximum distance to d_cutoff with n_iter iterations.
Parameters
----------
k: int list or scalar
K nearest neighbors to detect rare samples.
d_cutoff: float list or scalar
Samples with >= d_cutoff distances are distinct from each other.
Minimum (>=) distance to be called as rare.
n_iter: int list or scalar
N progressive iNN detections on the dataset.
metric: {'cosine', 'euclidean', None}
If none, self._sdm._metric is used.
use_pca: bool
Use PCA for nearest neighbors or not.
use_hnsw: bool
Use Hierarchical Navigable Small World graph to compute
approximate nearest neighbor.
index_params: dict
Parameters used by HNSW in indexing.
efConstruction: int
Default 100. Higher value improves the quality of a constructed
graph and leads to higher accuracy of search. However this also
leads to longer indexing times. The reasonable range of values
is 100-2000.
M: int
Default 5. Higher value leads to better recall and shorter
retrieval times, at the expense of longer indexing time. The
reasonable range of values is 5-100.
delaunay_type: {0, 1, 2, 3}
Default 2. Pruning heuristic, which affects the trade-off
between retrieval performance and indexing time. The default
is usually quite good.
post: {0, 1, 2}
Default 0. The amount and type of postprocessing applied to the
constructed graph. 0 means no processing. 2 means more
processing.
indexThreadQty: int
Default self._nprocs. The number of threads used.
query_params: dict
Parameters used by HNSW in querying.
efSearch: int
Default 100. Higher value improves recall at the expense of
longer retrieval time. The reasonable range of values is
100-2000.
nprocs: int
N processes to run all parameter tuples.
Returns
-------
res_list
Indices of non-rare samples of each corresponding parameter
tuple.
Notes
-----
If parameters are provided as lists of equal length n, the n
corresponding parameter tuples will be executed parallely.
Example:
`k = [10, 15, 20]`
`d_cutoff = [1, 2, 3]`
`n_iter = [10, 20, 30]`
`(k, d_cutoff, n_iter)` tuples `(10, 1, 10), (15, 2, 20), (20, 3, 30)`
will be tried parallely with nprocs.
"""
# Convert scalar to list
if np.isscalar(k):
k_list = [k]
else:
k_list = list(k)
if np.isscalar(d_cutoff):
d_cutoff_list = [d_cutoff]
else:
d_cutoff_list = list(d_cutoff)
if np.isscalar(n_iter):
n_iter_list = [n_iter]
else:
n_iter_list = list(n_iter)
# Check all param lists have the same length
if not (len(k_list) == len(d_cutoff_list) == len(n_iter_list)):
raise ValueError("Parameter should have the same length."
"k: {}, d_cutoff: {}, n_iter: {}.".format(
k, d_cutoff, n_iter))
n_param_tups = len(k_list)
# type check all parameters
for i in range(n_param_tups):
if k_list[i] < 1 or k_list[i] > self._sdm._x.shape[0] - 1:
raise ValueError("k should be >= 1 and <= n_samples-1. "
"k: {}".format(k))
else:
k_list[i] = int(k_list[i])
if d_cutoff_list[i] <= 0:
raise ValueError("d_cutoff should be > 0. "
"d_cutoff: {}".format(d_cutoff))
else:
d_cutoff_list[i] = float(d_cutoff_list[i])
if n_iter_list[i] < 1:
raise ValueError("n_iter should be >= 1. "
"n_iter: {}".format(n_iter))
else:
n_iter_list[i] = int(n_iter_list[i])
param_tups = [(k_list[i], d_cutoff_list[i], n_iter_list[i],
metric, use_pca, use_hnsw, index_params,
query_params)
for i in range(n_param_tups)]
nprocs = int(nprocs)
nprocs = min(nprocs, n_param_tups)
# returns (filtered_sdm, progress_list (list of kept indices))
if self._sdm._use_pdist:
res_list = utils.parmap(
lambda ptup: self._pdist_rare_s_detect(*ptup),
param_tups, nprocs)
else:
res_list = utils.parmap(
lambda ptup: self._no_pdist_rare_s_detect(*ptup),
param_tups, nprocs)
for i in range(n_param_tups):
# only use k, d, and n_iter for res saving
param_key = param_tups[i][:3]
if param_key not in self._res_lut:
self._res_lut[param_key] = res_list[i]
# print(res_list)
return [res[0] for res in res_list]