Source code for scedar.eda.sdm

import numpy as np

import scipy.cluster.hierarchy as sch
import scipy.spatial as spspatial
import scipy.sparse as spsparse

import sklearn as skl
import sklearn.metrics
import sklearn.manifold
from sklearn.neighbors import NearestNeighbors
import sklearn.preprocessing
from sklearn.decomposition import PCA
from sklearn.decomposition import TruncatedSVD

import warnings

import random

from collections import defaultdict

import networkx as nx

from fa2 import ForceAtlas2

import igraph as ig

from umap import UMAP

import nmslib

import scedar
from scedar.eda.plot import cluster_scatter
from scedar.eda.plot import heatmap
from scedar.eda.plot import hist_dens_plot
from scedar.eda.plot import networkx_graph
from scedar.eda.sfm import SampleFeatureMatrix
from scedar.eda.mdl import MultinomialMdl
from scedar import utils
from scedar.eda import mtype

[docs]class SampleDistanceMatrix(SampleFeatureMatrix): """ SampleDistanceMatrix: data with pairwise distance matrix Parameters ---------- x : ndarray or list data matrix (n_samples, n_features) d : ndarray or list or None distance matrix (n_samples, n_samples) If is None, d will be computed with x, metric, and nprocs. metric : string distance metric use_pdist : boolean to use the pairwise distance matrix or not. The pairwise distance matrix may be too large to save for datasets with a large number of cells. sids : homogenous list of int or string sample ids. Should not contain duplicated elements. fids : homogenous list of int or string feature ids. Should not contain duplicated elements. nprocs : int the number of processes for computing pairwise distance matrix Attributes ---------- _x : ndarray data matrix (n_samples, n_features) _d : ndarray distance matrix (n_samples, n_samples) _metric : string distance metric _sids : ndarray sample ids. _fids : ndarray sample ids. _tsne_lut: dict lookup table for previous tsne calculations. Each run has an indexed entry, {(param_str, index) : tsne_res} _last_tsne: float array The last *stored* tsne results. In no tsne performed before, a run with default parameters will be performed. _hnsw_index_lut: {string_index_parameters: hnsw_index} _last_k: int The last *k* used for s_knns computation. _last_knns: (knn_indices, knn_distances) The last computed s_knns. _knn_ng_lut: dict {(k, aff_scale): knn_graph} """ def __init__(self, x, d=None, metric="cosine", use_pdist=True, sids=None, fids=None, nprocs=None): super(SampleDistanceMatrix, self).__init__(x=x, sids=sids, fids=fids) if d is not None: if not use_pdist: raise ValueError("pdist cannot be provided when " "use pdist = False") try: d = np.array(d, copy=False, dtype="float64") except ValueError as e: raise ValueError("d must be float. {}".format(e)) if (d.ndim != 2 or d.shape[0] != d.shape[1] or d.shape[0] != self._x.shape[0]): # check provided distance matrix shape raise ValueError("d should have shape (n_samples, n_samples)") d = self.num_correct_dist_mat(d) else: if metric == "precomputed": raise ValueError("metric cannot be precomputed when " "d is None.") if nprocs is None: self._nprocs = 1 else: self._nprocs = max(int(nprocs), 1) self._lazy_load_d = d self._tsne_lut = {} self._lazy_load_last_tsne = None self._metric = metric self._use_pdist = use_pdist # Sorted distance matrix. Each column ascending from top to bottom. self._lazy_load_col_sorted_d = None self._lazy_load_col_argsorted_d = None self._knn_ng_lut = {} # sklearn.decomposition.PCA instance # use truncated svd for sparse if self._is_sparse: self._pca_n_components = min(100, self._x.shape[1] - 1) else: self._pca_n_components = min( 100, self._x.shape[0], self._x.shape[1]) self._lazy_load_skd_pca = None self._lazy_load_pca_x = None # umap self._lazy_load_umap_x = None # knn conn mat related self._last_k = None self._last_knns = None self._hnsw_index_lut = {}
[docs] def to_classified(self, labels): """Convert to SingleLabelClassifiedSamples Args: labels (list of labels): sample labels. Returns: SingleLabelClassifiedSamples """ slcs = scedar.eda.SingleLabelClassifiedSamples( self._x, labels, sids=self.sids, fids=self.fids, d=None, metric=self._metric, use_pdist=self._use_pdist, nprocs=self._nprocs) # pairwise distance matrix slcs._lazy_load_d = self._lazy_load_d # tsne slcs._tsne_lut = self._tsne_lut.copy() slcs._lazy_load_last_tsne = self._lazy_load_last_tsne # sorted distance matrix slcs._lazy_load_col_sorted_d = self._lazy_load_col_sorted_d slcs._lazy_load_col_argsorted_d = self._lazy_load_col_argsorted_d # knn graph slcs._knn_ng_lut = self._knn_ng_lut.copy() # pca slcs._pca_n_components = self._pca_n_components slcs._lazy_load_skd_pca = self._lazy_load_skd_pca slcs._lazy_load_pca_x = self._lazy_load_pca_x slcs._hnsw_index_lut = self._hnsw_index_lut.copy() return slcs
[docs] def sort_features(self, fdist_metric="cosine", optimal_ordering=False): sorted_f_inds = HClustTree.sort_x_by_d( self._x.T, metric=fdist_metric, optimal_ordering=optimal_ordering, nprocs=self._nprocs) self._x = self._x[:, sorted_f_inds] self._fids = self._fids[sorted_f_inds] return
# numerically correct dmat
[docs] @staticmethod def num_correct_dist_mat(dmat, upper_bound=None): if (not isinstance(dmat, np.ndarray) or dmat.ndim != 2 or dmat.shape[0] != dmat.shape[1]): # check distance matrix shape raise ValueError("dmat must be a 2D (n_samples, n_samples)" " np array") try: # Distance matrix diag vals should be close to 0. np.testing.assert_allclose(dmat[np.diag_indices(dmat.shape[0])], 0, atol=1e-5) except AssertionError as e: warnings.warn("distance matrix might not be numerically " "correct. diag vals " "should be close to 0. {}".format(e)) try: # distance matrix should be approximately symmetric np.testing.assert_allclose(dmat[np.triu_indices_from(dmat)], dmat.T[np.triu_indices_from(dmat)], rtol=0.001) except AssertionError as e: warnings.warn("distance matrix might not be numerically " "correct. should be approximately " "symmetric. {}".format(e)) dmat[dmat < 0] = 0 dmat[np.diag_indices(dmat.shape[0])] = 0 if upper_bound is not None: upper_bound = float(upper_bound) dmat[dmat > upper_bound] = upper_bound dmat[np.triu_indices_from(dmat)] = dmat.T[np.triu_indices_from(dmat)] return dmat
[docs] def put_tsne(self, str_params, res): """ Put t-SNE results into the lookup table. """ if type(str_params) != str: raise ValueError("Unknown key type: {}".format(str_params)) curr_store_ind = len(self._tsne_lut) + 1 tsne_params_key = (str_params, curr_store_ind) res_copy = res.copy() self._tsne_lut[tsne_params_key] = res_copy self._lazy_load_last_tsne = res_copy
[docs] def get_tsne_kv(self, key): """ Get t-SNE results from the lookup table. Return None if non-existent. Returns ------- res_tuple: tuple (key, val) pair of tsne result. """ if type(key) == int: # tuple key (param_str, ind) for tk in self._tsne_lut: if key == tk[1]: return (tk, self._tsne_lut[tk]) elif type(key) == str: for tk in self._tsne_lut: if key == tk[0]: return (tk, self._tsne_lut[tk]) else: raise ValueError("Unknown key type: {}".format(key)) # key cannot be found return None
[docs] def tsne(self, store_res=True, **kwargs): """ Run t-SNE on distance matrix. Parameters ---------- store_res: bool Store the results in lookup table or not. **kwargs Keyword arguments passed to tsne computation. Returns ------- tsne_res: float array t-SNE projections, (n_samples, m dimensions). """ # TODO: make parameter keys consistent such that same set of # parameters but different order will sill be the same. # check input args if self._use_pdist: if ("metric" in kwargs and kwargs["metric"] not in ("precomputed", self._metric)): raise ValueError("If you want to calculate t-SNE of a " "different " "metric than the instance metric, create " "another " "instance of the desired metric.") else: kwargs["metric"] = "precomputed" else: if "metric" in kwargs: if kwargs["metric"] == "precomputed": raise ValueError("Metric cannot be precomputed when " "use_pdist is False") elif kwargs["metric"] != self._metric: raise ValueError("If you want to calculate t-SNE of a " "different " "metric than the instance metric, create " "another " "instance of the desired metric.") else: kwargs["metric"] = self._metric str_params = str(sorted(kwargs.items())) tsne_kv = self.get_tsne_kv(str_params) if tsne_kv is None: if self._x.shape[0] == 0: tsne_res = np.empty((0, 0)) elif self._x.shape[0] == 1: tsne_res = np.zeros((1, 2)) else: if self._use_pdist: tsne_res = tsne(self._d, **kwargs) else: if self._is_sparse: tsne_res = tsne(self._x.toarray(), **kwargs) else: tsne_res = tsne(self._x, **kwargs) else: tsne_res = tsne_kv[1] if store_res: self.put_tsne(str_params, tsne_res) return tsne_res
[docs] def par_tsne(self, param_list, store_res=True, nprocs=1): """ Run t-SNE with multiple sets of parameters parallely. Parameters ---------- param_list: list of dict List of parameters being passed to t-SNE. nprocs: int Number of processes. Returns ------- tsne_res_list: list of float arrays List of t-SNE results of corresponding parameter set. Notes ----- Parallel running results cannot be stored during the run, because racing conditions may happen. """ nprocs = min(int(nprocs), len(param_list)) # single run tsne def srun_tsne(param_dict): return self.tsne(store_res=False, **param_dict) resl = utils.parmap(srun_tsne, param_list, nprocs) if store_res: for i in range(len(param_list)): self.put_tsne(str(param_list[i]), resl[i]) return resl
[docs] def tsne_plot(self, gradient=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last t-SNE projection with the provided gradient as color. Gradient is None by default. """ # labels are checked in cluster_scatter return cluster_scatter(self._last_tsne, labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=gradient, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def tsne_feature_gradient_plot(self, fid, transform=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last t-SNE projection with the provided gradient as color. Parameters ---------- fid: feature id scalar ID of the feature to be used for gradient plot. transform: callable Map transform on feature before plotting. labels: label array Labels assigned to each point, (n_samples,). selected_labels: label array Show gradient only for selected labels. Do not show non-selected. """ if mtype.is_valid_sfid(fid): fid = [fid] f_ind = self.f_id_to_ind(fid)[0] else: raise ValueError("Invalid fid {}." "Currently only support 1 " "feature gradient plot.".format(fid)) fx = self.f_ind_x_vec(f_ind, transform=transform) if labels is not None and len(labels) != fx.shape[0]: raise ValueError("labels ({}) must have same length as " "n_samples.".format(labels)) return cluster_scatter(self._last_tsne, labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=fx, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def pca_plot(self, component_ind_pair=(0, 1), gradient=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the PCA projection with the provided gradient as color. Gradient is None by default. """ # labels are checked in cluster_scatter return cluster_scatter(self._pca_x[:, component_ind_pair], labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=gradient, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def pca_feature_gradient_plot(self, fid, component_ind_pair=(0, 1), transform=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last PCA projection with the provided gradient as color. Parameters ---------- component_ind_pair: tuple of two ints Indices of the components to plot. fid: feature id scalar ID of the feature to be used for gradient plot. transform: callable Map transform on feature before plotting. labels: label array Labels assigned to each point, (n_samples,). selected_labels: label array Show gradient only for selected labels. Do not show non-selected. """ if mtype.is_valid_sfid(fid): fid = [fid] f_ind = self.f_id_to_ind(fid)[0] else: raise ValueError("Invalid fid {}." "Currently only support 1 " "feature gradient plot.".format(fid)) fx = self.f_ind_x_vec(f_ind, transform=transform) if labels is not None and len(labels) != fx.shape[0]: raise ValueError("labels ({}) must have same length as " "n_samples.".format(labels)) return cluster_scatter(self._pca_x[:, component_ind_pair], labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=fx, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def umap(self, use_pca=True, n_neighbors=5, n_components=2, n_epochs=None, learning_rate=1.0, init='spectral', min_dist=0.1, spread=1.0, set_op_mix_ratio=1.0, local_connectivity=1.0, repulsion_strength=1.0, negative_sample_rate=5, transform_queue_size=4.0, a=None, b=None, random_state=None, metric_kwds=None, angular_rp_forest=False, target_n_neighbors=-1, target_metric='categorical', target_metric_kwds=None, target_weight=0.5, transform_seed=42, verbose=False): if use_pca: umap_x = UMAP( n_neighbors=n_neighbors, n_components=n_components, metric=self._metric, n_epochs=n_epochs, learning_rate=learning_rate, init=init, min_dist=min_dist, spread=spread, set_op_mix_ratio=set_op_mix_ratio, local_connectivity=local_connectivity, repulsion_strength=repulsion_strength, negative_sample_rate=negative_sample_rate, transform_queue_size=transform_queue_size, a=a, b=b, random_state=random_state, metric_kwds=metric_kwds, angular_rp_forest=angular_rp_forest, target_n_neighbors=target_n_neighbors, target_metric=target_metric, target_metric_kwds=target_metric_kwds, target_weight=target_weight, transform_seed=transform_seed, verbose=verbose).fit_transform(self._pca_x) else: if self._use_pdist: umap_x = UMAP( n_neighbors=n_neighbors, n_components=n_components, metric='precomputed', n_epochs=n_epochs, learning_rate=learning_rate, init=init, min_dist=min_dist, spread=spread, set_op_mix_ratio=set_op_mix_ratio, local_connectivity=local_connectivity, repulsion_strength=repulsion_strength, negative_sample_rate=negative_sample_rate, transform_queue_size=transform_queue_size, a=a, b=b, random_state=random_state, metric_kwds=metric_kwds, angular_rp_forest=angular_rp_forest, target_n_neighbors=target_n_neighbors, target_metric=target_metric, target_metric_kwds=target_metric_kwds, target_weight=target_weight, transform_seed=transform_seed, verbose=verbose).fit_transform(self._d) else: umap_x = UMAP( n_neighbors=n_neighbors, n_components=n_components, metric=self._metric, n_epochs=n_epochs, learning_rate=learning_rate, init=init, min_dist=min_dist, spread=spread, set_op_mix_ratio=set_op_mix_ratio, local_connectivity=local_connectivity, repulsion_strength=repulsion_strength, negative_sample_rate=negative_sample_rate, transform_queue_size=transform_queue_size, a=a, b=b, random_state=random_state, metric_kwds=metric_kwds, angular_rp_forest=angular_rp_forest, target_n_neighbors=target_n_neighbors, target_metric=target_metric, target_metric_kwds=target_metric_kwds, target_weight=target_weight, transform_seed=transform_seed, verbose=verbose).fit_transform(self._x) self._lazy_load_umap_x = umap_x return umap_x
[docs] def umap_plot(self, component_ind_pair=(0, 1), gradient=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the UMAP projection with the provided gradient as color. Gradient is None by default. TODO: refactor plotting interface. Merge multiple plotting methods into one. """ # labels are checked in cluster_scatter return cluster_scatter(self._umap_x[:, component_ind_pair], labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=gradient, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def umap_feature_gradient_plot(self, fid, component_ind_pair=(0, 1), transform=None, labels=None, selected_labels=None, plot_different_markers=False, label_markers=None, shuffle_label_colors=False, xlim=None, ylim=None, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last UMAP projection with the provided gradient as color. Parameters ---------- component_ind_pair: tuple of two ints Indices of the components to plot. fid: feature id scalar ID of the feature to be used for gradient plot. transform: callable Map transform on feature before plotting. labels: label array Labels assigned to each point, (n_samples,). selected_labels: label array Show gradient only for selected labels. Do not show non-selected. """ if mtype.is_valid_sfid(fid): fid = [fid] f_ind = self.f_id_to_ind(fid)[0] else: raise ValueError("Invalid fid {}." "Currently only support 1 " "feature gradient plot.".format(fid)) fx = self.f_ind_x_vec(f_ind, transform=transform) if labels is not None and len(labels) != fx.shape[0]: raise ValueError("labels ({}) must have same length as " "n_samples.".format(labels)) return cluster_scatter(self._umap_x[:, component_ind_pair], labels=labels, selected_labels=selected_labels, plot_different_markers=plot_different_markers, label_markers=label_markers, shuffle_label_colors=shuffle_label_colors, gradient=fx, xlim=xlim, ylim=ylim, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def ind_x(self, selected_s_inds=None, selected_f_inds=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_s_inds: int array Index array of selected samples. If is None, select all. selected_f_inds: int array Index array of selected features. If is None, select all. Returns ------- subset: SampleDistanceMatrix """ if selected_s_inds is None: selected_s_inds = slice(None, None) if selected_f_inds is None: selected_f_inds = slice(None, None) if self._use_pdist: return SampleDistanceMatrix( x=self._x[selected_s_inds, :][:, selected_f_inds].copy(), d=self._d[selected_s_inds, :][:, selected_s_inds].copy(), metric=self._metric, use_pdist=self._use_pdist, sids=self._sids[selected_s_inds].tolist(), fids=self._fids[selected_f_inds].tolist(), nprocs=self._nprocs) else: return SampleDistanceMatrix( x=self._x[selected_s_inds, :][:, selected_f_inds].copy(), d=None, metric=self._metric, use_pdist=self._use_pdist, sids=self._sids[selected_s_inds].tolist(), fids=self._fids[selected_f_inds].tolist(), nprocs=self._nprocs)
[docs] def id_x(self, selected_sids=None, selected_fids=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_s_inds: int array Index array of selected samples. If is None, select all. selected_f_inds: int array Index array of selected features. If is None, select all. Returns ------- subset: SampleDistanceMatrix """ if selected_sids is None: selected_s_inds = None else: selected_s_inds = self.s_id_to_ind(selected_sids) if selected_fids is None: selected_f_inds = None else: selected_f_inds = self.f_id_to_ind(selected_fids) return self.ind_x(selected_s_inds, selected_f_inds)
[docs] def s_ith_nn_d(self, i): """ Computes the distances of the i-th nearest neighbor of all samples. """ return self._col_sorted_d[i, :]
[docs] def s_ith_nn_ind(self, i): """ Computes the sample indices of the i-th nearest neighbor of all samples. """ return self._col_argsorted_d[i, :]
[docs] def s_ith_nn_d_dist(self, i, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distances of the i-th nearest neighbor of all samples. """ x = self.s_ith_nn_d(i) return hist_dens_plot(x, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_knn_ind_lut(self, k, metric=None, use_pca=False, use_hnsw=False, index_params=None, query_params=None, verbose=False): """ Computes the lookup table for sample i and its KNN indices, i.e. `{i : [1st_NN_ind, 2nd_NN_ind, ..., nth_NN_ind], ...}` """ if k < 0 or k > self._x.shape[0] - 1: raise ValueError("k ({}) should >= 0 and <= n_samples-1".format(k)) k = int(k) if self._use_pdist: # each column is its KNN index from 1 to k knn_ind_arr = self._col_argsorted_d[1:k+1, :].copy() knn_order_ind_lut = dict(zip(range(knn_ind_arr.shape[1]), knn_ind_arr.T.tolist())) return knn_order_ind_lut else: if self._last_k is None or self._last_k < k: if k < 10: compute_k = min(10, max(self._x.shape[0] - 1, 0)) else: compute_k = k targets, distances = self.s_knns( compute_k, metric=metric, use_pca=use_pca, use_hnsw=use_hnsw, index_params=index_params, query_params=query_params, verbose=verbose) else: # if two samples are identical, their distance is 0 targets, distances = self._last_knns knn_order_ind_lut = defaultdict(list) for i in range(len(targets)): # create a key val pair knn_order_ind_lut[i] for j in range(k): knn_order_ind_lut[i].append(targets[i][j]) return knn_order_ind_lut
[docs] def s_knns(self, k, metric=None, use_pca=False, use_hnsw=False, index_params=None, query_params=None, verbose=False): """ Computes the k-nearest neighbors (KNNs) of samples. Parameters ---------- k: int The number of nearest neighbors. metric: {'cosine', 'euclidean', None} If none, self._metric is used. use_pca: bool Use PCA for nearest neighbors or not. use_hnsw: bool Use Hierarchical Navigable Small World graph to compute approximate nearest neighbor. index_params: dict Parameters used by HNSW in indexing. efConstruction: int Default 100. Higher value improves the quality of a constructed graph and leads to higher accuracy of search. However this also leads to longer indexing times. The reasonable range of values is 100-2000. M: int Default 5. Higher value leads to better recall and shorter retrieval times, at the expense of longer indexing time. The reasonable range of values is 5-100. delaunay_type: {0, 1, 2, 3} Default 2. Pruning heuristic, which affects the trade-off between retrieval performance and indexing time. The default is usually quite good. post: {0, 1, 2} Default 0. The amount and type of postprocessing applied to the constructed graph. 0 means no processing. 2 means more processing. indexThreadQty: int Default self._nprocs. The number of threads used. query_params: dict Parameters used by HNSW in querying. efSearch: int Default 100. Higher value improves recall at the expense of longer retrieval time. The reasonable range of values is 100-2000. verbose: bool Returns ------- knn_indices: list of numpy arrays The i-th array is the KNN indices of the i-th sample. knn_distances: list of numpy arrays The i-th array is the KNN distances of the i-th sample. """ if k < 1: raise ValueError("k should >= 1") if use_hnsw: targets, distances = self._s_knns_hnsw( k=k, metric=metric, use_pca=use_pca, index_params=index_params, query_params=query_params, verbose=verbose) else: if index_params is not None: raise ValueError("index_params are not used with " "use_hnsw=False.") if query_params is not None: raise ValueError("query_params are not used with " "use_hnsw=False.") targets, distances = self._s_knns_skl( k, metric=metric, use_pca=use_pca, verbose=verbose) self._last_k = k self._last_knns = (targets, distances) return targets, distances
[docs] def s_knn_connectivity_matrix(self, k, metric=None, use_pca=False, use_hnsw=False, index_params=None, query_params=None, verbose=False): """ Computes the connectivity matrix of KNN of samples. If an entry `(i, j)` has value 0, node `i` is not in node `j`'s KNN. If an entry `(i, j)` has value != 0, node `i` is in `j`'s KNN, and their distance is the entry value. If two NNs have distance euqal to 0, 0 will be replaced by -np.inf. Parameters ---------- k: int The number of nearest neighbors. metric: {'cosine', 'euclidean', None} If none, self._metric is used. use_pca: bool Use PCA for nearest neighbors or not. use_hnsw: bool Use Hierarchical Navigable Small World graph to compute approximate nearest neighbor. index_params: dict Parameters used by HNSW in indexing. efConstruction: int Default 100. Higher value improves the quality of a constructed graph and leads to higher accuracy of search. However this also leads to longer indexing times. The reasonable range of values is 100-2000. M: int Default 5. Higher value leads to better recall and shorter retrieval times, at the expense of longer indexing time. The reasonable range of values is 5-100. delaunay_type: {0, 1, 2, 3} Default 2. Pruning heuristic, which affects the trade-off between retrieval performance and indexing time. The default is usually quite good. post: {0, 1, 2} Default 0. The amount and type of postprocessing applied to the constructed graph. 0 means no processing. 2 means more processing. indexThreadQty: int Default self._nprocs. The number of threads used. query_params: dict Parameters used by HNSW in querying. efSearch: int Default 100. Higher value improves recall at the expense of longer retrieval time. The reasonable range of values is 100-2000. Returns ------- knn_conn_mat: float array (n_samples, n_samles) Non-zero entries are nearest neighbors (NNs). The values are distances. If two NNs have distance euqal to 0, 0 will be replaced by -np.inf. """ targets, distances = self.s_knns( k=k, metric=metric, use_pca=use_pca, use_hnsw=use_hnsw, index_params=index_params, query_params=query_params, verbose=verbose) sources = [np.repeat(i, len(targets[i])) for i in range(len(targets))] sources_1d = np.concatenate(sources, axis=0) targets_1d = np.concatenate(targets, axis=0) distances_1d = np.concatenate(distances, axis=0) distances_1d[distances_1d == 0] = -np.inf knn_conn_mat = spsparse.coo_matrix( (distances_1d, (sources_1d, targets_1d)), shape=(self._x.shape[0], self._x.shape[0])).tocsr() return knn_conn_mat
def _s_knns_hnsw(self, k, metric=None, use_pca=False, index_params=None, query_params=None, verbose=False): if k < 1: raise ValueError("k should >= 1.") if metric is None: metric = self._metric if use_pca: data_x = self._pca_x is_sparse = False if metric == "euclidean": metric = "l2" elif metric == "cosine": metric = "cosinesimil" else: raise ValueError( "HNSW only supports cosine and euclidean distance") else: data_x = self._x is_sparse = self._is_sparse if metric == "euclidean": if is_sparse: metric = "l2_sparse" else: metric = "l2" elif metric == "cosine": if is_sparse: metric = "cosinesimil_sparse_fast" else: metric = "cosinesimil" else: raise ValueError( "HNSW only supports cosine and euclidean distance") if is_sparse: data_type = nmslib.DataType.SPARSE_VECTOR else: data_type = nmslib.DataType.DENSE_VECTOR if index_params is None: index_params = { "efConstruction": 100, "M": 5, "delaunay_type": 2, "post": 0, "indexThreadQty": self._nprocs } if query_params is None: query_params = { "efSearch": 100 } # create index ind_pm_key = sorted([(k, v) for k, v in index_params.items() if k in ["efConstruction", "M", "delaunay_type", "post"]]) ind_pm_key.append(("metric", metric)) ind_pm_key.append(("use_pca", use_pca)) str_ind_pm_key = str(ind_pm_key) if str_ind_pm_key in self._hnsw_index_lut: hnsw = self._hnsw_index_lut[str_ind_pm_key] else: hnsw = nmslib.init(method="hnsw", space=metric, data_type=data_type) hnsw.addDataPointBatch(data_x) hnsw.createIndex(index_params, print_progress=verbose) self._hnsw_index_lut[str_ind_pm_key] = hnsw # query KNN hnsw.setQueryTimeParams(query_params) # k nearest neighbors # hnsw query may include self. compute_k = k + 1 knns = hnsw.knnQueryBatch( data_x, k=compute_k, num_threads=self._nprocs) # print(knns) # construct knn conn mat. knn_targets_sep_l = [] knn_weights_sep_l = [] # need benchmark for i in range(len(knns)): i_targets = knns[i][0] # TODO: warn if query result length is different from compute_k # TODO: add arg. If true, raise error when length is wrong, # otherwise warn. i_weights = knns[i][1] # Note that the query result mey have < compute_k neighbors. for j in range(len(i_weights)): if i_targets[j] == i: i_targets = np.delete(i_targets, j) i_weights = np.delete(i_weights, j) break else: # there is no self in the knn list i_targets = np.delete(i_targets, -1) i_weights = np.delete(i_weights, -1) knn_targets_sep_l.append(i_targets) knn_weights_sep_l.append(i_weights) return knn_targets_sep_l, knn_weights_sep_l def _s_knns_skl(self, k, metric=None, use_pca=False, verbose=False): """ Runner for exact KNN using sklearn. """ if metric is None: metric = self._metric if use_pca: if verbose: print("Construct {} distance KNN graph on PCs.".format( metric)) nn_ins = NearestNeighbors( n_neighbors=k, metric=metric).fit(self._pca_x) else: if verbose: print("Construct {} distance KNN graph on raw data.".format( metric)) if self._use_pdist: nn_ins = NearestNeighbors( n_neighbors=k, metric="precomputed").fit(self._d) else: # TODO: better error message. # some metrics are not supported here. nn_ins = NearestNeighbors( n_neighbors=k, metric=metric).fit(self._x) distances, targets = nn_ins.kneighbors( None, n_neighbors=k, return_distance=True) targets = list(targets) distances = list(distances) return targets, distances
[docs] @staticmethod def knn_conn_mat_to_aff_graph(knn_conn_mat, aff_scale=1): sources, targets = knn_conn_mat.nonzero() weights = knn_conn_mat[sources, targets].A1 weights[weights == -np.inf] = 0 weights = (weights.max() - weights) * aff_scale graph = ig.Graph(edges=list(zip(sources, targets)), directed=False, edge_attrs={"weight": weights}) return graph
[docs] def s_knn_graph(self, k, gradient=None, labels=None, different_label_markers=True, aff_scale=1, iterations=2000, figsize=(20, 20), node_size=30, alpha=0.05, random_state=None, init_pos=None, node_with_labels=False, fa2_kwargs=None, nx_draw_kwargs=None): """ Draw KNN graph of SampleDistanceMatrix. Graph layout using forceatlas2 for its speed on large graph. Parameters ---------- k: int gradient: float array (n_samples,) color gradient labels: label list (n_samples,) labels different_label_markers: bool whether plot different labels with different markers aff_scale: float Affinity is calculated by `(max(distance) - distance) * aff_scale` iterations: int ForceAtlas2 iterations figsize: (float, float) node_size: float alpha: float random_state: int init_pos: float array Initial position of ForceAtlas2, (n_samples, 2). node_with_labels: bool fa2_kwargs: dict nx_draw_kwargs: dict Returns ------- fig: matplotlib figure KNN graph. """ # TODO: Docstring. Feature gradient. # (n_samples, n_samples). Non-neighbor entries are 0. knn_ng_param_key = (k, aff_scale) if knn_ng_param_key in self._knn_ng_lut: ng = self._knn_ng_lut[knn_ng_param_key] else: knn_d_csr = self.s_knn_connectivity_matrix(k) # affinity shoud negatively correlate to distance = (knn_d_csr.max() - * aff_scale # Undirected graph ng = nx.Graph() ng = nx.from_scipy_sparse_matrix( knn_d_csr, parallel_edges=False, create_using=ng, edge_attribute="weight") self._knn_ng_lut[knn_ng_param_key] = ng if fa2_kwargs is None: fa2_kwargs = {} else: fa2_kwargs = fa2_kwargs.copy() random.seed(random_state) forceatlas2 = ForceAtlas2( # Dissuade hubs outboundAttractionDistribution=fa2_kwargs.pop( "outboundAttractionDistribution", True), edgeWeightInfluence=fa2_kwargs.pop("edgeWeightInfluence", 1.0), # Performance # Tolerance jitterTolerance=fa2_kwargs.pop("jitterTolerance", 1.0), barnesHutOptimize=fa2_kwargs.pop("barnesHutOptimize", True), barnesHutTheta=fa2_kwargs.pop("barnesHutTheta", 1.2), # Tuning scalingRatio=fa2_kwargs.pop("scalingRatio", 2.0), strongGravityMode=fa2_kwargs.pop("strongGravityMode", True), gravity=fa2_kwargs.pop("gravity", 1.0), # Log verbose=fa2_kwargs.pop("verbose", False), **fa2_kwargs) knn_fa2pos = forceatlas2.forceatlas2_networkx_layout( ng, pos=init_pos, iterations=iterations) if nx_draw_kwargs is None: nx_draw_kwargs = {} else: nx_draw_kwargs = nx_draw_kwargs.copy() fig = networkx_graph(ng, knn_fa2pos, alpha=alpha, figsize=figsize, gradient=gradient, labels=labels, different_label_markers=different_label_markers, node_size=node_size, node_with_labels=node_with_labels, nx_draw_kwargs=nx_draw_kwargs) return fig
@property def d(self): return self._d.tolist() @property def _d(self): if self._use_pdist: if self._is_sparse: sparse_compatible_metrics = ["cityblock", "cosine", "euclidean", "l1", "l2", "manhattan"] if self._metric not in sparse_compatible_metrics: raise ValueError("Only the following metrics are " "supportted " "for sparse matrices, {}".format( sparse_compatible_metrics)) if self._lazy_load_d is None: if self._x.size == 0: self._lazy_load_d = np.zeros((self._x.shape[0], self._x.shape[0])) else: if self._metric == "cosine": pdmat = self.cosine_pdist(self._x) elif self._metric == "correlation": pdmat = self.correlation_pdist(self._x) else: pdmat = skl.metrics.pairwise.pairwise_distances( self._x, metric=self._metric, n_jobs=self._nprocs) self._lazy_load_d = self.num_correct_dist_mat(pdmat) else: raise ValueError("This SDM instance does not use pdist.") return self._lazy_load_d
[docs] @staticmethod def cosine_pdist(x): """ Compute pairwise cosine pdist for x (n_samples, n_features). Adapted from Waylon Flinn's post on . Cosine distance is undefined if one of the vectors contain only 0s. Parameters ---------- x: ndarray (n_samples, n_features) Returns ------- d: ndarray Pairwise distance matrix, (n_samples, n_samples). """ # pairwise dot product matrix if spsparse.issparse(x): pdot_prod =, x.T).toarray() else: pdot_prod =, x.T) # diagonal values are self dot product, i.e. squared and sum. squared_sum = np.diag(pdot_prod) # inverse squared sum # when a sample has all 0s, its squared sum is also 0, thus inverse # is infinite. By sklearn pairwise_distances convention, # Zero vectors, e.g. [0, 0, 0, 0, 0], have cosine distance 0 to # themselves, but 1 distance to any other vectors including another # zero vector. In correlation distance, zero vectors have 0 distances # to themselves, but nan to others. # if it doesn't occur, set it's inverse magnitude to zero (instead of # inf) inv_squared_sum = [1 / ss if ss != 0 else 0 for ss in squared_sum] # square root of squared sum is l2 norm. inv_l2norm = np.sqrt(inv_squared_sum) # cosine similarity (elementwise multiply by inverse l2norm product) cosine_sim = pdot_prod * inv_l2norm cosine_sim = cosine_sim.T * inv_l2norm cosine_sim[np.diag_indices_from(cosine_sim)] = 1 return 1 - cosine_sim
[docs] @staticmethod def correlation_pdist(x): """ Compute pairwise correlation pdist for x (n_samples, n_features). Adapted from Waylon Flinn's post on . Parameters ---------- x: ndarray (n_samples, n_features) Returns ------- d: ndarray Pairwise distance matrix, (n_samples, n_samples). """ centered_x = x - x.mean(axis=1).reshape(x.shape[0], 1) return SampleDistanceMatrix.cosine_pdist(centered_x)
@property def _col_sorted_d(self): # Use mergesort to be stable if self._lazy_load_col_sorted_d is None: self._lazy_load_col_sorted_d = np.sort(self._d, kind="mergesort", axis=0) return self._lazy_load_col_sorted_d @property def _col_argsorted_d(self): # Use mergesort to be stable if self._lazy_load_col_argsorted_d is None: self._lazy_load_col_argsorted_d = np.argsort(self._d, kind="mergesort", axis=0) return self._lazy_load_col_argsorted_d @property def _last_tsne(self): if self._lazy_load_last_tsne is None: self._lazy_load_last_tsne = self.tsne() return self._lazy_load_last_tsne @property def _skd_pca(self): if self._lazy_load_skd_pca is None: if self._is_sparse: # use TruncatedSVD for sparse matrix self._lazy_load_skd_pca = TruncatedSVD( n_components=self._pca_n_components, random_state=17) else: # use standard PCA for dense matrix self._lazy_load_skd_pca = PCA( n_components=self._pca_n_components, svd_solver="auto", random_state=17) return self._lazy_load_skd_pca @property def _pca_x(self): if self._lazy_load_pca_x is None: self._lazy_load_pca_x = self._skd_pca.transform(self._x) return self._lazy_load_pca_x @property def _umap_x(self): if self._lazy_load_umap_x is None: self._lazy_load_umap_x = self.umap(random_state=17) return self._lazy_load_umap_x @property def metric(self): return self._metric @property def tsne_lut(self): return dict((key, val.copy()) for key, val in self._tsne_lut.items())
# x : (n_samples, n_features) or (n_samples, n_samples) # If metric is 'precomputed', x must be a pairwise distance matrix
[docs]def tsne(x, n_components=2, perplexity=30.0, early_exaggeration=12.0, learning_rate=200.0, n_iter=1000, n_iter_without_progress=300, min_grad_norm=1e-07, metric="euclidean", init="random", verbose=0, random_state=None, method="barnes_hut", angle=0.5): x_tsne = sklearn.manifold.TSNE( n_components=n_components, perplexity=perplexity, early_exaggeration=early_exaggeration, learning_rate=learning_rate, n_iter=n_iter, n_iter_without_progress=n_iter_without_progress, min_grad_norm=min_grad_norm, metric=metric, init=init, verbose=verbose, random_state=random_state, method=method, angle=angle).fit_transform(x) return x_tsne
[docs]class HClustTree(object): """ Hierarchical clustering tree. Implement simple tree operation routines. HCT is binary unbalanced tree. Attributes ---------- node : scipy.cluster.hierarchy.ClusterNode current node prev : HClustTree parent of current node """ def __init__(self, node, prev=None): super(HClustTree, self).__init__() self._node = node self._prev = prev if node is None: left = None right = None else: left = node.get_left() right = node.get_right() self._left = left self._right = right @property def prev(self): return self._prev
[docs] def count(self): if self._node is None: return 0 else: return self._node.get_count()
[docs] def left(self): return HClustTree(self._left, self)
[docs] def left_count(self): return self.left().count()
[docs] def right(self): return HClustTree(self._right, self)
[docs] def right_count(self): return self.right().count()
[docs] def leaf_ids(self): """Returns the list of leaf IDs from left to right Returns: :obj:`list` of leaf IDs """ if self._node is None: return [] else: return self._node.pre_order(lambda xn: xn.get_id())
[docs] def left_leaf_ids(self): return self.left().leaf_ids()
[docs] def right_leaf_ids(self): return self.right().leaf_ids()
[docs] def bi_partition(self, soft_min_subtree_size=1, return_subtrees=False): """ soft_min_subtree_size: when curr tree size < 2 * soft_min_subtree_size, it is impossible to have a bipartition with a minimum sub tree size bigger than soft_min_subtree_size. In this case, return the first partition. When soft_min_subtree_size = 1, the performance is the same as taking the first bipartition. When curr size = 1, the first bipartition gives (1, 0). Because curr size < 2 * soft_min_subtree_size, it goes directly to return. When curr size = 2, the first bipartition guarantees to give (1, 1), with the invariant that parent nodes of leaves always have 2 child nodes. This also goes directly to return. When curr size >= 3, the first bipartition guarantees to give two subtrees with size >= 1, with the same invariant in size = 2. """ soft_min_subtree_size = int(soft_min_subtree_size) if soft_min_subtree_size < 1: raise ValueError("soft_min_subtree_size should >= 1") lst = self.left() rst = self.right() if (self.count() >= 2 * soft_min_subtree_size and lst.count() != rst.count()): # cut is not balanced if lst.count() < rst.count(): min_st = lst max_st = rst min_side = "left" else: min_st = rst max_st = lst min_side = "right" # Invariants: # 1. min_st < max_st # 2. min_st size >= 1, which implies that max_st size >= 2 # 3. parent node of a leaf has two child nodes. while min_st.count() < soft_min_subtree_size: # increase min_st size until it is larger than min_st_size # or the max st size if min_side == "left": max_spl_st = max_st.left() max_const_st = max_st.right() else: # min side is right max_spl_st = max_st.right() max_const_st = max_st.left() if max_spl_st.count() <= 1: if max_spl_st.count() <= 0: # count < 1 or count == 0 # This should not happen given max_st > min_st >= 1 raise NotImplementedError("Unexpected branch reached") # split side only has 1 node # create an empty tree min_merge_st = min_st max_merge_st = max_spl_st else: # split max_spl_st if min_side == "left": max_merge_st = max_spl_st.right() min_merge_st_node = sch.ClusterNode(, left=min_st._node, right=max_spl_st._left, dist=0, count=min_st.count() + max_spl_st.left_count()) else: max_merge_st = max_spl_st.left() min_merge_st_node = sch.ClusterNode(, left=max_spl_st._right, right=min_st._node, dist=0, count=min_st.count() + max_spl_st.right_count()) min_merge_st = HClustTree(min_merge_st_node, None) if min_side == "left": merge_st_node = sch.ClusterNode(, left=min_merge_st._node, right=max_merge_st._node, dist=0, count=min_merge_st.count() + max_merge_st.count()) else: merge_st_node = sch.ClusterNode(, left=max_merge_st._node, right=min_merge_st._node, dist=0, count=min_merge_st.count() + max_merge_st.count()) merge_st = HClustTree(merge_st_node, self) min_merge_st._prev = merge_st max_merge_st._prev = merge_st max_const_st._prev = self if max_const_st.count() >= merge_st.count(): max_st = max_const_st min_st = merge_st else: # max_st > min_st max_st = merge_st min_st = max_const_st # reverse min side min_side = "left" if min_side == "right" else "right" # obtained a min size cut if min_side == "left": lst = min_st rst = max_st else: lst = max_st rst = min_st # instance is modified. self._left = lst._node self._right = rst._node labs, sids = self.cluster_id_to_lab_list( [lst.leaf_ids(), rst.leaf_ids()], self.leaf_ids()) if return_subtrees: return labs, sids, lst, rst else: return labs, sids
[docs] def n_round_bipar_cnt(self, n): assert n > 0 nr_bipar_cnt_list = [] curr_hct_list = [self] curr_hct_cnt_list = [] next_hct_list = [] for i in range(n): for iter_hct in curr_hct_list: iter_left = iter_hct.left() iter_right = iter_hct.right() next_hct_list += [iter_left, iter_right] curr_hct_cnt_list += [iter_left.count(), iter_right.count()] nr_bipar_cnt_list.append(curr_hct_cnt_list) curr_hct_list = next_hct_list next_hct_list = [] curr_hct_cnt_list = [] return nr_bipar_cnt_list
[docs] @staticmethod def cluster_id_to_lab_list(cl_sid_list, sid_list): """ Convert nested clustered ID list into cluster label list. For example, convert `[[0, 1, 2], [3,4]]` to `[0, 0, 0, 1, 1]` according to id_arr `[0, 1, 2, 3, 4]` Parameters cl_sid_list: list[list[id]] Nested list with each sublist as a sert of IDs from a cluster. sid_list: list[id] Flat list of lists. """ # checks uniqueness # This guarantees that clusters are all non-empty mtype.check_is_valid_sfids(sid_list) if type(cl_sid_list) != list: raise ValueError( "cl_sid_list must be a list: {}".format(cl_sid_list)) for x in cl_sid_list: mtype.check_is_valid_sfids(x) cl_id_mlist = np.concatenate(cl_sid_list).tolist() mtype.check_is_valid_sfids(cl_id_mlist) # np.unique returns sorted unique values if sorted(sid_list) != sorted(cl_id_mlist): raise ValueError( "sid_list should have the same ids as cl_sid_list.") cl_ind_lut = {} # iter_cl_ind : cluster index # iter_cl_sids: individual cluster list for iter_cl_ind, iter_cl_sids in enumerate(cl_sid_list): for sid in iter_cl_sids: cl_ind_lut[sid] = iter_cl_ind lab_list = [cl_ind_lut[x] for x in sid_list] return lab_list, sid_list
[docs] @staticmethod def hclust_linkage(dmat, linkage="complete", n_eval_rounds=None, is_euc_dist=False, optimal_ordering=False, verbose=False): dmat = np.array(dmat, dtype="float") dmat = SampleDistanceMatrix.num_correct_dist_mat(dmat) n = dmat.shape[0] if linkage == "auto": try_linkages = ("single", "complete", "average", "weighted") if is_euc_dist: try_linkages += ("centroid", "median", "ward") if n_eval_rounds is None: n_eval_rounds = int(np.ceil(np.log2(n))) else: n_eval_rounds = int(np.ceil(max(np.log2(n), n_eval_rounds))) ltype_mdl_list = [] for iter_ltype in try_linkages: iter_lhct = HClustTree.hclust_tree(dmat, linkage=iter_ltype) iter_nbp_cnt_list = iter_lhct.n_round_bipar_cnt(n_eval_rounds) iter_nbp_mdl_arr = np.array(list(map( lambda x: MultinomialMdl(np.array(x)).mdl, iter_nbp_cnt_list))) iter_nbp_mdl = np.sum( iter_nbp_mdl_arr / np.arange(1, n_eval_rounds + 1)) ltype_mdl_list.append(iter_nbp_mdl) linkage = try_linkages[ltype_mdl_list.index(max(ltype_mdl_list))] if verbose: print(linkage, tuple(zip(try_linkages, ltype_mdl_list)), sep="\n") dmat_sf = spspatial.distance.squareform(dmat) hac_z = sch.linkage(dmat_sf, method=linkage, optimal_ordering=optimal_ordering) return hac_z
[docs] @staticmethod def hclust_tree(dmat, linkage="complete", n_eval_rounds=None, is_euc_dist=False, optimal_ordering=False, verbose=False): hac_z = HClustTree.hclust_linkage( dmat=dmat, linkage=linkage, n_eval_rounds=n_eval_rounds, is_euc_dist=is_euc_dist, optimal_ordering=optimal_ordering, verbose=verbose) return HClustTree.hct_from_lkg(hac_z)
[docs] @staticmethod def hct_from_lkg(hac_z): return HClustTree(sch.to_tree(hac_z))
[docs] @staticmethod def sort_x_by_d(x, dmat=None, metric="cosine", linkage="auto", n_eval_rounds=None, optimal_ordering=False, nprocs=None, verbose=False): dmat = SampleDistanceMatrix(x, d=dmat, metric=metric, nprocs=nprocs)._d xhct = HClustTree.hclust_tree(dmat, linkage="auto", is_euc_dist=(metric == "euclidean"), optimal_ordering=optimal_ordering) return xhct.leaf_ids()