Source code for scedar.eda.sfm

import numpy as np

import scipy.sparse as spsp

from sklearn.preprocessing import StandardScaler

from scedar import utils

from scedar.eda.plot import regression_scatter
from scedar.eda.plot import hist_dens_plot
from scedar.eda import mtype
from scedar.eda import stats


[docs]class SampleFeatureMatrix(object): """ SampleFeatureMatrix is a (n_samples, n_features) matrix. In this package, we are only interested in float features as measured expression levels. Parameters ---------- x : {array-like, sparse matrix} data matrix (n_samples, n_features) sids : homogenous list of int or string sample ids. Should not contain duplicated elements. fids : homogenous list of int or string feature ids. Should not contain duplicated elements. Attributes ---------- _x : {array-like, sparse matrix} data matrix (n_samples, n_features) _is_sparse: boolean whether the data matrix is sparse matrix or not _sids : ndarray sample ids. _fids : ndarray sample ids. """ def __init__(self, x, sids=None, fids=None): super(SampleFeatureMatrix, self).__init__() if x is None: raise ValueError("x cannot be None") else: if spsp.issparse(x): x = spsp.csr_matrix(x, dtype="float64") else: try: x = np.array(x, copy=False, dtype="float64") except ValueError as e: raise ValueError("Features must be float. {}".format(e)) if x.ndim != 2: raise ValueError("x has shape (n_samples, n_features)") if sids is None: sids = list(range(x.shape[0])) else: mtype.check_is_valid_sfids(sids) if len(sids) != x.shape[0]: raise ValueError("x has shape (n_samples, n_features)") if fids is None: fids = list(range(x.shape[1])) else: mtype.check_is_valid_sfids(fids) if len(fids) != x.shape[1]: raise ValueError("x has shape (n_samples, n_features)") self._x = x self._sids = np.array(sids) self._fids = np.array(fids)
[docs] def s_id_to_ind(self, selected_sids): """ Convert a list of sample IDs into sample indices. """ sid_list = self.sids return [sid_list.index(i) for i in selected_sids]
[docs] def f_id_to_ind(self, selected_fids): """ Convert a list of feature IDs into feature indices. """ fid_list = self.fids return [fid_list.index(i) for i in selected_fids]
[docs] def ind_x(self, selected_s_inds=None, selected_f_inds=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_s_inds: int array Index array of selected samples. If is None, select all. selected_f_inds: int array Index array of selected features. If is None, select all. Returns ------- subset: SampleFeatureMatrix """ if selected_s_inds is None: selected_s_inds = slice(None, None) if selected_f_inds is None: selected_f_inds = slice(None, None) return SampleFeatureMatrix( x=self._x[selected_s_inds, :][:, selected_f_inds].copy(), sids=self._sids[selected_s_inds].tolist(), fids=self._fids[selected_f_inds].tolist())
[docs] def id_x(self, selected_sids=None, selected_fids=None): """ Subset samples by (sample IDs, feature IDs). Parameters ---------- selected_sids: id array ID array of selected samples. If is None, select all. selected_fids: id array ID array of selected features. If is None, select all. Returns ------- subset: SampleFeatureMatrix """ if selected_sids is None: selected_s_inds = None else: selected_s_inds = self.s_id_to_ind(selected_sids) if selected_fids is None: selected_f_inds = None else: selected_f_inds = self.f_id_to_ind(selected_fids) return self.ind_x(selected_s_inds, selected_f_inds)
[docs] @staticmethod def filter_1d_inds(f, x): # f_inds filtered index if f is None: f_inds = slice(None, None) else: if callable(f): f_inds = [f(ix) for ix in x] else: f_inds = f return f_inds
[docs] def s_ind_x_pair(self, xs_ind, ys_ind, feature_filter=None): x = self._x[xs_ind, :] y = self._x[ys_ind, :] if self._is_sparse: x = x.todense().A1 y = y.todense().A1 if callable(feature_filter): f_inds = self.filter_1d_inds( lambda pair: feature_filter(pair[0], pair[1]), zip(x, y)) else: f_inds = self.filter_1d_inds(feature_filter, zip(x, y)) xf = x[f_inds] yf = y[f_inds] return xf, yf
[docs] def s_ind_regression_scatter(self, xs_ind, ys_ind, feature_filter=None, xlab=None, ylab=None, title=None, **kwargs): """ Regression plot on two samples with xs_ind and ys_ind. Parameters ---------- xs_ind: int Sample index of x. ys_ind: int Sample index of y. feature_filter: bool array, or int array, or callable(x, y) If feature_filter is bool / int array, directly select features with it. If feature_filter is callable, it will be applied on each (x, y) value tuple. xlab: str ylab: str title: str """ xf, yf = self.s_ind_x_pair(xs_ind, ys_ind, feature_filter) if xlab is None: xlab = self._sids[xs_ind] if ylab is None: ylab = self._sids[ys_ind] return regression_scatter(x=xf, y=yf, xlab=xlab, ylab=ylab, title=title, **kwargs)
[docs] def s_id_regression_scatter(self, xs_id, ys_id, feature_filter=None, xlab=None, ylab=None, title=None, **kwargs): """ Regression plot on two samples with xs_id and ys_id. Parameters ---------- xs_ind: int Sample ID of x. ys_ind: int Sample ID of y. feature_filter: bool array, or int array, or callable(x, y) If feature_filter is bool / int array, directly select features with it. If feature_filter is callable, it will be applied on each (x, y) value tuple. xlab: str ylab: str title: str """ xs_ind, ys_ind = self.s_id_to_ind([xs_id, ys_id]) return self.s_ind_regression_scatter(xs_ind, ys_ind, feature_filter=feature_filter, xlab=xlab, ylab=ylab, title=title, **kwargs)
[docs] def f_ind_x_pair(self, xf_ind, yf_ind, sample_filter=None): x = self._x[:, xf_ind] y = self._x[:, yf_ind] if self._is_sparse: x = x.todense().A1 y = y.todense().A1 if callable(sample_filter): s_inds = self.filter_1d_inds( lambda pair: sample_filter(pair[0], pair[1]), zip(x, y)) else: s_inds = self.filter_1d_inds(sample_filter, zip(x, y)) xf = x[s_inds] yf = y[s_inds] return (xf, yf)
[docs] def f_ind_regression_scatter(self, xf_ind, yf_ind, sample_filter=None, xlab=None, ylab=None, title=None, **kwargs): """ Regression plot on two features with xf_ind and yf_ind. Parameters ---------- xf_ind: int Sample index of x. yf_ind: int Sample index of y. sample_filter: bool array, or int array, or callable(x, y) If sample_filter is bool / int array, directly select features with it. If sample_filter is callable, it will be applied on each (x, y) value tuple. xlab: str ylab: str title: str """ xf, yf = self.f_ind_x_pair(xf_ind, yf_ind, sample_filter) if xlab is None: xlab = self._fids[xf_ind] if ylab is None: ylab = self._fids[yf_ind] return regression_scatter(x=xf, y=yf, xlab=xlab, ylab=ylab, title=title, **kwargs)
[docs] def f_id_regression_scatter(self, xf_id, yf_id, sample_filter=None, xlab=None, ylab=None, title=None, **kwargs): """ Regression plot on two features with xf_id and yf_id. Parameters ---------- xf_id: int Sample ID of x. yf_ind: int Sample ID of y. sample_filter: bool array, or int array, or callable(x, y) If sample_filter is bool / int array, directly select features with it. If sample_filter is callable, it will be applied on each (x, y) value tuple. xlab: str ylab: str title: str """ xf_ind, yf_ind = self.f_id_to_ind([xf_id, yf_id]) return self.f_ind_regression_scatter(xf_ind, yf_ind, sample_filter=sample_filter, xlab=xlab, ylab=ylab, title=title, **kwargs)
[docs] def s_ind_x_vec(self, s_ind, feature_filter=None): """ Access a single vector of a sample. """ x = self._x[s_ind, :] if self._is_sparse: x = x.todense().A1 f_inds = self.filter_1d_inds(feature_filter, x) xf = x[f_inds] return xf
[docs] def s_ind_dist(self, s_ind, feature_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): xf = self.s_ind_x_vec(s_ind, feature_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_id_dist(self, s_id, feature_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): s_ind = self.s_id_to_ind([s_id])[0] return self.s_ind_dist(s_ind, feature_filter=feature_filter, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_ind_x_vec(self, f_ind, sample_filter=None, transform=None): """ Access a single vector of a sample. """ x = self._x[:, f_ind] if self._is_sparse: x = x.todense().A1 s_inds = self.filter_1d_inds(sample_filter, x) xf = x[s_inds] if transform is not None: if callable(transform): xf = np.array(list(map(transform, xf))) else: raise ValueError("transform must be a callable") return xf
[docs] def f_id_x_vec(self, f_id, sample_filter=None): f_ind = self.f_id_to_ind([f_id])[0] return self.f_ind_x_vec(f_ind, sample_filter=sample_filter)
[docs] def f_ind_dist(self, f_ind, sample_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): xf = self.f_ind_x_vec(f_ind, sample_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_id_dist(self, f_id, sample_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): f_ind = self.f_id_to_ind([f_id])[0] return self.f_ind_dist(f_ind, sample_filter=sample_filter, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_sum(self, f_sum_filter=None): """ For each sample, compute the sum of all features. Returns ------- rowsum: float array (filtered_n_samples,) """ rowsum = self._x.sum(axis=1) if self._is_sparse: rowsum = rowsum.A1 s_inds = self.filter_1d_inds(f_sum_filter, rowsum) rowsumf = rowsum[s_inds] return rowsumf
[docs] def f_sum_dist(self, f_sum_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the feature sum of each sample, (n_samples,). """ xf = self.f_sum(f_sum_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_sum(self, s_sum_filter=None): """ For each feature, computer the sum of all samples. Returns ------- xf: float array (filtered_n_features,) """ colsum = self._x.sum(axis=0) if self._is_sparse: colsum = colsum.A1 f_inds = self.filter_1d_inds(s_sum_filter, colsum) colsumf = colsum[f_inds] return colsumf
[docs] def s_sum_dist(self, s_sum_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the sample sum of each feature, (n_features,). """ xf = self.s_sum(s_sum_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_cv(self, f_cv_filter=None): """ For each sample, compute the coefficient of variation of all features. Returns ------- xf: float array (filtered_n_samples,) """ if self._x.shape[1] == 0: return np.repeat(np.nan, self._x.shape[0]) ss = StandardScaler(with_mean=False).fit(self._x.T) n_fts = self._x.shape[1] rowsd = np.sqrt(ss.var_ * (n_fts / (n_fts - 1))) rowmean = ss.mean_ rowcv = rowsd / rowmean s_inds = self.filter_1d_inds(f_cv_filter, rowcv) rowcvf = rowcv[s_inds] return rowcvf
[docs] def f_cv_dist(self, f_cv_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the feature sum of each sample, (n_samples,). """ xf = self.f_cv(f_cv_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_cv(self, s_cv_filter=None): """ For each feature, compute the coefficient of variation of all samples. Returns ------- xf: float array (n_features,) """ if self._x.shape[1] == 0: return np.array([]) ss = StandardScaler(with_mean=False).fit(self._x) n_sps = self._x.shape[0] colsd = np.sqrt(ss.var_ * (n_sps / (n_sps - 1))) colmean = ss.mean_ colcv = colsd / colmean f_inds = self.filter_1d_inds(s_cv_filter, colcv) colcvf = colcv[f_inds] return colcvf
[docs] def s_cv_dist(self, s_cv_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the sample coefficient of variation of each feature, (n_features,). """ xf = self.s_cv(s_cv_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_n_above_threshold(self, closed_threshold): """ For each sample, compute the number of features above a closed threshold. """ row_ath_sum = (self._x >= closed_threshold).sum(axis=1) if self._is_sparse: row_ath_sum = row_ath_sum.A1 return row_ath_sum
[docs] def f_n_above_threshold_dist(self, closed_threshold, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the the number of above threshold samples of each feature, (n_features,). """ xf = self.f_n_above_threshold(closed_threshold) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_n_above_threshold(self, closed_threshold): """ For each feature, compute the number of samples above a closed threshold. """ col_ath_sum = (self._x >= closed_threshold).sum(axis=0) if self._is_sparse: col_ath_sum = col_ath_sum.A1 return col_ath_sum
[docs] def s_n_above_threshold_dist(self, closed_threshold, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the the number of above threshold samples of each feature, (n_features,). """ xf = self.s_n_above_threshold(closed_threshold) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def f_gc(self, f_gc_filter=None): """ For each sample, compute the Gini coefficients of all features. Returns ------- xf: float array (filtered_n_samples,) """ rowgc = [] for i in range(self._x.shape[0]): if self._is_sparse: i_x = self._x[i, :].todense().A1 else: i_x = self._x[i, :] rowgc.append(stats.gc1d(i_x)) rowgc = np.array(rowgc) s_inds = self.filter_1d_inds(f_gc_filter, rowgc) rowgcf = rowgc[s_inds] return rowgcf
[docs] def f_gc_dist(self, f_gc_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the feature Gini coefficient of each sample, (n_samples,). """ xf = self.f_gc(f_gc_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
[docs] def s_gc(self, s_gc_filter=None): """ For each feature, compute the Gini coefficient of all samples. Returns ------- xf: float array (n_features,) """ colgc = [] for i in range(self._x.shape[1]): if self._is_sparse: i_x = self._x[:, i].todense().A1 else: i_x = self._x[:, i] colgc.append(stats.gc1d(i_x)) colgc = np.array(colgc) f_inds = self.filter_1d_inds(s_gc_filter, colgc) colgcf = colgc[f_inds] return colgcf
[docs] def s_gc_dist(self, s_gc_filter=None, xlab=None, ylab=None, title=None, figsize=(5, 5), ax=None, **kwargs): """ Plot the distribution of the sample Gini coefficients of each feature, (n_features,). """ xf = self.s_gc(s_gc_filter) return hist_dens_plot(xf, title=title, xlab=xlab, ylab=ylab, figsize=figsize, ax=ax, **kwargs)
@property def sids(self): return self._sids.tolist() @property def fids(self): return self._fids.tolist() @property def x(self): if self._is_sparse: return self._x.copy() else: return self._x.tolist() @property def _is_sparse(self): return spsp.issparse(self._x)