mlp.py 3.5 KB
# -*- coding: utf-8 -*-
import keras
import numpy
from keras.layers.core import Dense, Dropout, Activation , AutoEncoder
from keras.optimizers import SGD,Adam
from keras.models import Sequential
import pandas 
from collections import namedtuple
from sklearn.metrics import precision_recall_fscore_support as perf
save_tuple= namedtuple("save_tuple",["pred_train","pred_dev","pred_test"])

def train_mlp(x_train,y_train,x_dev,y_dev,x_test,y_test,hidden_size,input_activation="relu",hidden_activation="relu",output_activation="softmax",loss="mse",init="lecun_uniform",dropouts=None,sgd=None,epochs=1200,batch_size=16,fit_verbose=1,test_verbose=0,save_pred=False,keep_histo=False):
    model = Sequential()
    if len(hidden_size) != 0 :
        if not dropouts:
            dropouts=[0.25]*len(hidden_size)
        previous_size=hidden_size.pop(0)
        model.add(Dense(previous_size,input_dim=x_train.shape[1], init=init))
        model.add(Activation(input_activation))
        if dropouts:
            drop_prob=dropouts.pop(0)
            if drop_prob > 0:
                model.add(Dropout(drop_prob))
        for hidden in hidden_size :
            model.add(Dense(hidden,input_dim=previous_size,init=init))
            model.add(Activation(hidden_activation))
            if dropouts:
                drop_prob=dropouts.pop(0)
                if drop_prob > 0:
                    model.add(Dropout(drop_prob))
            previous_size=hidden
        model.add(Dense( y_train.shape[1],input_dim=previous_size, init=init))
    else :
        model.add(Dense( y_train.shape[1],input_dim=x_train.shape[1], init=init))
        if dropouts:
            model.add(Dropout(dropouts.pop(0)))
    model.add(Activation(output_activation))
    if not sgd:
        sgd = SGD(lr=0.01, decay=0, momentum=0.9)
    model.compile(loss=loss, optimizer=sgd)

    scores_dev=[]
    scores_test=[]
    save=None
    for i in range(epochs):
        hist=model.fit(x_train, y_train, nb_epoch=1, batch_size=batch_size,verbose=fit_verbose)
        pred_train=model.predict(x_train)
        pred_dev=model.predict(x_dev)
        pred_test=model.predict(x_test)
        scores_dev.append(perf(numpy.argmax(y_dev,axis=1)+1,numpy.argmax(pred_dev,axis=1)+1,average='micro'))
        scores_test.append(perf(numpy.argmax(y_test,axis=1)+1,numpy.argmax(pred_test,axis=1)+1,average='micro'))
        if save is None or (len(scores_dev)>2 and scores_dev[-1][0] > scores_dev[-2][0]):
            print "Save {}".format(i)
            save=save_tuple(pred_train,pred_dev,pred_test)
    res=[scores_dev,scores_test]
    if save_pred:
        res.append(save)
    if keep_histo:
        res.append(hist)
    return res

def autoencoder(train,dev,hidden_size,input_activation="tanh",out_activation="relu",loss="mse"):
    sgd = SGD(lr=0.01,momentum=0.9)
    autoencode=Sequential()
    autoencode.add(Dense(hidden_size,input_dim=train.shape[1],init='uniform',activation=input_activation))
    autoencode.add(Dense(train.shape[1],input_dim=hidden_size,init="uniform",activation=out_activation))
    autoencode.compile(optimizer=sgd,loss=loss)
    print "compiled"
    autoencode.fit(train,train,nb_epoch=8,batch_size=1,callbacks=[keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, verbose=0)],validation_data=(dev,dev),verbose=1)
    print "fited"
    auto_decoder=Sequential()
    auto_decoder.add(Dense(hidden_size,input_dim=train.shape[1],init='uniform',activation=input_activation,weights=autoencode.get_weights()[:2]))
    auto_decoder.compile(optimizer=sgd,loss=loss)
    return auto_decoder