Source code for scedar.cluster.community_mirac
import numpy as np
from collections import defaultdict
from scedar.cluster import MIRAC
from scedar.cluster import Community
[docs]class CommunityMIRAC(object):
"""
CommunityMIRAC: Community + MIRAC clustering
Run community clustering with high resolution to get a large number of
clusters. Then, run MIRAC on the community clusters.
Args
----
x : float array
Data matrix.
d : float array
Distance matrix.
sids : sid list
List of sample ids.
fids : fid list
List of feature ids.
nprocs : int > 0
The number of processes/cores used for community clustering.
verbose : bool
Print progress or not.
Attributes
----------
_x : float array
Data matrix.
_d : float array
Distance matrix.
_sids : sid list
List of sample ids.
_fids : fid list
List of feature ids.
_nprocs : int > 0
The number of processes/cores used for community clustering.
_verbose : bool
Print progress or not.
_cm_res : cluster.Community
Community clustering result.
_cm_clp_x : array
Data array with samples collapsed by community clustering labels.
For each cluster, the mean of all samples is a row in this array.
_mirac_res : cluster.MIRAC
MIRAC clustering results on _cm_clp_x
labs : list
list of labels
"""
def __init__(self, x, d=None, sids=None, fids=None,
nprocs=1, verbose=False):
super().__init__()
self._x = x
self._d = d
self._sids = sids
self._fids = fids
self._nprocs = nprocs
self._verbose = verbose
self._cm_res = None
self._cm_clp_x = None
self._mirac_res = None
self._labs = None
[docs] def run_community(self, graph=None, metric="cosine",
use_pdist=False, k=15, use_pca=True, use_hnsw=True,
index_params=None, query_params=None, aff_scale=1,
partition_method="RBConfigurationVertexPartition",
resolution=100, random_state=None, n_iter=2,
nprocs=None):
if nprocs is None:
nprocs = self._nprocs
self._cm_res = Community(x=self._x, d=self._d, graph=graph,
metric=metric, sids=self._sids,
fids=self._fids,
use_pdist=use_pdist, k=k, use_pca=use_pca,
use_hnsw=use_hnsw, index_params=index_params,
query_params=query_params,
aff_scale=aff_scale,
partition_method=partition_method,
resolution=resolution,
random_state=random_state,
n_iter=n_iter, nprocs=nprocs,
verbose=self._verbose)
if self._verbose:
print("Community cluster: {}".format(
self._cm_res._la_res.summary()))
self._cm_clp_x = self.collapse_clusters(self._x, self._cm_res.labs)
[docs] def run_mirac(self, metric="cosine", hac_tree=None, cl_mdl_scale_factor=1,
min_cl_n=25, encode_type="auto", mdl_method=None,
min_split_mdl_red_ratio=0.2, soft_min_subtree_size=1,
linkage="complete", optimal_ordering=False,
dim_reduct_method=None, nprocs=None):
if self._cm_clp_x is None:
raise ValueError("Need to run community clustering first.")
if nprocs is None:
nprocs = self._nprocs
self._mirac_res = MIRAC(
self._cm_clp_x, metric=metric,
sids=self._sids, fids=self._fids,
hac_tree=hac_tree, nprocs=nprocs,
cl_mdl_scale_factor=cl_mdl_scale_factor,
min_cl_n=min_cl_n, encode_type=encode_type,
mdl_method=mdl_method,
min_split_mdl_red_ratio=min_split_mdl_red_ratio,
soft_min_subtree_size=soft_min_subtree_size,
linkage=linkage, optimal_ordering=optimal_ordering,
dim_reduct_method=dim_reduct_method,
verbose=self._verbose)
self._merge_labels()
def _merge_labels(self):
l1_cm_labs = self._cm_res.labs
l2_mirac_labs = self._mirac_res.labs
self._labs = [l2_mirac_labs[i] for i in l1_cm_labs]
[docs] def tune_mirac(self, cl_mdl_scale_factor=1, min_cl_n=25,
min_split_mdl_red_ratio=0.2,
soft_min_subtree_size=1, verbose=False):
if self._mirac_res is None:
raise ValueError("Need to run MIRAC first.")
self._mirac_res.tune_parameters(cl_mdl_scale_factor, min_cl_n,
min_split_mdl_red_ratio,
soft_min_subtree_size, verbose)
self._merge_labels()
[docs] def run(self, graph=None, metric="cosine",
use_pdist=False, k=15, use_pca=True, use_hnsw=True,
index_params=None, query_params=None, aff_scale=1,
partition_method="RBConfigurationVertexPartition",
resolution=100, random_state=None, n_iter=2,
hac_tree=None, cl_mdl_scale_factor=1,
min_cl_n=25, encode_type="auto", mdl_method=None,
min_split_mdl_red_ratio=0.2,
soft_min_subtree_size=1,
linkage="complete", optimal_ordering=False, nprocs=None):
self.run_community(
graph=graph, metric=metric,
use_pdist=use_pdist, k=k, use_pca=use_pca, use_hnsw=use_hnsw,
index_params=index_params, query_params=query_params,
aff_scale=aff_scale,
partition_method=partition_method, nprocs=nprocs,
resolution=resolution, random_state=random_state, n_iter=n_iter)
self.run_mirac(
metric=metric, hac_tree=hac_tree,
cl_mdl_scale_factor=cl_mdl_scale_factor,
min_cl_n=min_cl_n, encode_type=encode_type,
mdl_method=mdl_method,
min_split_mdl_red_ratio=min_split_mdl_red_ratio,
soft_min_subtree_size=soft_min_subtree_size, nprocs=nprocs,
linkage=linkage, optimal_ordering=optimal_ordering)
[docs] @staticmethod
def collapse_clusters(data_x, cluster_labs):
uniq_labs = sorted(set(cluster_labs))
if uniq_labs != list(range(len(uniq_labs))):
raise ValueError("labels must be integers from 0 to the number"
"of clusters. There should be no missing ones.")
cl_lab_sinds_lut = defaultdict(list)
for s_ind, s_lab in enumerate(cluster_labs):
cl_lab_sinds_lut[s_lab].append(s_ind)
cl_stat_vecs = []
# need to be sorted
for lab in uniq_labs:
s_inds = cl_lab_sinds_lut[lab]
s_inds_x_mean = data_x[s_inds, :].mean(axis=0)
if s_inds_x_mean.ndim == 2:
# matrix behavior
assert s_inds_x_mean.shape[0] == 1
s_inds_x_mean = s_inds_x_mean.A1
cl_stat_vecs.append(s_inds_x_mean)
return np.vstack(cl_stat_vecs)
@property
def labs(self):
return self._labs.copy()