Blame view
volia/clustering_modules/kmeans_mahalanobis.py
4.67 KB
4152e83df Addind kmeans mah... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
from sklearn.cluster import KMeans import pickle import numpy as np import matplotlib.pyplot as plt from sklearn.manifold import TSNE from abstract_clustering import AbstractClustering class kmeansMahalanobis(): def __init__(self): """ """ self.C = None self.L = None self.K = None def predict(self, features): """ @param features: @return: """ N = features.shape[0] distances = np.zeros((N, self.K)) for n in range(N): for k in range(self.K): distances[n][k] = self._dist(features[n], self.C[k], self.L[k]) print(distances) closest_cluster = np.argmin(distances, axis=1) return closest_cluster def load(self, model_path): """ @param model_path: @return: """ data = None with open(model_path): data = pickle.load() if data is None: raise Exception("Le modèle n'a pas pu être chargé") else: self.C = data["C"] self.L = data["L"] self.K = data["K"] def save(self, modelpath: str): """ @param modelpath: @return: """ data = { "C": self.C, "L": self.L, "K": self.K } with open(modelpath, "wb") as f: pickle.dump(data, f) def fit(self, features, K: int): self._train(features, K) def _initialize_model(self, X, number_clusters): d = X.shape[1] C = X[np.random.choice(X.shape[0], number_clusters)] L = np.zeros((number_clusters, d, d)) for k in range(number_clusters): L[k] = np.identity(d) return C, L def _dist(self, a, b, l): ''' Distance euclidienne ''' a = np.reshape(a, (-1, 1)) b = np.reshape(b, (-1, 1)) result = np.transpose(a - b).dot(l).dot(a-b)[0][0] return result def _plot_iteration(self, iteration, points, clusters, centers): fig = plt.figure() ax = fig.add_subplot(111) scatter = ax.scatter(points[:, 0], points[:, 1], c=clusters, s=50) #for center in centers: # ax.scatter(center[0], center[1], s=50, c='red', marker='+') ax.scatter(centers[:, 0], centers[:, 1], s=50, c='red', marker='+') ax.set_xlabel('x') ax.set_ylabel('y') plt.colorbar(scatter) #plt.ylim(0, 1) #plt.xlim(0, 1) plt.savefig("test_" + str(iteration) + ".pdf") def _train(self, features, K: int): X = features N = X.shape[0] d = X.shape[1] C, L = self._initialize_model(X, K) self.C = C self.L = L self.K = K end_algo = False i = 0 while not end_algo: if i == 10: exit(1) print("Iteration: ", i) # Calcul matrix distance distances = np.zeros((N, K)) for n in range(N): for k in range(self.K): distances[n][k] = self._dist(X[n], self.C[k], self.L[k]) print(distances) closest_cluster = np.argmin(distances, axis=1) if i % 1 == 0: # -- Debug tool ---------------------- # TSNE #X_embedded = np.concatenate((X, self.C), axis=0) X_embedded = TSNE(n_components=2).fit_transform(np.concatenate((X, C), axis=0)) # Then plot self._plot_iteration( i, X_embedded[:X.shape[0]], closest_cluster, X_embedded[X.shape[0]:] ) # ------------------------------------ end_algo = True for k in range(K): # Find subset of X with values closed to the centroid c_k. X_sub = np.where(closest_cluster == k) X_sub = np.take(X, X_sub[0], axis=0) np.mean(X_sub, axis=0) C_new = np.mean(X_sub, axis=0) # -- COMPUTE NEW LAMBDA (here named K) -- K_new = np.zeros((L.shape[1], L.shape[2])) for x in X_sub: x = np.reshape(x, (-1, 1)) c_tmp = np.reshape(C_new, (-1, 1)) K_new = K_new + (x - c_tmp).dot((x - c_tmp).transpose()) K_new = K_new / X_sub.shape[0] K_new = np.linalg.inv(K_new) if end_algo and (not (self.C[k] == C_new).all()): # If the same stop end_algo = False self.C[k] = C_new self.L[k] = K_new i = i + 1 |