Blame view
volia/core/measures.py
7.18 KB
1f8612ebf repaired memory e... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
''' This module is a part of my library. It aims to compute some measures for clustering. ''' import numpy as np def disequilibrium_(matrix1, matrix2, isGlobal=False, mod=None): ''' Compute disequilibrium for all the clusters. The disequilibrium is compute from the difference between two clustering sets. isGlobal permet à l'utilisateur de choisir le dénominateur de la fonction : - True : divise la valeur par le nombre d'élément du cluster - False : divise la valeur par le nombre d'élément total withPower permet à l'utilisateur de décider d'appliquer un carré 2 ou une valeur absolue. ''' def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) dividers1 = 0 dividers2 = 0 if isGlobal: dividers1 = matrix1.sum() dividers2 = matrix2.sum() else: dividers1 = matrix1.sum(axis=1) dividers2 = matrix2.sum(axis=1) matrix1_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix1, dtype=np.float), dividers1) matrix2_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix2, dtype=np.float), dividers2) diff = matrix1_divided - matrix2_divided mask = np.logical_not(np.logical_and(matrix2==0, matrix1==0)) result = diff if mod != None or mod == "": for word in mod.split(" "): if word == "power": result = np.power(result,2) elif word == "human": result = result * 100 elif word == "abs": result = np.absolute(result) else: raise Exception("Need to specify an accepted mod of the disequilibrium (\"power\", \"human\" or \"abs\"") return (mask, result) def disequilibrium_mean_by_cluster(mask, matrix): ''' Mean of disequilibrium matrix is the disequilibrium calculated from number of occurences belonging to a class, for each cluster. ''' nb_k = len(matrix) results = np.zeros((nb_k)) for i in range(nb_k): results[i] = matrix[i].sum() / mask[i].sum() return results def disequilibrium(matrix1, matrix2, isGlobal=False): ''' Disequilibrium matrix And Disequilibrium value ''' mask, result = disequilibrium_(matrix1, matrix2, isGlobal) result_human = result * 100 result_power = np.power(result, 2) return ( mask, result_human, disequilibrium_mean_by_cluster(mask, result_power).sum()/matrix1.shape[0] ) |
aeff19f95 purity measure ad... |
91 |
def compute_count_matrix(y_truth, y_hat): |
1f8612ebf repaired memory e... |
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
''' Check the size of the lists with assertion ''' # Check size of the lists assert len(y_hat) == len(y_truth), f"Matrices should have the same length y_hat: {len(y_hat)}, y_truth: {len(y_truth)}" # Build count matrix count_matrix = np.zeros((max(y_hat+1), max(y_truth+1))) for i in range(len(y_hat)): count_matrix[y_hat[i]][y_truth[i]] += 1 return count_matrix def entropy_score(y_truth, y_hat): ''' Need to use label encoder before givin y_hat and y_truth Don't use one hot labels Return a tuple with: - result_matrix : the matrix with the log multiplied probabilities (P(x) * log(P(x))) - result_vector : the vector avec summing entropy of each class. Each value corresponds to a cluster. - result : the final entropy measure of the clustering ''' def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) # Build count matrix |
aeff19f95 purity measure ad... |
122 |
count_matrix = compute_count_matrix(y_truth, y_hat) |
1f8612ebf repaired memory e... |
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# Build dividers vector dividers = count_matrix.sum(axis=1) matrix_divided = np.apply_along_axis(divide_line, 0, np.asarray(count_matrix, dtype=np.float), dividers) log_matrix = np.zeros(matrix_divided.shape) np.log2(matrix_divided, out=log_matrix, where=count_matrix != 0) result_matrix = -1 * np.multiply(matrix_divided, log_matrix) result_vector = result_matrix.sum(axis=1) result_vector.sum() if np.isnan(np.sum(result_vector)): print("COUNT MATRIX") print(count_matrix) print("MATRIX DIVIDED") print(matrix_divided) print("RESULT MATRIX") print(result_matrix) print("VECTOR MATRIX") print(result_vector) print("An error occured due to nan value, some values are printed before") exit(1) result = result_vector * dividers / dividers.sum() result = result.sum() return (result_matrix, result_vector, result) |
aeff19f95 purity measure ad... |
150 |
def purity_score(y_truth, y_hat): |
503bfd927 Add comments to t... |
151 152 153 154 155 156 157 158 159 |
''' Return three values in a dictionary: - purity_class_score: the purity score of the class (asp) - purity_cluster_score: the purity score of the cluster (acp) - K: the overall evaluation criterion (sqrt(asp * acp)) This function is based on the following article: Unknown-multiple speaker clustering using HMM, J. Ajmera, H. Bourlard, I. Lapidot, I. McCowan ''' |
aeff19f95 purity measure ad... |
160 161 162 163 164 165 166 167 |
def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) def compute_purity_score(count_matrix, axis=0): |
29318d991 Repair error abou... |
168 169 170 171 |
if axis==0: other_axis = 1 else: other_axis = 0 |
aeff19f95 purity measure ad... |
172 173 |
count_per_row = count_matrix.sum(axis=axis) dividers = np.square(count_per_row) |
29318d991 Repair error abou... |
174 |
|
aeff19f95 purity measure ad... |
175 |
count_matrix_squared = np.square(count_matrix) |
29318d991 Repair error abou... |
176 |
matrix_divided = np.apply_along_axis(divide_line, other_axis, np.asarray(count_matrix_squared, dtype=np.float), dividers) |
aeff19f95 purity measure ad... |
177 178 179 180 181 182 183 184 185 186 187 188 189 |
vector_purity = np.sum(matrix_divided, axis=axis) scalar_purity = np.average(vector_purity, weights=count_per_row) return (vector_purity, scalar_purity) count_matrix = compute_count_matrix(y_truth, y_hat) _, purity_cluster_score = compute_purity_score(count_matrix, 1) _, purity_class_score = cluster_purity = compute_purity_score(count_matrix, 0) K = np.sqrt(purity_cluster_score * purity_class_score) for i in range(count_matrix.shape[0]): |
aeff19f95 purity measure ad... |
190 191 192 193 194 195 196 197 |
for j in range(count_matrix.shape[1]): count_matrix[i][j] count_matrix[i] return { "purity_class_score": purity_class_score, "purity_cluster_score": purity_cluster_score, "K": K } |
1f8612ebf repaired memory e... |
198 199 |
if __name__ == "__main__": |
29318d991 Repair error abou... |
200 |
print("Purity test #1") |
1f8612ebf repaired memory e... |
201 202 203 204 |
# Hypothesis y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0]) # Truth y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3]) |
aeff19f95 purity measure ad... |
205 |
(result_matrix, result_vector, result) = entropy_score(y, y_hat) |
29318d991 Repair error abou... |
206 |
print(purity_score(y, y_hat)) |
aeff19f95 purity measure ad... |
207 |
|
29318d991 Repair error abou... |
208 209 210 211 212 213 |
exit(1) print("Purity test #2") # Hypothesis y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0, 4, 4, 4]) # Truth y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 0, 3, 3, 3]) |
1f8612ebf repaired memory e... |
214 |
|
29318d991 Repair error abou... |
215 |
(result_matrix, result_vector, result) = entropy_score(y, y_hat) |
aeff19f95 purity measure ad... |
216 |
exit(1) |
1f8612ebf repaired memory e... |
217 218 219 220 221 |
print("Result matrix: ") print(result_matrix) print("Result vector: ") print(result_vector) print("Result: ", result) |