masseffect.py 4.34 KB
import argparse
from os import path
import core.data
from utils import SubCommandRunner
import os


def utt2char(features: str, outfile: str):
    """Allow the user to generate utt2char file from masseffect features file.

    TODO: Don't forget to manage two cases: one with old ids, and an other with
    new ones.

    Args:
        features (str): [description]
        outfile (str): [description]
    """
    data = core.data.read_features(features)
    keys = list(data.keys())

    with open(outfile, "w") as f:
        for key in keys:
            splited = key.replace("\n", "").split(",")
            character = splited[1]
            f.write(",".join(splited) + " " + character + "\n")


def char2utt(features: str, outfile: str):
    raise Exception("Not implemented yet")
    pass


def wavscp(datadir: str, outfile: str):
    """Generate the masseffect wav scp file from the directories.

    Args:
        datadir (str): oath of the data directory where "audio_en-us" and "audio_fr-fr" are available
        outfile (str): path of the wav scp output file

    Raises:
        Exception: if one of the directory is not available
    """
    en_us_dir = os.path.join(datadir, "audio_en-us")
    fr_fr_dir = os.path.join(datadir, "audio_fr-fr")

    if (not os.path.isdir(en_us_dir)) or (not os.path.isdir(fr_fr_dir)):
        raise Exception("Directory audio_en-us or audio_fr-fr does not exist")

    _,_,filenames_en=next(os.walk(en_us_dir))
    # filenames_en = [ os.path.join(en_us_dir, f) for f in filenames_en ]
    dir_en = [ en_us_dir for f in filenames_en ]
    _,_,filenames_fr=next(os.walk(fr_fr_dir))
    dir_fr = [ fr_fr_dir for f in filenames_fr ]
    # filenames_fr = [ os.path.join(fr_fr_dir, f) for f in filenames_fr ]

    directories = dir_en + dir_fr
    filenames = filenames_en + filenames_fr

    with open(outfile, "w") as f:
        for i, fn in enumerate(filenames):
            splited = fn.split(".")[0].split(",")
            lang = splited[0]
            character = splited[1]
            record_id = splited[3]
            path = os.path.join(directories[i], fn)
            f.write(f"{lang},{character},{record_id} {path}\n")


def changelabels(source: str, labels: str, outfile: str):
    data_dict = core.data.read_id_values(source)
    labels_dict = core.data.read_labels(labels)
    keys = list(data_dict.keys())

    with open(outfile, "w") as f:
        for key in keys:
            splited = key.split(",")
            splited[1] = labels_dict[key][0]
            core.data.write_line(",".join(splited), data_dict[key], out=f)


if __name__ == '__main__':
    # Main parser
    parser = argparse.ArgumentParser(description="...")
    subparsers = parser.add_subparsers(title="action")

    # utt2char
    parser_utt2char = subparsers.add_parser("utt2char", help="generate utt2char file")
    parser_utt2char.add_argument("--features", type=str, help="features file")
    parser_utt2char.add_argument("--outfile", type=str, help="output file")
    parser_utt2char.set_defaults(which="utt2char")

    # char2utt
    parser_char2utt = subparsers.add_parser("char2utt", help="generate char2utt file")
    parser_char2utt.add_argument("--features", type=str, help="features file")
    parser_char2utt.add_argument("--outfile", type=str, help="output file")
    parser_char2utt.set_defaults(which="char2utt")

    # wavscp
    parser_wavscp = subparsers.add_parser("wavscp", help="generate wav scp file")
    parser_wavscp.add_argument("--datadir", required=True, help="data directory of masseffect")
    parser_wavscp.add_argument("--outfile", default="wav.scp", help="wav.scp output file")
    parser_wavscp.set_defaults(which="wavscp")

    # Change labels
    parser_changelabels = subparsers.add_parser("changelabels", help="...")
    parser_changelabels.add_argument("--source", required=True, type=str, help="source file where we want to change ids.")
    parser_changelabels.add_argument("--labels", required=True, type=str, help="file with labels")
    parser_changelabels.add_argument("--outfile", required=True, type=str, help="Output file")
    parser_changelabels.set_defaults(which="changelabels")

    # Parse
    args = parser.parse_args()

    # Run commands
    runner = SubCommandRunner({
        "utt2char" : utt2char,
        "char2utt": char2utt,
        "wavscp": wavscp,
        "changelabels": changelabels
    })

    runner.run(args.which, args.__dict__, remove="which")