Created
March 8, 2021 18:18
-
-
Save nikopartanen/626fdc2ecc99bf557eebf7efc580beed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import wave | |
from deepspeech import Model, version | |
import numpy as np | |
import sys | |
import subprocess | |
from pipes import quote | |
import shlex | |
import pympi | |
# This code is essentially taken from DeepSpeech native_client repository | |
# https://github.com/mozilla/DeepSpeech/blob/master/native_client/python/client.py | |
# What is not from there is written by Niko Partanen in collaboration with | |
# the research project Language Documentation meets Language Technology: | |
# The Next Step in the Description and The Language Bank of Finland. | |
# [email protected] | |
# Idea is that we just give the script a file, i.e. 'recording_session.wav', | |
# and it will create an Elan file called 'recording_session.eaf'. | |
# For now we put all ASR output into a tier called 'deepspeech', but | |
# there is some further work that is needed to fix the input, or to | |
# place it into Elan file the best possible way. | |
def metadata_to_string(metadata): | |
return ''.join(token.text for token in metadata.tokens) | |
def words_from_candidate_transcript(metadata): | |
word = "" | |
word_list = [] | |
word_start_time = 0 | |
# Loop through each character | |
for i, token in enumerate(metadata.tokens): | |
# Append character to word if it's not a space | |
if token.text != " ": | |
if len(word) == 0: | |
# Log the start time of the new word | |
word_start_time = token.start_time | |
word = word + token.text | |
# Word boundary is either a space or the last character in the array | |
if token.text == " " or i == len(metadata.tokens) - 1: | |
word_duration = token.start_time - word_start_time | |
if word_duration < 0: | |
word_duration = 0 | |
each_word = dict() | |
each_word["word"] = word | |
each_word["start_time"] = round(word_start_time, 4) | |
each_word["duration"] = round(word_duration, 4) | |
word_list.append(each_word) | |
# Reset | |
word = "" | |
word_start_time = 0 | |
return word_list | |
def metadata_json_output(metadata): | |
json_result = dict() | |
json_result["transcripts"] = [{ | |
"confidence": transcript.confidence, | |
"words": words_from_candidate_transcript(transcript), | |
} for transcript in metadata.transcripts] | |
return json.dumps(json_result, indent=2) | |
def convert_samplerate(audio_path, desired_sample_rate): | |
sox_cmd = 'sox {} --type raw --bits 16 --channels 1 --rate {} --encoding signed-integer --endian little --compression 0.0 --no-dither - '.format(quote(audio_path), desired_sample_rate) | |
try: | |
output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE) | |
except subprocess.CalledProcessError as e: | |
raise RuntimeError('SoX returned non-zero status: {}'.format(e.stderr)) | |
except OSError as e: | |
raise OSError(e.errno, 'SoX not found, use {}hz files or install it: {}'.format(desired_sample_rate, e.strerror)) | |
return desired_sample_rate, np.frombuffer(output, np.int16) | |
# Here we specify the input file, model and scorer | |
audio_file = "test/0627_113435.wav" | |
model = "models/kpv/output_graph.pb" | |
scorer = "models/kpv/balanced_kenlm.scorer" | |
ds = Model(model) | |
ds.enableExternalScorer(scorer) | |
desired_sample_rate = ds.sampleRate() | |
fin = wave.open(audio_file, 'rb') | |
fs_orig = fin.getframerate() | |
if fs_orig != desired_sample_rate: | |
print('Warning: original sample rate ({}) is different than {}hz. Resampling might produce erratic speech recognition.'.format(fs_orig, desired_sample_rate), file=sys.stderr) | |
fs_new, audio = convert_samplerate(audio_file, desired_sample_rate) | |
else: | |
audio = np.frombuffer(fin.readframes(fin.getnframes()), np.int16) | |
# Here we do the speech recognition | |
result = ds.sttWithMetadata(audio, 1) | |
# Here we create the Elan file | |
elan_file = pympi.Elan.Eaf() | |
elan_file.add_linguistic_type(lingtype='deepspeechT', timealignable=True, graphicreferences=False) | |
elan_file.add_linked_file(file_path = audio_file, mimetype = 'audio/x-wav') | |
elan_file.add_tier(tier_id=f'deepspeech', ling='deepspeechT') | |
for word in words_from_candidate_transcript(result.transcripts[0]): | |
elan_file.add_annotation(id_tier = f"deepspeech", start = int(word['start_time'] * 1000), end = int(word['start_time'] * 1000) + int(word['duration'] * 1000), value=word['word']) | |
elan_file.remove_tier(id_tier='default') | |
elan_file.to_file(file_path = audio_file.replace(".wav", ".eaf")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment