Skip to content

Commit

Permalink
refactor code into classes
Browse files Browse the repository at this point in the history
All the logic has been moved and slightly refactored into separate
classes for better readability and maintanability of the codebase.
(redundant code will be removed with the next commit alongside tests)

An overview of the classes:
- `Transcription` is the main class that contains `Transcripts`.
- Each `Transcript` holds a `Source` which we want to transcribe
 and it is either `Audio` or `Video`.

How the flow looks like:
- We initialize a `Transcription` object that holds all the
related configurations for the current transcription process
- We can add as many sources as we want to the current `transcription`
with `transcription.add_transcription_source(...)`
- when we are ready, we `transcription.start()`, which:
     - produces an audio file by processing the source. This step is
     responsible for any downloads or conversions that needs to happen.
     - produces the transcription by processing the audio file. This step
     includes any summarizations, chapter generations, diarization that we
     might have configure.
     - writes the transcription to a markdown file.
     - can open a PR to the repo.
     - can push the transcript to a Queuer backend.
  • Loading branch information
kouloumos committed Nov 7, 2023
1 parent d9a8009 commit f75a843
Show file tree
Hide file tree
Showing 3 changed files with 662 additions and 41 deletions.
389 changes: 389 additions & 0 deletions app/transcript.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,389 @@
import json
import logging
import os
import shutil
import tempfile
from datetime import datetime
from urllib.parse import parse_qs, urlparse

import pytube
import requests
import static_ffmpeg
import whisper
import yt_dlp
from clint.textui import progress
from moviepy.editor import VideoFileClip

from app import __app_name__, __version__, application


class Transcript:
def __init__(self, source, test_mode=False):
self.source = source
self.test_mode = test_mode
self.logger = logging.getLogger(__app_name__)

def create_transcript(self):
result = ""
for x in self.result:
result = result + x[2] + " "

return result

def process_source(self, tmp_dir=None):
tmp_dir = tmp_dir if tmp_dir is not None else tempfile.mkdtemp()
self.audio_file = self.source.process(tmp_dir)
self.title = self.source.title if self.source.title else self.audio_file[:-4]
return self.audio_file, tmp_dir

def transcribe(self, working_dir, generate_chapters, summarize_transcript, service, diarize, upload, model_output_dir, test_transcript=None):

def process_mp3():
"""using whisper"""
self.logger.info("Transcribing audio to text using whisper ...")
try:
my_model = whisper.load_model(service)
result = my_model.transcribe(self.audio_file)
data = []
for x in result["segments"]:
data.append(tuple((x["start"], x["end"], x["text"])))
data_path = application.generate_srt(
data, self.title, model_output_dir)
if upload:
application.upload_file_to_s3(data_path)
return data
except Exception as e:
self.logger.error(
f"(wisper,{service}) Error transcribing audio to text: {e}")
return

def write_chapters_file():
"""Write out the chapter file based on simple MP4 format (OGM)"""
try:
if generate_chapters and len(self.source.chapters) > 0:
self.logger.info("Chapters detected")
chapters_file = os.path.join(working_dir, os.path.basename(
self.audio_file)[:-4] + ".chapters")

with open(chapters_file, "w") as fo:
for current_chapter in self.source.chapters:
fo.write(
f"CHAPTER{current_chapter[0]}="
f"{current_chapter[1]}\n"
f"CHAPTER{current_chapter[0]}NAME="
f"{current_chapter[2]}\n"
)
fo.close()
return True
else:
return False
except Exception as e:
raise Exception(f"Error writing chapters file: {e}")

try:
self.summary = None
if self.test_mode:
self.result = test_transcript if test_transcript is not None else "test-mode"
return self.result
if not self.audio_file:
# TODO give audio file path as argument
raise Exception(
"audio file is missing, you need to process_source() first")

has_chapters = write_chapters_file()
self.result = None
if service == "deepgram" or summarize_transcript:
deepgram_resp = application.process_mp3_deepgram(
self.audio_file, summarize_transcript, diarize)
self.result = application.get_deepgram_transcript(
deepgram_resp, diarize, self.title, upload, model_output_dir)

if summarize_transcript:
self.summary = application.get_deepgram_summary(
deepgram_resp)

if service == "deepgram" and has_chapters:
if diarize:
self.result = application.combine_deepgram_chapters_with_diarization(
deepgram_data=deepgram_resp, chapters=self.source.chapters
)
else:
self.result = application.combine_deepgram_with_chapters(
deepgram_data=deepgram_resp, chapters=self.source.chapters
)

if not service == "deepgram":
# whisper
self.result = process_mp3()
if has_chapters:
# this is only available for videos, for now
self.result = application.combine_chapter(
chapters=self.source.chapters,
transcript=self.result,
working_dir=working_dir
)
else:
# finalize transcript
self.result = self.create_transcript()

return self.result

except Exception as e:
raise Exception(f"Error while transcribing audio source: {e}")

def write_to_file(self, working_dir, transcript_by):
"""Writes transcript to a markdown file and returns its path
This file is submitted as part of the Pull Request to the
bitcointranscripts repo
"""

def process_metadata(key, value):
if value:
value = value.strip()
value = [item.strip() for item in value.split(",")]
return f"{key}: {value}\n"
return ""

self.logger.info("Creating markdown file with transcription...")
try:
# Add metadata prefix
meta_data = (
"---\n"
f"title: {self.title}\n"
f"transcript_by: {transcript_by} via TBTBTC v{__version__}\n"
)
if not self.source.local:
meta_data += f"media: {self.source.source_file}\n"
meta_data += process_metadata("tags", self.source.tags)
meta_data += process_metadata("speakers", self.source.speakers)
meta_data += process_metadata("categories",
self.source.category)
if self.summary:
meta_data += f"summary: {self.summary}\n"
if self.source.event_date:
meta_data += f"date: {self.source.event_date}\n"
meta_data += "---\n"
# Write to file
file_name = self.title.replace(" ", "-")
file_name_with_ext = os.path.join(
working_dir, file_name + ".md")
with open(file_name_with_ext, "a") as opf:
opf.write(meta_data + "\n")
opf.write(self.result + "\n")
opf.close()
return os.path.abspath(file_name_with_ext)
except Exception as e:
self.logger.error(f"Error writing to file: {e}")


class Source:
def __init__(self, source_file, local, title, date, tags, category, speakers):
self.source_file = source_file
self.local = local
self.__config_event_date(date)
self.title = title
self.tags = tags
self.category = category
self.speakers = speakers
self.logger = logging.getLogger(__app_name__)

def __config_event_date(self, date):
self.event_date = None
if date:
try:
self.event_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError as e:
raise ValueError(f"Supplied date is invalid: {e}")
return

def initialize(self):
try:
# FFMPEG installed on first use.
self.logger.debug("Initializing FFMPEG...")
static_ffmpeg.add_paths()
self.logger.debug("Initialized FFMPEG")
except Exception as e:
raise Exception("Error initializing")


class Audio(Source):
def __init__(self, source_file, local, title, date, tags, category, speakers):
super().__init__(source_file, local, title, date, tags, category, speakers)
self.type = "audio"

def process(self, working_dir):
"""Process audio"""

def download_audio():
"""Helper method to download an audio file"""
if self.local:
raise Exception(f"{self.source_file} is a local file")
if self.title is None:
raise Exception("Please supply a title for the audio file")
self.logger.info(f"Downloading audio file: {self.source_file}")
try:
audio = requests.get(self.source_file, stream=True)
with open(os.path.join(working_dir, self.title + ".mp3"), "wb") as f:
total_length = int(audio.headers.get("content-length"))
for chunk in progress.bar(
audio.iter_content(chunk_size=1024),
expected_size=(total_length / 1024) + 1,
):
if chunk:
f.write(chunk)
f.flush()
filename = os.path.join(working_dir, self.title + ".mp3")
return os.path.abspath(filename)
except Exception as e:
raise Exception(f"Error downloading audio file: {e}")

try:
self.logger.info(f"Audio file detected: '{self.title}'")
if not self.local:
# download audio file from the internet
filename = download_audio()
abs_path = os.path.abspath(path=filename)
self.logger.info(f"Audio file stored in: {abs_path}")
else:
# calculate the absolute path of the local audio file
filename = self.source_file.split("/")[-1]
abs_path = os.path.abspath(self.source_file)
self.logger.info(f"Processing audio file: {abs_path}")
if filename.endswith("wav"):
self.initialize()
abs_path = application.convert_wav_to_mp3(
abs_path=abs_path, filename=filename, working_dir=working_dir
)
# return the audio file that is now ready for transcription
return abs_path

except Exception as e:
raise Exception(f"Error processing audio file: {e}")


class Video(Source):
def __init__(self, source_file, local, title, date, tags, category, speakers):
super().__init__(source_file, local, title, date, tags, category, speakers)
self.type = "video"
self.__config_source()

def __config_source(self):
if not self.local:
# calculate youtube url
video_id = self.source_file # user gave just the youtube video id
if "watch?v=" in self.source_file:
parsed_url = urlparse(self.source_file)
video_id = parse_qs(parsed_url.query)["v"][0]
elif "youtu.be" in self.source_file or "embed" in self.source_file:
video_id = self.source_file.split("/")[-1]
self.source_file = "https://www.youtube.com/watch?v=" + video_id
if self.event_date is None:
# get the date from the published youtube video
video = pytube.YouTube(self.source_file)
self.event_date = str(video.publish_date).split(" ")[0]

def process(self, working_dir):
"""Process video"""

def download_video():
"""Helper method to download a YouTube video"""
if self.local:
raise Exception(f"{self.source_file} is a local file")
try:
# download video from youtube
self.logger.info(f"Downloading video: {self.source_file}")

ydl_opts = {
"format": "18",
"outtmpl": os.path.join(working_dir, "videoFile.%(ext)s"),
"nopart": True,
"writeinfojson": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ytdl:
ytdl.download([self.source_file])

with open(os.path.join(working_dir, "videoFile.info.json")) as file:
info = ytdl.sanitize_info(json.load(file))
name = info["title"].replace("/", "-")
file.close()

os.rename(
os.path.join(working_dir, "videoFile.mp4"),
os.path.join(working_dir, name + ".mp4"),
)

return os.path.abspath(os.path.join(working_dir, name + ".mp4"))
except Exception as e:
shutil.rmtree(working_dir)
raise Exception(f"Error downloading video: {e}")

def convert_video_to_mp3(video_file):
try:
clip = VideoFileClip(video_file)
self.logger.info(f"Converting {video_file} to mp3...")
clip.audio.write_audiofile(
os.path.join(working_dir, video_file.split("/")
[-1][:-4] + ".mp3")
)
clip.close()
self.logger.info("Video converted to mp3")
return os.path.join(working_dir, video_file.split("/")[-1][:-4] + ".mp3")
except Exception as e:
raise Exception(f"Error converting video to mp3: {e}")

def extract_chapters_from_downloaded_video_metadata():
try:
list_of_chapters = []
with open(f"{working_dir}/videoFile.info.json", "r") as f:
info = json.load(f)
if "chapters" not in info:
self.logger.info("No chapters found for downloaded video")
return list_of_chapters
for index, x in enumerate(info["chapters"]):
name = x["title"]
start = x["start_time"]
list_of_chapters.append((str(index), start, str(name)))

return list_of_chapters
except Exception as e:
self.logger.error(
f"Error reading downloaded video's metadata: {e}")
return []

try:
if not self.local:
abs_path = download_video()
self.chapters = extract_chapters_from_downloaded_video_metadata()
else:
abs_path = os.path.abspath(self.source_file)

self.initialize()
audio_file = convert_video_to_mp3(abs_path)
return audio_file

except Exception as e:
raise Exception(f"Error processing video file: {e}")


class Playlist(Source):
def __init__(self, source_file, local, title, date, tags, category, speakers):
super().__init__(source_file, local, title, date, tags, category, speakers)
self.type = "playlist"
self.videos = []
self.__config_source()

def __config_source(self):
if not self.local:
# calculate youtube url
playlist_id = self.source_file # user gave just the youtube playlist id
if self.source_file.startswith("http") or self.source_file.startswith("www"):
parsed_url = urlparse(self.source_file)
playlist_id = parse_qs(parsed_url.query)["list"][0]
url = "https://www.youtube.com/playlist?list=" + playlist_id
videos = pytube.Playlist(url)
if videos is None:
raise Exception(f"Playlist '{url}' is empty")
for source_file in videos:
source = Video(source_file, self.local, self.title,
self.event_date, self.tags, self.category, self.speakers)
self.videos.append(source)
Loading

0 comments on commit f75a843

Please sign in to comment.