Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize S3 read writes on transcription flow #14

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 88 additions & 86 deletions ml_stack/transcription/src/transcribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from __future__ import print_function

import glob
import gzip
import json
import os
import random
import string
import uuid
from urllib.parse import urlparse

import boto3
Expand Down Expand Up @@ -38,91 +38,76 @@ def get_model(cls):
return cls.model

@classmethod
def transcribe(cls, chunks_path, task, s3_output_uri):
model = cls.get_model()
temp_loc = generate_random_string(length=20)
if not os.path.exists(nfs_path + temp_loc):
os.makedirs(nfs_path + temp_loc)

if(task=="translate_transcribe"):
print("Translation+Transcribe task")
voice_files = list_files_in_directory(chunks_path)

for voice_file in voice_files:
result = " "
print(voice_file)
segments, info = model.transcribe(voice_file, beam_size=5, task="translate")

for segment in segments:
result += segment.text

out_file = nfs_path + temp_loc + '/'+ extract_filename_without_extension(voice_file)+'.translated.txt'
with open(out_file, 'w') as file:
file.write(result)

upload_file_to_s3(out_file, s3_output_uri)
segments, info = model.transcribe(voice_file, beam_size=5)

for segment in segments:
result +=segment.text

out_file = nfs_path + temp_loc + '/'+ extract_filename_without_extension(voice_file)+'.original.txt'
with open(out_file, 'w') as file:
file.write(result)

upload_file_to_s3(out_file, s3_output_uri)

elif(task=="translate"):
print("Translation task only")
voice_files = list_files_in_directory(chunks_path)

for voice_file in voice_files:
result = " "
print(voice_file)
segments, info = model.transcribe(voice_file, beam_size=5, task="translate")

for segment in segments:
result += segment.text

out_file = nfs_path + temp_loc + '/'+ extract_filename_without_extension(voice_file)+'.translated.txt'
with open(out_file, 'w') as file:
file.write(result)

upload_file_to_s3(out_file, s3_output_uri)
def get_file_info(cls, model, voice_file_name: str, task: str) -> dict:
"""Perform required task for given file"""
segments, info = model.transcribe(voice_file_name, beam_size=5, task=task)
text = ' '
for segment in segments:
text += segment.text
return {
'text': text,
'language': info.language,
'language_probability': info.language_probability,
}

else:
print("Transcribe task only")
voice_files = list_files_in_directory(chunks_path)
@classmethod
def process_chunks(cls, model, audio_chunk_path: str, task: str, chunk_prefix: str) -> dict:
"""Process all audio files in given path & provide information as a dict.

Sample:
audio_chunk_path structure
/tmp/advdsd/
0.wav
1.wav
2.wav
task - `translate`
chunk_prefix - `translated_`

Output:
{
'translated_0': {'text': 'Hi', 'language': 'en', 'language_probability': 0.9},
'translated_1': {'text': 'How are you?', 'language': 'en', 'language_probability': 0.89},
'translated_2': {'text': 'I am good', 'language': 'en', 'language_probability': 0.85}
}

"""
voice_files = list_files_in_directory(audio_chunk_path)
chunk_wise_data_map = {}
for file in voice_files:
chunk_name = file.split('/')[-1].replace('.wav', '')
chunk_wise_data_map[f'{chunk_prefix}{chunk_name}'] = cls.get_file_info(model, file, task)
return chunk_wise_data_map

for voice_file in voice_files:
result = " "
print(voice_file)
segments, info = model.transcribe(voice_file, beam_size=5)
for segment in segments:
result += segment.text
@classmethod
def transcribe(cls, chunks_path, task, s3_output_uri) -> str:
"""Process all audio files in given path & upload the output as json gzip file in provided S3 location"""
model = cls.get_model()
transcription_info, translation_info = {}, {}

out_file = nfs_path + temp_loc + '/'+ extract_filename_without_extension(voice_file)+'.original.txt'
with open(out_file, 'w') as file:
file.write(result)
if task == 'translate_transcribe':
transcription_info = cls.process_chunks(model, chunks_path, 'transcribe', 'original_')
translation_info = cls.process_chunks(model, chunks_path, 'translate', 'translated_')
elif task == 'translate':
translation_info = cls.process_chunks(model, chunks_path, 'translate', 'translated_')
else:
# By default perform transcribe task
transcription_info = cls.process_chunks(model, chunks_path, 'transcribe', 'original_')

upload_file_to_s3(out_file, s3_output_uri)
# Prepare final data to be dumped to s3 as a gzip file
final_data = {'transcription_info': transcription_info, 'translation_info': translation_info}
dump_file = f'{nfs_path}op_{generate_random_string()}.json.gz'
with gzip.open(dump_file, 'wb') as f:
f.write(json.dumps(final_data, ensure_ascii=False).encode('utf-8'))

return "OK"
upload_file_to_s3(dump_file, s3_output_uri)
return 'OK'


app = flask.Flask(__name__)


def generate_random_string(length=20):
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for _ in range(length))
return random_string


def extract_filename_without_extension(file_path):
base_name = os.path.basename(file_path)
filename_without_extension, _ = os.path.splitext(base_name)
return filename_without_extension
def generate_random_string() -> str:
return uuid.uuid4().hex


def upload_file_to_s3(file_path, s3_uri):
Expand All @@ -131,9 +116,6 @@ def upload_file_to_s3(file_path, s3_uri):
bucket_name = parsed_uri.netloc
key = parsed_uri.path.lstrip('/')

# Use the local file's name as the key (filename) in S3
key = os.path.join(key, os.path.basename(file_path))

# Initialize a Boto3 S3 client with AWS credentials
s3 = boto3.client('s3')

Expand Down Expand Up @@ -162,13 +144,28 @@ def download_s3_bucket_from_uri(s3_uri, local_folder):
os.makedirs(local_folder)

# Download each object from the S3 bucket to the local folder
object_count = 0
for obj in objects.get('Contents', []):
# Construct the local file path by removing the key prefix
local_file_path = os.path.join(local_folder, obj['Key'].replace(key_prefix, '', 1))

# Download the file from S3
s3.download_file(bucket_name, obj['Key'], local_file_path)
print(f"Downloaded: {obj['Key']} to {local_file_path}")
object_count += 1
return object_count



def clear_directory(path: str):
"""Delete directory & all its contents from given path"""
try:
for file in list_files_in_directory(path):
os.remove(file)
except FileNotFoundError:
pass
except Exception:
pass


def list_files_in_directory(directory):
Expand All @@ -184,18 +181,23 @@ def ping():
@app.route("/invocations", methods=["POST"])
def transcribe():
res = None

text = request.data.decode("utf-8")
data = json.loads(text)

input_location = data['input_location']
task = data['task']
output_location = data['output_location']
print(f"Input chunks location: {input_location}")
print(f'Input chunks location: {input_location}')

chunk_folder_path = f'{nfs_path}{generate_random_string()}'
objects_count = download_s3_bucket_from_uri(input_location, chunk_folder_path)
print(f'Downloaded {objects_count} files from S3')

chunk_folder_path = nfs_path+generate_random_string(20)
download_s3_bucket_from_uri(input_location, chunk_folder_path)
res = TranslateService.transcribe(chunk_folder_path, task, output_location)
print(f"Completed {task} for {input_location}")
return flask.Response(response=res, status=200, mimetype="text/plain")
print(f'Completed {task} for {input_location}')

# Clear storage
clear_directory(chunk_folder_path)

return flask.Response(response=res, status=200, mimetype='text/plain')

2 changes: 1 addition & 1 deletion server/lambdas/check_input_file_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def handler(event, context):
event['input_file'] = file_name_without_extn
event['output_s3_key'] = output_key
event['audio_chunks_s3_key'] = output_key + "/chunks/"
event['txt_chunks_s3_key'] = output_key + "/txt_chunks/"
event['txt_chunks_s3_key'] = output_key + "/txt_chunks/data.json.gz"
event['diarization_file'] = file_name_without_extn + '.diarization.txt'
event['output_file'] = file_name_without_extn + ".json"
event['original_transcription_file'] = chat_transcript_file_path
Expand Down
66 changes: 31 additions & 35 deletions server/lambdas/combine_transcription_files.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

import boto3
import gzip
import json
import pickle
import re
import time
import boto3
import server_constants
import time

print("Loading Combing Transcription Files...")
s3_client = boto3.client("s3")
tmp_prefix = '/tmp/'

spacermilli = 2000
def millisec(time_str):
spl = time_str.split(":")
s = int((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2])) * 1000)
return s


def combine_txt_transcriptions(TRANSCRIPTION_FILE_NAME, groups, txt_chunks_s3_key, language, BUCKET, output_s3_key):
fn_start = time.time()
gidx = -1
Expand All @@ -31,34 +26,35 @@ def combine_txt_transcriptions(TRANSCRIPTION_FILE_NAME, groups, txt_chunks_s3_ke
local_transcription_file = tmp_prefix + TRANSCRIPTION_FILE_NAME
text_file = open(local_transcription_file, "w")

for g in groups:
shift = re.findall("[0-9]+:[0-9]+:[0-9]+\.[0-9]+", string=g[0])[0]
# Start time in the original video
shift = millisec(shift) - spacermilli
shift = max(shift, 0)
# Download transcription model output file
tmp_output_path = tmp_prefix + 'data.json.gz'
s3_client.download_file(BUCKET, f"{output_s3_key}/{txt_chunks_s3_key}", tmp_output_path)
with gzip.open(tmp_output_path, 'r+') as file:
model_output = json.load(file)

is_translation = language not in {'original', 'en'}
original_txt_prefix, translated_txt_prefix = 'original_', 'translated_'

for group in groups:
speaker = group[0].split()[-1]
if speaker not in server_constants.SPEAKERS:
continue
speaker_str = server_constants.SPEAKERS[speaker]
speaker_str = re.sub("[!^:(),']", "", str(speaker))
gidx += 1

file_prefix = language
if language != 'original' and language != 'en':
file_prefix = "translated"
# Download chunks locally to combine
download_file_path = f"{txt_chunks_s3_key}{gidx}.{file_prefix}.txt"
tmp_chunk_path = f"{tmp_prefix}{gidx}.{file_prefix}.txt"

s3_client.download_file(BUCKET, download_file_path, tmp_chunk_path)
captions = open(tmp_chunk_path, "r+")

if captions:
speaker = g[0].split()[-1]
if speaker in server_constants.SPEAKERS:
speaker = server_constants.SPEAKERS[speaker]
# Removing speaker text with (), comma and other characters
speaker_str = re.sub("[!^:(),']", "", str(speaker))
for c in captions:
s = speaker_str + ":" + str(c)
res = re.sub(r"[!\n]", "", s)
text_file.write(res + "\n")
captions.close()
original_text_info = model_output.get(f'{original_txt_prefix}{gidx}', {})
translated_text_info = model_output.get(f'{translated_txt_prefix}{gidx}', {})

chunk_info = translated_text_info if is_translation else original_text_info
chunk_txt = re.sub(r'[!\n]', '', chunk_info.get('text', '')).strip()

if not chunk_txt:
continue

chunk_str = speaker_str + ":" + str(chunk_txt)
res = re.sub(r"[!\n]", "", chunk_str)
text_file.write(f'{res}\n')

group_file.close()
text_file.close()
Expand Down