measures.py 7.18 KB
``````'''
This module is a part of my library.
It aims to compute some measures for clustering.
'''

import numpy as np

def disequilibrium_(matrix1, matrix2, isGlobal=False, mod=None):
'''
Compute disequilibrium for all the clusters.
The disequilibrium is compute from the difference
between two clustering sets.
isGlobal permet à l'utilisateur de choisir le dénominateur de
la fonction :
- True : divise la valeur par le nombre d'élément du cluster
- False : divise la valeur par le nombre d'élément total

withPower permet à l'utilisateur de décider d'appliquer un carré 2 ou
une valeur absolue.
'''

def divide_line(a, divider):
'''
Sub function used for dividing matrix by a vector line by line.
'''
return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0)

dividers1 = 0
dividers2 = 0

if isGlobal:
dividers1 = matrix1.sum()
dividers2 = matrix2.sum()
else:
dividers1 = matrix1.sum(axis=1)
dividers2 = matrix2.sum(axis=1)

matrix1_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix1, dtype=np.float), dividers1)

matrix2_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix2, dtype=np.float), dividers2)

diff = matrix1_divided - matrix2_divided

result = diff

if mod != None or mod == "":
for word in mod.split(" "):
if word == "power":
result = np.power(result,2)
elif word == "human":
result = result * 100
elif word == "abs":
result = np.absolute(result)
else:
raise Exception("Need to specify an accepted mod of the disequilibrium (\"power\", \"human\" or \"abs\"")

'''
Mean of disequilibrium
matrix is the disequilibrium calculated
from number of occurences belonging to a class,
for each cluster.
'''
nb_k = len(matrix)
results = np.zeros((nb_k))

for i in range(nb_k):
return results

def disequilibrium(matrix1, matrix2, isGlobal=False):
'''
Disequilibrium matrix
And Disequilibrium value
'''
mask, result = disequilibrium_(matrix1, matrix2, isGlobal)
result_human = result * 100
result_power = np.power(result, 2)

return (
result_human,
)

def compute_count_matrix(y_truth, y_hat):
'''
Check the size of the lists with assertion
'''
# Check size of the lists
assert len(y_hat) == len(y_truth), f"Matrices should have the same length y_hat: {len(y_hat)}, y_truth: {len(y_truth)}"

# Build count matrix
count_matrix = np.zeros((max(y_hat+1), max(y_truth+1)))
for i in range(len(y_hat)):
count_matrix[y_hat[i]][y_truth[i]] += 1
return count_matrix

def entropy_score(y_truth, y_hat):
'''
Need to use label encoder before givin y_hat and y_truth
Don't use one hot labels

Return a tuple with:
- result_matrix : the matrix with the log multiplied probabilities (P(x) * log(P(x)))
- result_vector : the vector avec summing entropy of each class. Each value corresponds to a cluster.
- result : the final entropy measure of the clustering
'''
def divide_line(a, divider):
'''
Sub function used for dividing matrix by a vector line by line.
'''
return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0)

# Build count matrix
count_matrix = compute_count_matrix(y_truth, y_hat)

# Build dividers vector
dividers = count_matrix.sum(axis=1)

matrix_divided = np.apply_along_axis(divide_line, 0, np.asarray(count_matrix, dtype=np.float), dividers)

log_matrix = np.zeros(matrix_divided.shape)
np.log2(matrix_divided, out=log_matrix, where=count_matrix != 0)
result_matrix = -1 * np.multiply(matrix_divided, log_matrix)
result_vector = result_matrix.sum(axis=1)
result_vector.sum()

if np.isnan(np.sum(result_vector)):
print("COUNT MATRIX")
print(count_matrix)
print("MATRIX DIVIDED")
print(matrix_divided)
print("RESULT MATRIX")
print(result_matrix)
print("VECTOR MATRIX")
print(result_vector)
print("An error occured due to nan value, some values are printed before")
exit(1)

result = result_vector * dividers / dividers.sum()
result = result.sum()
return (result_matrix, result_vector, result)

def purity_score(y_truth, y_hat):
'''
Return three values in a dictionary:
- purity_class_score: the purity score of the class (asp)
- purity_cluster_score: the purity score of the cluster (acp)
- K: the overall evaluation criterion (sqrt(asp * acp))

This function is based on the following article:
Unknown-multiple speaker clustering using HMM, J. Ajmera, H. Bourlard, I. Lapidot, I. McCowan
'''

def divide_line(a, divider):
'''
Sub function used for dividing matrix by a vector line by line.
'''
return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0)

def compute_purity_score(count_matrix, axis=0):
if axis==0:
other_axis = 1
else:
other_axis = 0
count_per_row = count_matrix.sum(axis=axis)
dividers = np.square(count_per_row)

count_matrix_squared = np.square(count_matrix)
matrix_divided = np.apply_along_axis(divide_line, other_axis, np.asarray(count_matrix_squared, dtype=np.float), dividers)
vector_purity = np.sum(matrix_divided, axis=axis)

scalar_purity = np.average(vector_purity, weights=count_per_row)
return (vector_purity, scalar_purity)

count_matrix = compute_count_matrix(y_truth, y_hat)
_, purity_cluster_score = compute_purity_score(count_matrix, 1)
_, purity_class_score = cluster_purity = compute_purity_score(count_matrix, 0)

K = np.sqrt(purity_cluster_score * purity_class_score)

for i in range(count_matrix.shape[0]):
for j in range(count_matrix.shape[1]):
count_matrix[i][j]
count_matrix[i]
return {
"purity_class_score": purity_class_score,
"purity_cluster_score": purity_cluster_score,
"K": K
}

if __name__ == "__main__":
print("Purity test #1")
# Hypothesis
y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0])
# Truth
y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3])

(result_matrix, result_vector, result) = entropy_score(y, y_hat)
print(purity_score(y, y_hat))

exit(1)
print("Purity test #2")
# Hypothesis
y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0, 4, 4, 4])
# Truth
y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 0, 3, 3, 3])

(result_matrix, result_vector, result) = entropy_score(y, y_hat)
exit(1)
print("Result matrix: ")
print(result_matrix)
print("Result vector: ")
print(result_vector)
print("Result: ", result)``````