-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
vosk_helper.py
153 lines (123 loc) · 5.64 KB
/
vosk_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import tarfile
import tempfile
import zipfile
import numpy as np
import requests
import soundfile as sf
import vosk
import configuration
import ffmpeg
from configuration import *
ffmpeg_base_command_list = ["ffmpeg", "-hide_banner", "-loglevel", "error"]
vosk.SetLogLevel(-1)
vosk_model_path = ''
vosk_model = None
# Function to download and cache the Vosk model
def download_and_cache_vosk_model(model_dir="vosk_model_cache"):
# Ensure the cache directory exists
if not os.path.exists(model_dir):
os.makedirs(model_dir)
# Extract the model name from the URL
model_filename = get_config().vad.vosk_url.split("/")[-1]
model_path = os.path.join(model_dir, model_filename)
# If the model is already downloaded, skip the download
if not os.path.exists(model_path):
logger.info(
f"Downloading the Vosk model from {get_config().vad.vosk_url}... This will take a while if using large model, ~1G")
response = requests.get(get_config().vad.vosk_url, stream=True)
with open(model_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
logger.info("Download complete.")
# Extract the model if it's a zip or tar file
model_extract_path = os.path.join(model_dir, "vosk_model")
if not os.path.exists(model_extract_path):
logger.info("Extracting the Vosk model...")
if model_filename.endswith(".zip"):
with zipfile.ZipFile(model_path, "r") as zip_ref:
zip_ref.extractall(model_extract_path)
elif model_filename.endswith(".tar.gz"):
with tarfile.open(model_path, "r:gz") as tar_ref:
tar_ref.extractall(model_extract_path)
else:
logger.info("Unknown archive format. Model extraction skipped.")
logger.info(f"Model extracted to {model_extract_path}.")
else:
logger.info(f"Model already extracted at {model_extract_path}.")
# Return the path to the actual model folder inside the extraction directory
extracted_folders = os.listdir(model_extract_path)
if extracted_folders:
actual_model_folder = os.path.join(model_extract_path,
extracted_folders[0]) # Assuming the first folder is the model
return actual_model_folder
else:
return model_extract_path # In case there's no subfolder, return the extraction path directly
# Use Vosk to detect voice activity with timestamps in the audio
def detect_voice_with_vosk(input_audio):
global vosk_model_path, vosk_model
# Convert the audio to 16kHz mono WAV
temp_wav = tempfile.NamedTemporaryFile(dir=configuration.temp_directory, suffix='.wav').name
ffmpeg.convert_audio_to_wav(input_audio, temp_wav)
if not vosk_model_path or not vosk_model:
vosk_model_path = download_and_cache_vosk_model()
vosk_model = vosk.Model(vosk_model_path)
# Open the audio file
with sf.SoundFile(temp_wav) as audio_file:
recognizer = vosk.KaldiRecognizer(vosk_model, audio_file.samplerate)
voice_activity = []
total_duration = len(audio_file) / audio_file.samplerate # Get total duration in seconds
recognizer.SetWords(True)
# recognizer.SetPartialWords(True)
# Process audio in chunks
while True:
data = audio_file.buffer_read(4000, dtype='int16')
if len(data) == 0:
break
# Convert buffer to bytes using NumPy
data_bytes = np.frombuffer(data, dtype='int16').tobytes()
if recognizer.AcceptWaveform(data_bytes):
pass
final_result = json.loads(recognizer.FinalResult())
if 'result' in final_result:
should_use = False
unique_words = set()
for word in final_result['result']:
if word['conf'] >= .90:
logger.debug(word)
should_use = True
unique_words.add(word['word'])
if len(unique_words) == 1 or all(item in ['えー', 'ん'] for item in unique_words):
should_use = False
if not should_use:
return None, 0
for word in final_result['result']:
voice_activity.append({
'text': word['word'],
'start': word['start'],
'end': word['end']
})
# Return the detected voice activity and the total duration
return voice_activity, total_duration
# Example usage of Vosk with trimming
def process_audio_with_vosk(input_audio, output_audio):
voice_activity, total_duration = detect_voice_with_vosk(input_audio)
if not voice_activity:
logger.info("No voice activity detected in the audio.")
return False
# Trim based on the first and last speech detected
start_time = voice_activity[0]['start'] if voice_activity else 0
end_time = voice_activity[-1]['end'] if voice_activity else total_duration
if get_config().vad.trim_beginning:
logger.info(f"Trimmed Beginning of Audio to {start_time}")
# Print detected speech details with timestamps
logger.info(f"Trimmed End of Audio to {end_time} seconds:")
# Trim the audio using FFmpeg
ffmpeg.trim_audio(input_audio, start_time, end_time + get_config().audio.end_offset, output_audio)
logger.info(f"Trimmed audio saved to: {output_audio}")
return True
def get_vosk_model():
global vosk_model_path, vosk_model
vosk_model_path = download_and_cache_vosk_model()
vosk_model = vosk.Model(vosk_model_path)
logger.info(f"Using Vosk model from {vosk_model_path}")