6.63 KB
import argparse
from os import path, mkdir
from utils import SubCommandRunner
from import read_features, read_lst, read_labels
import numpy as np
from sklearn.cluster import KMeans
import pickle
from clustering_modules.kmeans import kmeans

from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import v_measure_score, homogeneity_score, completeness_score

import core.measures
import json

    "k-means": kmeans()

    "entropy": core.measures.entropy_score,
    "purity": core.measures.purity_score,
    "v-measure": v_measure_score,
    "homogeneity": homogeneity_score,
    "completeness": completeness_score,

def disequilibrium_run():

def measure_run(measure: str, features: str, lst: str, truelabels: str, model: str, modeltype: str):

    @param measure:
    @param features:
    @param lst:
    @param truelabels:
    @param model:
    @param modeltype:
    module = CLUSTERING_METHODS[modeltype]

    eval = {}
    for ms in measure:
        evaluation = EVALUATION_METHODS[ms]
        feats_dict = read_features(features)
        labels_dict = read_labels(truelabels)
        lst_dict = read_lst(lst)
        lst_keys = [key for key in lst_dict]
        feats = np.asarray([feats_dict[key] for key in lst_keys])
        Y_pred = module.predict(feats)
        Y_truth = [labels_dict[key][0] for key in lst_keys]

        le = LabelEncoder()
        Y_truth = le.transform(Y_truth)

        eval[ms] = evaluation(Y_truth, Y_pred)


def kmeans_run(features: str, lst: str, k:int, kmax: int, klist, output: str):

    @param features: output features
    @param lst: list file
    @param k: k (kmin if kmax specified)
    @param kmax: maximum k to compute
    @param klist: list of k values to compute, ignore k value
    @param output: output file if kmax not specified, else, output directory
    # -- READ FILES --
    features_dict = read_features(features)
    lst_dict = read_lst(lst)
    X = np.asarray([features_dict[x] for x in lst_dict])

    # Exception cases
    if kmax is None and klist is None and path.isdir(output):
        raise Exception("The \"output\" is an existing directory while the system is waiting the path of a file.")

    if (kmax is not None or klist is not None) and path.isfile(output):
        raise Exception("The \"output\" is an existing file while the system is waiting the path of a directory.")

    # Mono value case
    if kmax is None and klist is None:
        print(f"Computing clustering with k={k}")
        kmeans = KMeans(n_clusters=k, n_init=10, random_state=0).fit(X)
        preds = kmeans.predict(X)
        pickle.dump(kmeans, open(output, "wb"))

    # Multi values case with kmax
    if kmax is not None:
        if not path.isdir(output):
        Ks = range(k, kmax + 1)
        for i in Ks:
            print(f"Computing clustering with k={i}")
            kmeans = KMeans(n_clusters=i, n_init=10, random_state=0).fit(X)
            preds = kmeans.predict(X)
            pickle.dump(kmeans, open(path.join(output, "clustering_" + str(i) + ".pkl"), "wb"))

    # Second multi values case with klist
    if klist is not None:
        if not path.isdir(output):
        for k in klist:
            k = int(k)
            print(f"Computing clustering with k={k}")
            kmeans = KMeans(n_clusters=k, n_init=10, random_state=0).fit(X)
            preds = kmeans.predict(X)
            pickle.dump(kmeans, open(path.join(output, "clustering_" + str(k) + ".pkl"), "wb"))

if __name__ == "__main__":
    # Main parser
    parser = argparse.ArgumentParser(description="Clustering methods to apply")
    subparsers = parser.add_subparsers(title="action")

    # kmeans
    parser_kmeans = subparsers.add_parser(
        "kmeans", help="Compute clustering using k-means algorithm")

    parser_kmeans.add_argument("--features", required=True, type=str, help="Features file (works with list)")
    parser_kmeans.add_argument("--lst", required=True, type=str, help="List file (.lst)")
    parser_kmeans.add_argument("-k", default=2, type=int,
                               help="number of clusters to compute. It is kmin if kmax is specified.")
    parser_kmeans.add_argument("--kmax", default=None, type=int, help="if specified, k is kmin.")
    parser_kmeans.add_argument("--klist", nargs="+",
                               help="List of k values to test. As kmax, activate the multi values mod.")
    parser_kmeans.add_argument("--output", default=".kmeans", help="output file if only k. Output directory if multiple kmax specified.")

    # measure
    parser_measure = subparsers.add_parser(
        "measure", help="compute the entropy")

                                choices=[key for key in EVALUATION_METHODS],
    parser_measure.add_argument("--features", required=True, type=str, help="...")
    parser_measure.add_argument("--lst", required=True, type=str, help="...")
    parser_measure.add_argument("--truelabels", required=True, type=str, help="...")
    parser_measure.add_argument("--model", required=True, type=str, help="...")
                                choices=[key for key in CLUSTERING_METHODS],
                                help="type of model for learning")

    # disequilibrium
    parser_disequilibrium = subparsers.add_parser(
        "disequilibrium", help="...")

    parser_disequilibrium.add_argument("--features", required=True, type=str, help="...")
    parser_disequilibrium.add_argument("--lstrain", required=True, type=str, help="...")
    parser_disequilibrium.add_argument("--lstest", required=True, type=str, help="...")
    parser_disequilibrium.add_argument("--model", required=True, type=str, help="...")
                                choices=["kmeans", "2", "3"],

    # Parse
    args = parser.parse_args()

    # Run commands
    runner = SubCommandRunner({
        "kmeans": kmeans_run,
        "measure": measure_run,
        "disequilibrium": disequilibrium_run
    }), args.__dict__, remove="which")