kmeans_multidistance.py 2.92 KB
import pickle
from abstract_clustering import AbstractClustering
from KMeans_Multidistance.KMeans_Class import KMeans
from random import seed
from random import randint
import numpy as np
from sklearn.metrics import pairwise_distances

class kmeansMultidistance():
    def __init__(self, distance="cosine"):
        self.kmeans_model = None # Best model
        self.centroids = None
        self.distance = distance
        self.seed = None # Seed of the best
        self.seeds = None

    def predict(self, features):
        """

        @param features:
        @return:
        """
        return self.kmeans_model.assign_clusters(data=features, centroids=self.centroids, distance=self.kmeans_model.distance)

    def load(self, model_path: str):
        """

        @param model_path:
        @return:
        """
        with open(model_path, "rb") as f:
            data = pickle.load(f)
            self.kmeans_model = data["kmeans_model"]
            self.centroids = data["centroids"]
            self.distance = self.kmeans_model.distance

    def save(self, model_path: str):
        """

        @param model_path:
        @return:
        """
        with open(model_path, "wb") as f:
            pickle.dump({
                "kmeans_model": self.kmeans_model,
                "centroids": self.centroids
            }, f)

    def fit(self, features, k: int, tol: float, ninit: int, maxiter: int=300, debug: bool=False):
        """

        @param features:
        @param k:
        @return:
        """
        # Initialization
        self.kmeans_model = None
        self.centroids = None
        self.seed = None

        # Compute seeds before using seeds
        seed()
        self.seeds = [randint(1, 100000) for i in range(ninit)]

        # Learning k-means model
        results = []
        for i in range(ninit):
            model = KMeans(k=k,
                           maxiter=maxiter,
                           distance=self.distance,
                           record_heterogeneity=[],
                           verbose=debug,
                           seed=self.seeds[i])
            centroids, closest_cluster = model.fit(features)

            # Compute distance
            kwds = {}
            if self.distance == "mahalanobis":
                VI = np.linalg.pinv(np.cov(features.T)).T
                kwds = {"VI": VI}
            distances = pairwise_distances(features, centroids, metric=self.distance, **kwds)

            # Then compute the loss
            loss = np.sum(distances[np.arange(len(distances)), closest_cluster])

            results.append({
                "model": model,
                "centroids": centroids,
                "seed": self.seeds[i],
                "loss": loss
            })
        losses = [result["loss"] for result in results]
        best = results[losses.index(min(losses))]
        self.kmeans_model = best["model"]
        self.centroids = best["centroids"]
        self.seed = best["seed"]