Blame view

egs/sprakbanken_swe/s5/local/sprak2kaldi.py 5.95 KB
8dcb6dfcb   Yannick Estève   first commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
  #!/usr/bin/env python
  '''
  # Copyright 2013-2014 Mirsk Digital Aps  (Author: Andreas Kirkedal)
  
  # Licensed under the Apache License, Version 2.0 (the "License");
  # you may not use this file except in compliance with the License.
  # You may obtain a copy of the License at
  #
  #  http://www.apache.org/licenses/LICENSE-2.0
  
  # THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  # KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
  # WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
  # MERCHANTABLITY OR NON-INFRINGEMENT.
  # See the Apache 2 License for the specific language governing permissions and
  # limitations under the License.
  
  '''
  from __future__ import print_function
  
  
  import sys
  import codecs
  import os
  import shutil
  from sprakparser import Session
  
  n = 0
  
  
  ### Utility functions
  def find_ext_folders(topfolder, extfolderlist, file_ext):
      '''Recursive function that finds all the folders containing $file_ext files
      and returns a list of folders.'''
  
      for path in os.listdir(topfolder):
          curpath = os.path.join(topfolder, path)
          if os.path.isdir(curpath):
              find_ext_folders(curpath, extfolderlist, file_ext)
          elif os.path.isfile(curpath):
              if os.path.splitext(path)[1] == file_ext:
                  extfolderlist.append(topfolder)
                  return
          else:
                  pass
  
  
  def create_parallel_file_list(session, sndlist, txtlist):
      '''This function creates two lists that are aligned line by line and a text file. 
      The two lists are aligned line by line. One list contains the locations of a sound
      file, the other list contains the location of a text file that contains the
      transcription for that sound file. The text file is output by this function, but
      to save disk space, the sound file remains where it is.'''
      
      shadow = False
      if os.path.exists(session.sessiondir):  # The dir exists, i.e. the sessiondir name is not unique
  
          # Append counter to create new directory. Use global counter to prevent resetting every time
          # the function is called.
          if len(os.listdir(session.sessiondir)) != 0:  # Check if there are files in the directory
              global n
              n += 1
              session.sessiondir = "{}_{}".format(session.sessiondir, n)
              session.speaker_id = "{}_{}".format(session.speaker_id, n)
              os.mkdir(session.sessiondir)
              shadow = True
      else:
          os.mkdir(session.sessiondir)
  
      for recnum, recording in enumerate(session.record_states):
          #print(session.record_states)
          if recnum == 0:     # skip the first recording of silence
              continue
          oldsound = os.path.join(session.wavdir, recording[1])
  
          # Some wavdirs are empty, check for files
          if not os.path.exists(oldsound): 
              continue
  
          # create file and write the transcription
          txtout = session.create_filename(recnum+1, "txt")
          txtline = os.path.join(session.sessiondir, txtout)
          fout = codecs.open(txtline, "w", "utf8")
          fout.write(recording[0] + "
  ")   
          fout.close()
  
          # write locations to lists
          txtline = txtline.replace('791213 8232', '791213_8232')
          oldsound = oldsound.replace('791213 8232', '791213_8232')
          txtlist.write(txtline + "
  ")  # write lists of txt files
          sndlist.write(oldsound + "
  ")   # write lists of recordings
          
      if recnum == 10:
          sys.exit('Are there files?')
  
      if len(os.listdir(session.sessiondir)) == 0:  # Remove dir if it is empty
          os.rmdir(session.sessiondir)
          if shadow:
              n -= 1
              shadow = False
  
  
  def make_speech_corpus(top, dest, txtdest, snddest, srcfolder):
      '''This function tests whether the information in an spl file is sufficient to
      extract the recording and text. It also creates a directory name based on the
      speaker id and the sessions id for the processed files.'''
      
      spls = os.listdir(srcfolder)
      for splfile in sorted(spls):
          if os.path.splitext(splfile)[1] != ".spl":
              continue
  
          # Parse the spl file and check whether key information has been found.
          # This is necessary because not all files are complete, some contain errors
          # from maual editing and some spl files point to recordings that do not
          # exit in the corpus
          session = Session(os.path.abspath(srcfolder), splfile)
          if session.speaker_id == "":  # ignore if there is no speaker
              continue
          if not session.wavdir:  # ignore if there is no matching directory
              continue
          if len(session.record_states) < 2:  # unsure whether this has an effect
              continue
          session.sessiondir = os.path.join(dest, session.filestem) + "." + session.speaker_id
  
          # 
          create_parallel_file_list(session, snddest, txtdest)
  
  if __name__ == '__main__':
  
      try:
          topfolder = sys.argv[1]
          dest = sys.argv[2]
      except:
          print('Usage: python3 sprak2kaldi.py <corpus project dir> <processed corpus project subdir>')
          print('E.g. python3 sprak2kaldi.py /path/to/data/local/data/0565-1  /path/to/data/local/data/corpus_processed/0565-1' )
          sys.exit('exit 1')
  
      if os.path.exists(dest):
          try:
              shutil.rmtree(dest)
              os.mkdir(dest)
          except:
              print('Failed to remove ' + dest)
              sys.exit('Must remove ' + dest + ' to proceed corpus preparation.')
          
  
      ## Find the subdirectories containing '.spl' files. These files contain information that
      #  pairs a recording with speaker information, id and script
      spldirs = []
      find_ext_folders(topfolder, spldirs, ".spl")
  
      sndlist = codecs.open(os.path.join(dest, "sndlist"), "w", "utf8")
      txtlist = codecs.open(os.path.join(dest, "txtlist"), "w", "utf8")
  
  
  
      for num, folder in enumerate(spldirs):
          make_speech_corpus(topfolder, dest, txtlist, sndlist, folder)
  
      sndlist.close()
      txtlist.close()