Blame view

volia/core/measures.py 7.12 KB
 ```1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90``` ``` ''' This module is a part of my library. It aims to compute some measures for clustering. ''' import numpy as np def disequilibrium_(matrix1, matrix2, isGlobal=False, mod=None): ''' Compute disequilibrium for all the clusters. The disequilibrium is compute from the difference between two clustering sets. isGlobal permet à l'utilisateur de choisir le dénominateur de la fonction : - True : divise la valeur par le nombre d'élément du cluster - False : divise la valeur par le nombre d'élément total withPower permet à l'utilisateur de décider d'appliquer un carré 2 ou une valeur absolue. ''' def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) dividers1 = 0 dividers2 = 0 if isGlobal: dividers1 = matrix1.sum() dividers2 = matrix2.sum() else: dividers1 = matrix1.sum(axis=1) dividers2 = matrix2.sum(axis=1) matrix1_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix1, dtype=np.float), dividers1) matrix2_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix2, dtype=np.float), dividers2) diff = matrix1_divided - matrix2_divided mask = np.logical_not(np.logical_and(matrix2==0, matrix1==0)) result = diff if mod != None or mod == "": for word in mod.split(" "): if word == "power": result = np.power(result,2) elif word == "human": result = result * 100 elif word == "abs": result = np.absolute(result) else: raise Exception("Need to specify an accepted mod of the disequilibrium (\"power\", \"human\" or \"abs\"") return (mask, result) def disequilibrium_mean_by_cluster(mask, matrix): ''' Mean of disequilibrium matrix is the disequilibrium calculated from number of occurences belonging to a class, for each cluster. ''' nb_k = len(matrix) results = np.zeros((nb_k)) for i in range(nb_k): results[i] = matrix[i].sum() / mask[i].sum() return results def disequilibrium(matrix1, matrix2, isGlobal=False): ''' Disequilibrium matrix And Disequilibrium value ''' mask, result = disequilibrium_(matrix1, matrix2, isGlobal) result_human = result * 100 result_power = np.power(result, 2) return ( mask, result_human, disequilibrium_mean_by_cluster(mask, result_power).sum()/matrix1.shape[0] ) ``` `91` ``` def compute_count_matrix(y_truth, y_hat): ``` ```92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121``` ``` ''' Check the size of the lists with assertion ''' # Check size of the lists assert len(y_hat) == len(y_truth), f"Matrices should have the same length y_hat: {len(y_hat)}, y_truth: {len(y_truth)}" # Build count matrix count_matrix = np.zeros((max(y_hat+1), max(y_truth+1))) for i in range(len(y_hat)): count_matrix[y_hat[i]][y_truth[i]] += 1 return count_matrix def entropy_score(y_truth, y_hat): ''' Need to use label encoder before givin y_hat and y_truth Don't use one hot labels Return a tuple with: - result_matrix : the matrix with the log multiplied probabilities (P(x) * log(P(x))) - result_vector : the vector avec summing entropy of each class. Each value corresponds to a cluster. - result : the final entropy measure of the clustering ''' def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) # Build count matrix ``` `122` ``` count_matrix = compute_count_matrix(y_truth, y_hat) ``` ```123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148``` ``` # Build dividers vector dividers = count_matrix.sum(axis=1) matrix_divided = np.apply_along_axis(divide_line, 0, np.asarray(count_matrix, dtype=np.float), dividers) log_matrix = np.zeros(matrix_divided.shape) np.log2(matrix_divided, out=log_matrix, where=count_matrix != 0) result_matrix = -1 * np.multiply(matrix_divided, log_matrix) result_vector = result_matrix.sum(axis=1) result_vector.sum() if np.isnan(np.sum(result_vector)): print("COUNT MATRIX") print(count_matrix) print("MATRIX DIVIDED") print(matrix_divided) print("RESULT MATRIX") print(result_matrix) print("VECTOR MATRIX") print(result_vector) print("An error occured due to nan value, some values are printed before") exit(1) result = result_vector * dividers / dividers.sum() result = result.sum() ``` `149` ``` return result ``` `150` `151` ``` def purity_score(y_truth, y_hat): ``` ```152 153 154 155 156 157 158 159 160``` ``` ''' Return three values in a dictionary: - purity_class_score: the purity score of the class (asp) - purity_cluster_score: the purity score of the cluster (acp) - K: the overall evaluation criterion (sqrt(asp * acp)) This function is based on the following article: Unknown-multiple speaker clustering using HMM, J. Ajmera, H. Bourlard, I. Lapidot, I. McCowan ''' ``` ```161 162 163 164 165 166 167 168``` ``` def divide_line(a, divider): ''' Sub function used for dividing matrix by a vector line by line. ''' return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) def compute_purity_score(count_matrix, axis=0): ``` ```169 170 171 172``` ``` if axis==0: other_axis = 1 else: other_axis = 0 ``` ```173 174``` ``` count_per_row = count_matrix.sum(axis=axis) dividers = np.square(count_per_row) ``` `175` `176` ``` count_matrix_squared = np.square(count_matrix) ``` `177` ``` matrix_divided = np.apply_along_axis(divide_line, other_axis, np.asarray(count_matrix_squared, dtype=np.float), dividers) ``` ```178 179 180``` ``` vector_purity = np.sum(matrix_divided, axis=axis) scalar_purity = np.average(vector_purity, weights=count_per_row) ``` `181` ``` return scalar_purity ``` ```182 183 184``` ``` count_matrix = compute_count_matrix(y_truth, y_hat) ``` ```185 186 187``` ``` purity_cluster_score = compute_purity_score(count_matrix, 1) purity_class_score = compute_purity_score(count_matrix, 0) ``` ```188 189 190 191``` ``` K = np.sqrt(purity_cluster_score * purity_class_score) for i in range(count_matrix.shape[0]): ``` ```192 193 194 195 196 197 198 199``` ``` for j in range(count_matrix.shape[1]): count_matrix[i][j] count_matrix[i] return { "purity_class_score": purity_class_score, "purity_cluster_score": purity_cluster_score, "K": K } ``` ```200 201``` ``` if __name__ == "__main__": ``` `202` ``` print("Purity test #1") ``` ```203 204 205 206``` ``` # Hypothesis y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0]) # Truth y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3]) ``` `207` ``` (result_matrix, result_vector, result) = entropy_score(y, y_hat) ``` `208` ``` print(purity_score(y, y_hat)) ``` `209` ```210 211 212 213 214 215``` ``` exit(1) print("Purity test #2") # Hypothesis y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0, 4, 4, 4]) # Truth y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 0, 3, 3, 3]) ``` `216` `217` ``` (result_matrix, result_vector, result) = entropy_score(y, y_hat) ``` `218` ``` exit(1) ``` ```219 220 221 222 223``` ``` print("Result matrix: ") print(result_matrix) print("Result vector: ") print(result_vector) print("Result: ", result) ```