Blame view
volia/clustering_modules/kmeans_mahalanobis.py
4.68 KB
4152e83df
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
from sklearn.cluster import KMeans import pickle import numpy as np import matplotlib.pyplot as plt from sklearn.manifold import TSNE from abstract_clustering import AbstractClustering class kmeansMahalanobis(): def __init__(self): """ """ self.C = None self.L = None self.K = None def predict(self, features): """ @param features: @return: """ N = features.shape[0] distances = np.zeros((N, self.K)) for n in range(N): for k in range(self.K): distances[n][k] = self._dist(features[n], self.C[k], self.L[k]) |
4152e83df
|
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
closest_cluster = np.argmin(distances, axis=1) return closest_cluster def load(self, model_path): """ @param model_path: @return: """ data = None with open(model_path): data = pickle.load() if data is None: raise Exception("Le modèle n'a pas pu être chargé") else: self.C = data["C"] self.L = data["L"] self.K = data["K"] def save(self, modelpath: str): """ @param modelpath: @return: """ data = { "C": self.C, "L": self.L, "K": self.K } with open(modelpath, "wb") as f: pickle.dump(data, f) def fit(self, features, K: int): self._train(features, K) def _initialize_model(self, X, number_clusters): d = X.shape[1] C = X[np.random.choice(X.shape[0], number_clusters)] L = np.zeros((number_clusters, d, d)) for k in range(number_clusters): L[k] = np.identity(d) return C, L def _dist(self, a, b, l): ''' Distance euclidienne ''' a = np.reshape(a, (-1, 1)) b = np.reshape(b, (-1, 1)) result = np.transpose(a - b).dot(l).dot(a-b)[0][0] return result def _plot_iteration(self, iteration, points, clusters, centers): fig = plt.figure() ax = fig.add_subplot(111) scatter = ax.scatter(points[:, 0], points[:, 1], c=clusters, s=50) #for center in centers: # ax.scatter(center[0], center[1], s=50, c='red', marker='+') ax.scatter(centers[:, 0], centers[:, 1], s=50, c='red', marker='+') ax.set_xlabel('x') ax.set_ylabel('y') plt.colorbar(scatter) #plt.ylim(0, 1) #plt.xlim(0, 1) plt.savefig("test_" + str(iteration) + ".pdf") def _train(self, features, K: int): X = features N = X.shape[0] d = X.shape[1] C, L = self._initialize_model(X, K) self.C = C self.L = L self.K = K end_algo = False i = 0 while not end_algo: if i == 10: exit(1) print("Iteration: ", i) # Calcul matrix distance distances = np.zeros((N, K)) for n in range(N): for k in range(self.K): distances[n][k] = self._dist(X[n], self.C[k], self.L[k]) |
4152e83df
|
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
closest_cluster = np.argmin(distances, axis=1) if i % 1 == 0: # -- Debug tool ---------------------- # TSNE #X_embedded = np.concatenate((X, self.C), axis=0) X_embedded = TSNE(n_components=2).fit_transform(np.concatenate((X, C), axis=0)) # Then plot self._plot_iteration( i, X_embedded[:X.shape[0]], closest_cluster, X_embedded[X.shape[0]:] ) # ------------------------------------ end_algo = True for k in range(K): # Find subset of X with values closed to the centroid c_k. X_sub = np.where(closest_cluster == k) X_sub = np.take(X, X_sub[0], axis=0) |
d4507c268
|
141 142 |
if X_sub.shape[0] == 0: continue |
4152e83df
|
143 144 145 146 147 148 149 150 151 152 |
np.mean(X_sub, axis=0) C_new = np.mean(X_sub, axis=0) # -- COMPUTE NEW LAMBDA (here named K) -- K_new = np.zeros((L.shape[1], L.shape[2])) for x in X_sub: x = np.reshape(x, (-1, 1)) c_tmp = np.reshape(C_new, (-1, 1)) K_new = K_new + (x - c_tmp).dot((x - c_tmp).transpose()) K_new = K_new / X_sub.shape[0] |
d4507c268
|
153 |
K_new = np.linalg.pinv(K_new) |
4152e83df
|
154 155 156 157 158 159 |
if end_algo and (not (self.C[k] == C_new).all()): # If the same stop end_algo = False self.C[k] = C_new self.L[k] = K_new i = i + 1 |