Source code for mdance.cluster.equal

import warnings

import numpy as np
from sklearn.cluster import kmeans_plusplus, MiniBatchKMeans
from sklearn.metrics import calinski_harabasz_score, davies_bouldin_score

from mdance.tools.bts import align_traj
from mdance.tools.bts import calculate_comp_sim
from mdance.tools.bts import calculate_medoid
from mdance.tools.bts import diversity_selection
from mdance.tools.bts import extended_comparison


[docs]class ExtendedQuality: """ Extended quality clustering algorithm is an extension of the radial threshold algorithm. It grows clusters from seeds and can rejects low density clusters. Parameters ---------- data : array-like of shape (n_samples, n_features) A feature array. metric : str, default='MSD' The metric to when calculating distance between *n* objects in an array. It must be an options allowed by :func:`mdance.tools.bts.extended_comparison`. N_atoms : int Number of atoms in the system used for normalization. ``N_atoms=1`` for non-Molecular Dynamics datasets. threshold : float The distance between the seed of the subcluster and a new sample should be lesser than the threshold. n_seeds : {float, int} Number of seeds to be used per iteration. Default is 1. float: Real number between (0, 1). Indicates the % of the total data. int: Number of seeds. seed_method : {'comp_sim', 'greedy', 'medoid', 'mini_batch_kmeans', 'vanilla'} Method used to select the initial seeds. check_sim : bool, default False If True, validates the proposed cluster against a similarity threshold to ensure it meets acceptable criteria. reject_lowd : bool, default False If True, will reject low density clusters if they are below the minimum cluster size. align_method: {'uni', 'kron', None}, optional Alignment method used for the data. Default is None, which means no alignment. 'uni' is a uniform alignment method. 'kron' is a Kronecker alignment method. percentage : int, default=10 Percentage of the dataset to be used for the initial selection of the initial centers. (**kwargs) sim_threshold : float The largest similarity value that is acceptable for the proposed cluster. (**kwargs) min_samples : {float, int}, default=10 Minimum number of data points required in a cluster. (**kwargs) float: Real number between (0, 1). Indicates the % of the total data. int: Number of data points. """ def __init__(self, data, threshold, metric, N_atoms, seed_method='greedy', n_seeds=1, check_sim=False, reject_lowd=True, **kwargs): self.data = data self.threshold = threshold self.metric = metric self.N_atoms = N_atoms self.seed_method = seed_method self.n_seeds = n_seeds self.check_sim = check_sim self.reject_lowd = reject_lowd self.n_objects = len(self.data) # Alignment method criteria self.align_method = kwargs.get('align_method', None) self.align_method = self._check_align_method() # Percentage criteria if self.seed_method == 'comp_sim': self.percentage = kwargs.get('percentage', 10) self.percentage = self._check_percentage() # Number of seeds criteria self.n_seeds = self._check_n_seeds() # sim_threshold criteria if self.check_sim: self.sim_threshold = kwargs.get('sim_threshold') self.sim_threshold = self._check_sim_threshold() # Minimum cluster size criteria if self.reject_lowd: self.min_samples = kwargs.get('min_samples', 10) self.min_samples = self._check_min_samples() def _check_align_method(self): """ Checks the ``align_method`` attribute. Raises ------ ValueError If align_method is not one of the following: [``uni``, ``kron``, ``None``]. Returns ------- str Alignment method. """ if self.align_method not in ['uni', 'kron', None]: raise ValueError(f"Invalid alignment method. Must be one of the following: ['uni', 'kron', None].") return self.align_method def _check_percentage(self): """ Checks the ``percentage`` attribute. Raises ------ ValueError If ``percentage`` is not an integer or not ``[0, 100]``. Returns ------- int Percentage of the dataset to be used for the initial selection """ if not 0 <= self.percentage <= 100 or not isinstance(self.percentage, int): raise ValueError("percentage must be an integer [0, 100].") return self.percentage def _check_n_seeds(self): """ Checks the ``n_seeds`` attribute. Raises ------ ValueError If ``n_seeds`` is not an integer or not less than the number of objects. Returns ------- int Number of seeds to be used. """ if 0 < self.n_seeds < 1: self.n_seeds = int(self.n_objects * self.n_seeds) elif isinstance(self.n_seeds, int): if self.n_seeds >= self.n_objects: raise ValueError(f"n_seeds, {self.n_seeds} must be less than the n_objects, {self.n_objects}.") self.n_seeds = self.n_seeds return self.n_seeds def _check_sim_threshold(self): """ Checks the ``sim_threshold`` attribute. Raises ------ ValueError If ``sim_threshold`` is not specified when ``check_sim`` is True. Returns ------- float Threshold for the similarity of the proposed cluster. """ if not self.sim_threshold: raise ValueError("sim_threshold must be specified if check_msd is True.") self.sim_threshold = self.sim_threshold return self.sim_threshold def _check_min_samples(self): """ Checks the ``min_samples`` attribute. Raises ------ ValueError If ``min_samples`` is not specified when ``reject_lowd`` is True. ValueError If ``min_samples`` is not an integer or not less than the number of objects. Returns ------- int Minimum number of data points in a cluster. """ if not self.min_samples: raise ValueError("min_samples must be specified if reject_lowd is True.") elif 0 < self.min_samples < 1: self.min_samples = int(self.n_objects * self.min_samples) elif isinstance(self.min_samples, int): if self.min_samples > self.n_objects: raise ValueError(f"min_samples, {self.min_samples} must be less than the n_objects, {self.n_objects}.") self.min_samples = self.min_samples return self.min_samples
[docs] def run(self): """ Run the ExtendedQuality algorithm. Returns ------- dict Key: iteration number, value: numpy.ndarray of the cluster members. """ cluster_dict = self.grow_clusters() non_empty_clusters = {k: v for k, v in cluster_dict.items() if v} sorted_clusters = {k: v for k, v in sorted(non_empty_clusters.items(), key=lambda item: len(item[1]), reverse=True)} renumbered_clusters = {} for i, v in enumerate(sorted_clusters.values()): renumbered_clusters[i] = v if not renumbered_clusters: warnings.warn(f"threshold {self.threshold}. All clusters are empty. sim_threshold may be too low.") return renumbered_clusters
[docs] def comp_sim_seeds(self): """ Selects the inital centers based on the diversity in the high density region of the data using the *n*-ary similarity. Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. Notes ----- A complementary similarity is calculated for each point in the dataset. Next, the top n% of the points are selected for diversity selection. The first ``n_seeds`` number of points are selected as the seeds. """ n_max = int(self.percentage * self.n_objects / 100) comp_sim = calculate_comp_sim(self.data, self.metric, self.N_atoms) sorted_indices = np.argsort(comp_sim) top_comp_sim_indices = sorted_indices[-n_max:] top_cc_data = self.data[top_comp_sim_indices] medoids_indices = diversity_selection(top_cc_data, 100, self.metric, self.N_atoms, 'comp_sim', 'medoid') seeds = top_cc_data[medoids_indices] return seeds
[docs] def greedy_seeds(self): """ Select the initial centers using the greedy *k*-means++ algorithm. (Arthur and Vassilvitskii, 2007). Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. """ centers, indices = kmeans_plusplus(self.data, n_clusters=self.n_seeds, random_state=42) seeds = self.data[indices] return seeds
[docs] def find_medoids(self): """ Finds the seeds by selecting the medoids using the complementary similarity. Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. Notes ----- A complementary similarity is calculated for each point in the dataset. Then, the first ``n_seeds`` number of points are selected as the seeds. """ comp_sim = calculate_comp_sim(self.data, self.metric, self.N_atoms) sorted_indices = np.argsort(comp_sim) medoid_indices = sorted_indices[-self.n_seeds:] seeds = self.data[medoid_indices] return seeds
[docs] def mini_batch_kmeans_seeds(self): """ Select the initial centers using the mini-batch *k*-means algorithm. Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. """ mbk = MiniBatchKMeans(n_clusters=self.n_seeds, random_state=42) mbk.fit(self.data) seeds = mbk.cluster_centers_ return seeds
[docs] def vanilla_seeds(self): """ Select the initial centers using the vanilla *k*-means++ algorithm. Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. """ centers, indices = kmeans_plusplus(self.data, n_clusters=self.n_seeds, random_state=42, n_local_trials=1) seeds = self.data[indices] return seeds
def _choose_seed_method(self): """ Chooses the seed method based on the ``seed_method`` attribute. Raises ------ ValueError If ``seed_method`` is not one of the following: [``comp_sim``, ``greedy``, ``medoid``, ``mini_batch_kmeans``, ``vanilla``]. Returns ------- numpy.ndarray (n_seeds, n_features) array of the initial seeds. """ if self.seed_method == 'comp_sim': seeds = self.comp_sim_seeds() elif self.seed_method == 'greedy': seeds = self.greedy_seeds() elif self.seed_method == 'medoid': seeds = self.find_medoids() elif self.seed_method == 'mini_batch_kmeans': seeds = self.mini_batch_kmeans_seeds() elif self.seed_method == 'vanilla': seeds = self.vanilla_seeds() else: raise ValueError(f"Invalid seed method. Must be one of the following: ['comp_sim', 'greedy', 'medoid', 'mini_batch_kmeans', 'vanilla'].") return seeds
[docs] def grow_clusters(self): """ The heart of the ``ExtendedQuality`` algorithm. Returns ------- dict Key: iteration number, value: numpy.ndarray of the cluster members. Notes ----- 1. Initial seeds are selected using the method in ``seed_method`` attribute. 2. Each seed proposes a cluster by adding available objects within the radial threshold. 3. The winner seed cluster is the most dense cluster. If there are multiple, the one with the lowest similarity is chosen. 4. Objects in the winner seed cluster are removed from the data. 5. If ``check_sim`` is True, clusters above the similarity threshold are rejected. 6. if ``reject_lowd`` is True, clusters below the ``min_samples`` are rejected. 7. Repeat steps 1-6 until there are less than 2 objects left in the data because it is not possible to determine seeds with 2 or less objects. """ seed_clusters = {} iteration = 0 while len(self.data) > 2 and len(self.data) > self.n_seeds: # Select the initial seeds all_seeds = self._choose_seed_method() # Propose clusters for each seed candidate_cluster = [] for index, seed in enumerate(all_seeds): current_cluster = [] for i, candidate in enumerate(self.data): value = extended_comparison(np.array([seed, candidate]), data_type='full', metric=self.metric, N_atoms=self.N_atoms) if value <= self.threshold: current_cluster.append(candidate) candidate_cluster.append(current_cluster) # Find the winner cluster max_indices = [i for i, cluster in enumerate(candidate_cluster) \ if len(cluster) == max(len(c) for c in candidate_cluster)] max_clusters = [candidate_cluster[i] for i in max_indices] if len(max_indices) == 1: winner = max_clusters[0] seed_clusters[iteration] = winner else: sim_threshold = 9999 index = len(max_clusters) + 1 for i, c in enumerate(max_clusters): sim = extended_comparison(np.array(c), data_type='full', metric=self.metric, N_atoms=self.N_atoms) if sim < sim_threshold: sim_threshold = sim index = i if sim_threshold == 9999: index = 0 winner = max_clusters[index] seed_clusters[iteration] = winner # Remove winner cluster from the original dataset for i in winner: self.data = self.data[~np.all(self.data == i, axis=1)] # Check similarity and reject low density clusters if not self.check_sim or self._check_sim(winner): seed_clusters = self._low_density_termination(seed_clusters, winner, iteration) self.data = align_traj(self.data, self.N_atoms, align_method=self.align_method) iteration += 1 else: seed_clusters[iteration] = [] break return seed_clusters
def _check_sim(self, current_cluster): """ Checks the similarity of the proposed cluster. Parameters ---------- current_cluster : list List of points in the current cluster. Returns ------- bool True if the similarity value is less than the ``sim_threshold``, False otherwise. """ sim = extended_comparison(np.array(current_cluster), data_type='full', metric=self.metric, N_atoms=self.N_atoms) if sim <= self.sim_threshold: return True else: return False def _low_density_termination(self, seed_clusters, winner, iteration): """ Removes low density clusters if it is below the minimum cluster size. Parameters ---------- seed_clusters : dict Winner clusters from all previous iterations are stored in this dictionary. winner : list Winner cluster from current iteration that is to be checked for rejection. iteration : int Iteration number. Returns ------- dict Updated seed_clusters dictionary. Key: iteration number, value: numpy.ndarray of the cluster members. """ if self.reject_lowd: if len(winner) >= self.min_samples: seed_clusters[iteration] = winner else: seed_clusters[iteration] = [] else: seed_clusters[iteration] = winner return seed_clusters
[docs] def calculate_populations(self, clusters): """ Calculate the populations of the clusters. Returns ------- dict Key: cluster number, value: cluster population. """ each_top10_frac = [] top10_total_items = 0 for i, v in enumerate(clusters.values()): if i < 10: each_top10_frac.append('%.6f' % (len(v) / self.n_objects)) top10_total_items += len(v) top10_fraction = '%.6f' % (top10_total_items / self.n_objects) pop_list = [self.threshold, len(clusters), top10_fraction] + each_top10_frac return pop_list
[docs] def calculate_best_frames(self, clusters, n_structures=10, sorted_by='frame'): """ Extract the best n structures for each cluster. Parameters ---------- clusters : dict Dictionary of the clusters and their corresponding indices. n_structures : int, default=10 Number of structures to be extracted for each cluster. sorted_by : {'frame', 'similarity'}, default='frame' Sort the best structures by frame number or similarity value. Returns ------- numpy.ndarray Array of the best frames. """ best_frames = [] for v in clusters.values(): medoid_index = calculate_medoid(np.array(v), metric=self.metric, N_atoms=self.N_atoms) medoid = v[medoid_index] msd_to_medoid = [] for i, frame in enumerate(v): msd_to_medoid.append((i, extended_comparison(np.array([frame, medoid]), data_type='full', metric=self.metric, N_atoms=self.N_atoms))) msd_to_medoid = np.array(msd_to_medoid) if sorted_by == 'frame': sorted_indices = np.argsort(msd_to_medoid[:, 0]) elif sorted_by == 'similarity': sorted_indices = np.argsort(msd_to_medoid[:, 1]) best_n_structures = [v[idx] for idx in sorted_indices[:n_structures]] best_frames.append(best_n_structures) return np.array(best_frames)
[docs] def find_best_frames_indices(self, best_frames, sieve): """ Find the indices of the best frames. Parameters ---------- best_frames : numpy.ndarray Array of the best frames. sieve : int The sieve value used to select the frames. Returns ------- numpy.ndarray Array of the best frames indices. """ best_frames_indices = [] for i, frame in enumerate(self.data): i = i * sieve for k, v in enumerate(best_frames): if any((frame == x).all() for x in v): best_frames_indices.append((i, k)) break best_frames_indices = np.array(best_frames_indices) best_frames_indices = best_frames_indices[best_frames_indices[:, 1].argsort()] return best_frames_indices
[docs] def labels(self, clusters, sieve): """ Assigns labels to the clusters. Parameters ---------- clusters : dict Dictionary of the clusters and their corresponding indices. Returns ------- numpy.ndarray Array of the cluster labels. """ frame_vs_cluster = [] for i, frame in enumerate(self.data): i = i * sieve for k, v in clusters.items(): if any((frame == x).all() for x in v): frame_vs_cluster.append((i, k)) break frame_vs_cluster = np.array(frame_vs_cluster) return frame_vs_cluster
[docs]def compute_scores(results): """ Compute the Calinski-Harabasz (CH) and Davies-Bouldin (DB) scores for the clusters. Parameters ---------- data : array-like of shape (n_samples, n_features) Input dataset. results : dict Dictionary of the clusters and their corresponding indices. Returns ------- tuple A tuple of the CH and DB scores in that order. Notes ----- Labels are assigned based on number of clusters. If there is only one cluster, the CH and DB scores cannot be calculated and None is returned. Example ---- >>> import numpy as np >>> from sklearn.metrics import calinski_harabasz_score, davies_bouldin_score >>> from mdance.cluster.equal import compute_scores >>> data = np.array([[1, 2], [1, 4], [1, 0], ... [4, 2], [4, 4], [4, 0]]) >>> results = {0: [0, 1, 2], 1: [3, 4, 5]} >>> ch, db = compute_scores(data, results) >>> print(ch, db) 3.375 0.8888888888888888 """ data = np.array([frame for cluster in results.values() for frame in cluster]) labels = [] count = 0 for v in results.values(): labels.extend([count] * len(v)) count += 1 labels = np.array(labels) if len(np.unique(labels)) == 1 or data.shape[0] == 0: return None, None else: ch_score = calinski_harabasz_score(data, labels) db_score = davies_bouldin_score(data, labels) return ch_score, db_score