From f75a8434458f58d3dc0155113b1b0072e26dffc9 Mon Sep 17 00:00:00 2001 From: kouloumos Date: Thu, 2 Nov 2023 20:39:10 +0200 Subject: [PATCH] refactor code into classes All the logic has been moved and slightly refactored into separate classes for better readability and maintanability of the codebase. (redundant code will be removed with the next commit alongside tests) An overview of the classes: - `Transcription` is the main class that contains `Transcripts`. - Each `Transcript` holds a `Source` which we want to transcribe and it is either `Audio` or `Video`. How the flow looks like: - We initialize a `Transcription` object that holds all the related configurations for the current transcription process - We can add as many sources as we want to the current `transcription` with `transcription.add_transcription_source(...)` - when we are ready, we `transcription.start()`, which: - produces an audio file by processing the source. This step is responsible for any downloads or conversions that needs to happen. - produces the transcription by processing the audio file. This step includes any summarizations, chapter generations, diarization that we might have configure. - writes the transcription to a markdown file. - can open a PR to the repo. - can push the transcript to a Queuer backend. --- app/transcript.py | 389 +++++++++++++++++++++++++++++++++++++++++++ app/transcription.py | 233 ++++++++++++++++++++++++++ transcriber.py | 81 +++++---- 3 files changed, 662 insertions(+), 41 deletions(-) create mode 100644 app/transcript.py create mode 100644 app/transcription.py diff --git a/app/transcript.py b/app/transcript.py new file mode 100644 index 0000000..371e515 --- /dev/null +++ b/app/transcript.py @@ -0,0 +1,389 @@ +import json +import logging +import os +import shutil +import tempfile +from datetime import datetime +from urllib.parse import parse_qs, urlparse + +import pytube +import requests +import static_ffmpeg +import whisper +import yt_dlp +from clint.textui import progress +from moviepy.editor import VideoFileClip + +from app import __app_name__, __version__, application + + +class Transcript: + def __init__(self, source, test_mode=False): + self.source = source + self.test_mode = test_mode + self.logger = logging.getLogger(__app_name__) + + def create_transcript(self): + result = "" + for x in self.result: + result = result + x[2] + " " + + return result + + def process_source(self, tmp_dir=None): + tmp_dir = tmp_dir if tmp_dir is not None else tempfile.mkdtemp() + self.audio_file = self.source.process(tmp_dir) + self.title = self.source.title if self.source.title else self.audio_file[:-4] + return self.audio_file, tmp_dir + + def transcribe(self, working_dir, generate_chapters, summarize_transcript, service, diarize, upload, model_output_dir, test_transcript=None): + + def process_mp3(): + """using whisper""" + self.logger.info("Transcribing audio to text using whisper ...") + try: + my_model = whisper.load_model(service) + result = my_model.transcribe(self.audio_file) + data = [] + for x in result["segments"]: + data.append(tuple((x["start"], x["end"], x["text"]))) + data_path = application.generate_srt( + data, self.title, model_output_dir) + if upload: + application.upload_file_to_s3(data_path) + return data + except Exception as e: + self.logger.error( + f"(wisper,{service}) Error transcribing audio to text: {e}") + return + + def write_chapters_file(): + """Write out the chapter file based on simple MP4 format (OGM)""" + try: + if generate_chapters and len(self.source.chapters) > 0: + self.logger.info("Chapters detected") + chapters_file = os.path.join(working_dir, os.path.basename( + self.audio_file)[:-4] + ".chapters") + + with open(chapters_file, "w") as fo: + for current_chapter in self.source.chapters: + fo.write( + f"CHAPTER{current_chapter[0]}=" + f"{current_chapter[1]}\n" + f"CHAPTER{current_chapter[0]}NAME=" + f"{current_chapter[2]}\n" + ) + fo.close() + return True + else: + return False + except Exception as e: + raise Exception(f"Error writing chapters file: {e}") + + try: + self.summary = None + if self.test_mode: + self.result = test_transcript if test_transcript is not None else "test-mode" + return self.result + if not self.audio_file: + # TODO give audio file path as argument + raise Exception( + "audio file is missing, you need to process_source() first") + + has_chapters = write_chapters_file() + self.result = None + if service == "deepgram" or summarize_transcript: + deepgram_resp = application.process_mp3_deepgram( + self.audio_file, summarize_transcript, diarize) + self.result = application.get_deepgram_transcript( + deepgram_resp, diarize, self.title, upload, model_output_dir) + + if summarize_transcript: + self.summary = application.get_deepgram_summary( + deepgram_resp) + + if service == "deepgram" and has_chapters: + if diarize: + self.result = application.combine_deepgram_chapters_with_diarization( + deepgram_data=deepgram_resp, chapters=self.source.chapters + ) + else: + self.result = application.combine_deepgram_with_chapters( + deepgram_data=deepgram_resp, chapters=self.source.chapters + ) + + if not service == "deepgram": + # whisper + self.result = process_mp3() + if has_chapters: + # this is only available for videos, for now + self.result = application.combine_chapter( + chapters=self.source.chapters, + transcript=self.result, + working_dir=working_dir + ) + else: + # finalize transcript + self.result = self.create_transcript() + + return self.result + + except Exception as e: + raise Exception(f"Error while transcribing audio source: {e}") + + def write_to_file(self, working_dir, transcript_by): + """Writes transcript to a markdown file and returns its path + This file is submitted as part of the Pull Request to the + bitcointranscripts repo + """ + + def process_metadata(key, value): + if value: + value = value.strip() + value = [item.strip() for item in value.split(",")] + return f"{key}: {value}\n" + return "" + + self.logger.info("Creating markdown file with transcription...") + try: + # Add metadata prefix + meta_data = ( + "---\n" + f"title: {self.title}\n" + f"transcript_by: {transcript_by} via TBTBTC v{__version__}\n" + ) + if not self.source.local: + meta_data += f"media: {self.source.source_file}\n" + meta_data += process_metadata("tags", self.source.tags) + meta_data += process_metadata("speakers", self.source.speakers) + meta_data += process_metadata("categories", + self.source.category) + if self.summary: + meta_data += f"summary: {self.summary}\n" + if self.source.event_date: + meta_data += f"date: {self.source.event_date}\n" + meta_data += "---\n" + # Write to file + file_name = self.title.replace(" ", "-") + file_name_with_ext = os.path.join( + working_dir, file_name + ".md") + with open(file_name_with_ext, "a") as opf: + opf.write(meta_data + "\n") + opf.write(self.result + "\n") + opf.close() + return os.path.abspath(file_name_with_ext) + except Exception as e: + self.logger.error(f"Error writing to file: {e}") + + +class Source: + def __init__(self, source_file, local, title, date, tags, category, speakers): + self.source_file = source_file + self.local = local + self.__config_event_date(date) + self.title = title + self.tags = tags + self.category = category + self.speakers = speakers + self.logger = logging.getLogger(__app_name__) + + def __config_event_date(self, date): + self.event_date = None + if date: + try: + self.event_date = datetime.strptime(date, "%Y-%m-%d").date() + except ValueError as e: + raise ValueError(f"Supplied date is invalid: {e}") + return + + def initialize(self): + try: + # FFMPEG installed on first use. + self.logger.debug("Initializing FFMPEG...") + static_ffmpeg.add_paths() + self.logger.debug("Initialized FFMPEG") + except Exception as e: + raise Exception("Error initializing") + + +class Audio(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "audio" + + def process(self, working_dir): + """Process audio""" + + def download_audio(): + """Helper method to download an audio file""" + if self.local: + raise Exception(f"{self.source_file} is a local file") + if self.title is None: + raise Exception("Please supply a title for the audio file") + self.logger.info(f"Downloading audio file: {self.source_file}") + try: + audio = requests.get(self.source_file, stream=True) + with open(os.path.join(working_dir, self.title + ".mp3"), "wb") as f: + total_length = int(audio.headers.get("content-length")) + for chunk in progress.bar( + audio.iter_content(chunk_size=1024), + expected_size=(total_length / 1024) + 1, + ): + if chunk: + f.write(chunk) + f.flush() + filename = os.path.join(working_dir, self.title + ".mp3") + return os.path.abspath(filename) + except Exception as e: + raise Exception(f"Error downloading audio file: {e}") + + try: + self.logger.info(f"Audio file detected: '{self.title}'") + if not self.local: + # download audio file from the internet + filename = download_audio() + abs_path = os.path.abspath(path=filename) + self.logger.info(f"Audio file stored in: {abs_path}") + else: + # calculate the absolute path of the local audio file + filename = self.source_file.split("/")[-1] + abs_path = os.path.abspath(self.source_file) + self.logger.info(f"Processing audio file: {abs_path}") + if filename.endswith("wav"): + self.initialize() + abs_path = application.convert_wav_to_mp3( + abs_path=abs_path, filename=filename, working_dir=working_dir + ) + # return the audio file that is now ready for transcription + return abs_path + + except Exception as e: + raise Exception(f"Error processing audio file: {e}") + + +class Video(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "video" + self.__config_source() + + def __config_source(self): + if not self.local: + # calculate youtube url + video_id = self.source_file # user gave just the youtube video id + if "watch?v=" in self.source_file: + parsed_url = urlparse(self.source_file) + video_id = parse_qs(parsed_url.query)["v"][0] + elif "youtu.be" in self.source_file or "embed" in self.source_file: + video_id = self.source_file.split("/")[-1] + self.source_file = "https://www.youtube.com/watch?v=" + video_id + if self.event_date is None: + # get the date from the published youtube video + video = pytube.YouTube(self.source_file) + self.event_date = str(video.publish_date).split(" ")[0] + + def process(self, working_dir): + """Process video""" + + def download_video(): + """Helper method to download a YouTube video""" + if self.local: + raise Exception(f"{self.source_file} is a local file") + try: + # download video from youtube + self.logger.info(f"Downloading video: {self.source_file}") + + ydl_opts = { + "format": "18", + "outtmpl": os.path.join(working_dir, "videoFile.%(ext)s"), + "nopart": True, + "writeinfojson": True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ytdl: + ytdl.download([self.source_file]) + + with open(os.path.join(working_dir, "videoFile.info.json")) as file: + info = ytdl.sanitize_info(json.load(file)) + name = info["title"].replace("/", "-") + file.close() + + os.rename( + os.path.join(working_dir, "videoFile.mp4"), + os.path.join(working_dir, name + ".mp4"), + ) + + return os.path.abspath(os.path.join(working_dir, name + ".mp4")) + except Exception as e: + shutil.rmtree(working_dir) + raise Exception(f"Error downloading video: {e}") + + def convert_video_to_mp3(video_file): + try: + clip = VideoFileClip(video_file) + self.logger.info(f"Converting {video_file} to mp3...") + clip.audio.write_audiofile( + os.path.join(working_dir, video_file.split("/") + [-1][:-4] + ".mp3") + ) + clip.close() + self.logger.info("Video converted to mp3") + return os.path.join(working_dir, video_file.split("/")[-1][:-4] + ".mp3") + except Exception as e: + raise Exception(f"Error converting video to mp3: {e}") + + def extract_chapters_from_downloaded_video_metadata(): + try: + list_of_chapters = [] + with open(f"{working_dir}/videoFile.info.json", "r") as f: + info = json.load(f) + if "chapters" not in info: + self.logger.info("No chapters found for downloaded video") + return list_of_chapters + for index, x in enumerate(info["chapters"]): + name = x["title"] + start = x["start_time"] + list_of_chapters.append((str(index), start, str(name))) + + return list_of_chapters + except Exception as e: + self.logger.error( + f"Error reading downloaded video's metadata: {e}") + return [] + + try: + if not self.local: + abs_path = download_video() + self.chapters = extract_chapters_from_downloaded_video_metadata() + else: + abs_path = os.path.abspath(self.source_file) + + self.initialize() + audio_file = convert_video_to_mp3(abs_path) + return audio_file + + except Exception as e: + raise Exception(f"Error processing video file: {e}") + + +class Playlist(Source): + def __init__(self, source_file, local, title, date, tags, category, speakers): + super().__init__(source_file, local, title, date, tags, category, speakers) + self.type = "playlist" + self.videos = [] + self.__config_source() + + def __config_source(self): + if not self.local: + # calculate youtube url + playlist_id = self.source_file # user gave just the youtube playlist id + if self.source_file.startswith("http") or self.source_file.startswith("www"): + parsed_url = urlparse(self.source_file) + playlist_id = parse_qs(parsed_url.query)["list"][0] + url = "https://www.youtube.com/playlist?list=" + playlist_id + videos = pytube.Playlist(url) + if videos is None: + raise Exception(f"Playlist '{url}' is empty") + for source_file in videos: + source = Video(source_file, self.local, self.title, + self.event_date, self.tags, self.category, self.speakers) + self.videos.append(source) diff --git a/app/transcription.py b/app/transcription.py new file mode 100644 index 0000000..e8f16eb --- /dev/null +++ b/app/transcription.py @@ -0,0 +1,233 @@ +import logging +import os +import re +import tempfile +import time +from datetime import datetime + +from dotenv import dotenv_values +import pytube +from pytube.exceptions import PytubeError +import requests + +from app.transcript import Transcript, Audio, Video, Playlist, RSS +from app import __app_name__, __version__, application + + +class Transcription: + def __init__(self, loc="test/test", model="tiny", chapters=False, pr=False, summarize=False, deepgram=False, diarize=False, upload=False, model_output_dir="local_models/", nocleanup=False, queue=True, markdown=False, username=None, test_mode=False, working_dir=None): + self.model = model + self.transcript_by = "username" if test_mode else self.__get_username() + # location in the bitcointranscripts hierarchy + self.loc = loc.strip("/") + self.generate_chapters = chapters + self.open_pr = pr + self.summarize_transcript = summarize + self.service = "deepgram" if deepgram else model + self.diarize = diarize + self.upload = upload + self.model_output_dir = model_output_dir + self.transcripts = [] + self.nocleanup = nocleanup + # during testing we do not have/need a queuer backend + self.queue = queue if not test_mode else False + # during testing we need to create the markdown for validation purposes + self.markdown = markdown or test_mode + self.test_mode = test_mode + self.logger = logging.getLogger(__app_name__) + self.tmp_dir = working_dir if working_dir is not None else tempfile.mkdtemp() + + self.logger.info(f"Temp directory: {self.tmp_dir}") + + def _create_subdirectory(self, subdir_name): + """Helper method to create subdirectories within the central temp director""" + subdir_path = os.path.join(self.tmp_dir, subdir_name) + os.makedirs(subdir_path) + return subdir_path + + def __get_username(self): + try: + if os.path.isfile(".username"): + with open(".username", "r") as f: + username = f.read() + f.close() + else: + print("What is your github username?") + username = input() + with open(".username", "w") as f: + f.write(username) + f.close() + return username + except Exception as e: + raise Exception("Error getting username") + + def _calculate_source_type(self, source): + def check_if_playlist(media): + """Helper function to check if a source is a playlist + based on its file name""" + try: + if ( + media.startswith("PL") + or media.startswith("UU") + or media.startswith("FL") + or media.startswith("RD") + ): + return True + playlists = list(pytube.Playlist(media).video_urls) + if type(playlists) is not list: + return False + return True + except Exception as e: + return False + + def check_if_video(media): + """Helper function to check if a source is a video + based on its file name""" + if media.endswith(".mp4"): + return True + if re.search(r"^([\dA-Za-z_-]{11})$", media): + return True + try: + pytube.YouTube(media) + return True + except PytubeError as e: + raise Exception(f"Pytube Error: {e}") + + if source.endswith(".mp3") or source.endswith(".wav"): + return "audio" + elif check_if_playlist(source): + return "playlist" + elif check_if_video(source): + return "video" + else: + raise Exception(f"Invalid source: {source}") + + def add_transcription_source(self, source_file, title=None, date=None, tags=None, category=None, speakers=None): + """Calculates the type of the source based on its file name + """ + + # check if source is a local file + local = False + if os.path.isfile(source_file): + local = True + # initialize source + source_type = self._calculate_source_type(source_file) + if source_type == "audio": + source = Audio(source_file, local, title, + date, tags, category, speakers) + self.transcripts.append(Transcript(source, self.test_mode)) + elif source_type == "playlist": + source = Playlist(source_file, local, title, + date, tags, category, speakers) + for video in source.videos: + self.transcripts.append(Transcript(video, self.test_mode)) + elif source_type == "video": + source = Video(source_file, local, title, + date, tags, category, speakers) + self.transcripts.append(Transcript(source, self.test_mode)) + else: + raise Exception(f"Invalid source: {source_file}") + + def push_to_queue(self, transcript: Transcript, payload=None): + """Push the resulting transcript to a Queuer backend""" + def write_to_json(json_data): + time_in_str = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + file_path = os.path.join( + self.model_output_dir, f"{transcript.title}_{time_in_str}_payload.json" + ) + with open(file_path, "w") as json_file: + json.dump(json_data, json_file, indent=4) + return file_path + try: + if payload is None: + # No payload has been given directly + # Construct the payload with the resulting transcript + payload = { + "content": { + "title": transcript.title, + "transcript_by": f"{self.transcript_by} via TBTBTC v{__version__}", + "categories": transcript.source.category, + "tags": transcript.source.tags, + "speakers": transcript.source.speakers, + "loc": self.loc, + "body": transcript.result, + } + } + # Handle optional metadata fields + if transcript.source.event_date: + payload["content"]["date"] = transcript.source.event_date if type( + transcript.source.event_date) is str else transcript.source.event_date.strftime("%Y-%m-%d") + if not transcript.source.local: + payload["content"]["media"] = transcript.source.source_file + # Check if the user opt-out from sending the payload to the Queuer + if not self.queue: + # payload will not be send to the Queuer backend + if self.test_mode: + # queuer is disabled by default when testing but we still + # return the payload to be used for testing purposes + return payload + else: + # store payload in case the user wants to manually send it to the queuer + payload_json_file = write_to_json(payload) + self.logger.info( + f"Transcript not added to the queue, payload stored at: {payload_json_file}") + return payload_json_file + # Push the payload with the resulting transcript to the Queuer backend + config = dotenv_values(".env") + if "QUEUE_ENDPOINT" not in config: + raise Exception( + "To push to a queue you need to define a 'QUEUE_ENDPOINT' in your .env file") + if "BEARER_TOKEN" not in config: + raise Exception( + "To push to a queue you need to define a 'BEARER_TOKEN' in your .env file") + url = config["QUEUE_ENDPOINT"] + "/api/transcripts" + headers = { + 'Authorization': f'Bearer {config["BEARER_TOKEN"]}', + 'Content-Type': 'application/json' + } + resp = requests.post(url, json=payload, headers=headers) + if resp.status_code == 200: + self.logger.info("Transcript added to queue") + return resp + except Exception as e: + self.logger.error(f"Transcript not added to queue: {e}") + + def start(self, test_transcript=None): + self.result = [] + try: + for transcript in self.transcripts: + tmp_dir = self._create_subdirectory( + f"transcript{len(self.result) + 1}") + audio_file, _ = transcript.process_source(tmp_dir) + result = transcript.transcribe( + tmp_dir, + self.generate_chapters, + self.summarize_transcript, + self.service, + self.diarize, + self.upload, + self.model_output_dir, + test_transcript=test_transcript + ) + if self.markdown: + transcription_md_file = transcript.write_to_file( + self.model_output_dir if not self.test_mode else tmp_dir, + self.transcript_by) + if self.open_pr: + application.create_pr( + absolute_path=transcription_md_file, + loc=self.loc, + username=self.transcript_by, + curr_time=str(round(time.time() * 1000)), + title=transcript.title, + ) + else: + self.push_to_queue(transcript) + self.result.append(transcription_md_file) + return self.result + except Exception as e: + raise Exception(f"Error with the transcription: {e}") from e + + def clean_up(self): + self.logger.info("Cleaning up...") + application.clean_up(self.tmp_dir) diff --git a/transcriber.py b/transcriber.py index 063d962..d67ab01 100644 --- a/transcriber.py +++ b/transcriber.py @@ -1,9 +1,11 @@ import logging -from datetime import datetime +import tempfile import click from app import __app_name__, __version__, application +from app.transcript import Transcript +from app.transcription import Transcription def setup_logger(): @@ -110,21 +112,12 @@ def print_help(ctx, param, value): default=False, help="Supply this flag if you want to generate chapters for the transcript", ) -@click.option( - "-h", - "--help", - is_flag=True, - callback=print_help, - expose_value=False, - is_eager=True, - help="Show the application's help and exit.", -) @click.option( "-p", "--PR", is_flag=True, default=False, - help="Supply this flag if you want to generate a payload", + help="Supply this flag if you want to open a PR at the bitcointranscripts repo", ) @click.option( "-D", @@ -171,6 +164,24 @@ def print_help(ctx, param, value): help="Supply this flag if you want to upload processed model files to AWS " "S3", ) +@click.option( + "--nocleanup", + is_flag=True, + default=False, + help="Do not remove temp files on exit", +) +@click.option( + "--noqueue", + is_flag=True, + default=False, + help="Do not push the resulting transcript to the Queuer backend", +) +@click.option( + "--markdown", + is_flag=True, + default=False, + help="Create a markdown file for the resulting transcript", +) def add( source: str, loc: str, @@ -188,6 +199,9 @@ def add( upload: bool, verbose: bool, model_output_dir: str, + nocleanup: bool, + noqueue: bool, + markdown: bool ) -> None: """Supply a YouTube video id and directory for transcription. \n Note: The https links need to be wrapped in quotes when running the command @@ -199,51 +213,36 @@ def add( logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.WARNING) + tmp_dir = tempfile.mkdtemp() logger.info( "This tool will convert Youtube videos to mp3 files and then " "transcribe them to text using Whisper. " ) try: - username = application.get_username() - loc = loc.strip("/") - event_date = None - if date: - try: - event_date = datetime.strptime(date, "%Y-%m-%d").date() - except ValueError as e: - logger.error("Supplied date is invalid: ", e) - return - (source_type, local) = application.check_source_type(source=source) - if source_type is None: - logger.error("Invalid source") - return - filename, tmp_dir = application.process_source( - source=source, - title=title, - event_date=event_date, - tags=tags, - category=category, - speakers=speakers, + transcription = Transcription( loc=loc, model=model, - username=username, chapters=chapters, pr=pr, summarize=summarize, - source_type=source_type, deepgram=deepgram, diarize=diarize, upload=upload, model_output_dir=model_output_dir, - verbose=verbose, - local=local + nocleanup=nocleanup, + queue=not noqueue, + markdown=markdown, + working_dir=tmp_dir + ) + transcription.add_transcription_source( + source_file=source, title=title, date=date, tags=tags, category=category, speakers=speakers, ) - if filename: - """INITIALIZE GIT AND OPEN A PR""" - logger.info("Transcription complete") - logger.info("Cleaning up...") - application.clean_up(tmp_dir) + transcription.start() + if nocleanup: + logger.info("Not cleaning up temp files...") + else: + transcription.clean_up() except Exception as e: logger.error(e) - logger.error("Cleaning up...") + logger.info(f"Exited with error, not cleaning up temp files: {tmp_dir}")