diff --git a/volia/measures.py b/volia/measures.py new file mode 100644 index 0000000..3aaebfa --- /dev/null +++ b/volia/measures.py @@ -0,0 +1,167 @@ +''' +This module is a part of my library. +It aims to compute some measures for clustering. +''' + +import numpy as np + +def disequilibrium_(matrix1, matrix2, isGlobal=False, mod=None): + ''' + Compute disequilibrium for all the clusters. + The disequilibrium is compute from the difference + between two clustering sets. + isGlobal permet à l'utilisateur de choisir le dénominateur de + la fonction : + - True : divise la valeur par le nombre d'élément du cluster + - False : divise la valeur par le nombre d'élément total + + withPower permet à l'utilisateur de décider d'appliquer un carré 2 ou + une valeur absolue. + ''' + + def divide_line(a, divider): + ''' + Sub function used for dividing matrix by a vector line by line. + ''' + return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) + + dividers1 = 0 + dividers2 = 0 + + if isGlobal: + dividers1 = matrix1.sum() + dividers2 = matrix2.sum() + else: + dividers1 = matrix1.sum(axis=1) + dividers2 = matrix2.sum(axis=1) + + matrix1_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix1, dtype=np.float), dividers1) + + matrix2_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix2, dtype=np.float), dividers2) + + diff = matrix1_divided - matrix2_divided + + mask = np.logical_not(np.logical_and(matrix2==0, matrix1==0)) + + result = diff + + if mod != None or mod == "": + for word in mod.split(" "): + if word == "power": + result = np.power(result,2) + elif word == "human": + result = result * 100 + elif word == "abs": + result = np.absolute(result) + else: + raise Exception("Need to specify an accepted mod of the disequilibrium (\"power\", \"human\" or \"abs\"") + return (mask, result) + + + +def disequilibrium_mean_by_cluster(mask, matrix): + ''' + Mean of disequilibrium + matrix is the disequilibrium calculated + from number of occurences belonging to a class, + for each cluster. + ''' + nb_k = len(matrix) + results = np.zeros((nb_k)) + + for i in range(nb_k): + results[i] = matrix[i].sum() / mask[i].sum() + return results + + +def disequilibrium(matrix1, matrix2, isGlobal=False): + ''' + Disequilibrium matrix + And Disequilibrium value + ''' + mask, result = disequilibrium_(matrix1, matrix2, isGlobal) + result_human = result * 100 + result_power = np.power(result, 2) + + return ( + mask, + result_human, + disequilibrium_mean_by_cluster(mask, result_power).sum()/matrix1.shape[0] + ) + + +def compute_count_matrix(y_hat, y_truth): + ''' + Check the size of the lists with assertion + ''' + # Check size of the lists + assert len(y_hat) == len(y_truth), f"Matrices should have the same length y_hat: {len(y_hat)}, y_truth: {len(y_truth)}" + + # Build count matrix + count_matrix = np.zeros((max(y_hat+1), max(y_truth+1))) + for i in range(len(y_hat)): + count_matrix[y_hat[i]][y_truth[i]] += 1 + return count_matrix + + +def entropy_score(y_truth, y_hat): + ''' + Need to use label encoder before givin y_hat and y_truth + Don't use one hot labels + + Return a tuple with: + - result_matrix : the matrix with the log multiplied probabilities (P(x) * log(P(x))) + - result_vector : the vector avec summing entropy of each class. Each value corresponds to a cluster. + - result : the final entropy measure of the clustering + ''' + def divide_line(a, divider): + ''' + Sub function used for dividing matrix by a vector line by line. + ''' + return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0) + + # Build count matrix + count_matrix = compute_count_matrix(y_hat, y_truth) + + # Build dividers vector + dividers = count_matrix.sum(axis=1) + + matrix_divided = np.apply_along_axis(divide_line, 0, np.asarray(count_matrix, dtype=np.float), dividers) + + log_matrix = np.zeros(matrix_divided.shape) + np.log2(matrix_divided, out=log_matrix, where=count_matrix != 0) + result_matrix = -1 * np.multiply(matrix_divided, log_matrix) + result_vector = result_matrix.sum(axis=1) + result_vector.sum() + + if np.isnan(np.sum(result_vector)): + print("COUNT MATRIX") + print(count_matrix) + print("MATRIX DIVIDED") + print(matrix_divided) + print("RESULT MATRIX") + print(result_matrix) + print("VECTOR MATRIX") + print(result_vector) + print("An error occured due to nan value, some values are printed before") + exit(1) + + result = result_vector * dividers / dividers.sum() + result = result.sum() + return (result_matrix, result_vector, result) + + + +if __name__ == "__main__": + # Hypothesis + y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0]) + # Truth + y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3]) + + (result_matrix, result_vector, result) = entropy(y, y_hat) + + print("Result matrix: ") + print(result_matrix) + print("Result vector: ") + print(result_vector) + print("Result: ", result) \ No newline at end of file