measures.py 5.07 KB
'''
This module is a part of my library. 
It aims to compute some measures for clustering.
'''

import numpy as np

def disequilibrium_(matrix1, matrix2, isGlobal=False, mod=None):
    '''
    Compute disequilibrium for all the clusters.
    The disequilibrium is compute from the difference
    between two clustering sets.
    isGlobal permet à l'utilisateur de choisir le dénominateur de
    la fonction : 
        - True : divise la valeur par le nombre d'élément du cluster
        - False : divise la valeur par le nombre d'élément total

    withPower permet à l'utilisateur de décider d'appliquer un carré 2 ou
    une valeur absolue.
    '''

    def divide_line(a, divider):
        '''
        Sub function used for dividing matrix by a vector line by line.
        '''
        return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0)

    dividers1 = 0
    dividers2 = 0

    if isGlobal:
        dividers1 = matrix1.sum()
        dividers2 = matrix2.sum()
    else:
        dividers1 = matrix1.sum(axis=1)
        dividers2 = matrix2.sum(axis=1)
    
    matrix1_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix1, dtype=np.float), dividers1)
    
    matrix2_divided = np.apply_along_axis(divide_line, 0, np.asarray(matrix2, dtype=np.float), dividers2)
    
    diff = matrix1_divided - matrix2_divided
    
    mask = np.logical_not(np.logical_and(matrix2==0, matrix1==0))

    result = diff

    if mod != None or mod == "":
        for word in mod.split(" "):
            if word == "power":
                result = np.power(result,2)
            elif word == "human":
                result = result * 100
            elif word == "abs":
                result = np.absolute(result)    
            else:
                raise Exception("Need to specify an accepted mod of the disequilibrium (\"power\", \"human\" or \"abs\"")
    return (mask, result)



def disequilibrium_mean_by_cluster(mask, matrix):
    '''
    Mean of disequilibrium
    matrix is the disequilibrium calculated
    from number of occurences belonging to a class,
    for each cluster. 
    '''
    nb_k = len(matrix)
    results = np.zeros((nb_k))
    
    for i in range(nb_k):
        results[i] = matrix[i].sum() / mask[i].sum()
    return results


def disequilibrium(matrix1, matrix2, isGlobal=False):
    '''
    Disequilibrium matrix
    And Disequilibrium value
    '''
    mask, result = disequilibrium_(matrix1, matrix2, isGlobal)
    result_human = result * 100
    result_power = np.power(result, 2)

    return (
        mask,
        result_human,
        disequilibrium_mean_by_cluster(mask, result_power).sum()/matrix1.shape[0]
    )


def compute_count_matrix(y_hat, y_truth):
    '''
        Check the size of the lists with assertion
    '''
    # Check size of the lists
    assert len(y_hat) == len(y_truth), f"Matrices should have the same length y_hat: {len(y_hat)}, y_truth: {len(y_truth)}"

    # Build count matrix
    count_matrix = np.zeros((max(y_hat+1), max(y_truth+1)))
    for i in range(len(y_hat)):
        count_matrix[y_hat[i]][y_truth[i]] += 1
    return count_matrix


def entropy_score(y_truth, y_hat):
    '''
    Need to use label encoder before givin y_hat and y_truth
    Don't use one hot labels

    Return a tuple with:
        - result_matrix : the matrix with the log multiplied probabilities (P(x) * log(P(x)))
        - result_vector : the vector avec summing entropy of each class. Each value corresponds to a cluster.
        - result : the final entropy measure of the clustering
    '''
    def divide_line(a, divider):
        '''
        Sub function used for dividing matrix by a vector line by line.
        '''
        return np.divide(a, divider, out=np.zeros_like(a), where=divider!=0)

    # Build count matrix
    count_matrix = compute_count_matrix(y_hat, y_truth)

    # Build dividers vector
    dividers = count_matrix.sum(axis=1)
    
    matrix_divided = np.apply_along_axis(divide_line, 0, np.asarray(count_matrix, dtype=np.float), dividers)

    log_matrix = np.zeros(matrix_divided.shape)
    np.log2(matrix_divided, out=log_matrix, where=count_matrix != 0)
    result_matrix = -1 * np.multiply(matrix_divided, log_matrix)
    result_vector = result_matrix.sum(axis=1)
    result_vector.sum()
    
    if np.isnan(np.sum(result_vector)):
        print("COUNT MATRIX")
        print(count_matrix)
        print("MATRIX DIVIDED")
        print(matrix_divided)
        print("RESULT MATRIX")
        print(result_matrix)
        print("VECTOR MATRIX")
        print(result_vector)
        print("An error occured due to nan value, some values are printed before")
        exit(1)
    
    result = result_vector * dividers / dividers.sum()
    result = result.sum()
    return (result_matrix, result_vector, result)



if __name__ == "__main__":
    # Hypothesis
    y_hat = np.asarray([0, 1, 2, 0, 1, 0, 3, 2, 2, 3, 3, 0])
    # Truth
    y = np.asarray([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3])

    (result_matrix, result_vector, result) = entropy(y, y_hat)

    print("Result matrix: ")
    print(result_matrix)
    print("Result vector: ")
    print(result_vector)
    print("Result: ", result)