Source code for scedar.eda.slcs

import itertools

from collections import namedtuple

import numpy as np

from scipy.stats import ks_2samp

from sklearn.model_selection import train_test_split
import sklearn.utils

import matplotlib as mpl
import matplotlib.colors
import seaborn as sns

from collections import defaultdict

import xgboost as xgb

import inspect

from scedar.eda.sdm import SampleDistanceMatrix
from scedar.eda.plot import swarm
from scedar.eda.plot import heatmap
from scedar.eda import mdl
from scedar.eda import mtype
from scedar import utils

[docs]class SingleLabelClassifiedSamples(SampleDistanceMatrix): """Data structure of single label classified samples Attributes: _x (2D number array): (n_samples, n_features) data matrix. _d (2D number array): (n_samples, n_samples) distance matrix. _labs (list of labels): list of labels in the same type, int or str. _fids (list of feature IDs): list of feature IDs in the same type, int or str. _sids (list of sample IDs): list of sample IDs in the same type, int or str. _metric (str): Distance metric. Note: If sort by labels, the samples will be reordered, so that samples from left to right are from one label to another. """ # sid, lab, fid, x def __init__(self, x, labs, sids=None, fids=None, d=None, metric="cosine", use_pdist=True, nprocs=None): # sids: sample IDs. String or int. # labs: sample classified labels. String or int. # x: (n_samples, n_features) super(SingleLabelClassifiedSamples, self).__init__( x=x, d=d, metric=metric, use_pdist=use_pdist, sids=sids, fids=fids, nprocs=nprocs) mtype.check_is_valid_labs(labs) labs = np.array(labs) if self._sids.shape[0] != labs.shape[0]: raise ValueError("sids must have the same length as labs") self._labs = labs self._set_up_lab_rel_attrs() # keep a copy of original labels self._orig_labs = labs self._xgb_lut = {} return def _set_up_lab_rel_attrs(self): """Set up labels related attrs """ self._uniq_labs, self._uniq_lab_cnts = np.unique( self._labs, return_counts=True) # {lab: array([sid0, ...]), ...} sid_lut = {} for ulab in self._uniq_labs: sid_lut[ulab] = self._sids[self._labs == ulab] self._sid_lut = sid_lut # {sid1: lab1, ...} lab_lut = {} # sids only contain unique values for i in range(self._sids.shape[0]): lab_lut[self._sids[i]] = self._labs[i] self._lab_lut = lab_lut
[docs] def sort_by_labels(self): """ Return a copy with sorted sample indices by labels and distances. """ labels = np.array(self.labs) # slcs is empty if len(labels) == 0 or self._x.size == 0: return self.ind_x() uniq_labs = np.unique(labels) s_ind_lut = dict([(ulab, np.where(labels == ulab)[0]) for ulab in uniq_labs]) # sort within each label for ulab in uniq_labs: # get sample indices of that class s_inds = s_ind_lut[ulab] # sort that class by distance to the first sample # get a list of distances to the frist sample s_dist_to_s0_list = [self._d[s_inds[0], s_inds[i]] for i in range(len(s_inds))] # sort indices by distances sorted_s_inds = s_inds[np.argsort(s_dist_to_s0_list, kind="mergesort")] # update lut s_ind_lut[ulab] = sorted_s_inds # sort classes by distances of first samples # frist sample indices lab_fs_inds = [s_ind_lut[ulab][0] for ulab in uniq_labs] # distance of first samples to the first class first sample lab_fs_dist_to_fc_list = [self._d[lab_fs_inds[0], lab_fs_inds[i]] for i in range(len(lab_fs_inds))] sorted_ulabs = uniq_labs[np.argsort(lab_fs_dist_to_fc_list, kind="mergesort")] sorted_s_inds = np.concatenate([s_ind_lut[ulab] for ulab in sorted_ulabs]) return self.ind_x(sorted_s_inds)
[docs] def filter_min_class_n(self, min_class_n): uniq_lab_cnts = np.unique(self._labs, return_counts=True) nf_sid_ind = np.in1d( self._labs, (uniq_lab_cnts[0])[uniq_lab_cnts[1] >= min_class_n]) return self.ind_x(nf_sid_ind)
[docs] def labs_to_sids(self, labs): return tuple(tuple(self._sid_lut[y].tolist()) for y in labs)
[docs] def sids_to_labs(self, sids): return np.array([self._lab_lut[x] for x in sids])
[docs] def ind_x(self, selected_s_inds=None, selected_f_inds=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_s_inds: int array Index array of selected samples. If is None, select all. selected_f_inds: int array Index array of selected features. If is None, select all. Returns ------- subset: SingleLabelClassifiedSamples """ if selected_s_inds is None: selected_s_inds = slice(None, None) if selected_f_inds is None: selected_f_inds = slice(None, None) if self._use_pdist: return SingleLabelClassifiedSamples( x=self._x[selected_s_inds, :][:, selected_f_inds].copy(), d=self._d[selected_s_inds, :][:, selected_s_inds].copy(), labs=self._labs[selected_s_inds].tolist(), metric=self._metric, use_pdist=self._use_pdist, sids=self._sids[selected_s_inds].tolist(), fids=self._fids[selected_f_inds].tolist(), nprocs=self._nprocs) else: return SingleLabelClassifiedSamples( x=self._x[selected_s_inds, :][:, selected_f_inds].copy(), d=None, labs=self._labs[selected_s_inds].tolist(), metric=self._metric, use_pdist=self._use_pdist, sids=self._sids[selected_s_inds].tolist(), fids=self._fids[selected_f_inds].tolist(), nprocs=self._nprocs)
[docs] def merge_labels(self, orig_labs_to_merge, new_lab): """Merge selected labels into a new label Args: orig_labs_to_merge (list of unique labels): original labels to be merged into a new label new_lab (label): new label of the merged labels Returns: None Note: Update labels in place. """ if not mtype.is_valid_lab(new_lab): raise ValueError("new_lab, {}, must be str or int") mtype.check_is_valid_labs(orig_labs_to_merge) # all labs must be unique if len(orig_labs_to_merge) != len(np.unique(orig_labs_to_merge)): raise ValueError("orig_labs_to_merge must all be unique") for ulab in orig_labs_to_merge: if ulab not in self._uniq_labs: raise ValueError("label {} not in original unique " "labels".format(ulab)) updated_labs = self._labs.copy() for i, lab in enumerate(self._labs): if lab in orig_labs_to_merge: updated_labs[i] = new_lab self._labs = updated_labs self._set_up_lab_rel_attrs() return
[docs] def relabel(self, labels): """ Return a new SingleLabelClassifiedSamples with new labels. """ return SingleLabelClassifiedSamples( x=self._x.copy(), labs=labels, d=self._d.copy(), sids=self._sids.tolist(), fids=self._fids.tolist(), metric=self._metric, nprocs=self._nprocs)
[docs] def id_x(self, selected_sids=None, selected_fids=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_s_inds: int array Index array of selected samples. If is None, select all. selected_f_inds: int array Index array of selected features. If is None, select all. Returns ------- subset: SingleLabelClassifiedSamples """ if selected_sids is None: selected_s_inds = None else: selected_s_inds = self.s_id_to_ind(selected_sids) if selected_fids is None: selected_f_inds = None else: selected_f_inds = self.f_id_to_ind(selected_fids) return self.ind_x(selected_s_inds, selected_f_inds)
[docs] @staticmethod def select_labs_bool_inds(ref_labs, selected_labs): if selected_labs is None: return np.repeat(True, len(ref_labs)) if np.isscalar(selected_labs): selected_labs = [selected_labs] ref_uniq_labs = np.unique(ref_labs).tolist() if not all([x in ref_uniq_labs for x in selected_labs]): raise ValueError("selected_labs: {} are not all existed " "in the unique ref labels " "{}".format(selected_labs, ref_uniq_labs)) lab_selected_s_bool_inds = np.in1d(ref_labs, selected_labs) return lab_selected_s_bool_inds
[docs] def lab_x_bool_inds(self, selected_labs): return self.select_labs_bool_inds(self._labs, selected_labs)
[docs] def lab_x(self, selected_labs): lab_selected_s_bool_inds = self.lab_x_bool_inds(selected_labs) return self.ind_x(lab_selected_s_bool_inds)
@staticmethod def _xgb_train_runner(x, lab_inds, fids, test_size=0.3, num_boost_round=10, nprocs=1, random_state=None, silent=1, xgb_params=None): """ Run xgboost train with prepared data and parameters. Returns ------- sorted_fscore_list: list Ordered important features and their results. bst: xgb.Booster Fitted model. eval_stats: tuple Final test and train error. """ n_uniq_labs = len(np.unique(lab_inds)) # xgb only takes string as feature names str_fids = list(map(str, fids)) orig_fid_lut = dict(zip(str_fids, fids)) # Prepare default xgboost parameters xgb_random_state = 0 if random_state is None else random_state if xgb_params is None: # Use log2(n_features) as default depth. xgb_params = { "eta": 0.3, "max_depth": 6, "silent": silent, "nthread": nprocs, "alpha": 1, "lambda": 0, "seed": xgb_random_state } if n_uniq_labs == 2: # do binary classification xgb_params["objective"] = "binary:logistic" xgb_params["eval_metric"] = "error" else: # do multi-label classification xgb_params["num_class"] = n_uniq_labs xgb_params["objective"] = "multi:softmax" xgb_params["eval_metric"] = "merror" # split training and testing # random state determined by numpy train_x, test_x, train_labs, test_labs = train_test_split( x, lab_inds, test_size=test_size) # xgb datastructure to hold data and labels dtrain = xgb.DMatrix(train_x, train_labs, feature_names=str_fids) dtest = xgb.DMatrix(test_x, test_labs, feature_names=str_fids) # list of data to evaluate eval_list = [(dtest, "test"), (dtrain, "train")] evals_result = {} if silent: verbose_eval = False else: verbose_eval = True # bst is the train boost tree model bst = xgb.train(xgb_params, dtrain, num_boost_round, eval_list, evals_result=evals_result, verbose_eval=verbose_eval) # Turn dict to list # [ [('train...', float), ...], # [('test...', float), ...] ] eval_stats = [[(eval_name + " " + mname, mval_list[num_boost_round-1]) for mname, mval_list in eval_dict.items()] for eval_name, eval_dict in evals_result.items()] # {feature_name: fscore, ...} fscore_dict = bst.get_fscore() orig_fid_fscore_list = [(orig_fid_lut[istr_fid], ifscore) for istr_fid, ifscore in fscore_dict.items()] sorted_fscore_list = sorted(orig_fid_fscore_list, key=lambda t: t[1], reverse=True) return sorted_fscore_list, bst, eval_stats
[docs] def feature_importance_across_labs(self, selected_labs, test_size=0.3, num_boost_round=10, nprocs=1, random_state=None, silent=1, xgb_params=None, num_bootstrap_round=0, bootstrap_size=None, shuffle_features=False): """ Use xgboost to determine the importance of features determining the difference between samples with different labels. Run cross validation on dataset and obtain import features. Parameters ---------- selected_labs: label list Labels to compare using xgboost. test_size: float in range (0, 1) Ratio of samples to be used for testing num_bootstrap_round: int Do num_bootstrap_round times of simple bootstrapping if `num_bootstrap_round > 0`. bootstrap_size: int The number of samples for each bootstrapping run. shuffle_features: bool num_boost_round: int The number of rounds for xgboost training. random_state: int nprocs: int xgb_params: dict Parameters for xgboost run. If None, default will be used. If provided, they will be directly used for xgbooster. Returns ------- feature_importance_list: list of feature importance of each run [(feature_id, mean of fscore across all bootstrapping rounds, standard deviation of fscore across all bootstrapping rounds, number of times used all bootstrapping rounds), ...] bst_list: list of xgb Booster Fitted boost tree model Notes ----- If multiple features are highly correlated, they may not all show up in the resulting tree. You could try to reduce redundant features first before comparing different clusters, or you could also interpret the important features further after obtaining the important features. For details about xgboost parameters, check the following links: [1]\ complete-guide-parameter-tuning-xgboost-with-codes-python/ [2] [3] [4] [5]\ complete-guide-parameter-tuning-xgboost-with-codes-python/ """ num_boost_round = int(num_boost_round) if num_boost_round <= 0: raise ValueError("num_boost_round must >= 1") # This is for implementing caching in the future. selected_uniq_labs = np.unique(selected_labs).tolist() # subset SLCS lab_selected_slcs = self.lab_x(selected_uniq_labs) # unique labels in SLCS after subsetting # Since lab_x checks whether selected labels are all existing, # the unique labels of the subset is equivalent to input selected # labels. uniq_labs = lab_selected_slcs._uniq_labs.tolist() # convert labels into indices from 0 to n_classes n_uniq_labs = len(uniq_labs) if n_uniq_labs <= 1: raise ValueError("The number of unique labels should > 1. " "Provided uniq labs:" " {}".format(uniq_labs)) lab_ind_lut = dict(zip(uniq_labs, range(n_uniq_labs))) lab_inds = [lab_ind_lut[lab] for lab in lab_selected_slcs._labs] np.random.seed(random_state) # shuffle features if necessary fids = lab_selected_slcs.fids if shuffle_features: feature_inds = np.arange(lab_selected_slcs._x.shape[1]) feature_inds, fids = sklearn.utils.shuffle( feature_inds, fids) else: feature_inds = slice(None, None) # perform bootstrapping if necessary num_bootstrap_round = int(num_bootstrap_round) if num_bootstrap_round <= 0: # no bootstrapping # _xgb_train_runner returns (fscores, bst, eval_stats) fscores, bst, eval_stats = self._xgb_train_runner( lab_selected_slcs._x[:, feature_inds], lab_inds, fids, test_size=test_size, num_boost_round=num_boost_round, xgb_params=xgb_params, random_state=random_state, nprocs=nprocs, silent=silent) # make sorted_fs_list consistent to the bootstrapped one fs_list = [(t[0], t[1], 0, 1, [t[1]]) for t in fscores] sorted_fs_list = sorted(fs_list, key=lambda t: (t[3], t[1]), reverse=True) print(eval_stats) bst_list = [bst] else: # do bootstrapping # ([dict of scores], [list of bsts], dict of eval stats) fs_dict = defaultdict(list) bst_list = [] eval_stats_dict = defaultdict(list) if bootstrap_size is None: bootstrap_size = lab_selected_slcs._x.shape[0] sample_inds = np.arange(lab_selected_slcs._x.shape[0]) # bootstrapping rounds for i in range(num_bootstrap_round): # random state determined by numpy # ensure all labels present # initialize resample sample_indices and labels bs_s_inds, bs_lab_inds = sklearn.utils.resample( sample_inds, lab_inds, replace=True, n_samples=bootstrap_size) while len(np.unique(bs_lab_inds)) != n_uniq_labs: bs_s_inds, bs_lab_inds = sklearn.utils.resample( sample_inds, lab_inds, replace=True, n_samples=bootstrap_size) fscores, bst, eval_stats = self._xgb_train_runner( lab_selected_slcs._x[bs_s_inds, :][:, feature_inds], bs_lab_inds, fids, test_size=test_size, num_boost_round=num_boost_round, xgb_params=xgb_params, random_state=random_state, nprocs=nprocs, silent=silent) # Sum fscores for fid, fs in fscores: fs_dict[fid] += [fs] bst_list.append(bst) # est: eval stats tuple # [ [('train...', float), ...], # [('test...', float), ...] ] for elist in eval_stats: for ename, evalue in elist: eval_stats_dict[ename].append(evalue) if shuffle_features: feature_inds, fids = sklearn.utils.shuffle( feature_inds, fids) # score summary: average, std, times showed up fid_s_list = [] for fid, fs in fs_dict.items(): fid_s_list.append((fid, np.mean(fs), np.std(fs, ddof=0), len(fs), fs)) sorted_fs_list = sorted(fid_s_list, key=lambda t: (t[3], t[1]), reverse=True) # calculate mean +/- std of eval stats for ename, evalue_list in eval_stats_dict.items(): print("{}: mean {}, std {}".format( ename, np.mean(evalue_list), np.std(evalue_list, ddof=1))) # return same things for two branches return sorted_fs_list, bst_list
[docs] def feature_importance_distintuishing_labs(self, selected_labs, test_size=0.3, num_boost_round=10, nprocs=1, random_state=None, silent=1, xgb_params=None, num_bootstrap_round=0, bootstrap_size=None, shuffle_features=False): """ Use xgboost to compare selected labels and others. """ selected_s_bool_inds = self.lab_x_bool_inds(selected_labs) # binary labs distinguishing selected and non-selected io_bin_lab_arr = ["selected" if s else "non-selected" for s in selected_s_bool_inds] # create a new SLCS instance with new labels nl_slcs = self.relabel(io_bin_lab_arr) fi_res = nl_slcs.feature_importance_across_labs( ["selected", "non-selected"], test_size=test_size, num_boost_round=num_boost_round, nprocs=nprocs, random_state=random_state, silent=silent, xgb_params=xgb_params, num_bootstrap_round=num_bootstrap_round, bootstrap_size=bootstrap_size, shuffle_features=shuffle_features) return fi_res
[docs] def feature_importance_each_lab(self, test_size=0.3, num_boost_round=10, nprocs=1, random_state=None, silent=1, xgb_params=None, num_bootstrap_round=0, bootstrap_size=None, shuffle_features=False): """ Use xgboost to compare each label with others. Experimental. """ # Construct important feature lut # {ulab0: [if1, if2, ...], ...} ulab_fi_lut = defaultdict(list) for ulab in self._uniq_labs: # get bool indices of current label ulab_s_bool_inds = self.lab_x_bool_inds(ulab) # compare current label with other samples fi_res = self.feature_importance_distintuishing_labs( ulab, test_size=test_size, num_boost_round=num_boost_round, nprocs=nprocs, random_state=random_state, silent=silent, xgb_params=xgb_params, num_bootstrap_round=num_bootstrap_round, bootstrap_size=bootstrap_size, shuffle_features=shuffle_features) for fid in [t[0] for t in fi_res[0]]: fx = self.f_id_x_vec(fid) # current label values ulab_x = fx[ulab_s_bool_inds] # other values other_x = fx[np.logical_not(ulab_s_bool_inds)] # current lab mean ulab_x_mean = np.mean(ulab_x) # other mean other_x_mean = np.mean(other_x) # mean fold change ulab_mfc = (ulab_x_mean - other_x_mean) / ulab_x_mean # ks test result ks_res = ks_2samp(ulab_x, other_x) ulab_fi_lut[ulab].append((fid, ulab_mfc, ks_res.pvalue)) ulab_fi_lut[ulab].sort(key=lambda t: t[1], reverse=True) ulab_fi_lut[ulab] = [t for t in ulab_fi_lut[ulab]] return ulab_fi_lut
[docs] def tsne_plot(self, gradient=None, labels=None, selected_labels=None, shuffle_label_colors=False, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last t-SNE projection with the provided gradient as color. """ if labels is None: labels = self.labs return super(SingleLabelClassifiedSamples, self).tsne_plot( gradient=gradient, labels=labels, selected_labels=selected_labels, shuffle_label_colors=shuffle_label_colors, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def tsne_feature_gradient_plot(self, fid, transform=None, labels=None, selected_labels=None, shuffle_label_colors=False, title=None, xlab=None, ylab=None, figsize=(20, 20), add_legend=True, n_txt_per_cluster=3, alpha=1, s=0.5, random_state=None, **kwargs): """ Plot the last t-SNE projection with the provided gradient as color. Parameters ---------- fid: feature id scalar ID of the feature to be used for gradient plot. transform: callable Map transform on feature before plotting. """ if labels is None: labels = self.labs return super(SingleLabelClassifiedSamples, self).tsne_feature_gradient_plot( fid=fid, transform=transform, labels=labels, selected_labels=selected_labels, shuffle_label_colors=shuffle_label_colors, title=title, xlab=xlab, ylab=ylab, figsize=figsize, add_legend=add_legend, n_txt_per_cluster=n_txt_per_cluster, alpha=alpha, s=s, random_state=random_state, **kwargs)
[docs] def feature_swarm_plot(self, fid, transform=None, labels=None, selected_labels=None, title=None, xlab=None, ylab=None, figsize=(10, 10)): f_ind = self.f_id_to_ind([fid])[0] fx = self.f_ind_x_vec(f_ind) if transform is not None: if callable(transform): fx = np.array(list(map(transform, fx))) else: raise ValueError("transform must be a callable") if labels is not None and len(labels) != fx.shape[0]: raise ValueError("labels ({}) must have same length as " "n_samples.".format(labels)) else: labels = self.labs return swarm(fx, labels=labels, selected_labels=selected_labels, title=title, xlab=xlab, ylab=ylab, figsize=figsize)
[docs] def dmat_heatmap(self, selected_labels=None, col_labels=None, transform=None, title=None, xlab=None, ylab=None, figsize=(10, 10), **kwargs): """ Plot distance matrix with rows colored by current labels. """ selected_s_bool_inds = self.lab_x_bool_inds(selected_labels) selected_labels = self._labs[selected_s_bool_inds].tolist() selected_d = self._d[selected_s_bool_inds, :][:, selected_s_bool_inds] return heatmap(selected_d, row_labels=selected_labels, col_labels=col_labels, transform=transform, title=title, xlab=xlab, ylab=ylab, figsize=figsize, **kwargs)
[docs] def xmat_heatmap(self, selected_labels=None, selected_fids=None, col_labels=None, transform=None, title=None, xlab=None, ylab=None, figsize=(10, 10), **kwargs): """ Plot x as heatmap. """ selected_s_bool_inds = self.lab_x_bool_inds(selected_labels) selected_s_ids = self._sids[selected_s_bool_inds] selected_slcs = self.id_x(selected_s_ids, selected_fids) return heatmap(selected_slcs._x, row_labels=selected_slcs.labs, col_labels=col_labels, transform=transform, title=title, xlab=xlab, ylab=ylab, figsize=figsize, **kwargs)
@property def labs(self): return self._labs.tolist() # Sort the clustered sample_ids with the reference order of another. # # Sort sids according to labs # If ref_sid_order is not None: # sort sids further according to ref_sid_order
[docs] def lab_sorted_sids(self, ref_sid_order=None): sep_lab_sid_list = [] sep_lab_list = [] for iter_lab in sorted(self._sid_lut.keys()): iter_sid_arr = self._sid_lut[iter_lab] sep_lab_sid_list.append(iter_sid_arr) sep_lab_list.append(np.repeat(iter_lab, len(iter_sid_arr))) if ref_sid_order is not None: mtype.check_is_valid_sfids(ref_sid_order) ref_sid_order = np.array(ref_sid_order) # sort r according to q # assumes: # - r contains all elements in q # - r is 1d np array def sort_flat_sids(query_sids, ref_sids): return ref_sids[np.in1d(ref_sids, query_sids)] # sort inner sid list but maintains the order as sep_lab_list sep_lab_sid_list = [sort_flat_sids(x, ref_sid_order) for x in sep_lab_sid_list] sep_lab_min_sid_list = [x[0] for x in sep_lab_sid_list] sorted_sep_lab_min_sid_list = list( sort_flat_sids(sep_lab_min_sid_list, ref_sid_order)) min_sid_sorted_sep_lab_ind_list = [ sep_lab_min_sid_list.index(x) for x in sorted_sep_lab_min_sid_list ] sep_lab_list = [sep_lab_list[i] for i in min_sid_sorted_sep_lab_ind_list] sep_lab_sid_list = [sep_lab_sid_list[i] for i in min_sid_sorted_sep_lab_ind_list] lab_sorted_sid_arr = np.concatenate(sep_lab_sid_list) lab_sorted_lab_arr = np.concatenate(sep_lab_list) # check sorted sids are the same set as original np.testing.assert_array_equal( np.sort(lab_sorted_sid_arr), np.sort(self._sids)) # check sorted labs are the same set as original np.testing.assert_array_equal( np.sort(lab_sorted_lab_arr), np.sort(self._labs)) # check sorted (sid, lab) matchings are the same set as original np.testing.assert_array_equal( lab_sorted_lab_arr[np.argsort(lab_sorted_sid_arr)], self._labs[np.argsort(self._sids)]) return (lab_sorted_sid_arr, lab_sorted_lab_arr)
# See how two clustering criteria match with each other. # When given q_slc_samples is not None, sids and labs are ignored. # When q_slc_samples is None, sids and labs must be provided
[docs] def cross_labs(self, q_slc_samples): if not isinstance(q_slc_samples, SingleLabelClassifiedSamples): raise TypeError("Query should be an instance of " "SingleLabelClassifiedSamples") try: ref_labs = np.array([self._lab_lut[x] for x in q_slc_samples.sids]) except KeyError as e: raise ValueError("query sid {} is not in ref sids.".format(e)) query_labs = np.array(q_slc_samples.labs) uniq_rlabs, uniq_rlab_cnts = np.unique(ref_labs, return_counts=True) cross_lab_lut = {} for i in range(len(uniq_rlabs)): # ref cluster i. query unique labs. ref_ci_quniq = tuple(map(list, np.unique( query_labs[np.where(np.array(ref_labs) == uniq_rlabs[i])], return_counts=True))) cross_lab_lut[uniq_rlabs[i]] = (uniq_rlab_cnts[i], tuple(map(tuple, ref_ci_quniq))) return cross_lab_lut
[docs]class MDLSingleLabelClassifiedSamples(SingleLabelClassifiedSamples): """ MDLSingleLabelClassifiedSamples inherits SingleLabelClassifiedSamples to offer MDL operations. Args: x (2d number array): data matrix labs (list of str or int): labels sids (list of str or int): sample ids fids (list of str or int): feature ids encode_type ("auto", "data", or "distance"): Type of values to encode. If "auto", encode data when n_features <= 100. mdl_method (mdl.Mdl): If None, use ZeroIGKdeMdl for encoded values with >= 50% zeros, and use GKdeMdl otherwise. d (2d number array): distance matrix metric (str): distance metric for scipy nprocs (int) Attributes: _mdl_method (.mdl.Mdl) """ LabMdlResult = namedtuple("LabMdlResult", ["ulab_mdl_sum", "ulab_s_inds", "ulab_cnts", "ulab_mdls", "cluster_mdl"]) def __init__(self, x, labs, sids=None, fids=None, encode_type="data", mdl_method=mdl.ZeroIGKdeMdl, d=None, metric="correlation", nprocs=None): super(MDLSingleLabelClassifiedSamples, self).__init__( x=x, labs=labs, sids=sids, fids=fids, d=d, metric=metric, nprocs=nprocs) # initialize encode type if encode_type not in ("auto", "data", "distance"): raise ValueError("encode_type must in " "('auto', 'data', 'distance')." "Provided: {}".format(encode_type)) if encode_type == "auto": if self._x.shape[1] > 100: encode_type = "distance" else: encode_type = "data" self._encode_type = encode_type # initialize mdl method if mdl_method is None: if encode_type == "data": ex = self._x else: ex = self._d if ex.size == 0: # empty matrix mdl_method = mdl.GKdeMdl else: n_nonzero = np.count_nonzero(ex) if n_nonzero / ex.size > 0.5: mdl_method = mdl.GKdeMdl else: mdl_method = mdl.ZeroIGKdeMdl self._mdl_method = mdl_method
[docs] @staticmethod def per_col_encoders(x, encode_type, mdl_method=mdl.ZeroIGKdeMdl, nprocs=1, verbose=False): """Compute mdl encoder for each column Args: x (2d number array) encode_type ("data" or "distance") mdl_method (mdl.Mdl) nprocs (int) verbose (bool) Returns: :obj: list of column mdl encoders of x """ # verbose is not implemented if not inspect.isclass(mdl_method): raise ValueError("method must be a subclass of eda.mdl.Mdl") if not issubclass(mdl_method, mdl.Mdl): raise ValueError("method must be a subclass of eda.mdl.Mdl") if x.ndim != 2: raise ValueError("x should be 2D. x.shape: {}".format(x.shape)) nprocs = max(int(nprocs), 1) if encode_type == "data": col_encoders = utils.parmap( lambda x1d: mdl_method(x1d), x.T, nprocs) elif encode_type == "distance": # distance s_inds = list(range(x.shape[0])) xs_for_map = [x[s_inds[i], s_inds[:i] + s_inds[i+1:]] for i in s_inds] def single_s_mdl_encoder(x1d): # copy indices for parallel processing return mdl_method(x1d) col_encoders = utils.parmap( single_s_mdl_encoder, xs_for_map, nprocs) else: raise NotImplementedError("unknown encode_type: " "{}".format(encode_type)) return col_encoders
[docs] def no_lab_mdl(self, nprocs=1, verbose=False): """Compute mdl of each feature without separating samples by labels Args: nprocs (int) verbose (bool): Not implemented Returns: float: mdl of matrix without separating samples by labels """ # TODO: implement verbose if self._encode_type == "data": col_encoders = self.per_col_encoders(self._x, self._encode_type, self._mdl_method, nprocs, verbose) col_mdl_sum = np.sum(list(map(lambda e: e.mdl, col_encoders))) elif self._encode_type == "distance": col_encoders = self.per_col_encoders(self._d, self._encode_type, self._mdl_method, nprocs, verbose) col_mdl_sum = 0 ulab_s_ind_list = [np.where(self._labs == ulab)[0].tolist() for ulab in self._uniq_labs] for s_inds in ulab_s_ind_list: n_s_inds = len(s_inds) rn_inds = list(range(n_s_inds)) # [(encoder, x), ...] enc_x_tups = [] for i in rn_inds: i_s_ind = s_inds[i] non_i_s_inds = s_inds[:i] + s_inds[i+1:] enc_x_tups.append((col_encoders[i_s_ind], self._d[non_i_s_inds, i_s_ind])) mdls = utils.parmap(lambda ext: ext[0].encode(ext[1]), enc_x_tups, nprocs) col_mdl_sum += sum(mdls) else: raise NotImplementedError("Do not change encode_type after init. " "Unknown encode type " "{}".format(self._encode_type)) return col_mdl_sum
[docs] @staticmethod def compute_cluster_mdl(labs, cl_mdl_scale_factor=1): """Additional MDL for encoding the cluster - labels are encoded by multinomial distribution - parameters are encoded by 32bit float np.log(2**32) = 22.18070977791825 - scaled by factor TODO: formalize parameter mdl """ uniq_labs, uniq_lab_cnts = np.unique(labs, return_counts=True) n_uniq_labs = len(uniq_lab_cnts) # make a flat list of labels int_labs = list(itertools.chain.from_iterable( [[i]*uniq_lab_cnts[i] for i in range(n_uniq_labs)])) mn_mdl = mdl.MultinomialMdl(int_labs).mdl param_mdl = 22.18070977791825 * n_uniq_labs return (mn_mdl + param_mdl) * cl_mdl_scale_factor
[docs] def lab_mdl(self, cl_mdl_scale_factor=1, nprocs=1, verbose=False, ret_internal=False): """Compute mdl of each feature after separating samples by labels Args: cl_mdl_scale_factor (float): multiplies cluster related mdl by this number nprocs (int) verbose (bool): Not implemented Returns: float: mdl of matrix after separating sampels by labels """ # compute cluster label overhead mdl cluster_mdl = self.compute_cluster_mdl( self._labs, cl_mdl_scale_factor=cl_mdl_scale_factor) # compute mdl for data points n_uniq_labs = self._uniq_labs.shape[0] ulab_s_ind_list = [np.where(self._labs == ulab)[0].tolist() for ulab in self._uniq_labs] # summarize mdls of all clusters if self._encode_type == "distance": ulab_mdls = [MDLSingleLabelClassifiedSamples( self._x[s_inds], labs=[0]*len(s_inds), encode_type=self._encode_type, mdl_method=self._mdl_method, d=self._d[s_inds][:, s_inds], metric=self._metric, nprocs=self._nprocs).no_lab_mdl() for s_inds in ulab_s_ind_list] elif self._encode_type == "data": # do not pass d ulab_mdls = [MDLSingleLabelClassifiedSamples( self._x[s_inds], labs=[0]*len(s_inds), encode_type=self._encode_type, mdl_method=self._mdl_method, metric=self._metric, nprocs=self._nprocs).no_lab_mdl() for s_inds in ulab_s_ind_list] else: raise NotImplementedError("Do not change encode_type after init. " "Unknown encode type " "{}".format(self._encode_type)) # add cluster overhead mdl to each cluster ulab_cnt_ratios = self._uniq_lab_cnts / np.int_(self._x.shape[0]) ulab_cl_mdls = [ulab_mdls[i] + cluster_mdl * ulab_cnt_ratios[i] for i in range(n_uniq_labs)] ulab_mdl_sum = np.sum(ulab_cl_mdls) lab_mdl_res = self.LabMdlResult(ulab_mdl_sum, ulab_s_ind_list, self._uniq_lab_cnts.tolist(), ulab_cl_mdls, cluster_mdl) if ret_internal: return lab_mdl_res, ulab_mdls else: return lab_mdl_res
[docs] def encode(self, qx, col_summary_func=sum, non_zero_only=False, nprocs=1, verbose=False): """Encode input array qx with fitted code without label Args: qx (2d np number array) col_summary_func (callable): function applied on column mdls non_zero_only (bool): whether to encode non-zero entries only nprocs (int) verbose (bool) Returns: float: mdl for encoding qx """ if not callable(col_summary_func): raise ValueError("col_summary_func must be callable") if self._encode_type == "data": ex = self._x elif self._encode_type == "distance": ex = self._d else: raise NotImplementedError("Do not change encode_type after init. " "Unknown encode type " "{}".format(self._encode_type)) if qx.ndim != 2 or qx.shape[1] != ex.shape[1]: raise ValueError("Array to encode should have the same number of" "columns as the encoded x") col_encoders = self.per_col_encoders( ex, self._encode_type, self._mdl_method, nprocs, verbose=verbose) ncols = ex.shape[1] q_x_cols = [] for i in range(ncols): x_col = qx[:, i] # mdl_method is valid after running per_col_encoders if non_zero_only: q_x_cols.append(x_col[np.nonzero(x_col)]) else: # all mdl methods here are valid # this branch is GKdeMdl q_x_cols.append(x_col) # (mdl, qx) tuple mdl_qxcol_tups = list(zip(col_encoders, q_x_cols)) encode_q_col_mdls = utils.parmap( lambda ext: ext[0].encode(ext[1]), mdl_qxcol_tups, nprocs=nprocs) encode_x_mdl = col_summary_func(encode_q_col_mdls) return encode_x_mdl